Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
kyriediculous committed Jun 17, 2021
1 parent 8d692d8 commit 9f55ef6
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 314 deletions.
58 changes: 18 additions & 40 deletions cmd/devtool/devtool.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,17 @@ func ethSetup(ethAcctAddr, keystoreDir string, isBroadcaster bool) {

gpm := eth.NewGasPriceMonitor(backend, 5*time.Second, big.NewInt(0))

client, err := eth.NewClient(ethcommon.HexToAddress(ethAcctAddr), keystoreDir, passphrase, backend, gpm,
ethcommon.HexToAddress(ethController), ethTxTimeout, nil)
clientCfg := eth.LivepeerEthClientConfig{
AccountAddr: ethcommon.HexToAddress(ethAcctAddr),
KeystoreDir: keystoreDir,
Password: passphrase,
ControllerAddr: ethcommon.HexToAddress(ethController),
TxTimeout: ethTxTimeout,
MaxGasPrice: nil,
ReplaceTransactions: false,
}

client, err := eth.NewClient(backend, gpm, clientCfg)
if err != nil {
glog.Errorf("Failed to create client: %v", err)
return
Expand All @@ -179,31 +188,22 @@ func ethSetup(ethAcctAddr, keystoreDir string, isBroadcaster bool) {
glog.Infof("Funding deposit with %v", amount)
glog.Infof("Funding reserve with %v", amount)

tx, err := client.FundDepositAndReserve(amount, amount)
_, err := client.FundDepositAndReserve(amount, amount)
if err != nil {
glog.Error(err)
return
}
if err := client.CheckTx(tx); err != nil {
glog.Error(err)
return
}

glog.Info("Done funding deposit and reserve")
} else {
glog.Infof("Requesting tokens from faucet")

tx, err := client.Request()
_, err := client.Request()
if err != nil {
glog.Errorf("Error requesting tokens from faucet: %v", err)
return
}

err = client.CheckTx(tx)
if err != nil {
glog.Errorf("Error requesting tokens from faucet: %v", err)
return
}
glog.Info("Done requesting tokens.")
time.Sleep(4 * time.Second)

Expand All @@ -222,20 +222,15 @@ func ethSetup(ethAcctAddr, keystoreDir string, isBroadcaster bool) {
glog.Info("Waiting will first round ended.")
time.Sleep(4 * time.Second)
}
tx, err = client.InitializeRound()
_, err = client.InitializeRound()
// ErrRoundInitialized
if err != nil {
if err.Error() != "ErrRoundInitialized" {
glog.Errorf("Error initializing round: %v", err)
return
}
} else {
err = client.CheckTx(tx)
if err != nil {
glog.Errorf("Error initializng round: %v", err)
return
}
}

glog.Info("Done initializing round.")
glog.Info("Activating transcoder")
// curl -d "blockRewardCut=10&feeShare=5&amount=500" --data-urlencode "serviceURI=https://$transcoderServiceAddr" \
Expand All @@ -244,21 +239,15 @@ func ethSetup(ethAcctAddr, keystoreDir string, isBroadcaster bool) {
var amount *big.Int = big.NewInt(int64(500))
glog.Infof("Bonding %v to %s", amount, ethAcctAddr)

tx, err = client.Bond(amount, ethcommon.HexToAddress(ethAcctAddr))
_, err = client.Bond(amount, ethcommon.HexToAddress(ethAcctAddr))
if err != nil {
glog.Error(err)
return
}

err = client.CheckTx(tx)
if err != nil {
glog.Error("=== Bonding failed")
glog.Error(err)
return
}
glog.Infof("Registering transcoder %v", ethAcctAddr)

tx, err = client.Transcoder(eth.FromPerc(10), eth.FromPerc(5))
_, err = client.Transcoder(eth.FromPerc(10), eth.FromPerc(5))
if err == eth.ErrCurrentRoundLocked {
// wait for next round and retry
}
Expand All @@ -267,24 +256,13 @@ func ethSetup(ethAcctAddr, keystoreDir string, isBroadcaster bool) {
return
}

err = client.CheckTx(tx)
if err != nil {
glog.Error(err)
return
}

glog.Infof("Storing service URI %v in service registry...", serviceURI)

tx, err = client.SetServiceURI(serviceURI)
_, err = client.SetServiceURI(serviceURI)
if err != nil {
glog.Error(err)
return
}

err = client.CheckTx(tx)
if err != nil {
glog.Error(err)
}
}
}

Expand Down
13 changes: 12 additions & 1 deletion cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func main() {
ethKeystorePath := flag.String("ethKeystorePath", "", "Path for the Eth Key")
ethOrchAddr := flag.String("ethOrchAddr", "", "ETH address of an on-chain registered orchestrator")
ethUrl := flag.String("ethUrl", "", "Ethereum node JSON-RPC URL")
replaceTx := flag.Bool("replaceTransactions", true, "Set to true to replace ethereum transactions with a higher gas price if they time out")
gasLimit := flag.Int("gasLimit", 0, "Gas limit for ETH transactions")
maxGasPrice := flag.Int("maxGasPrice", 0, "Maximum gas price for ETH transactions")
ethController := flag.String("ethController", "", "Protocol smart contract address")
Expand Down Expand Up @@ -399,7 +400,17 @@ func main() {
}
defer gpm.Stop()

client, err := eth.NewClient(ethcommon.HexToAddress(*ethAcctAddr), keystoreDir, *ethPassword, backend, gpm, ethcommon.HexToAddress(*ethController), EthTxTimeout, bigMaxGasPrice)
ethCfg := eth.LivepeerEthClientConfig{
AccountAddr: ethcommon.HexToAddress(*ethAcctAddr),
KeystoreDir: keystoreDir,
Password: *ethPassword,
ControllerAddr: ethcommon.HexToAddress(*ethController),
TxTimeout: EthTxTimeout,
MaxGasPrice: bigMaxGasPrice,
ReplaceTransactions: *replaceTx,
}

client, err := eth.NewClient(backend, gpm, ethCfg)
if err != nil {
glog.Errorf("Failed to create Livepeer Ethereum client: %v", err)
return
Expand Down
8 changes: 7 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Backend interface {
ethereum.LogFilterer
ethereum.ChainReader
ChainID(ctx context.Context) (*big.Int, error)
GasPriceMonitor() *GasPriceMonitor
}

type backend struct {
Expand All @@ -56,12 +57,13 @@ type backend struct {
sync.RWMutex
}

func NewBackend(client *ethclient.Client, signer types.Signer, gpm *GasPriceMonitor) Backend {
func NewBackend(client *ethclient.Client, signer types.Signer, gpm *GasPriceMonitor, tm *TransactionManager) Backend {
return &backend{
Client: client,
nonceManager: NewNonceManager(client),
signer: signer,
gpm: gpm,
tm: tm,
}
}

Expand Down Expand Up @@ -108,6 +110,10 @@ func (b *backend) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
return gp, nil
}

func (b *backend) GasPriceMonitor() *GasPriceMonitor {
return b.gpm
}

type txLog struct {
method string
inputs string
Expand Down
4 changes: 3 additions & 1 deletion eth/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func TestSendTransaction_SendErr_DontUpdateNonce(t *testing.T) {
}, 1*time.Second, big.NewInt(0))
gpm.gasPrice = big.NewInt(1)

bi := NewBackend(client, signer, gpm)
tm := NewTransactionManager(client, gpm, &accountManager{}, 3*time.Second, true)

bi := NewBackend(client, signer, gpm, tm)

nonceLockBefore := bi.(*backend).nonceManager.getNonceLock(fromAddress)

Expand Down
116 changes: 20 additions & 96 deletions eth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ import (
"fmt"
"math/big"
"sort"
"strings"
"time"

ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -112,8 +110,6 @@ type LivepeerEthClient interface {

// Helpers
ContractAddresses() map[string]ethcommon.Address
CheckTx(*types.Transaction) error
ReplaceTransaction(*types.Transaction, string, *big.Int) (*types.Transaction, error)
Sign([]byte) ([]byte, error)
SetGasInfo(uint64) error
}
Expand Down Expand Up @@ -148,30 +144,42 @@ type client struct {
txTimeout time.Duration
}

func NewClient(accountAddr ethcommon.Address, keystoreDir, password string, eth *ethclient.Client, gpm *GasPriceMonitor, controllerAddr ethcommon.Address, txTimeout time.Duration, maxGasPrice *big.Int) (LivepeerEthClient, error) {
type LivepeerEthClientConfig struct {
AccountAddr ethcommon.Address
KeystoreDir string
Password string
ControllerAddr ethcommon.Address
TxTimeout time.Duration
MaxGasPrice *big.Int
ReplaceTransactions bool
}

func NewClient(eth *ethclient.Client, gpm *GasPriceMonitor, cfg LivepeerEthClientConfig) (LivepeerEthClient, error) {
chainID, err := eth.ChainID(context.Background())
if err != nil {
return nil, err
}

signer := types.NewEIP155Signer(chainID)

backend := NewBackend(eth, signer, gpm)

am, err := NewAccountManager(accountAddr, keystoreDir, signer)
am, err := NewAccountManager(cfg.AccountAddr, cfg.KeystoreDir, signer)
if err != nil {
return nil, err
}

if err := am.Unlock(password); err != nil {
tm := NewTransactionManager(eth, gpm, am, cfg.TxTimeout, true)

backend := NewBackend(eth, signer, gpm, tm)

if err := am.Unlock(cfg.Password); err != nil {
return nil, err
}

return &client{
accountManager: am,
backend: backend,
controllerAddr: controllerAddr,
txTimeout: txTimeout,
controllerAddr: cfg.ControllerAddr,
txTimeout: cfg.TxTimeout,
}, nil
}

Expand Down Expand Up @@ -404,12 +412,7 @@ func (c *client) Bond(amount *big.Int, to ethcommon.Address) (*types.Transaction
// If existing allowance set by account for BondingManager is
// less than the bond amount, approve the necessary amount
if allowance.Cmp(amount) == -1 {
tx, err := c.Approve(c.bondingManagerAddr, amount)
if err != nil {
return nil, err
}

err = c.CheckTx(tx)
_, err := c.Approve(c.bondingManagerAddr, amount)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -878,85 +881,6 @@ func (c *client) ContractAddresses() map[string]ethcommon.Address {
return addrMap
}

func (c *client) CheckTx(tx *types.Transaction) error {
ctx, cancel := context.WithTimeout(context.Background(), c.txTimeout)
defer cancel()

receipt, err := bind.WaitMined(ctx, c.backend, tx)
if err != nil {
return err
}

if receipt.Status == uint64(0) {
return fmt.Errorf("tx %v failed", tx.Hash().Hex())
} else {
return nil
}
}

func (c *client) Sign(msg []byte) ([]byte, error) {
return c.accountManager.Sign(msg)
}

func (c *client) ReplaceTransaction(tx *types.Transaction, method string, gasPrice *big.Int) (*types.Transaction, error) {
_, pending, err := c.backend.TransactionByHash(context.Background(), tx.Hash())
// Only return here if the error is not related to the tx not being found
// Presumably the provided tx was already broadcasted at some point, so even if for some reason the
// node being used cannot find it, the originally broadcasted tx is still valid and might be sitting somewhere
if err != nil && err != ethereum.NotFound {
return nil, err
}
// If tx was found
// If `pending` is false, the tx was mined and included in a block
if err == nil && !pending {
return nil, ErrReplacingMinedTx
}

// Updated gas price must be at least 10% greater than the gas price used for the original transaction in order
// to submit a replacement transaction with the same nonce. 10% is not defined by the protocol, but is the default required price bump
// used by many clients: https://github.com/ethereum/go-ethereum/blob/01a7e267dc6d7bbef94882542bbd01bd712f5548/core/tx_pool.go#L148
// We add a little extra in addition to the 10% price bump just to be sure
minGasPrice := big.NewInt(0).Add(big.NewInt(0).Add(tx.GasPrice(), big.NewInt(0).Div(tx.GasPrice(), big.NewInt(10))), big.NewInt(10))

// If gas price is not provided, use minimum gas price that satisfies the 10% required price bump
if gasPrice == nil {
gasPrice = minGasPrice

suggestedGasPrice, err := c.backend.SuggestGasPrice(context.Background())
if err != nil {
return nil, err
}

// If the suggested gas price is higher than the bumped gas price, use the suggested gas price
// This is to account for any wild market gas price increases between the time of the original tx submission and time
// of replacement tx submission
// Note: If the suggested gas price is lower than the bumped gas price because market gas prices have dropped
// since the time of the original tx submission we cannot use the lower suggested gas price and we still need to use
// the bumped gas price in order to properly replace a still pending tx
if suggestedGasPrice.Cmp(gasPrice) == 1 {
gasPrice = suggestedGasPrice
}
}

// Check that gas price meets minimum price bump requirement
if gasPrice.Cmp(minGasPrice) == -1 {
return nil, fmt.Errorf("Provided gas price does not satisfy required price bump to replace transaction %v", tx.Hash())
}

// Replacement raw tx uses same fields as old tx (reusing the same nonce is crucial) except the gas price is updated
newRawTx := types.NewTransaction(tx.Nonce(), *tx.To(), tx.Value(), tx.Gas(), gasPrice, tx.Data())

newSignedTx, err := c.accountManager.SignTx(newRawTx)
if err != nil {
return nil, err
}

err = c.backend.SendTransaction(context.Background(), newSignedTx)
if err == nil {
glog.Infof("\n%vEth Transaction%v\n\nReplacement transaction: \"%v\". Hash: \"%v\". Gas Price: %v \n\n%v\n", strings.Repeat("*", 30), strings.Repeat("*", 30), method, newSignedTx.Hash().String(), newSignedTx.GasPrice().String(), strings.Repeat("*", 75))
} else {
glog.Infof("\n%vEth Transaction%v\n\nReplacement transaction: \"%v\". Gas Price: %v \nTransaction Failed: %v\n\n%v\n", strings.Repeat("*", 30), strings.Repeat("*", 30), method, newSignedTx.GasPrice().String(), err, strings.Repeat("*", 75))
}

return newSignedTx, err
}
20 changes: 1 addition & 19 deletions eth/rewardservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,11 @@ func (s *RewardService) tryReward() error {
}

if t.LastRewardRound.Cmp(currentRound) == -1 && t.Active {
tx, err := s.client.Reward()
_, err := s.client.Reward()
if err != nil {
return err
}

err = s.client.CheckTx(tx)
if err != nil {
if err == context.DeadlineExceeded {
glog.Infof("Reward tx did not confirm within defined time window - will try to replace pending tx once")
// Previous attempt to call reward() still pending
// Replace pending tx by bumping gas price
tx, err = s.client.ReplaceTransaction(tx, "reward", nil)
if err != nil {
return err
}
if err := s.client.CheckTx(tx); err != nil {
return err
}
} else {
return err
}
}

tp, err := s.client.GetTranscoderEarningsPoolForRound(s.client.Account().Address, currentRound)
if err != nil {
return err
Expand Down

0 comments on commit 9f55ef6

Please sign in to comment.