Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source messenger on ProcessReceivedMessage #5365

Merged
merged 8 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/mock/sposWorkerMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (sposWorkerMock *SposWorkerMock) RemoveAllReceivedMessagesCalls() {
}

// ProcessReceivedMessage -
func (sposWorkerMock *SposWorkerMock) ProcessReceivedMessage(message p2p.MessageP2P, _ core.PeerID) error {
func (sposWorkerMock *SposWorkerMock) ProcessReceivedMessage(message p2p.MessageP2P, _ core.PeerID, _ p2p.MessageHandler) error {
return sposWorkerMock.ProcessReceivedMessageCalled(message)
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type WorkerHandler interface {
// RemoveAllReceivedMessagesCalls removes all the functions handlers
RemoveAllReceivedMessagesCalls()
// ProcessReceivedMessage method redirects the received message to the channel which should handle it
ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID, source p2p.MessageHandler) error
// Extend does an extension for the subround with subroundId
Extend(subroundId int)
// GetConsensusStateChangedChannel gets the channel for the consensusStateChanged
Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (wrk *Worker) getCleanedList(cnsDataList []*consensus.Message) []*consensus
}

// ProcessReceivedMessage method redirects the received message to the channel which should handle it
func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID, _ p2p.MessageHandler) error {
if check.IfNil(message) {
return ErrNilMessage
}
Expand Down
38 changes: 24 additions & 14 deletions consensus/spos/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func TestWorker_ProcessReceivedMessageShouldErrIfFloodIsDetectedOnTopic(t *testi
TopicField: "topic1",
SignatureField: []byte("signature"),
}
err := wrk.ProcessReceivedMessage(msg, "peer")
err := wrk.ProcessReceivedMessage(msg, "peer", &p2pmocks.MessengerStub{})
assert.Equal(t, expectedErr, err)
}

Expand Down Expand Up @@ -515,15 +515,15 @@ func TestWorker_ProcessReceivedMessageTxBlockBodyShouldRetNil(t *testing.T) {
PeerField: currentPid,
SignatureField: []byte("signature"),
}
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId)
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId, &p2pmocks.MessengerStub{})

assert.Nil(t, err)
}

