Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(proposer): handle transaction replacement underpriced error (#322)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtaikocha committed Jul 21, 2023
1 parent 9eefc8d commit 2273d10
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 50 deletions.
8 changes: 8 additions & 0 deletions cmd/flags/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ var (
}
ProposeBlockTxGasLimit = &cli.Uint64Flag{
Name: "proposeBlockTxGasLimit",
Usage: "Gas limit will be used for TaikoL1.proposeBlock transactions",
Category: proposerCategory,
}
ProposeBlockTxReplacementMultiplier = &cli.Uint64Flag{
Name: "proposeBlockTxReplacementMultiplier",
Value: 2,
Usage: "Gas tip multiplier when replacing a TaikoL1.proposeBlock transaction with same nonce",
Category: proposerCategory,
}
)
Expand All @@ -73,4 +80,5 @@ var ProposerFlags = MergeFlags(CommonFlags, []cli.Flag{
MinBlockGasLimit,
MaxProposedTxListsPerEpoch,
ProposeBlockTxGasLimit,
ProposeBlockTxReplacementMultiplier,
})
41 changes: 41 additions & 0 deletions pkg/rpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -119,6 +120,46 @@ func NeedNewProof(
return false, nil
}

type AccountPoolContent map[string]map[string]*types.Transaction

// ContentFrom fetches a given account's transactions list from a node's transactions pool.
func ContentFrom(
ctx context.Context,
rawRPC *rpc.Client,
address common.Address,
) (AccountPoolContent, error) {
var result AccountPoolContent
return result, rawRPC.CallContext(
ctx,
&result,
"txpool_contentFrom",
address,
)
}

// GetPendingTxByNonce tries to retrieve a pending transaction with a given nonce in a node's mempool.
func GetPendingTxByNonce(
ctx context.Context,
cli *Client,
address common.Address,
nonce uint64,
) (*types.Transaction, error) {
content, err := ContentFrom(ctx, cli.L1RawRPC, address)
if err != nil {
return nil, err
}

for _, txMap := range content {
for txNonce, tx := range txMap {
if txNonce == strconv.Itoa(int(nonce)) {
return tx, nil
}
}
}

return nil, nil
}

// SetHead makes a `debug_setHead` RPC call to set the chain's head, should only be used
// for testing purpose.
func SetHead(ctx context.Context, rpc *rpc.Client, headNum *big.Int) error {
Expand Down
38 changes: 38 additions & 0 deletions pkg/rpc/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package rpc

import (
"context"
"os"
"strconv"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
)

Expand All @@ -31,3 +34,38 @@ func TestStringToBytes32(t *testing.T) {
require.Equal(t, [32]byte{}, StringToBytes32(""))
require.Equal(t, [32]byte{0x61, 0x62, 0x63}, StringToBytes32("abc"))
}

func TestL1ContentFrom(t *testing.T) {
client := newTestClient(t)
l2Head, err := client.L2.HeaderByNumber(context.Background(), nil)
require.Nil(t, err)

baseFee, err := client.TaikoL2.GetBasefee(nil, 0, 60000000, uint32(l2Head.GasUsed))
require.Nil(t, err)

testAddrPrivKey, err := crypto.ToECDSA(common.Hex2Bytes(os.Getenv("L1_PROPOSER_PRIVATE_KEY")))
require.Nil(t, err)

testAddr := crypto.PubkeyToAddress(testAddrPrivKey.PublicKey)

nonce, err := client.L2.PendingNonceAt(context.Background(), testAddr)
require.Nil(t, err)

tx := types.NewTransaction(
nonce,
testAddr,
common.Big1,
100000,
baseFee,
[]byte{},
)
signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(client.L2ChainID), testAddrPrivKey)
require.Nil(t, err)
require.Nil(t, client.L2.SendTransaction(context.Background(), signedTx))

content, err := ContentFrom(context.Background(), client.L2RawRPC, testAddr)
require.Nil(t, err)

require.NotZero(t, len(content["pending"]))
require.Equal(t, signedTx.Nonce(), content["pending"][strconv.Itoa(int(signedTx.Nonce()))].Nonce())
}
66 changes: 38 additions & 28 deletions proposer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ import (

// Config contains all configurations to initialize a Taiko proposer.
type Config struct {
L1Endpoint string
L2Endpoint string
TaikoL1Address common.Address
TaikoL2Address common.Address
L1ProposerPrivKey *ecdsa.PrivateKey
L2SuggestedFeeRecipient common.Address
ProposeInterval *time.Duration
CommitSlot uint64
LocalAddresses []common.Address
ProposeEmptyBlocksInterval *time.Duration
MinBlockGasLimit uint64
MaxProposedTxListsPerEpoch uint64
ProposeBlockTxGasLimit *uint64
BackOffRetryInterval time.Duration
L1Endpoint string
L2Endpoint string
TaikoL1Address common.Address
TaikoL2Address common.Address
L1ProposerPrivKey *ecdsa.PrivateKey
L2SuggestedFeeRecipient common.Address
ProposeInterval *time.Duration
CommitSlot uint64
LocalAddresses []common.Address
ProposeEmptyBlocksInterval *time.Duration
MinBlockGasLimit uint64
MaxProposedTxListsPerEpoch uint64
ProposeBlockTxGasLimit *uint64
BackOffRetryInterval time.Duration
ProposeBlockTxReplacementMultiplier uint64
}

// NewConfigFromCliContext initializes a Config instance from
Expand Down Expand Up @@ -80,20 +81,29 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
proposeBlockTxGasLimit = &gasLimit
}

