Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peers rating handler integration test #4781

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package common
import (
"math"
"time"

"github.com/ElrondNetwork/elrond-go/p2p"
)

// PeerType represents the type of a peer
Expand Down Expand Up @@ -646,6 +648,9 @@ const MetricP2PUnknownPeers = "erd_p2p_unknown_shard_peers"
// MetricP2PNumConnectedPeersClassification is the metric for monitoring the number of connected peers split on the connection type
const MetricP2PNumConnectedPeersClassification = "erd_p2p_num_connected_peers_classification"

// MetricP2PPeersRating is the metric that outputs the peers ratings
const MetricP2PPeersRating = p2p.MetricP2PPeersRating

// MetricAreVMQueriesReady will hold the string representation of the boolean that indicated if the node is ready
// to process VM queries
const MetricAreVMQueriesReady = "erd_are_vm_queries_ready"
Expand Down
5 changes: 3 additions & 2 deletions factory/network/networkComponents.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ func (ncf *networkComponentsFactory) Create() (*networkComponents, error) {
return nil, err
}
argsPeersRatingHandler := p2pFactory.ArgPeersRatingHandler{
TopRatedCache: topRatedCache,
BadRatedCache: badRatedCache,
TopRatedCache: topRatedCache,
BadRatedCache: badRatedCache,
AppStatusHandler: ncf.statusHandler,
}
peersRatingHandler, err := p2pFactory.NewPeersRatingHandler(argsPeersRatingHandler)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/ElrondNetwork/elrond-go-core v1.1.26
github.com/ElrondNetwork/elrond-go-crypto v1.2.2
github.com/ElrondNetwork/elrond-go-logger v1.0.10
github.com/ElrondNetwork/elrond-go-p2p v1.0.5
github.com/ElrondNetwork/elrond-go-p2p v1.0.6-0.20221208084139-c0d2bb66e5ed
github.com/ElrondNetwork/elrond-go-storage v1.0.4
github.com/ElrondNetwork/elrond-vm-common v1.3.28
github.com/ElrondNetwork/wasm-vm-v1_2 v1.2.48
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ github.com/ElrondNetwork/elrond-go-crypto v1.2.2 h1:Q59dZUeyibuskq5vjgk3ng/87ifO
github.com/ElrondNetwork/elrond-go-crypto v1.2.2/go.mod h1:MyQPKUKti7Axnx/eihhL0F2jLTalvSV/Ytv1mIxvYyM=
github.com/ElrondNetwork/elrond-go-logger v1.0.10 h1:2xQOWZErcHW5sl9qSRO+7mGNw+QhFhqiUlLLtOgvuuk=
github.com/ElrondNetwork/elrond-go-logger v1.0.10/go.mod h1:+rMODFw4yQptTi5WuLUBzvl/AE26V+2YJtc52wX30Eg=
github.com/ElrondNetwork/elrond-go-p2p v1.0.5 h1:XzuieXEKrVSQ9gKKO3sq60RRZC29IRYgmn7RsPmgUOA=
github.com/ElrondNetwork/elrond-go-p2p v1.0.5/go.mod h1:yCdKTQWNHNKrrQAK5xx1uQT02BEiumjz88I/fk5m1f8=
github.com/ElrondNetwork/elrond-go-p2p v1.0.6-0.20221208084139-c0d2bb66e5ed h1:eY8kelRwVHLBRyZadIs3hqGcznLXPqnhHIL0aH8hUbs=
github.com/ElrondNetwork/elrond-go-p2p v1.0.6-0.20221208084139-c0d2bb66e5ed/go.mod h1:yCdKTQWNHNKrrQAK5xx1uQT02BEiumjz88I/fk5m1f8=
github.com/ElrondNetwork/elrond-go-storage v1.0.2/go.mod h1:SRsv4hUtL1BCiQe0eADth3C0EZH9baijzIJDCUitR34=
github.com/ElrondNetwork/elrond-go-storage v1.0.4 h1:esyXbHQvlR6m4HGeC86Nq0xAJ74+QG9EnUgfG+wQDYQ=
github.com/ElrondNetwork/elrond-go-storage v1.0.4/go.mod h1:SRsv4hUtL1BCiQe0eADth3C0EZH9baijzIJDCUitR34=
Expand Down
187 changes: 187 additions & 0 deletions integrationTests/p2p/peersRating/peersRating_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package peersRating

