diff --git a/blockchain/types/transaction.go b/blockchain/types/transaction.go index 4cd0397b10..86f30af98d 100644 --- a/blockchain/types/transaction.go +++ b/blockchain/types/transaction.go @@ -29,6 +29,7 @@ import ( "fmt" "io" "math/big" + "sort" "sync" "sync/atomic" "time" @@ -898,54 +899,6 @@ func (s TxByNonce) Less(i, j int) bool { } func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -type TxByTime Transactions - -func (s TxByTime) Len() int { return len(s) } -func (s TxByTime) Less(i, j int) bool { - // Use the time the transaction was first seen for deterministic sorting - return s[i].time.Before(s[j].time) -} -func (s TxByTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s *TxByTime) Push(x interface{}) { - *s = append(*s, x.(*Transaction)) -} - -func (s *TxByTime) Pop() interface{} { - old := *s - n := len(old) - x := old[n-1] - *s = old[0 : n-1] - return x -} - -// TxByPriceAndTime implements both the sort and the heap interface, making it useful -// for all at once sorting as well as individually adding and removing elements. -type TxByPriceAndTime Transactions - -func (s TxByPriceAndTime) Len() int { return len(s) } -func (s TxByPriceAndTime) Less(i, j int) bool { - // Use the time the transaction was first seen for deterministic sorting - cmp := s[i].GasPrice().Cmp(s[j].GasPrice()) - if cmp == 0 { - return s[i].time.Before(s[j].time) - } - return cmp > 0 -} -func (s TxByPriceAndTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s *TxByPriceAndTime) Push(x interface{}) { - *s = append(*s, x.(*Transaction)) -} - -func (s *TxByPriceAndTime) Pop() interface{} { - old := *s - n := len(old) - x := old[n-1] - *s = old[0 : n-1] - return x -} - // txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap type txWithMinerFee struct { tx *Transaction @@ -1003,6 +956,31 @@ func (s *txByEffectivePriceAndTime) Pop() interface{} { return x } +// SortTxsByPriceAndTime is used to sort the txs by expected effectiveGasTip and arrival time. +// It is called on the process of txs broadcasting. There's three points when this function called. +// (1) BroadcastTxs: before broadcasting txs to the peers +// (2) RebroadcastTxs: before rebroadcasting the remaining pending txs to the peers +// (3) syncTransactions: before sending the all pending txs to the newly connected peer +func SortTxsByPriceAndTime(txs Transactions, baseFee *big.Int) Transactions { + sortedTxsWithMinerFee := make(txByEffectivePriceAndTime, len(txs)) + for i, tx := range txs { + sortedTxsWithMinerFee[i] = &txWithMinerFee{tx, common.Address{}, tx.EffectiveGasTip(baseFee)} + } + + // If already sorted, just return original txs. + if sort.IsSorted(sortedTxsWithMinerFee) { + return txs + } + + // Sort the batch of txs and derive sortedTxs to return it. + sort.Sort(sortedTxsWithMinerFee) + sortedTxs := make(Transactions, len(txs)) + for i, tx := range sortedTxsWithMinerFee { + sortedTxs[i] = tx.tx + } + return sortedTxs +} + // TransactionsByPriceAndNonce represents a set of transactions that can return // transactions in a profit-maximizing sorted order, while supporting removing // entire batches of transactions for non-executable accounts. diff --git a/blockchain/types/transaction_test.go b/blockchain/types/transaction_test.go index eba4458e4a..6bfec71859 100644 --- a/blockchain/types/transaction_test.go +++ b/blockchain/types/transaction_test.go @@ -960,101 +960,91 @@ func TestFilterTransactionWithBaseFee(t *testing.T) { assert.Equal(t, len(pending[from3]), 0) } -func BenchmarkTxSortByTime30000(b *testing.B) { benchmarkTxSortByTime(b, 30000) } -func BenchmarkTxSortByTime20000(b *testing.B) { benchmarkTxSortByTime(b, 20000) } -func benchmarkTxSortByTime(b *testing.B, size int) { +// go test -bench=BenchmarkTxSortByTimeLegacy -benchtime=1000x +func BenchmarkSortTxsByPriceAndTime20000(b *testing.B) { benchmarkSortTxsByPriceAndTime(b, 20000) } +func BenchmarkSortTxsByPriceAndTime10000(b *testing.B) { benchmarkSortTxsByPriceAndTime(b, 10000) } +func BenchmarkSortTxsByPriceAndTime100(b *testing.B) { benchmarkSortTxsByPriceAndTime(b, 100) } +func benchmarkSortTxsByPriceAndTime(b *testing.B, size int) { signer := LatestSignerForChainID(big.NewInt(1)) key, _ := crypto.GenerateKey() - batches := make(Transactions, size) - for i := 0; i < size; i++ { - batches[i], _ = SignTx(NewTransaction(uint64(i), common.Address{}, big.NewInt(100), 100, big.NewInt(int64(i)), nil), signer, key) - } - - // Shuffle transactions. - rand.Shuffle(len(batches), func(i, j int) { - batches[i], batches[j] = batches[j], batches[i] - }) - - // Benchmark importing the transactions into the queue - b.ResetTimer() - - for i := 0; i < b.N; i++ { - sort.Sort(TxByPriceAndTime(batches)) + // make size to be even + if size%2 == 1 { + size += 1 } -} + txs := make(Transactions, size) -func BenchmarkTxSortByTimeDynamicFee30000(b *testing.B) { benchmarkTxSortByTimeDynamicFee(b, 30000) } -func BenchmarkTxSortByTimeDynamicFee20000(b *testing.B) { benchmarkTxSortByTimeDynamicFee(b, 20000) } -func benchmarkTxSortByTimeDynamicFee(b *testing.B, size int) { - signer := LatestSignerForChainID(big.NewInt(1)) - - key, _ := crypto.GenerateKey() - batches := make(Transactions, size) - - for i := 0; i < size; i++ { - gasFeeCap := rand.Intn(50) - tx, _ := SignTx(NewTx(&TxInternalDataEthereumDynamicFee{ + for i := 0; i < size; i += 2 { + gasFeeCap := rand.Int63n(50 * params.Ston) + txs[i], _ = SignTx(NewTransaction(uint64(i), common.Address{}, big.NewInt(100), 100, big.NewInt(25*params.Ston), nil), signer, key) + txs[i+1], _ = SignTx(NewTx(&TxInternalDataEthereumDynamicFee{ AccountNonce: uint64(i), Recipient: &common.Address{}, Amount: big.NewInt(100), GasLimit: 100, - GasFeeCap: big.NewInt(int64(gasFeeCap)), - GasTipCap: big.NewInt(int64(rand.Intn(gasFeeCap + 1))), + GasFeeCap: big.NewInt(int64(25*params.Ston) + gasFeeCap), + GasTipCap: big.NewInt(gasFeeCap), Payload: nil, }), signer, key) - batches[i] = tx } - // Should be already shuffled, but shuffle transactions anyway. - rand.Seed(time.Now().Unix()) - rand.Shuffle(len(batches), func(i, j int) { - batches[i], batches[j] = batches[j], batches[i] - }) - // Benchmark importing the transactions into the queue b.ResetTimer() - for i := 0; i < b.N; i++ { - sort.Sort(TxByPriceAndTime(batches)) + start := time.Now() + rand.Shuffle(size, func(i, j int) { + txs[i], txs[j] = txs[j], txs[i] + }) + b.Log("[", size, "], newBatch shuffle:", time.Since(start)) + + start = time.Now() + txs = SortTxsByPriceAndTime(txs, big.NewInt(25*params.Ston)) + b.Log("[", size, "], sorting time:", time.Since(start)) } } -func BenchmarkTxSortByPrice30000(b *testing.B) { benchmarkTxSortByPrice(b, 30000) } -func BenchmarkTxSortByPrice20000(b *testing.B) { benchmarkTxSortByPrice(b, 20000) } -func benchmarkTxSortByPrice(b *testing.B, size int) { +// go test -bench=BenchmarkTxSortByPriceAndTime -benchtime=10000x - 2s +func BenchmarkTxSortByEffectivePriceAndTime20000(b *testing.B) { + benchmarkTxSortByEffectivePriceAndTime(b, 20000) +} + +func BenchmarkTxSortByEffectivePriceAndTime10000(b *testing.B) { + benchmarkTxSortByEffectivePriceAndTime(b, 10000) +} + +func BenchmarkTxSortByEffectivePriceAndTime100(b *testing.B) { + benchmarkTxSortByEffectivePriceAndTime(b, 100) +} + +func benchmarkTxSortByEffectivePriceAndTime(b *testing.B, size int) { signer := LatestSignerForChainID(big.NewInt(1)) key, _ := crypto.GenerateKey() batches := make(txByEffectivePriceAndTime, size) for i := 0; i < size; i++ { - gasFeeCap := rand.Intn(50) + gasFeeCap := rand.Int63n(50) tx, _ := SignTx(NewTx(&TxInternalDataEthereumDynamicFee{ AccountNonce: uint64(i), Recipient: &common.Address{}, Amount: big.NewInt(100), GasLimit: 100, - GasFeeCap: big.NewInt(int64(gasFeeCap)), - GasTipCap: big.NewInt(int64(rand.Intn(gasFeeCap + 1))), + GasFeeCap: big.NewInt(int64(25*params.Ston) + gasFeeCap), + GasTipCap: big.NewInt(gasFeeCap), Payload: nil, }), signer, key) - txWithFee, _ := newTxWithMinerFee(tx, common.Address{}, nil) + txWithFee, _ := newTxWithMinerFee(tx, common.Address{}, big.NewInt(25*params.Ston)) batches[i] = txWithFee } - // Should be already shuffled, but shuffle transactions anyway. - rand.Seed(time.Now().Unix()) - rand.Shuffle(len(batches), func(i, j int) { - batches[i], batches[j] = batches[j], batches[i] - }) - // Benchmark importing the transactions into the queue b.ResetTimer() - for i := 0; i < b.N; i++ { - sort.Sort(txByEffectivePriceAndTime(batches)) + rand.Shuffle(len(batches), func(i, j int) { + batches[i], batches[j] = batches[j], batches[i] + }) + sort.Sort(batches) } } diff --git a/node/cn/handler.go b/node/cn/handler.go index 73175b29c2..8cfa5c41c8 100644 --- a/node/cn/handler.go +++ b/node/cn/handler.go @@ -28,7 +28,6 @@ import ( "math/big" "math/rand" "runtime/debug" - "sort" "sync" "sync/atomic" "time" @@ -1295,10 +1294,11 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { // This function calls sendTransaction() to broadcast the transactions for each peer. // In that case, transactions are sorted for each peer in sendTransaction(). // Therefore, it prevents sorting transactions by each peer. - if !sort.IsSorted(types.TxByTime(txs)) { - sort.Sort(types.TxByTime(txs)) + baseFee := big.NewInt(int64(params.DefaultLowerBoundBaseFee)) + if pm.blockchain != nil && pm.blockchain.CurrentHeader() != nil && pm.blockchain.CurrentHeader().BaseFee != nil { + baseFee = pm.blockchain.CurrentHeader().BaseFee } - + txs = types.SortTxsByPriceAndTime(txs, baseFee) switch pm.nodetype { case common.CONSENSUSNODE: pm.broadcastTxsFromCN(txs) @@ -1381,12 +1381,11 @@ func (pm *ProtocolManager) ReBroadcastTxs(txs types.Transactions) { return } - // This function calls sendTransaction() to broadcast the transactions for each peer. - // In that case, transactions are sorted for each peer in sendTransaction(). - // Therefore, it prevents sorting transactions by each peer. - if !sort.IsSorted(types.TxByTime(txs)) { - sort.Sort(types.TxByTime(txs)) + baseFee := big.NewInt(int64(params.DefaultLowerBoundBaseFee)) + if pm.blockchain != nil && pm.blockchain.CurrentHeader() != nil && pm.blockchain.CurrentHeader().BaseFee != nil { + baseFee = pm.blockchain.CurrentHeader().BaseFee } + txs = types.SortTxsByPriceAndTime(txs, baseFee) peersWithoutTxs := make(map[Peer]types.Transactions) for _, tx := range txs { diff --git a/node/cn/handler_test.go b/node/cn/handler_test.go index 7a467591b1..db4db8c4b8 100644 --- a/node/cn/handler_test.go +++ b/node/cn/handler_test.go @@ -21,7 +21,6 @@ import ( "fmt" "math/big" "math/rand" - "sort" "testing" "time" @@ -110,6 +109,7 @@ func newBlock(blockNum int) *types.Block { Extra: addrs[0][:], Governance: addrs[0][:], Vote: addrs[0][:], + BaseFee: big.NewInt(25 * params.Ston), } header.Hash() block := types.NewBlockWithHeader(header) @@ -1020,19 +1020,31 @@ func TestProtocolManager_SetWsEndPoint(t *testing.T) { assert.Equal(t, wsep, pm.wsendpoint) } -func TestBroadcastTxsSortedByTime(t *testing.T) { +func TestBroadcastTxsSortedByPriceAndTime(t *testing.T) { // Generate a batch of accounts to start with - keys := make([]*ecdsa.PrivateKey, 5) + keys := make([]*ecdsa.PrivateKey, 10) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() } signer := types.LatestSignerForChainID(big.NewInt(1)) // Generate a batch of transactions. + // txs[0:4] - legacyTxType with gasPrice 25ston. txs[5:10] - dynamicFeeTxType with gasPrice 25ston + tip. txs := types.Transactions{} - for _, key := range keys { - tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) - + for _, key := range keys[0:5] { + tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(25*params.Ston), nil), signer, key) + txs = append(txs, tx) + } + for i, key := range keys[5:10] { + tx, _ := types.SignTx(types.NewTx(&types.TxInternalDataEthereumDynamicFee{ + AccountNonce: uint64(0), + Recipient: &common.Address{}, + Amount: big.NewInt(100), + GasLimit: 100, + GasFeeCap: big.NewInt(int64(25*params.Ston + i + 1)), + GasTipCap: big.NewInt(int64(i + 1)), + Payload: nil, + }), signer, key) txs = append(txs, tx) } @@ -1045,7 +1057,7 @@ func TestBroadcastTxsSortedByTime(t *testing.T) { copy(sortedTxs, txs) // Sort transaction by time. - sort.Sort(types.TxByTime(sortedTxs)) + sortedTxs = types.SortTxsByPriceAndTime(sortedTxs, big.NewInt(25*params.Ston)) pm := &ProtocolManager{} pm.nodetype = common.ENDPOINTNODE @@ -1082,17 +1094,29 @@ func TestBroadcastTxsSortedByTime(t *testing.T) { func TestReBroadcastTxsSortedByTime(t *testing.T) { // Generate a batch of accounts to start with - keys := make([]*ecdsa.PrivateKey, 5) + keys := make([]*ecdsa.PrivateKey, 10) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() } signer := types.LatestSignerForChainID(big.NewInt(1)) // Generate a batch of transactions. + // txs[0:4] - legacyTxType with gasPrice 25ston. txs[5:10] - dynamicFeeTxType with gasPrice 25ston + tip. txs := types.Transactions{} - for _, key := range keys { - tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) - + for _, key := range keys[0:4] { + tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(25*params.Ston), nil), signer, key) + txs = append(txs, tx) + } + for i, key := range keys[5:10] { + tx, _ := types.SignTx(types.NewTx(&types.TxInternalDataEthereumDynamicFee{ + AccountNonce: uint64(0), + Recipient: &common.Address{}, + Amount: big.NewInt(100), + GasLimit: 100, + GasFeeCap: big.NewInt(int64(25*params.Ston + i + 1)), + GasTipCap: big.NewInt(int64(i + 1)), + Payload: nil, + }), signer, key) txs = append(txs, tx) } @@ -1105,7 +1129,7 @@ func TestReBroadcastTxsSortedByTime(t *testing.T) { copy(sortedTxs, txs) // Sort transaction by time. - sort.Sort(types.TxByTime(sortedTxs)) + sortedTxs = types.SortTxsByPriceAndTime(sortedTxs, big.NewInt(25*params.Ston)) pm := &ProtocolManager{} pm.nodetype = common.ENDPOINTNODE diff --git a/node/cn/peer.go b/node/cn/peer.go index 09ac4a180d..538004076e 100644 --- a/node/cn/peer.go +++ b/node/cn/peer.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "math/big" - "sort" "sync" "sync/atomic" "time" @@ -455,11 +454,6 @@ func (p *basePeer) Send(msgcode uint64, data interface{}) error { // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *basePeer) SendTransactions(txs types.Transactions) error { - // Before sending transactions, sort transactions in ascending order by time. - if !sort.IsSorted(types.TxByTime(txs)) { - sort.Sort(types.TxByTime(txs)) - } - for _, tx := range txs { p.AddToKnownTxs(tx.Hash()) } @@ -468,11 +462,6 @@ func (p *basePeer) SendTransactions(txs types.Transactions) error { // ReSendTransactions sends txs to a peer in order to prevent the txs from missing. func (p *basePeer) ReSendTransactions(txs types.Transactions) error { - // Before sending transactions, sort transactions in ascending order by time. - if !sort.IsSorted(types.TxByTime(txs)) { - sort.Sort(types.TxByTime(txs)) - } - return p2p.Send(p.rw, TxMsg, txs) } @@ -856,11 +845,6 @@ func (p *multiChannelPeer) Broadcast() { // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *multiChannelPeer) SendTransactions(txs types.Transactions) error { - // Before sending transactions, sort transactions in ascending order by time. - if !sort.IsSorted(types.TxByTime(txs)) { - sort.Sort(types.TxByTime(txs)) - } - for _, tx := range txs { p.AddToKnownTxs(tx.Hash()) } @@ -869,11 +853,6 @@ func (p *multiChannelPeer) SendTransactions(txs types.Transactions) error { // ReSendTransactions sends txs to a peer in order to prevent the txs from missing. func (p *multiChannelPeer) ReSendTransactions(txs types.Transactions) error { - // Before sending transactions, sort transactions in ascending order by time. - if !sort.IsSorted(types.TxByTime(txs)) { - sort.Sort(types.TxByTime(txs)) - } - return p.msgSender(TxMsg, txs) } diff --git a/node/cn/peer_test.go b/node/cn/peer_test.go index c1c88e5fa3..fdaa57adf8 100644 --- a/node/cn/peer_test.go +++ b/node/cn/peer_test.go @@ -17,16 +17,12 @@ package cn import ( - "crypto/ecdsa" "math/big" - "math/rand" - "sort" "strings" "testing" "github.com/klaytn/klaytn/blockchain/types" "github.com/klaytn/klaytn/common" - "github.com/klaytn/klaytn/crypto" "github.com/klaytn/klaytn/networks/p2p" "github.com/stretchr/testify/assert" ) @@ -365,237 +361,3 @@ func TestBasePeer_RequestReceipts(t *testing.T) { assert.Equal(t, sentHashes, receivedHashes) } - -func TestBasePeer_SendTransactionWithSortedByTime(t *testing.T) { - // Generate a batch of accounts to start with - keys := make([]*ecdsa.PrivateKey, 5) - for i := 0; i < len(keys); i++ { - keys[i], _ = crypto.GenerateKey() - } - signer := types.LatestSignerForChainID(big.NewInt(1)) - - // Generate a batch of transactions. - txs := types.Transactions{} - for _, key := range keys { - tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) - - txs = append(txs, tx) - } - - // Shuffle transactions. - rand.Shuffle(len(txs), func(i, j int) { - txs[i], txs[j] = txs[j], txs[i] - }) - - sortedTxs := make(types.Transactions, len(txs)) - copy(sortedTxs, txs) - - // Sort transaction by time. - sort.Sort(types.TxByTime(sortedTxs)) - - basePeer, _, oppositePipe := newBasePeer() - for _, tx := range txs { - assert.False(t, basePeer.KnowsTx(tx.Hash())) - } - - go func(t *testing.T) { - if err := basePeer.SendTransactions(txs); err != nil { - t.Error(t) - return - } - }(t) - - receivedMsg, err := oppositePipe.ReadMsg() - if err != nil { - t.Fatal(err) - } - - var receivedTxs types.Transactions - if err := receivedMsg.Decode(&receivedTxs); err != nil { - t.Fatal(err) - } - - assert.Equal(t, len(txs), len(receivedTxs)) - - // It should be received transaction with sorted by times. - for i, tx := range receivedTxs { - assert.True(t, basePeer.KnowsTx(tx.Hash())) - assert.Equal(t, sortedTxs[i].Hash(), tx.Hash()) - assert.False(t, sortedTxs[i].Time().Equal(tx.Time())) - } -} - -func TestBasePeer_ReSendTransactionWithSortedByTime(t *testing.T) { - // Generate a batch of accounts to start with - keys := make([]*ecdsa.PrivateKey, 5) - for i := 0; i < len(keys); i++ { - keys[i], _ = crypto.GenerateKey() - } - signer := types.LatestSignerForChainID(big.NewInt(1)) - - // Generate a batch of transactions. - txs := types.Transactions{} - for _, key := range keys { - tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) - - txs = append(txs, tx) - } - - // Shuffle transactions. - rand.Shuffle(len(txs), func(i, j int) { - txs[i], txs[j] = txs[j], txs[i] - }) - - sortedTxs := make(types.Transactions, len(txs)) - copy(sortedTxs, txs) - - // Sort transaction by time. - sort.Sort(types.TxByTime(sortedTxs)) - - basePeer, _, oppositePipe := newBasePeer() - go func(t *testing.T) { - if err := basePeer.ReSendTransactions(txs); err != nil { - t.Error(t) - return - } - }(t) - - receivedMsg, err := oppositePipe.ReadMsg() - if err != nil { - t.Fatal(err) - } - - var receivedTxs types.Transactions - if err := receivedMsg.Decode(&receivedTxs); err != nil { - t.Fatal(err) - } - - assert.Equal(t, len(txs), len(receivedTxs)) - - // It should be received transaction with sorted by times. - for i, tx := range receivedTxs { - assert.Equal(t, sortedTxs[i].Hash(), tx.Hash()) - assert.False(t, sortedTxs[i].Time().Equal(tx.Time())) - } -} - -func TestMultiChannelPeer_SendTransactionWithSortedByTime(t *testing.T) { - // Generate a batch of accounts to start with - keys := make([]*ecdsa.PrivateKey, 5) - for i := 0; i < len(keys); i++ { - keys[i], _ = crypto.GenerateKey() - } - signer := types.LatestSignerForChainID(big.NewInt(1)) - - // Generate a batch of transactions. - txs := types.Transactions{} - for _, key := range keys { - tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) - - txs = append(txs, tx) - } - - // Shuffle transactions. - rand.Shuffle(len(txs), func(i, j int) { - txs[i], txs[j] = txs[j], txs[i] - }) - - sortedTxs := make(types.Transactions, len(txs)) - copy(sortedTxs, txs) - - // Sort transaction by time. - sort.Sort(types.TxByTime(sortedTxs)) - - _, oppositePipe1, oppositePipe2 := newBasePeer() - multiPeer, _ := newPeerWithRWs(version, p2pPeers[0], []p2p.MsgReadWriter{oppositePipe1, oppositePipe2}) - - for _, tx := range txs { - assert.False(t, multiPeer.KnowsTx(tx.Hash())) - } - - go func(t *testing.T) { - if err := multiPeer.SendTransactions(txs); err != nil { - t.Error(t) - return - } - }(t) - - receivedMsg, err := oppositePipe1.ReadMsg() - if err != nil { - t.Fatal(err) - } - - var receivedTxs types.Transactions - if err := receivedMsg.Decode(&receivedTxs); err != nil { - t.Fatal(err) - } - - assert.Equal(t, len(txs), len(receivedTxs)) - - // It should be received transaction with sorted by times. - for i, tx := range receivedTxs { - assert.True(t, multiPeer.KnowsTx(tx.Hash())) - assert.Equal(t, sortedTxs[i].Hash(), tx.Hash()) - assert.False(t, sortedTxs[i].Time().Equal(tx.Time())) - } -} - -func TestMultiChannelPeer_ReSendTransactionWithSortedByTime(t *testing.T) { - // Generate a batch of accounts to start with - keys := make([]*ecdsa.PrivateKey, 5) - for i := 0; i < len(keys); i++ { - keys[i], _ = crypto.GenerateKey() - } - signer := types.LatestSignerForChainID(big.NewInt(1)) - - // Generate a batch of transactions. - txs := types.Transactions{} - for _, key := range keys { - tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) - - txs = append(txs, tx) - } - - // Shuffle transactions. - rand.Shuffle(len(txs), func(i, j int) { - txs[i], txs[j] = txs[j], txs[i] - }) - - sortedTxs := make(types.Transactions, len(txs)) - copy(sortedTxs, txs) - - // Sort transaction by time. - sort.Sort(types.TxByTime(sortedTxs)) - - _, oppositePipe1, oppositePipe2 := newBasePeer() - multiPeer, _ := newPeerWithRWs(version, p2pPeers[0], []p2p.MsgReadWriter{oppositePipe1, oppositePipe2}) - - for _, tx := range txs { - assert.False(t, multiPeer.KnowsTx(tx.Hash())) - } - - go func(t *testing.T) { - if err := multiPeer.ReSendTransactions(txs); err != nil { - t.Error(t) - return - } - }(t) - - receivedMsg, err := oppositePipe1.ReadMsg() - if err != nil { - t.Fatal(err) - } - - var receivedTxs types.Transactions - if err := receivedMsg.Decode(&receivedTxs); err != nil { - t.Fatal(err) - } - - assert.Equal(t, len(txs), len(receivedTxs)) - - // It should be received transaction with sorted by times. - for i, tx := range receivedTxs { - assert.Equal(t, sortedTxs[i].Hash(), tx.Hash()) - assert.False(t, sortedTxs[i].Time().Equal(tx.Time())) - } -} diff --git a/node/cn/sync.go b/node/cn/sync.go index 981142abe8..c271ce1282 100644 --- a/node/cn/sync.go +++ b/node/cn/sync.go @@ -55,6 +55,7 @@ func (pm *ProtocolManager) syncTransactions(p Peer) { if len(txs) == 0 { return } + txs = types.SortTxsByPriceAndTime(txs, pm.blockchain.CurrentHeader().BaseFee) select { case pm.txsyncCh <- &txsync{p, txs}: case <-pm.quitSync: