Skip to content

Commit

Permalink
broadcast sorted txs by price and time
Browse files Browse the repository at this point in the history
  • Loading branch information
yoomee1313 committed May 10, 2024
1 parent 10d70b1 commit d78751b
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 384 deletions.
74 changes: 26 additions & 48 deletions blockchain/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"fmt"
"io"
"math/big"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -898,54 +899,6 @@ func (s TxByNonce) Less(i, j int) bool {
}
func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

type TxByTime Transactions

func (s TxByTime) Len() int { return len(s) }
func (s TxByTime) Less(i, j int) bool {
// Use the time the transaction was first seen for deterministic sorting
return s[i].time.Before(s[j].time)
}
func (s TxByTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

func (s *TxByTime) Push(x interface{}) {
*s = append(*s, x.(*Transaction))
}

func (s *TxByTime) Pop() interface{} {
old := *s
n := len(old)
x := old[n-1]
*s = old[0 : n-1]
return x
}

// TxByPriceAndTime implements both the sort and the heap interface, making it useful
// for all at once sorting as well as individually adding and removing elements.
type TxByPriceAndTime Transactions

func (s TxByPriceAndTime) Len() int { return len(s) }
func (s TxByPriceAndTime) Less(i, j int) bool {
// Use the time the transaction was first seen for deterministic sorting
cmp := s[i].GasPrice().Cmp(s[j].GasPrice())
if cmp == 0 {
return s[i].time.Before(s[j].time)
}
return cmp > 0
}
func (s TxByPriceAndTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

func (s *TxByPriceAndTime) Push(x interface{}) {
*s = append(*s, x.(*Transaction))
}

func (s *TxByPriceAndTime) Pop() interface{} {
old := *s
n := len(old)
x := old[n-1]
*s = old[0 : n-1]
return x
}

// txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap
type txWithMinerFee struct {
tx *Transaction
Expand Down Expand Up @@ -1003,6 +956,31 @@ func (s *txByEffectivePriceAndTime) Pop() interface{} {
return x
}

// SortTxsByPriceAndTime is used to sort the txs by expected effectiveGasTip and arrival time.
// It is called on the process of txs broadcasting. There's three points when this function called.
// (1) BroadcastTxs: before broadcasting txs to the peers
// (2) RebroadcastTxs: before rebroadcasting the remaining pending txs to the peers
// (3) syncTransactions: before sending the all pending txs to the newly connected peer
func SortTxsByPriceAndTime(txs Transactions, baseFee *big.Int) Transactions {
sortedTxsWithMinerFee := make(txByEffectivePriceAndTime, len(txs))
for i, tx := range txs {
sortedTxsWithMinerFee[i] = &txWithMinerFee{tx, common.Address{}, tx.EffectiveGasTip(baseFee)}
}

// If already sorted, just return original txs.
if sort.IsSorted(sortedTxsWithMinerFee) {
return txs
}

// Sort the batch of txs and derive sortedTxs to return it.
sort.Sort(sortedTxsWithMinerFee)
sortedTxs := make(Transactions, len(txs))
for i, tx := range sortedTxsWithMinerFee {
sortedTxs[i] = tx.tx
}
return sortedTxs
}

// TransactionsByPriceAndNonce represents a set of transactions that can return
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
Expand Down
102 changes: 46 additions & 56 deletions blockchain/types/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,101 +960,91 @@ func TestFilterTransactionWithBaseFee(t *testing.T) {
assert.Equal(t, len(pending[from3]), 0)
}

func BenchmarkTxSortByTime30000(b *testing.B) { benchmarkTxSortByTime(b, 30000) }
func BenchmarkTxSortByTime20000(b *testing.B) { benchmarkTxSortByTime(b, 20000) }
func benchmarkTxSortByTime(b *testing.B, size int) {
// go test -bench=BenchmarkTxSortByTimeLegacy -benchtime=1000x
func BenchmarkSortTxsByPriceAndTime20000(b *testing.B) { benchmarkSortTxsByPriceAndTime(b, 20000) }
func BenchmarkSortTxsByPriceAndTime10000(b *testing.B) { benchmarkSortTxsByPriceAndTime(b, 10000) }
func BenchmarkSortTxsByPriceAndTime100(b *testing.B) { benchmarkSortTxsByPriceAndTime(b, 100) }
func benchmarkSortTxsByPriceAndTime(b *testing.B, size int) {
signer := LatestSignerForChainID(big.NewInt(1))

key, _ := crypto.GenerateKey()
batches := make(Transactions, size)

for i := 0; i < size; i++ {
batches[i], _ = SignTx(NewTransaction(uint64(i), common.Address{}, big.NewInt(100), 100, big.NewInt(int64(i)), nil), signer, key)
}

// Shuffle transactions.
rand.Shuffle(len(batches), func(i, j int) {
batches[i], batches[j] = batches[j], batches[i]
})

// Benchmark importing the transactions into the queue
b.ResetTimer()

for i := 0; i < b.N; i++ {
sort.Sort(TxByPriceAndTime(batches))
// make size to be even
if size%2 == 1 {
size += 1
}
}
txs := make(Transactions, size)

func BenchmarkTxSortByTimeDynamicFee30000(b *testing.B) { benchmarkTxSortByTimeDynamicFee(b, 30000) }
func BenchmarkTxSortByTimeDynamicFee20000(b *testing.B) { benchmarkTxSortByTimeDynamicFee(b, 20000) }
func benchmarkTxSortByTimeDynamicFee(b *testing.B, size int) {
signer := LatestSignerForChainID(big.NewInt(1))

key, _ := crypto.GenerateKey()
batches := make(Transactions, size)

for i := 0; i < size; i++ {
gasFeeCap := rand.Intn(50)
tx, _ := SignTx(NewTx(&TxInternalDataEthereumDynamicFee{
for i := 0; i < size; i += 2 {
gasFeeCap := rand.Int63n(50 * params.Ston)
txs[i], _ = SignTx(NewTransaction(uint64(i), common.Address{}, big.NewInt(100), 100, big.NewInt(25*params.Ston), nil), signer, key)
txs[i+1], _ = SignTx(NewTx(&TxInternalDataEthereumDynamicFee{
AccountNonce: uint64(i),
Recipient: &common.Address{},
Amount: big.NewInt(100),
GasLimit: 100,
GasFeeCap: big.NewInt(int64(gasFeeCap)),
GasTipCap: big.NewInt(int64(rand.Intn(gasFeeCap + 1))),
GasFeeCap: big.NewInt(int64(25*params.Ston) + gasFeeCap),
GasTipCap: big.NewInt(gasFeeCap),
Payload: nil,
}), signer, key)
batches[i] = tx
}

// Should be already shuffled, but shuffle transactions anyway.
rand.Seed(time.Now().Unix())
rand.Shuffle(len(batches), func(i, j int) {
batches[i], batches[j] = batches[j], batches[i]
})

// Benchmark importing the transactions into the queue
b.ResetTimer()

for i := 0; i < b.N; i++ {
sort.Sort(TxByPriceAndTime(batches))
start := time.Now()
rand.Shuffle(size, func(i, j int) {
txs[i], txs[j] = txs[j], txs[i]
})
b.Log("[", size, "], newBatch shuffle:", time.Since(start))

start = time.Now()
txs = SortTxsByPriceAndTime(txs, big.NewInt(25*params.Ston))
b.Log("[", size, "], sorting time:", time.Since(start))
}
}

func BenchmarkTxSortByPrice30000(b *testing.B) { benchmarkTxSortByPrice(b, 30000) }
func BenchmarkTxSortByPrice20000(b *testing.B) { benchmarkTxSortByPrice(b, 20000) }
func benchmarkTxSortByPrice(b *testing.B, size int) {
// go test -bench=BenchmarkTxSortByPriceAndTime -benchtime=10000x - 2s
func BenchmarkTxSortByEffectivePriceAndTime20000(b *testing.B) {
benchmarkTxSortByEffectivePriceAndTime(b, 20000)
}

func BenchmarkTxSortByEffectivePriceAndTime10000(b *testing.B) {
benchmarkTxSortByEffectivePriceAndTime(b, 10000)
}

func BenchmarkTxSortByEffectivePriceAndTime100(b *testing.B) {
benchmarkTxSortByEffectivePriceAndTime(b, 100)
}

func benchmarkTxSortByEffectivePriceAndTime(b *testing.B, size int) {
signer := LatestSignerForChainID(big.NewInt(1))

key, _ := crypto.GenerateKey()
batches := make(txByEffectivePriceAndTime, size)

for i := 0; i < size; i++ {
gasFeeCap := rand.Intn(50)
gasFeeCap := rand.Int63n(50)
tx, _ := SignTx(NewTx(&TxInternalDataEthereumDynamicFee{
AccountNonce: uint64(i),
Recipient: &common.Address{},
Amount: big.NewInt(100),
GasLimit: 100,
GasFeeCap: big.NewInt(int64(gasFeeCap)),
GasTipCap: big.NewInt(int64(rand.Intn(gasFeeCap + 1))),
GasFeeCap: big.NewInt(int64(25*params.Ston) + gasFeeCap),
GasTipCap: big.NewInt(gasFeeCap),
Payload: nil,
}), signer, key)
txWithFee, _ := newTxWithMinerFee(tx, common.Address{}, nil)
txWithFee, _ := newTxWithMinerFee(tx, common.Address{}, big.NewInt(25*params.Ston))
batches[i] = txWithFee
}

// Should be already shuffled, but shuffle transactions anyway.
rand.Seed(time.Now().Unix())
rand.Shuffle(len(batches), func(i, j int) {
batches[i], batches[j] = batches[j], batches[i]
})

// Benchmark importing the transactions into the queue
b.ResetTimer()

for i := 0; i < b.N; i++ {
sort.Sort(txByEffectivePriceAndTime(batches))
rand.Shuffle(len(batches), func(i, j int) {
batches[i], batches[j] = batches[j], batches[i]
})
sort.Sort(batches)
}
}

Expand Down
17 changes: 8 additions & 9 deletions node/cn/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"math/big"
"math/rand"
"runtime/debug"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1295,10 +1294,11 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
// This function calls sendTransaction() to broadcast the transactions for each peer.
// In that case, transactions are sorted for each peer in sendTransaction().
// Therefore, it prevents sorting transactions by each peer.
if !sort.IsSorted(types.TxByTime(txs)) {
sort.Sort(types.TxByTime(txs))
baseFee := big.NewInt(int64(params.DefaultLowerBoundBaseFee))
if pm.blockchain != nil && pm.blockchain.CurrentHeader() != nil && pm.blockchain.CurrentHeader().BaseFee != nil {
baseFee = pm.blockchain.CurrentHeader().BaseFee
}

txs = types.SortTxsByPriceAndTime(txs, baseFee)
switch pm.nodetype {
case common.CONSENSUSNODE:
pm.broadcastTxsFromCN(txs)
Expand Down Expand Up @@ -1381,12 +1381,11 @@ func (pm *ProtocolManager) ReBroadcastTxs(txs types.Transactions) {
return
}

// This function calls sendTransaction() to broadcast the transactions for each peer.
// In that case, transactions are sorted for each peer in sendTransaction().
// Therefore, it prevents sorting transactions by each peer.
if !sort.IsSorted(types.TxByTime(txs)) {
sort.Sort(types.TxByTime(txs))
baseFee := big.NewInt(int64(params.DefaultLowerBoundBaseFee))
if pm.blockchain != nil && pm.blockchain.CurrentHeader() != nil && pm.blockchain.CurrentHeader().BaseFee != nil {
baseFee = pm.blockchain.CurrentHeader().BaseFee
}
txs = types.SortTxsByPriceAndTime(txs, baseFee)

peersWithoutTxs := make(map[Peer]types.Transactions)
for _, tx := range txs {
Expand Down
48 changes: 36 additions & 12 deletions node/cn/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"math/big"
"math/rand"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -110,6 +109,7 @@ func newBlock(blockNum int) *types.Block {
Extra: addrs[0][:],
Governance: addrs[0][:],
Vote: addrs[0][:],
BaseFee: big.NewInt(25 * params.Ston),
}
header.Hash()
block := types.NewBlockWithHeader(header)
Expand Down Expand Up @@ -1020,19 +1020,31 @@ func TestProtocolManager_SetWsEndPoint(t *testing.T) {
assert.Equal(t, wsep, pm.wsendpoint)
}

func TestBroadcastTxsSortedByTime(t *testing.T) {
func TestBroadcastTxsSortedByPriceAndTime(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
keys := make([]*ecdsa.PrivateKey, 10)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := types.LatestSignerForChainID(big.NewInt(1))

// Generate a batch of transactions.
// txs[0:4] - legacyTxType with gasPrice 25ston. txs[5:10] - dynamicFeeTxType with gasPrice 25ston + tip.
txs := types.Transactions{}
for _, key := range keys {
tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)

for _, key := range keys[0:5] {
tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(25*params.Ston), nil), signer, key)
txs = append(txs, tx)
}
for i, key := range keys[5:10] {
tx, _ := types.SignTx(types.NewTx(&types.TxInternalDataEthereumDynamicFee{
AccountNonce: uint64(0),
Recipient: &common.Address{},
Amount: big.NewInt(100),
GasLimit: 100,
GasFeeCap: big.NewInt(int64(25*params.Ston + i + 1)),
GasTipCap: big.NewInt(int64(i + 1)),
Payload: nil,
}), signer, key)
txs = append(txs, tx)
}

Expand All @@ -1045,7 +1057,7 @@ func TestBroadcastTxsSortedByTime(t *testing.T) {
copy(sortedTxs, txs)

// Sort transaction by time.
sort.Sort(types.TxByTime(sortedTxs))
sortedTxs = types.SortTxsByPriceAndTime(sortedTxs, big.NewInt(25*params.Ston))

pm := &ProtocolManager{}
pm.nodetype = common.ENDPOINTNODE
Expand Down Expand Up @@ -1082,17 +1094,29 @@ func TestBroadcastTxsSortedByTime(t *testing.T) {

func TestReBroadcastTxsSortedByTime(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
keys := make([]*ecdsa.PrivateKey, 10)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := types.LatestSignerForChainID(big.NewInt(1))

// Generate a batch of transactions.
// txs[0:4] - legacyTxType with gasPrice 25ston. txs[5:10] - dynamicFeeTxType with gasPrice 25ston + tip.
txs := types.Transactions{}
for _, key := range keys {
tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)

for _, key := range keys[0:4] {
tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(25*params.Ston), nil), signer, key)
txs = append(txs, tx)
}
for i, key := range keys[5:10] {
tx, _ := types.SignTx(types.NewTx(&types.TxInternalDataEthereumDynamicFee{
AccountNonce: uint64(0),
Recipient: &common.Address{},
Amount: big.NewInt(100),
GasLimit: 100,
GasFeeCap: big.NewInt(int64(25*params.Ston + i + 1)),
GasTipCap: big.NewInt(int64(i + 1)),
Payload: nil,
}), signer, key)
txs = append(txs, tx)
}

Expand All @@ -1105,7 +1129,7 @@ func TestReBroadcastTxsSortedByTime(t *testing.T) {
copy(sortedTxs, txs)

// Sort transaction by time.
sort.Sort(types.TxByTime(sortedTxs))
sortedTxs = types.SortTxsByPriceAndTime(sortedTxs, big.NewInt(25*params.Ston))

pm := &ProtocolManager{}
pm.nodetype = common.ENDPOINTNODE
Expand Down
Loading

0 comments on commit d78751b

Please sign in to comment.