Skip to content

Commit

Permalink
core, txpool: less allocations when handling transactions (#21232)
Browse files Browse the repository at this point in the history
* core: use uint64 for total tx costs instead of big.Int

* core: added local tx pool test case

* core, crypto: various allocation savings regarding tx handling

* Update core/tx_list.go

* core: added tx.GasPriceIntCmp for comparison without allocation

adds a method to remove unneeded allocation in comparison to tx.gasPrice

* core: handle pools full of locals better

* core/tests: benchmark for tx_list

* core/txlist, txpool: save a reheap operation, avoid some bigint allocs

Co-authored-by: Martin Holst Swende <martin@swende.se>
  • Loading branch information
MariusVanDerWijden and holiman committed Jul 1, 2020
1 parent 8dfd66f commit af5c97a
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 54 deletions.
10 changes: 5 additions & 5 deletions common/math/integer.go
Expand Up @@ -18,6 +18,7 @@ package math

import (
"fmt"
"math/bits"
"strconv"
)

Expand Down Expand Up @@ -87,13 +88,12 @@ func SafeSub(x, y uint64) (uint64, bool) {

// SafeAdd returns the result and whether overflow occurred.
func SafeAdd(x, y uint64) (uint64, bool) {
return x + y, y > MaxUint64-x
sum, carry := bits.Add64(x, y, 0)
return sum, carry != 0
}

// SafeMul returns multiplication result and whether overflow occurred.
func SafeMul(x, y uint64) (uint64, bool) {
if x == 0 || y == 0 {
return 0, false
}
return x * y, y > MaxUint64/x
hi, lo := bits.Mul64(x, y)
return lo, hi != 0
}
129 changes: 99 additions & 30 deletions core/tx_list.go
Expand Up @@ -99,7 +99,30 @@ func (m *txSortedMap) Forward(threshold uint64) types.Transactions {

// Filter iterates over the list of transactions and removes all of them for which
// the specified function evaluates to true.
// Filter, as opposed to 'filter', re-initialises the heap after the operation is done.
// If you want to do several consecutive filterings, it's therefore better to first
// do a .filter(func1) followed by .Filter(func2) or reheap()
func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
removed := m.filter(filter)
// If transactions were removed, the heap and cache are ruined
if len(removed) > 0 {
m.reheap()
}
return removed
}

func (m *txSortedMap) reheap() {
*m.index = make([]uint64, 0, len(m.items))
for nonce := range m.items {
*m.index = append(*m.index, nonce)
}
heap.Init(m.index)
m.cache = nil
}

// filter is identical to Filter, but **does not** regenerate the heap. This method
// should only be used if followed immediately by a call to Filter or reheap()
func (m *txSortedMap) filter(filter func(*types.Transaction) bool) types.Transactions {
var removed types.Transactions

// Collect all the transactions to filter out
Expand All @@ -109,14 +132,7 @@ func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transac
delete(m.items, nonce)
}
}
// If transactions were removed, the heap and cache are ruined
if len(removed) > 0 {
*m.index = make([]uint64, 0, len(m.items))
for nonce := range m.items {
*m.index = append(*m.index, nonce)
}
heap.Init(m.index)

m.cache = nil
}
return removed
Expand Down Expand Up @@ -197,10 +213,7 @@ func (m *txSortedMap) Len() int {
return len(m.items)
}

// Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents.
func (m *txSortedMap) Flatten() types.Transactions {
func (m *txSortedMap) flatten() types.Transactions {
// If the sorting was not cached yet, create and cache it
if m.cache == nil {
m.cache = make(types.Transactions, 0, len(m.items))
Expand All @@ -209,12 +222,27 @@ func (m *txSortedMap) Flatten() types.Transactions {
}
sort.Sort(types.TxByNonce(m.cache))
}
return m.cache
}

// Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents.
func (m *txSortedMap) Flatten() types.Transactions {
// Copy the cache to prevent accidental modifications
txs := make(types.Transactions, len(m.cache))
copy(txs, m.cache)
cache := m.flatten()
txs := make(types.Transactions, len(cache))
copy(txs, cache)
return txs
}

// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (m *txSortedMap) LastElement() *types.Transaction {
cache := m.flatten()
return cache[len(cache)-1]
}

// txList is a "list" of transactions belonging to an account, sorted by account
// nonce. The same type can be used both for storing contiguous transactions for
// the executable/pending queue; and for storing gapped transactions for the non-
Expand All @@ -223,17 +251,16 @@ type txList struct {
strict bool // Whether nonces are strictly continuous or not
txs *txSortedMap // Heap indexed sorted hash map of the transactions

costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
costcap uint64 // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
}

// newTxList create a new transaction list for maintaining nonce-indexable fast,
// gapped, sortable transaction lists.
func newTxList(strict bool) *txList {
return &txList{
strict: strict,
txs: newTxSortedMap(),
costcap: new(big.Int),
strict: strict,
txs: newTxSortedMap(),
}
}

Expand All @@ -252,17 +279,26 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran
// If there's an older better transaction, abort
old := l.txs.Get(tx.Nonce())
if old != nil {
threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
// threshold = oldGP * (100 + priceBump) / 100
a := big.NewInt(100 + int64(priceBump))
a = a.Mul(a, old.GasPrice())
b := big.NewInt(100)
threshold := a.Div(a, b)
// Have to ensure that the new gas price is higher than the old gas
// price as well as checking the percentage threshold to ensure that
// this is accurate for low (Wei-level) gas price replacements
if old.GasPriceCmp(tx) >= 0 || tx.GasPriceIntCmp(threshold) < 0 {
return false, nil
}
}
cost, overflow := tx.CostU64()
if overflow {
log.Warn("transaction cost overflown, txHash: %v txCost: %v", tx.Hash(), cost)
return false, nil
}
// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx)
if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
if l.costcap < cost {
l.costcap = cost
}
if gas := tx.Gas(); l.gascap < gas {
Expand All @@ -287,29 +323,35 @@ func (l *txList) Forward(threshold uint64) types.Transactions {
// a point in calculating all the costs or if the balance covers all. If the threshold
// is lower than the costgas cap, the caps will be reset to a new high after removing
// the newly invalidated transactions.
func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions, types.Transactions) {
func (l *txList) Filter(costLimit uint64, gasLimit uint64) (types.Transactions, types.Transactions) {
// If all transactions are below the threshold, short circuit
if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit {
if l.costcap <= costLimit && l.gascap <= gasLimit {
return nil, nil
}
l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds
l.costcap = costLimit // Lower the caps to the thresholds
l.gascap = gasLimit

// Filter out all the transactions above the account's funds
removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit })
removed := l.txs.filter(func(tx *types.Transaction) bool {
cost, _ := tx.CostU64()
return cost > costLimit || tx.Gas() > gasLimit
})

// If the list was strict, filter anything above the lowest nonce
if len(removed) == 0 {
return nil, nil
}
var invalids types.Transactions

if l.strict && len(removed) > 0 {
// If the list was strict, filter anything above the lowest nonce
if l.strict {
lowest := uint64(math.MaxUint64)
for _, tx := range removed {
if nonce := tx.Nonce(); lowest > nonce {
lowest = nonce
}
}
invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
invalids = l.txs.filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
}
l.txs.reheap()
return removed, invalids
}

Expand Down Expand Up @@ -363,6 +405,12 @@ func (l *txList) Flatten() types.Transactions {
return l.txs.Flatten()
}

// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (l *txList) LastElement() *types.Transaction {
return l.txs.LastElement()
}

// priceHeap is a heap.Interface implementation over transactions for retrieving
// price-sorted transactions to discard when the pool fills up.
type priceHeap []*types.Transaction
Expand Down Expand Up @@ -495,8 +543,29 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo
// Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool.
func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions {
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
// If we have some local accountset, those will not be discarded
if !local.empty() {
// In case the list is filled to the brim with 'local' txs, we do this
// little check to avoid unpacking / repacking the heap later on, which
// is very expensive
discardable := 0
for _, tx := range *l.items {
if !local.containsTx(tx) {
discardable++
}
if discardable >= slots {
break
}
}
if slots > discardable {
slots = discardable
}
}
if slots == 0 {
return nil
}
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, len(*l.items)-slots) // Local underpriced transactions to keep

for len(*l.items) > 0 && slots > 0 {
// Discard stale transactions if found during cleanup
Expand Down
19 changes: 10 additions & 9 deletions core/tx_list_test.go
Expand Up @@ -17,7 +17,6 @@
package core

import (
"math/big"
"math/rand"
"testing"

Expand Down Expand Up @@ -51,20 +50,22 @@ func TestStrictTxListAdd(t *testing.T) {
}
}

func BenchmarkTxListAdd(t *testing.B) {
func BenchmarkTxListAdd(b *testing.B) {
// Generate a list of transactions to insert
key, _ := crypto.GenerateKey()

txs := make(types.Transactions, 100000)
txs := make(types.Transactions, 2000)
for i := 0; i < len(txs); i++ {
txs[i] = transaction(uint64(i), 0, key)
}
// Insert the transactions in a random order
list := newTxList(true)
priceLimit := big.NewInt(int64(DefaultTxPoolConfig.PriceLimit))
t.ResetTimer()
for _, v := range rand.Perm(len(txs)) {
list.Add(txs[v], DefaultTxPoolConfig.PriceBump)
list.Filter(priceLimit, DefaultTxPoolConfig.PriceBump)
b.ResetTimer()
priceLimit := DefaultTxPoolConfig.PriceLimit
for i := 0; i < b.N; i++ {
list := newTxList(true)
for _, v := range rand.Perm(len(txs)) {
list.Add(txs[v], DefaultTxPoolConfig.PriceBump)
list.Filter(priceLimit, DefaultTxPoolConfig.PriceBump)
}
}
}
18 changes: 13 additions & 5 deletions core/tx_pool.go
Expand Up @@ -543,7 +543,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
cost, overflow := tx.CostU64()
if overflow {
return ErrInsufficientFunds
}
if pool.currentState.GetBalance(from).Uint64() < cost {
return ErrInsufficientFunds
}
// Ensure the transaction has more gas than the basic tx fee.
Expand Down Expand Up @@ -1059,8 +1063,8 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt

// Update all accounts to the latest known pending nonce
for addr, list := range pool.pending {
txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
pool.pendingNonces.set(addr, txs[len(txs)-1].Nonce()+1)
highestPending := list.LastElement()
pool.pendingNonces.set(addr, highestPending.Nonce()+1)
}
pool.mu.Unlock()

Expand Down Expand Up @@ -1190,7 +1194,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr).Uint64(), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
Expand Down Expand Up @@ -1382,7 +1386,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Trace("Removed old pending transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
drops, invalids := list.Filter(pool.currentState.GetBalance(addr).Uint64(), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
Expand Down Expand Up @@ -1457,6 +1461,10 @@ func (as *accountSet) contains(addr common.Address) bool {
return exist
}

func (as *accountSet) empty() bool {
return len(as.accounts) == 0
}

// containsTx checks if the sender of a given tx is within the set. If the sender
// cannot be derived, this method returns false.
func (as *accountSet) containsTx(tx *types.Transaction) bool {
Expand Down
18 changes: 13 additions & 5 deletions core/tx_pool_test.go
Expand Up @@ -1889,11 +1889,15 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
}

// Benchmarks the speed of batched transaction insertion.
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) }
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) }
func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000) }
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, false) }
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, false) }
func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, false) }

func benchmarkPoolBatchInsert(b *testing.B, size int) {
func BenchmarkPoolBatchLocalInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, true) }
func BenchmarkPoolBatchLocalInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, true) }
func BenchmarkPoolBatchLocalInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, true) }

func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) {
// Generate a batch of transactions to enqueue into the pool
pool, key := setupTxPool()
defer pool.Stop()
Expand All @@ -1911,6 +1915,10 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) {
// Benchmark importing the transactions into the queue
b.ResetTimer()
for _, batch := range batches {
pool.AddRemotes(batch)
if local {
pool.AddLocals(batch)
} else {
pool.AddRemotes(batch)
}
}
}

0 comments on commit af5c97a

Please sign in to comment.