proposeBlockTxReplacementMultiplier := c.Uint64(flags.ProposeBlockTxReplacementMultiplier.Name)
if proposeBlockTxReplacementMultiplier == 0 {
return nil, fmt.Errorf(
"invalid --proposeBlockTxReplacementMultiplier value: %d",
proposeBlockTxReplacementMultiplier,
)
}

return &Config{
L1Endpoint: c.String(flags.L1WSEndpoint.Name),
L2Endpoint: c.String(flags.L2HTTPEndpoint.Name),
TaikoL1Address: common.HexToAddress(c.String(flags.TaikoL1Address.Name)),
TaikoL2Address: common.HexToAddress(c.String(flags.TaikoL2Address.Name)),
L1ProposerPrivKey: l1ProposerPrivKey,
L2SuggestedFeeRecipient: common.HexToAddress(l2SuggestedFeeRecipient),
ProposeInterval: proposingInterval,
CommitSlot: c.Uint64(flags.CommitSlot.Name),
LocalAddresses: localAddresses,
ProposeEmptyBlocksInterval: proposeEmptyBlocksInterval,
MinBlockGasLimit: c.Uint64(flags.MinBlockGasLimit.Name),
MaxProposedTxListsPerEpoch: c.Uint64(flags.MaxProposedTxListsPerEpoch.Name),
ProposeBlockTxGasLimit: proposeBlockTxGasLimit,
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
L1Endpoint: c.String(flags.L1WSEndpoint.Name),
L2Endpoint: c.String(flags.L2HTTPEndpoint.Name),
TaikoL1Address: common.HexToAddress(c.String(flags.TaikoL1Address.Name)),
TaikoL2Address: common.HexToAddress(c.String(flags.TaikoL2Address.Name)),
L1ProposerPrivKey: l1ProposerPrivKey,
L2SuggestedFeeRecipient: common.HexToAddress(l2SuggestedFeeRecipient),
ProposeInterval: proposingInterval,
CommitSlot: c.Uint64(flags.CommitSlot.Name),
LocalAddresses: localAddresses,
ProposeEmptyBlocksInterval: proposeEmptyBlocksInterval,
MinBlockGasLimit: c.Uint64(flags.MinBlockGasLimit.Name),
MaxProposedTxListsPerEpoch: c.Uint64(flags.MaxProposedTxListsPerEpoch.Name),
ProposeBlockTxGasLimit: proposeBlockTxGasLimit,
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
ProposeBlockTxReplacementMultiplier: proposeBlockTxReplacementMultiplier,
}, nil
}
3 changes: 3 additions & 0 deletions proposer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() {
&cli.StringFlag{Name: flags.ProposeInterval.Name},
&cli.Uint64Flag{Name: flags.CommitSlot.Name},
&cli.StringFlag{Name: flags.TxPoolLocals.Name},
&cli.Uint64Flag{Name: flags.ProposeBlockTxReplacementMultiplier.Name},
}
app.Action = func(ctx *cli.Context) error {
c, err := NewConfigFromCliContext(ctx)
Expand All @@ -50,6 +51,7 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() {
s.Equal(uint64(commitSlot), c.CommitSlot)
s.Equal(1, len(c.LocalAddresses))
s.Equal(goldenTouchAddress, c.LocalAddresses[0])
s.Equal(uint64(5), c.ProposeBlockTxReplacementMultiplier)
s.Nil(new(Proposer).InitFromCli(context.Background(), ctx))

return err
Expand All @@ -66,5 +68,6 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() {
"-" + flags.ProposeInterval.Name, proposeInterval,
"-" + flags.CommitSlot.Name, strconv.Itoa(commitSlot),
"-" + flags.TxPoolLocals.Name, goldenTouchAddress.Hex(),
"-" + flags.ProposeBlockTxReplacementMultiplier.Name, "5",
}))
}
96 changes: 85 additions & 11 deletions proposer/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"math"
"math/big"
"math/rand"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -27,8 +29,9 @@ import (
)

