From 0064836f047996e87ef22949fad6a293e8b24da8 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Fri, 17 Sep 2021 18:33:11 +0530 Subject: [PATCH] fix(dot/core): Batch process transaction message. (#1780) * Batch process transaction message. Co-authored-by: noot --- dot/network/notifications.go | 42 ++++++++++-- dot/network/notifications_test.go | 107 +++++++++++++++++++++++++++++- dot/network/service.go | 11 ++- dot/network/transaction.go | 33 +++++++++ dot/sync/interface.go | 4 +- lib/babe/state.go | 4 +- lib/grandpa/network.go | 1 + lib/grandpa/round_test.go | 17 ++--- lib/grandpa/state.go | 1 + 9 files changed, 198 insertions(+), 22 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 2c62fcff63..6aa5af7ba8 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -55,8 +55,16 @@ type ( // NotificationsMessageHandler is called when a (non-handshake) message is received over a notifications stream. NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error) + + // NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode. + NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*batchMessage, err error) ) +type batchMessage struct { + msg NotificationsMessage + peer peer.ID +} + type handshakeReader struct { hs Handshake err error @@ -141,7 +149,7 @@ func createDecoder(info *notificationsProtocol, handshakeDecoder HandshakeDecode } } -func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, messageHandler NotificationsMessageHandler) messageHandler { +func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, messageHandler NotificationsMessageHandler, batchHandler NotificationsMessageBatchHandler) messageHandler { return func(stream libp2pnetwork.Stream, m Message) error { if m == nil || info == nil || info.handshakeValidator == nil || messageHandler == nil { return nil @@ -210,18 +218,38 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, "peer", stream.Conn().RemotePeer(), ) - propagate, err := messageHandler(peer, msg) - if err != nil { - return err + var ( + propagate bool + err error + msgs []*batchMessage + ) + if batchHandler != nil { + msgs, err = batchHandler(peer, msg) + if err != nil { + return err + } + + propagate = len(msgs) > 0 + } else { + propagate, err = messageHandler(peer, msg) + if err != nil { + return err + } + msgs = append(msgs, &batchMessage{ + msg: msg, + peer: peer, + }) } if !propagate || s.noGossip { return nil } - seen := s.gossip.hasSeen(msg) - if !seen { - s.broadcastExcluding(info, peer, msg) + for _, data := range msgs { + seen := s.gossip.hasSeen(data.msg) + if !seen { + s.broadcastExcluding(info, data.peer, data.msg) + } } return nil diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index b71f1a6faa..d971cbf0f2 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -30,6 +30,7 @@ import ( "github.com/ChainSafe/gossamer/lib/utils" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -143,7 +144,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounce(t *testing.T) { inboundHandshakeData: new(sync.Map), outboundHandshakeData: new(sync.Map), } - handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage) + handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage, nil) // set handshake data to received info.inboundHandshakeData.Store(testPeerID, handshakeData{ @@ -176,7 +177,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T) inboundHandshakeData: new(sync.Map), outboundHandshakeData: new(sync.Map), } - handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage) + handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage, nil) configB := &Config{ BasePath: utils.NewTestBasePath(t, "nodeB"), @@ -316,6 +317,108 @@ func Test_HandshakeTimeout(t *testing.T) { require.Len(t, connAToB[0].GetStreams(), 0) } +func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { + basePath := utils.NewTestBasePath(t, "nodeA") + mockhandler := &MockTransactionHandler{} + mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil) + mockhandler.On("TransactionsCount").Return(0) + config := &Config{ + BasePath: basePath, + Port: 7001, + NoBootstrap: true, + NoMDNS: true, + TransactionHandler: mockhandler, + } + + s := createTestService(t, config) + s.batchSize = 5 + + configB := &Config{ + BasePath: utils.NewTestBasePath(t, "nodeB"), + Port: 7002, + NoBootstrap: true, + NoMDNS: true, + } + + b := createTestService(t, configB) + + txnBatch := make(chan *batchMessage, s.batchSize) + txnBatchHandler := s.createBatchMessageHandler(txnBatch) + + // don't set handshake data ie. this stream has just been opened + testPeerID := b.host.id() + + // connect nodes + addrInfoB := b.host.addrInfo() + err := s.host.connect(addrInfoB) + if failedToDial(err) { + time.Sleep(TestBackoffTimeout) + err = s.host.connect(addrInfoB) + } + require.NoError(t, err) + + stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID) + require.NoError(t, err) + require.Len(t, txnBatch, 0) + + // create info and handler + info := ¬ificationsProtocol{ + protocolID: s.host.protocolID + transactionsID, + getHandshake: s.getTransactionHandshake, + handshakeValidator: validateTransactionHandshake, + inboundHandshakeData: new(sync.Map), + outboundHandshakeData: new(sync.Map), + } + handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler) + + // set handshake data to received + info.inboundHandshakeData.Store(testPeerID, handshakeData{ + received: true, + validated: true, + }) + msg := &TransactionMessage{ + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + } + err = handler(stream, msg) + require.NoError(t, err) + require.Len(t, txnBatch, 1) + + msg = &TransactionMessage{ + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + } + err = handler(stream, msg) + require.NoError(t, err) + require.Len(t, txnBatch, 2) + + msg = &TransactionMessage{ + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + } + err = handler(stream, msg) + require.NoError(t, err) + require.Len(t, txnBatch, 3) + + msg = &TransactionMessage{ + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + } + err = handler(stream, msg) + require.NoError(t, err) + require.Len(t, txnBatch, 4) + + msg = &TransactionMessage{ + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + } + err = handler(stream, msg) + require.NoError(t, err) + require.Len(t, txnBatch, 0) + + msg = &TransactionMessage{ + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + } + err = handler(stream, msg) + require.NoError(t, err) + require.Len(t, txnBatch, 1) +} + func TestBlockAnnounceHandshakeSize(t *testing.T) { require.Equal(t, unsafe.Sizeof(BlockAnnounceHandshake{}), reflect.TypeOf(BlockAnnounceHandshake{}).Size()) } diff --git a/dot/network/service.go b/dot/network/service.go index 3b9ce545b5..2a11bcb4fb 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -98,6 +98,8 @@ type Service struct { // telemetry telemetryInterval time.Duration closeCh chan interface{} + + batchSize int } // NewService creates a new network service from the configuration and message channels @@ -171,6 +173,7 @@ func NewService(cfg *Config) (*Service, error) { closeCh: make(chan interface{}), bufPool: bufPool, streamManager: newStreamManager(ctx), + batchSize: 100, } network.syncQueue = newSyncQueue(network) @@ -218,11 +221,15 @@ func (s *Service) Start() error { s.validateBlockAnnounceHandshake, decodeBlockAnnounceMessage, s.handleBlockAnnounceMessage, + nil, ) if err != nil { logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err) } + txnBatch := make(chan *batchMessage, s.batchSize) + txnBatchHandler := s.createBatchMessageHandler(txnBatch) + // register transactions protocol err = s.RegisterNotificationsProtocol( s.host.protocolID+transactionsID, @@ -232,6 +239,7 @@ func (s *Service) Start() error { validateTransactionHandshake, decodeTransactionMessage, s.handleTransactionMessage, + txnBatchHandler, ) if err != nil { logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err) @@ -420,6 +428,7 @@ func (s *Service) RegisterNotificationsProtocol( handshakeValidator HandshakeValidator, messageDecoder MessageDecoder, messageHandler NotificationsMessageHandler, + batchHandler NotificationsMessageBatchHandler, ) error { s.notificationsMu.Lock() defer s.notificationsMu.Unlock() @@ -462,7 +471,7 @@ func (s *Service) RegisterNotificationsProtocol( info := s.notificationsProtocols[messageID] decoder := createDecoder(info, handshakeDecoder, messageDecoder) - handlerWithValidate := s.createNotificationsMessageHandler(info, messageHandler) + handlerWithValidate := s.createNotificationsMessageHandler(info, messageHandler, batchHandler) s.host.registerStreamHandler(protocolID, func(stream libp2pnetwork.Stream) { logger.Trace("received stream", "sub-protocol", protocolID) diff --git a/dot/network/transaction.go b/dot/network/transaction.go index 127aab2d14..90301e17db 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -119,6 +119,39 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) { return &transactionHandshake{}, nil } +func (s *Service) createBatchMessageHandler(txnBatch chan *batchMessage) NotificationsMessageBatchHandler { + return func(peer peer.ID, msg NotificationsMessage) (msgs []*batchMessage, err error) { + data := &batchMessage{ + msg: msg, + peer: peer, + } + txnBatch <- data + + if len(txnBatch) < s.batchSize { + return nil, nil + } + + var propagateMsgs []*batchMessage + for txnData := range txnBatch { + propagate, err := s.handleTransactionMessage(txnData.peer, txnData.msg) + if err != nil { + continue + } + if propagate { + propagateMsgs = append(propagateMsgs, &batchMessage{ + msg: txnData.msg, + peer: txnData.peer, + }) + } + if len(txnBatch) == 0 { + break + } + } + // May be use error to compute peer score. + return propagateMsgs, nil + } +} + func validateTransactionHandshake(_ peer.ID, _ Handshake) error { return nil } diff --git a/dot/sync/interface.go b/dot/sync/interface.go index f99fd4d86f..5136591afb 100644 --- a/dot/sync/interface.go +++ b/dot/sync/interface.go @@ -18,6 +18,7 @@ package sync import ( "math/big" + "sync" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" @@ -56,8 +57,7 @@ type StorageState interface { TrieState(root *common.Hash) (*rtstorage.TrieState, error) LoadCodeHash(*common.Hash) (common.Hash, error) SetSyncing(bool) - Lock() - Unlock() + sync.Locker } // CodeSubstitutedState interface to handle storage of code substitute state diff --git a/lib/babe/state.go b/lib/babe/state.go index 51722cf0c8..6008ff25bf 100644 --- a/lib/babe/state.go +++ b/lib/babe/state.go @@ -18,6 +18,7 @@ package babe import ( "math/big" + "sync" "time" "github.com/ChainSafe/gossamer/dot/types" @@ -52,8 +53,7 @@ type BlockState interface { // StorageState interface for storage state methods type StorageState interface { TrieState(hash *common.Hash) (*rtstorage.TrieState, error) - Lock() - Unlock() + sync.Locker } // TransactionState is the interface for transaction queue methods diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index fedd4597a7..a9174e0e05 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -102,6 +102,7 @@ func (s *Service) registerProtocol() error { s.validateHandshake, s.decodeMessage, s.handleNetworkMessage, + nil, ) } diff --git a/lib/grandpa/round_test.go b/lib/grandpa/round_test.go index cf0bc89b4a..d96410d28c 100644 --- a/lib/grandpa/round_test.go +++ b/lib/grandpa/round_test.go @@ -82,14 +82,15 @@ func (n *testNetwork) SendJustificationRequest(to peer.ID, num uint32) { } } -func (n *testNetwork) RegisterNotificationsProtocol( - pid protocol.ID, - messageID byte, - handshakeGetter network.HandshakeGetter, - handshakeDecoder network.HandshakeDecoder, - handshakeValidator network.HandshakeValidator, - messageDecoder network.MessageDecoder, - messageHandler network.NotificationsMessageHandler, +func (*testNetwork) RegisterNotificationsProtocol( + _ protocol.ID, + _ byte, + _ network.HandshakeGetter, + _ network.HandshakeDecoder, + _ network.HandshakeValidator, + _ network.MessageDecoder, + _ network.NotificationsMessageHandler, + _ network.NotificationsMessageBatchHandler, ) error { return nil } diff --git a/lib/grandpa/state.go b/lib/grandpa/state.go index d35c99021c..cd6a24a357 100644 --- a/lib/grandpa/state.go +++ b/lib/grandpa/state.go @@ -84,5 +84,6 @@ type Network interface { handshakeValidator network.HandshakeValidator, messageDecoder network.MessageDecoder, messageHandler network.NotificationsMessageHandler, + batchHandler network.NotificationsMessageBatchHandler, ) error }