import (
"encoding/json"
"testing"
"time"

"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/data/block"
"github.com/ElrondNetwork/elrond-go/common"
"github.com/ElrondNetwork/elrond-go/integrationTests"
"github.com/ElrondNetwork/elrond-go/statusHandler"
statusHandlerMock "github.com/ElrondNetwork/elrond-go/testscommon/statusHandler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
increaseFactor = 2
decreaseFactor = -1
)

type ratingInfo struct {
Rating int32 `json:"rating"`
TimestampLastRequestToPid int64 `json:"timestampLastRequestToPid"`
TimestampLastResponseFromPid int64 `json:"timestampLastResponseFromPid"`
}

func TestPeersRatingAndResponsiveness(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
}

var numOfShards uint32 = 1
var shardID uint32 = 0
resolverNode := createNodeWithStatusHandler(shardID, numOfShards)
maliciousNode := createNodeWithStatusHandler(shardID, numOfShards)
requesterNode := createNodeWithStatusHandler(core.MetachainShardId, numOfShards)

defer func() {
_ = resolverNode.Messenger.Close()
_ = maliciousNode.Messenger.Close()
_ = requesterNode.Messenger.Close()
}()

time.Sleep(time.Second)
require.Nil(t, resolverNode.ConnectTo(maliciousNode))
require.Nil(t, resolverNode.ConnectTo(requesterNode))
require.Nil(t, maliciousNode.ConnectTo(requesterNode))
time.Sleep(time.Second)

hdr, hdrHash := getHeader()

numOfRequests := 10
// Add header to the resolver node's cache
resolverNode.DataPool.Headers().AddHeader(hdrHash, hdr)
requestHeader(requesterNode, numOfRequests, hdrHash)

peerRatingsMap := getRatingsMapFromMetric(t, requesterNode)
// resolver node should have received and responded to numOfRequests
initialResolverRating, exists := peerRatingsMap[resolverNode.Messenger.ID().Pretty()]
require.True(t, exists)
initialResolverExpectedRating := numOfRequests * (decreaseFactor + increaseFactor)
assert.Equal(t, int32(initialResolverExpectedRating), initialResolverRating.Rating)
testTimestampsForRespondingNode(t, initialResolverRating.TimestampLastResponseFromPid, initialResolverRating.TimestampLastRequestToPid)
// malicious node should have only received numOfRequests
initialMaliciousRating, exists := peerRatingsMap[maliciousNode.Messenger.ID().Pretty()]
require.True(t, exists)
initialMaliciousExpectedRating := numOfRequests * decreaseFactor
assert.Equal(t, int32(initialMaliciousExpectedRating), initialMaliciousRating.Rating)
testTimestampsForNotRespondingNode(t, initialMaliciousRating.TimestampLastResponseFromPid, initialMaliciousRating.TimestampLastRequestToPid, 0)

// Reach max limits
numOfRequests = 120
requestHeader(requesterNode, numOfRequests, hdrHash)

peerRatingsMap = getRatingsMapFromMetric(t, requesterNode)
// Resolver should have reached max limit and timestamps still update
initialResolverRating, exists = peerRatingsMap[resolverNode.Messenger.ID().Pretty()]
require.True(t, exists)
assert.Equal(t, int32(100), initialResolverRating.Rating)
testTimestampsForRespondingNode(t, initialResolverRating.TimestampLastResponseFromPid, initialResolverRating.TimestampLastRequestToPid)

// Malicious should have reached min limit and timestamps still update
initialMaliciousRating, exists = peerRatingsMap[maliciousNode.Messenger.ID().Pretty()]
require.True(t, exists)
assert.Equal(t, int32(-100), initialMaliciousRating.Rating)
testTimestampsForNotRespondingNode(t, initialMaliciousRating.TimestampLastResponseFromPid, initialMaliciousRating.TimestampLastRequestToPid, 0)

