Skip to content

Commit

Permalink
core/txpool: move some validation to outside of mutex (ethereum#27006)
Browse files Browse the repository at this point in the history
Currently, most of transaction validation while holding the txpool mutex: one exception being an early-on signature check. 

This PR changes that, so that we do all non-stateful checks before we entering the mutex area. This means they can be performed in parallel, and to enable that, certain fields have been made atomic bools and uint64.
  • Loading branch information
holiman authored and shekhirin committed Jun 6, 2023
1 parent 6022c02 commit 025cf10
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 51 deletions.
81 changes: 45 additions & 36 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -250,14 +251,14 @@ type TxPool struct {
signer types.Signer
mu sync.RWMutex

istanbul bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
eip1559 bool // Fork indicator whether we are using EIP-1559 type transactions.
shanghai bool // Fork indicator whether we are in the Shanghai stage.
istanbul atomic.Bool // Fork indicator whether we are in the istanbul stage.
eip2718 atomic.Bool // Fork indicator whether we are using EIP-2718 type transactions.
eip1559 atomic.Bool // Fork indicator whether we are using EIP-1559 type transactions.
shanghai atomic.Bool // Fork indicator whether we are in the Shanghai stage.

currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps
currentMaxGas atomic.Uint64 // Current gas limit for transaction caps

locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk
Expand Down Expand Up @@ -592,23 +593,25 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
return txs
}

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// validateTxBasics checks whether a transaction is valid according to the consensus
// rules, but does not check state-dependent validation such as sufficient balance.
// This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held.
func (pool *TxPool) validateTxBasics(tx *types.Transaction, local bool) error {
// Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718 && tx.Type() != types.LegacyTxType {
if !pool.eip2718.Load() && tx.Type() != types.LegacyTxType {
return core.ErrTxTypeNotSupported
}
// Reject dynamic fee transactions until EIP-1559 activates.
if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType {
if !pool.eip1559.Load() && tx.Type() == types.DynamicFeeTxType {
return core.ErrTxTypeNotSupported
}
// Reject transactions over defined size to prevent DOS attacks
if tx.Size() > txMaxSize {
return ErrOversizedData
}
// Check whether the init code size has been exceeded.
if pool.shanghai && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize {
if pool.shanghai.Load() && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize {
return fmt.Errorf("%w: code size %v limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize)
}
// Transactions can't be negative. This may never happen using RLP decoded
Expand All @@ -617,7 +620,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas < tx.Gas() {
if pool.currentMaxGas.Load() < tx.Gas() {
return ErrGasLimit
}
// Sanity check for extremely large numbers
Expand All @@ -632,14 +635,29 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return core.ErrTipAboveFeeCap
}
// Make sure the transaction is signed properly.
from, err := types.Sender(pool.signer, tx)
if err != nil {
if _, err := types.Sender(pool.signer, tx); err != nil {
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price or tip
if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
return ErrUnderpriced
}
// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul.Load(), pool.shanghai.Load())
if err != nil {
return err
}
if tx.Gas() < intrGas {
return core.ErrIntrinsicGas
}
return nil
}

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Signature has been checked already, this cannot error.
from, _ := types.Sender(pool.signer, tx)
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
return core.ErrNonceTooLow
Expand All @@ -664,15 +682,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrOverdraft
}
}

// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul, pool.shanghai)
if err != nil {
return err
}
if tx.Gas() < intrGas {
return core.ErrIntrinsicGas
}
return nil
}

Expand Down Expand Up @@ -969,12 +978,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
knownTxMeter.Mark(1)
continue
}
// Exclude transactions with invalid signatures as soon as
// possible and cache senders in transactions before
// obtaining lock
_, err := types.Sender(pool.signer, tx)
if err != nil {
errs[i] = ErrInvalidSender
// Exclude transactions with basic errors, e.g invalid signatures and
// insufficient intrinsic gas as soon as possible and cache senders
// in transactions before obtaining lock

if err := pool.validateTxBasics(tx, local); err != nil {
errs[i] = err
invalidTxMeter.Mark(1)
continue
}
Expand Down Expand Up @@ -1364,7 +1373,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
}
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)
pool.currentMaxGas = newHead.GasLimit
pool.currentMaxGas.Store(newHead.GasLimit)

// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
Expand All @@ -1373,10 +1382,10 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {

// Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1))
pool.istanbul = pool.chainconfig.IsIstanbul(next)
pool.eip2718 = pool.chainconfig.IsBerlin(next)
pool.eip1559 = pool.chainconfig.IsLondon(next)
pool.shanghai = pool.chainconfig.IsShanghai(uint64(time.Now().Unix()))
pool.istanbul.Store(pool.chainconfig.IsIstanbul(next))
pool.eip2718.Store(pool.chainconfig.IsBerlin(next))
pool.eip1559.Store(pool.chainconfig.IsLondon(next))
pool.shanghai.Store(pool.chainconfig.IsShanghai(uint64(time.Now().Unix())))
}