var (
errNoNewTxs = errors.New("no new transactions")
waitReceiptTimeout = 1 * time.Minute
errNoNewTxs = errors.New("no new transactions")
waitReceiptTimeout = 1 * time.Minute
maxSendProposeBlockTxRetry = 10
)

// Proposer keep proposing new transactions from L2 execution engine's tx pool at a fixed interval.
Expand All @@ -50,6 +53,7 @@ type Proposer struct {
minBlockGasLimit *uint64
maxProposedTxListsPerEpoch uint64
proposeBlockTxGasLimit *uint64
txReplacementTipMultiplier uint64

// Protocol configurations
protocolConfigs *bindings.TaikoDataConfig
Expand Down Expand Up @@ -84,6 +88,7 @@ func InitFromConfig(ctx context.Context, p *Proposer, cfg *Config) (err error) {
p.locals = cfg.LocalAddresses
p.commitSlot = cfg.CommitSlot
p.maxProposedTxListsPerEpoch = cfg.MaxProposedTxListsPerEpoch
p.txReplacementTipMultiplier = cfg.ProposeBlockTxReplacementMultiplier
p.ctx = ctx

// RPC clients
Expand Down Expand Up @@ -270,44 +275,113 @@ func (p *Proposer) ProposeOp(ctx context.Context) error {
return nil
}

// ProposeTxList proposes the given transactions list to TaikoL1 smart contract.
func (p *Proposer) ProposeTxList(
// sendProposeBlockTx tries to send a TaikoL1.proposeBlock transaction.
func (p *Proposer) sendProposeBlockTx(
ctx context.Context,
meta *encoding.TaikoL1BlockMetadataInput,
txListBytes []byte,
txNum uint,
nonce *uint64,
) error {
isReplacement bool,
) (*types.Transaction, error) {
if p.minBlockGasLimit != nil && meta.GasLimit < uint32(*p.minBlockGasLimit) {
meta.GasLimit = uint32(*p.minBlockGasLimit)
}

// Propose the transactions list
inputs, err := encoding.EncodeProposeBlockInput(meta)
if err != nil {
return err
return nil, err
}

opts, err := getTxOpts(ctx, p.rpc.L1, p.l1ProposerPrivKey, p.rpc.L1ChainID)
if err != nil {
return err
return nil, err
}
if nonce != nil {
opts.Nonce = new(big.Int).SetUint64(*nonce)
}
if p.proposeBlockTxGasLimit != nil {
opts.GasLimit = *p.proposeBlockTxGasLimit
}
if isReplacement {
log.Info("Try replacing a transaction with same nonce", "sender", p.l1ProposerAddress, "nonce", nonce)
originalTx, err := rpc.GetPendingTxByNonce(ctx, p.rpc, p.l1ProposerAddress, *nonce)
if err != nil || originalTx == nil {
log.Warn(
"Original transaction not found",
"sender", p.l1ProposerAddress,
"nonce", nonce,
"error", err,
)

opts.GasTipCap = new(big.Int).Mul(opts.GasTipCap, new(big.Int).SetUint64(p.txReplacementTipMultiplier))
} else {
log.Info(
"Original transaction to replace",
"sender", p.l1ProposerAddress,
"nonce", nonce,
"tx", originalTx,
)

opts.GasTipCap = new(big.Int).Mul(
originalTx.GasTipCap(),
new(big.Int).SetUint64(p.txReplacementTipMultiplier),
)
}
}

proposeTx, err := p.rpc.TaikoL1.ProposeBlock(opts, inputs, txListBytes)
if err != nil {
return encoding.TryParsingCustomError(err)
return nil, encoding.TryParsingCustomError(err)
}

return proposeTx, nil
}

// ProposeTxList proposes the given transactions list to TaikoL1 smart contract.
func (p *Proposer) ProposeTxList(
ctx context.Context,
meta *encoding.TaikoL1BlockMetadataInput,
txListBytes []byte,
txNum uint,
nonce *uint64,
) error {
var (
isReplacement bool
tx *types.Transaction
err error
)
if err := backoff.Retry(
func() error {
if ctx.Err() != nil {
return nil
}
if tx, err = p.sendProposeBlockTx(ctx, meta, txListBytes, nonce, isReplacement); err != nil {
log.Warn("Failed to send propose block transaction, retrying", "error", err)
if strings.Contains(err.Error(), "replacement transaction underpriced") {
isReplacement = true
} else {
isReplacement = false
}
return err
}

return nil
},
backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxSendProposeBlockTxRetry)),
); err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
return err
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, waitReceiptTimeout)
defer cancel()

if _, err := rpc.WaitReceipt(ctxWithTimeout, p.rpc.L1, proposeTx); err != nil {
if _, err := rpc.WaitReceipt(ctxWithTimeout, p.rpc.L1, tx); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 2273d10

Please sign in to comment.