// Add header to the malicious node's cache and remove it from the resolver's cache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L98 - L116 also test that requests are still performed towards the low rating peer 👍

maliciousNode.DataPool.Headers().AddHeader(hdrHash, hdr)
resolverNode.DataPool.Headers().RemoveHeaderByHash(hdrHash)
numOfRequests = 10
requestHeader(requesterNode, numOfRequests, hdrHash)

peerRatingsMap = getRatingsMapFromMetric(t, requesterNode)
// resolver node should have the max rating + numOfRequests that didn't answer to
resolverRating, exists := peerRatingsMap[resolverNode.Messenger.ID().Pretty()]
require.True(t, exists)
finalResolverExpectedRating := 100 + decreaseFactor*numOfRequests
assert.Equal(t, int32(finalResolverExpectedRating), resolverRating.Rating)
testTimestampsForNotRespondingNode(t, resolverRating.TimestampLastResponseFromPid, resolverRating.TimestampLastRequestToPid, initialResolverRating.TimestampLastResponseFromPid)
// malicious node should have the min rating + numOfRequests that received and responded to
maliciousRating, exists := peerRatingsMap[maliciousNode.Messenger.ID().Pretty()]
require.True(t, exists)
finalMaliciousExpectedRating := -100 + numOfRequests*increaseFactor + (numOfRequests-1)*decreaseFactor
assert.Equal(t, int32(finalMaliciousExpectedRating), maliciousRating.Rating)
testTimestampsForRespondingNode(t, maliciousRating.TimestampLastResponseFromPid, maliciousRating.TimestampLastRequestToPid)
}

func createNodeWithStatusHandler(shardID uint32, numShards uint32) *integrationTests.TestProcessorNode {
statusMetrics := statusHandler.NewStatusMetrics()
appStatusHandler := &statusHandlerMock.AppStatusHandlerStub{
SetStringValueHandler: func(key string, value string) {
statusMetrics.SetStringValue(key, value)
},
}
return integrationTests.NewTestProcessorNode(integrationTests.ArgTestProcessorNode{
MaxShards: numShards,
NodeShardId: shardID,
AppStatusHandler: appStatusHandler,
StatusMetrics: statusMetrics,
})
}

func getHeader() (*block.Header, []byte) {
hdr := &block.Header{
Nonce: 0,
PubKeysBitmap: []byte{255, 0},
Signature: []byte("signature"),
PrevHash: []byte("prev hash"),
TimeStamp: uint64(time.Now().Unix()),
Round: 1,
Epoch: 2,
ShardID: 0,
BlockBodyType: block.TxBlock,
RootHash: []byte{255, 255},
PrevRandSeed: make([]byte, 1),
RandSeed: make([]byte, 1),
MiniBlockHeaders: nil,
ChainID: integrationTests.ChainID,
SoftwareVersion: []byte("version"),
}
hdrBuff, _ := integrationTests.TestMarshalizer.Marshal(hdr)
hdrHash := integrationTests.TestHasher.Compute(string(hdrBuff))
return hdr, hdrHash
}

func getRatingsMapFromMetric(t *testing.T, node *integrationTests.TestProcessorNode) map[string]*ratingInfo {
statusMetrics := node.Node.GetStatusCoreComponents().StatusMetrics()
p2pMetricsMap, err := statusMetrics.StatusP2pMetricsMap()
require.Nil(t, err)

metricPeersRating := p2pMetricsMap[common.MetricP2PPeersRating]
metricPeersRatingString, ok := metricPeersRating.(string)
require.True(t, ok)

peerRatingsMap := map[string]*ratingInfo{}
err = json.Unmarshal([]byte(metricPeersRatingString), &peerRatingsMap)
require.Nil(t, err)

return peerRatingsMap
}

func requestHeader(requesterNode *integrationTests.TestProcessorNode, numOfRequests int, hdrHash []byte) {
for i := 0; i < numOfRequests; i++ {
requesterNode.RequestHandler.RequestShardHeader(0, hdrHash)
time.Sleep(time.Second) // allow nodes to respond
}
}

func testTimestampsForRespondingNode(t *testing.T, timestampLastResponse int64, timestampLastRequest int64) {
expectedMaxTimestamp := time.Now().Unix()
expectedMinTimestamp := time.Now().Unix() - 1
assert.LessOrEqual(t, timestampLastRequest, expectedMaxTimestamp)
assert.GreaterOrEqual(t, timestampLastRequest, expectedMinTimestamp)
assert.LessOrEqual(t, timestampLastResponse, expectedMaxTimestamp)
assert.GreaterOrEqual(t, timestampLastResponse, expectedMinTimestamp)
}

func testTimestampsForNotRespondingNode(t *testing.T, timestampLastResponse int64, timestampLastRequest int64, expectedTimestampLastResponse int64) {
expectedMaxTimestamp := time.Now().Unix()
expectedMinTimestamp := time.Now().Unix() - 1
assert.LessOrEqual(t, timestampLastRequest, expectedMaxTimestamp)
assert.GreaterOrEqual(t, timestampLastRequest, expectedMinTimestamp)
assert.Equal(t, expectedTimestampLastResponse, timestampLastResponse)
}
35 changes: 31 additions & 4 deletions integrationTests/testProcessorNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ type ArgTestProcessorNode struct {
HeaderIntegrityVerifier process.HeaderIntegrityVerifier
OwnAccount *TestWalletAccount
EpochStartSubscriber notifier.EpochStartNotifier
AppStatusHandler core.AppStatusHandler
StatusMetrics external.StatusMetricsHandler
}

// TestProcessorNode represents a container type of class used in integration tests
Expand Down Expand Up @@ -373,6 +375,8 @@ type TestProcessorNode struct {
TransactionLogProcessor process.TransactionLogProcessor
PeersRatingHandler p2p.PeersRatingHandler
HardforkTrigger node.HardforkTrigger
AppStatusHandler core.AppStatusHandler
StatusMetrics external.StatusMetricsHandler
}

// CreatePkBytes creates 'numShards' public key-like byte slices
Expand Down Expand Up @@ -409,10 +413,16 @@ func newBaseTestProcessorNode(args ArgTestProcessorNode) *TestProcessorNode {
nodesCoordinatorInstance = getDefaultNodesCoordinator(args.MaxShards, pksBytes)
}

appStatusHandler := args.AppStatusHandler
if check.IfNil(args.AppStatusHandler) {
appStatusHandler = TestAppStatusHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this OK? Having the same pointer TestAppStatusHandler will make debugging incredibly hard because nodes will tend to alter the same instance. Maybe not for this PR I will consider removing this TestAppStatusHandler and create different instances always.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, shouldn't use the same pointer

}

peersRatingHandler, _ := p2pFactory.NewPeersRatingHandler(
p2pFactory.ArgPeersRatingHandler{
TopRatedCache: testscommon.NewCacherMock(),
BadRatedCache: testscommon.NewCacherMock(),
TopRatedCache: testscommon.NewCacherMock(),
BadRatedCache: testscommon.NewCacherMock(),
AppStatusHandler: appStatusHandler,
})

messenger := CreateMessengerWithNoDiscoveryAndPeersRatingHandler(peersRatingHandler)
Expand Down Expand Up @@ -631,6 +641,16 @@ func (tpn *TestProcessorNode) initGenesisBlocks(args ArgTestProcessorNode) {
}