// promoteExecutables moves transactions that have become processable from the
Expand All @@ -1400,7 +1409,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas.Load())
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
Expand Down Expand Up @@ -1597,7 +1606,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Trace("Removed old pending transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas.Load())
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
Expand Down
31 changes: 16 additions & 15 deletions core/txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,28 +293,29 @@ func TestInvalidTransactions(t *testing.T) {
tx := transaction(0, 100, key)
from, _ := deriveSender(tx)

// Intrinsic gas too low
testAddBalance(pool, from, big.NewInt(1))
if err := pool.AddRemote(tx); !errors.Is(err, core.ErrInsufficientFunds) {
t.Error("expected", core.ErrInsufficientFunds)
if err, want := pool.AddRemote(tx), core.ErrIntrinsicGas; !errors.Is(err, want) {
t.Errorf("want %v have %v", want, err)
}

balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(new(big.Int).SetUint64(tx.Gas()), tx.GasPrice()))
testAddBalance(pool, from, balance)
if err := pool.AddRemote(tx); !errors.Is(err, core.ErrIntrinsicGas) {
t.Error("expected", core.ErrIntrinsicGas, "got", err)
// Insufficient funds
tx = transaction(0, 100000, key)
if err, want := pool.AddRemote(tx), core.ErrInsufficientFunds; !errors.Is(err, want) {
t.Errorf("want %v have %v", want, err)
}

testSetNonce(pool, from, 1)
testAddBalance(pool, from, big.NewInt(0xffffffffffffff))
tx = transaction(0, 100000, key)
if err := pool.AddRemote(tx); !errors.Is(err, core.ErrNonceTooLow) {
t.Error("expected", core.ErrNonceTooLow)
if err, want := pool.AddRemote(tx), core.ErrNonceTooLow; !errors.Is(err, want) {
t.Errorf("want %v have %v", want, err)
}

tx = transaction(1, 100000, key)
pool.gasPrice = big.NewInt(1000)
if err := pool.AddRemote(tx); err != ErrUnderpriced {
t.Error("expected", ErrUnderpriced, "got", err)
if err, want := pool.AddRemote(tx), ErrUnderpriced; !errors.Is(err, want) {
t.Errorf("want %v have %v", want, err)
}
if err := pool.AddLocal(tx); err != nil {
t.Error("expected", nil, "got", err)
Expand Down Expand Up @@ -1217,22 +1218,22 @@ func TestAllowedTxSize(t *testing.T) {
// All those fields are summed up to at most 213 bytes.
baseSize := uint64(213)
dataSize := txMaxSize - baseSize

maxGas := pool.currentMaxGas.Load()
// Try adding a transaction with maximal allowed size
tx := pricedDataTransaction(0, pool.currentMaxGas, big.NewInt(1), key, dataSize)
tx := pricedDataTransaction(0, maxGas, big.NewInt(1), key, dataSize)
if err := pool.addRemoteSync(tx); err != nil {
t.Fatalf("failed to add transaction of size %d, close to maximal: %v", int(tx.Size()), err)
}
// Try adding a transaction with random allowed size
if err := pool.addRemoteSync(pricedDataTransaction(1, pool.currentMaxGas, big.NewInt(1), key, uint64(rand.Intn(int(dataSize))))); err != nil {
if err := pool.addRemoteSync(pricedDataTransaction(1, maxGas, big.NewInt(1), key, uint64(rand.Intn(int(dataSize))))); err != nil {
t.Fatalf("failed to add transaction of random allowed size: %v", err)
}
// Try adding a transaction of minimal not allowed size
if err := pool.addRemoteSync(pricedDataTransaction(2, pool.currentMaxGas, big.NewInt(1), key, txMaxSize)); err == nil {
if err := pool.addRemoteSync(pricedDataTransaction(2, maxGas, big.NewInt(1), key, txMaxSize)); err == nil {
t.Fatalf("expected rejection on slightly oversize transaction")
}
// Try adding a transaction of random not allowed size
if err := pool.addRemoteSync(pricedDataTransaction(2, pool.currentMaxGas, big.NewInt(1), key, dataSize+1+uint64(rand.Intn(10*txMaxSize)))); err == nil {
if err := pool.addRemoteSync(pricedDataTransaction(2, maxGas, big.NewInt(1), key, dataSize+1+uint64(rand.Intn(10*txMaxSize)))); err == nil {
t.Fatalf("expected rejection on oversize transaction")
}
// Run some sanity checks on the pool internals
Expand Down

0 comments on commit 025cf10

Please sign in to comment.