func TestWorker_ProcessReceivedMessageNilMessageShouldErr(t *testing.T) {
t.Parallel()
wrk := *initWorker(&statusHandlerMock.AppStatusHandlerStub{})
err := wrk.ProcessReceivedMessage(nil, fromConnectedPeerId)
err := wrk.ProcessReceivedMessage(nil, fromConnectedPeerId, &p2pmocks.MessengerStub{})
time.Sleep(time.Second)

assert.Equal(t, 0, len(wrk.ReceivedMessages()[bls.MtBlockBody]))
Expand All @@ -533,7 +533,7 @@ func TestWorker_ProcessReceivedMessageNilMessageShouldErr(t *testing.T) {
func TestWorker_ProcessReceivedMessageNilMessageDataFieldShouldErr(t *testing.T) {
t.Parallel()
wrk := *initWorker(&statusHandlerMock.AppStatusHandlerStub{})
err := wrk.ProcessReceivedMessage(&p2pmocks.P2PMessageMock{}, fromConnectedPeerId)
err := wrk.ProcessReceivedMessage(&p2pmocks.P2PMessageMock{}, fromConnectedPeerId, &p2pmocks.MessengerStub{})
time.Sleep(time.Second)

assert.Equal(t, 0, len(wrk.ReceivedMessages()[bls.MtBlockBody]))
Expand All @@ -548,6 +548,7 @@ func TestWorker_ProcessReceivedMessageEmptySignatureFieldShouldErr(t *testing.T)
DataField: []byte("data field"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)
time.Sleep(time.Second)

Expand Down Expand Up @@ -575,6 +576,7 @@ func TestWorker_ProcessReceivedMessageRedundancyNodeShouldResetInactivityIfNeede
SignatureField: []byte("signature"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)

assert.True(t, wasCalled)
Expand Down Expand Up @@ -608,6 +610,7 @@ func TestWorker_ProcessReceivedMessageNodeNotInEligibleListShouldErr(t *testing.
SignatureField: []byte("signature"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)
time.Sleep(time.Second)

Expand Down Expand Up @@ -720,7 +723,7 @@ func testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(
PeerField: currentPid,
SignatureField: []byte("signature"),
}
_ = wrk.ProcessReceivedMessage(msg, "")
_ = wrk.ProcessReceivedMessage(msg, "", &p2pmocks.MessengerStub{})

return receivedValue
}
Expand Down Expand Up @@ -754,6 +757,7 @@ func TestWorker_ProcessReceivedMessageInconsistentChainIDInConsensusMessageShoul
SignatureField: []byte("signature"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)

assert.True(t, errors.Is(err, spos.ErrInvalidChainID))
Expand Down Expand Up @@ -787,6 +791,7 @@ func TestWorker_ProcessReceivedMessageTypeInvalidShouldErr(t *testing.T) {
SignatureField: []byte("signature"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)
time.Sleep(time.Second)

Expand Down Expand Up @@ -822,6 +827,7 @@ func TestWorker_ProcessReceivedHeaderHashSizeInvalidShouldErr(t *testing.T) {
SignatureField: []byte("signature"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)
time.Sleep(time.Second)

Expand Down Expand Up @@ -857,6 +863,7 @@ func TestWorker_ProcessReceivedMessageForFutureRoundShouldErr(t *testing.T) {
SignatureField: []byte("signature"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)
time.Sleep(time.Second)

Expand Down Expand Up @@ -892,6 +899,7 @@ func TestWorker_ProcessReceivedMessageForPastRoundShouldErr(t *testing.T) {
SignatureField: []byte("signature"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)
time.Sleep(time.Second)

Expand Down Expand Up @@ -927,17 +935,17 @@ func TestWorker_ProcessReceivedMessageTypeLimitReachedShouldErr(t *testing.T) {
SignatureField: []byte("signature"),
}

err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId)
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId, &p2pmocks.MessengerStub{})
time.Sleep(time.Second)
assert.Equal(t, 1, len(wrk.ReceivedMessages()[bls.MtBlockBody]))
assert.Nil(t, err)

err = wrk.ProcessReceivedMessage(msg, fromConnectedPeerId)
err = wrk.ProcessReceivedMessage(msg, fromConnectedPeerId, &p2pmocks.MessengerStub{})
time.Sleep(time.Second)
assert.Equal(t, 1, len(wrk.ReceivedMessages()[bls.MtBlockBody]))
assert.True(t, errors.Is(err, spos.ErrMessageTypeLimitReached))

err = wrk.ProcessReceivedMessage(msg, fromConnectedPeerId)
err = wrk.ProcessReceivedMessage(msg, fromConnectedPeerId, &p2pmocks.MessengerStub{})
time.Sleep(time.Second)
assert.Equal(t, 1, len(wrk.ReceivedMessages()[bls.MtBlockBody]))
assert.True(t, errors.Is(err, spos.ErrMessageTypeLimitReached))
Expand Down Expand Up @@ -971,6 +979,7 @@ func TestWorker_ProcessReceivedMessageInvalidSignatureShouldErr(t *testing.T) {
SignatureField: []byte("signature"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)
time.Sleep(time.Second)

Expand Down Expand Up @@ -1005,7 +1014,7 @@ func TestWorker_ProcessReceivedMessageReceivedMessageIsFromSelfShouldRetNilAndNo
PeerField: currentPid,
SignatureField: []byte("signature"),
}
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId)
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId, &p2pmocks.MessengerStub{})
time.Sleep(time.Second)

assert.Equal(t, 0, len(wrk.ReceivedMessages()[bls.MtBlockBody]))
Expand Down Expand Up @@ -1040,7 +1049,7 @@ func TestWorker_ProcessReceivedMessageWhenRoundIsCanceledShouldRetNilAndNotProce
PeerField: currentPid,
SignatureField: []byte("signature"),
}
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId)
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId, &p2pmocks.MessengerStub{})
time.Sleep(time.Second)

assert.Equal(t, 0, len(wrk.ReceivedMessages()[bls.MtBlockBody]))
Expand Down Expand Up @@ -1092,6 +1101,7 @@ func TestWorker_ProcessReceivedMessageWrongChainIDInProposedBlockShouldError(t *
SignatureField: []byte("signature"),
},
fromConnectedPeerId,
&p2pmocks.MessengerStub{},
)
time.Sleep(time.Second)

Expand Down Expand Up @@ -1146,7 +1156,7 @@ func TestWorker_ProcessReceivedMessageWithABadOriginatorShouldErr(t *testing.T)
PeerField: "other originator",
SignatureField: []byte("signature"),
}
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId)
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId, &p2pmocks.MessengerStub{})
time.Sleep(time.Second)

assert.Equal(t, 0, len(wrk.ReceivedMessages()[bls.MtBlockHeader]))
Expand Down Expand Up @@ -1215,7 +1225,7 @@ func TestWorker_ProcessReceivedMessageOkValsShouldWork(t *testing.T) {
PeerField: currentPid,
SignatureField: []byte("signature"),
}
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId)
err := wrk.ProcessReceivedMessage(msg, fromConnectedPeerId, &p2pmocks.MessengerStub{})
time.Sleep(time.Second)

assert.Equal(t, 1, len(wrk.ReceivedMessages()[bls.MtBlockHeader]))
Expand Down Expand Up @@ -1741,7 +1751,7 @@ func TestWorker_ProcessReceivedMessageWrongHeaderShouldErr(t *testing.T) {
PeerField: currentPid,
SignatureField: []byte("signature"),
}
err := wrk.ProcessReceivedMessage(msg, "")
err := wrk.ProcessReceivedMessage(msg, "", &p2pmocks.MessengerStub{})
assert.True(t, errors.Is(err, spos.ErrInvalidHeader))
}

Expand Down Expand Up @@ -1786,7 +1796,7 @@ func TestWorker_ProcessReceivedMessageWithSignature(t *testing.T) {
PeerField: currentPid,
SignatureField: []byte("signature"),
}
err = wrk.ProcessReceivedMessage(msg, "")
err = wrk.ProcessReceivedMessage(msg, "", &p2pmocks.MessengerStub{})
assert.Nil(t, err)

p2pMsgWithSignature, ok := wrk.ConsensusState().GetMessageWithSignature(string(pubKey))
Expand Down
4 changes: 2 additions & 2 deletions dataRetriever/factory/requestersContainer/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
type FactoryArgs struct {
RequesterConfig config.RequesterConfig
ShardCoordinator sharding.Coordinator
MainMessenger dataRetriever.TopicMessageHandler
FullArchiveMessenger dataRetriever.TopicMessageHandler
MainMessenger p2p.Messenger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should evaluate that we can get rid of providing the messenger on the resolvers as the messenger is now provided on each data request. (next PR)

FullArchiveMessenger p2p.Messenger
Marshaller marshal.Marshalizer
Uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
OutputAntifloodHandler dataRetriever.P2PAntifloodHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/dataRetriever/requestHandlers/requesters"
topicsender "github.com/multiversx/mx-chain-go/dataRetriever/topicSender"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/sharding"
logger "github.com/multiversx/mx-chain-logger-go"
Expand All @@ -24,8 +25,8 @@ var log = logger.GetOrCreate("dataRetriever/factory/requesterscontainer")
type baseRequestersContainerFactory struct {
container dataRetriever.RequestersContainer
shardCoordinator sharding.Coordinator
mainMessenger dataRetriever.TopicMessageHandler
fullArchiveMessenger dataRetriever.TopicMessageHandler
mainMessenger p2p.Messenger
fullArchiveMessenger p2p.Messenger
marshaller marshal.Marshalizer
uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
intRandomizer dataRetriever.IntRandomizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/dataRetriever/factory/requestersContainer"
"github.com/multiversx/mx-chain-go/dataRetriever/mock"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/testscommon/p2pmocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var errExpected = errors.New("expected error")

func createStubTopicMessageHandler(matchStrToErrOnCreate string) dataRetriever.TopicMessageHandler {
tmhs := mock.NewTopicMessageHandlerStub()
func createMessengerStub(matchStrToErrOnCreate string) p2p.Messenger {
stub := &p2pmocks.MessengerStub{}

tmhs.CreateTopicCalled = func(name string, createChannelForTopic bool) error {
stub.CreateTopicCalled = func(name string, createChannelForTopic bool) error {
if matchStrToErrOnCreate == "" {
return nil
}
Expand All @@ -30,7 +31,7 @@ func createStubTopicMessageHandler(matchStrToErrOnCreate string) dataRetriever.T
return nil
}

return tmhs
return stub
}

func TestNewShardRequestersContainerFactory_NilShardCoordinatorShouldErr(t *testing.T) {
Expand Down Expand Up @@ -277,8 +278,8 @@ func getArguments() requesterscontainer.FactoryArgs {
NumFullHistoryPeers: 3,
},
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createStubTopicMessageHandler(""),
FullArchiveMessenger: createStubTopicMessageHandler(""),
MainMessenger: createMessengerStub(""),
FullArchiveMessenger: createMessengerStub(""),
Marshaller: &mock.MarshalizerMock{},
Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterMock{},
OutputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
Expand Down
4 changes: 2 additions & 2 deletions dataRetriever/factory/resolverscontainer/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
type FactoryArgs struct {
NumConcurrentResolvingJobs int32
ShardCoordinator sharding.Coordinator
MainMessenger dataRetriever.TopicMessageHandler
FullArchiveMessenger dataRetriever.TopicMessageHandler
MainMessenger p2p.Messenger
FullArchiveMessenger p2p.Messenger
Store dataRetriever.StorageService
Marshalizer marshal.Marshalizer
DataPools dataRetriever.PoolsHolder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/dataRetriever/resolvers"
"github.com/multiversx/mx-chain-go/dataRetriever/topicSender"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/sharding"
logger "github.com/multiversx/mx-chain-logger-go"
Expand All @@ -24,8 +25,8 @@ var log = logger.GetOrCreate("dataRetriever/factory/resolverscontainer")
type baseResolversContainerFactory struct {
container dataRetriever.ResolversContainer
shardCoordinator sharding.Coordinator
mainMessenger dataRetriever.TopicMessageHandler
fullArchiveMessenger dataRetriever.TopicMessageHandler
mainMessenger p2p.Messenger
fullArchiveMessenger p2p.Messenger
store dataRetriever.StorageService
marshalizer marshal.Marshalizer
dataPools dataRetriever.PoolsHolder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"github.com/stretchr/testify/assert"
)

func createStubTopicMessageHandlerForMeta(matchStrToErrOnCreate string, matchStrToErrOnRegister string) dataRetriever.TopicMessageHandler {
tmhs := mock.NewTopicMessageHandlerStub()
func createStubMessengerForMeta(matchStrToErrOnCreate string, matchStrToErrOnRegister string) p2p.Messenger {
stub := &p2pmocks.MessengerStub{}

tmhs.CreateTopicCalled = func(name string, createChannelForTopic bool) error {
stub.CreateTopicCalled = func(name string, createChannelForTopic bool) error {
if matchStrToErrOnCreate == "" {
return nil
}
Expand All @@ -36,7 +36,7 @@ func createStubTopicMessageHandlerForMeta(matchStrToErrOnCreate string, matchStr
return nil
}

tmhs.RegisterMessageProcessorCalled = func(topic string, identifier string, handler p2p.MessageProcessor) error {
stub.RegisterMessageProcessorCalled = func(topic string, identifier string, handler p2p.MessageProcessor) error {
if matchStrToErrOnRegister == "" {
return nil
}
Expand All @@ -47,7 +47,7 @@ func createStubTopicMessageHandlerForMeta(matchStrToErrOnCreate string, matchStr
return nil
}

return tmhs
return stub
}

func createDataPoolsForMeta() dataRetriever.PoolsHolder {
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestMetaResolversContainerFactory_CreateRegisterShardHeadersForMetachainOnM
t.Parallel()

args := getArgumentsMeta()
args.MainMessenger = createStubTopicMessageHandlerForMeta("", factory.ShardBlocksTopic)
args.MainMessenger = createStubMessengerForMeta("", factory.ShardBlocksTopic)
rcf, _ := resolverscontainer.NewMetaResolversContainerFactory(args)

container, err := rcf.Create()
Expand All @@ -274,7 +274,7 @@ func TestMetaResolversContainerFactory_CreateRegisterShardHeadersForMetachainOnF
t.Parallel()

args := getArgumentsMeta()
args.FullArchiveMessenger = createStubTopicMessageHandlerForMeta("", factory.ShardBlocksTopic)
args.FullArchiveMessenger = createStubMessengerForMeta("", factory.ShardBlocksTopic)
rcf, _ := resolverscontainer.NewMetaResolversContainerFactory(args)

container, err := rcf.Create()
Expand Down Expand Up @@ -358,8 +358,8 @@ func TestMetaResolversContainerFactory_IsInterfaceNil(t *testing.T) {
func getArgumentsMeta() resolverscontainer.FactoryArgs {
return resolverscontainer.FactoryArgs{
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createStubTopicMessageHandlerForMeta("", ""),
FullArchiveMessenger: createStubTopicMessageHandlerForMeta("", ""),
MainMessenger: createStubMessengerForMeta("", ""),
FullArchiveMessenger: createStubMessengerForMeta("", ""),
Store: createStoreForMeta(),
Marshalizer: &mock.MarshalizerMock{},
DataPools: createDataPoolsForMeta(),
Expand Down