func (tpn *TestProcessorNode) initTestNodeWithArgs(args ArgTestProcessorNode) {
tpn.AppStatusHandler = args.AppStatusHandler
if check.IfNil(args.AppStatusHandler) {
tpn.AppStatusHandler = TestAppStatusHandler
}

tpn.StatusMetrics = args.StatusMetrics
if check.IfNil(args.StatusMetrics) {
args.StatusMetrics = &testscommon.StatusMetricsStub{}
}

tpn.initChainHandler()
tpn.initHeaderValidator()
tpn.initRoundHandler()
Expand Down Expand Up @@ -1261,6 +1281,7 @@ func (tpn *TestProcessorNode) initResolvers() {

_ = tpn.Messenger.CreateTopic(common.ConsensusTopic+tpn.ShardCoordinator.CommunicationIdentifier(tpn.ShardCoordinator.SelfId()), true)
payloadValidator, _ := validator.NewPeerAuthenticationPayloadValidator(60)
preferredPeersHolder, _ := p2pFactory.NewPeersHolder([]string{})

resolverContainerFactory := resolverscontainer.FactoryArgs{
ShardCoordinator: tpn.ShardCoordinator,
Expand All @@ -1276,7 +1297,7 @@ func (tpn *TestProcessorNode) initResolvers() {
OutputAntifloodHandler: &mock.NilAntifloodHandler{},
NumConcurrentResolvingJobs: 10,
CurrentNetworkEpochProvider: &mock.CurrentNetworkEpochProviderStub{},
PreferredPeersHolder: &p2pmocks.PeersHolderStub{},
PreferredPeersHolder: preferredPeersHolder,
ResolverConfig: config.ResolverConfig{
NumCrossShardPeers: 2,
NumTotalPeers: 3,
Expand Down Expand Up @@ -2174,6 +2195,11 @@ func (tpn *TestProcessorNode) setGenesisBlock() {
func (tpn *TestProcessorNode) initNode() {
var err error

statusCoreComponents := &testFactory.StatusCoreComponentsStub{
StatusMetricsField: tpn.StatusMetrics,
AppStatusHandlerField: tpn.AppStatusHandler,
}

coreComponents := GetDefaultCoreComponents()
coreComponents.InternalMarshalizerField = TestMarshalizer
coreComponents.VmMarshalizerField = TestVmMarshalizer
Expand Down Expand Up @@ -2262,6 +2288,7 @@ func (tpn *TestProcessorNode) initNode() {
node.WithNetworkComponents(networkComponents),
node.WithStateComponents(stateComponents),
node.WithPeerDenialEvaluator(&mock.PeerDenialEvaluatorStub{}),
node.WithStatusCoreComponents(statusCoreComponents),
)
log.LogIfError(err)

Expand Down Expand Up @@ -2805,7 +2832,7 @@ func (tpn *TestProcessorNode) createHeartbeatWithHardforkTrigger() {
processComponents.HardforkTriggerField = tpn.HardforkTrigger

statusCoreComponents := &testFactory.StatusCoreComponentsStub{
AppStatusHandlerField: TestAppStatusHandler,
AppStatusHandlerField: tpn.AppStatusHandler,
}

err = tpn.Node.ApplyOptions(
Expand Down
1 change: 1 addition & 0 deletions node/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func InitBaseMetrics(appStatusHandler core.AppStatusHandler) error {
appStatusHandler.SetStringValue(common.MetricP2PCrossShardObservers, initString)
appStatusHandler.SetStringValue(common.MetricP2PFullHistoryObservers, initString)
appStatusHandler.SetStringValue(common.MetricP2PUnknownPeers, initString)
appStatusHandler.SetStringValue(common.MetricP2PPeersRating, initString)
appStatusHandler.SetStringValue(common.MetricAccountsSnapshotInProgress, initString)
appStatusHandler.SetStringValue(common.MetricPeersSnapshotInProgress, initString)

Expand Down
1 change: 1 addition & 0 deletions node/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestInitBaseMetrics(t *testing.T) {
common.MetricP2PCrossShardObservers,
common.MetricP2PFullHistoryObservers,
common.MetricP2PUnknownPeers,
common.MetricP2PPeersRating,
common.MetricInflation,
common.MetricDevRewardsInEpoch,
common.MetricTotalFees,
Expand Down
3 changes: 3 additions & 0 deletions p2p/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ const ListenAddrWithIp4AndTcp = "/ip4/0.0.0.0/tcp/"

// ListenLocalhostAddrWithIp4AndTcp defines the local host listening ip v.4 address and TCP
const ListenLocalhostAddrWithIp4AndTcp = "/ip4/127.0.0.1/tcp/"

// MetricP2PPeersRating is the metric that outputs the peers ratings
const MetricP2PPeersRating = p2p.MetricP2PPeersRating