Skip to content

Commit

Permalink
added managed peers monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
sstanculeanu committed Jun 16, 2023
1 parent 03438af commit f8a1f1b
Show file tree
Hide file tree
Showing 5 changed files with 379 additions and 0 deletions.
8 changes: 8 additions & 0 deletions common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,11 @@ type StateSyncNotifierSubscriber interface {
MissingDataTrieNodeFound(hash []byte)
IsInterfaceNil() bool
}

// ManagedPeersMonitor defines the operations of an entity that monitors the managed peers holder
type ManagedPeersMonitor interface {
GetManagedKeysCount() int
GetEligibleManagedKeys(epoch uint32) ([][]byte, error)
GetWaitingManagedKeys(epoch uint32) ([][]byte, error)
IsInterfaceNil() bool
}
6 changes: 6 additions & 0 deletions keysManagement/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@ var ErrEmptyPeerID = errors.New("empty peer ID")

// ErrNilP2PKeyConverter signals that a nil p2p key converter has been provided
var ErrNilP2PKeyConverter = errors.New("nil p2p key converter")

// ErrNilNodesCoordinator signals that a nil nodes coordinator has been provided
var ErrNilNodesCoordinator = errors.New("nil nodes coordinator")

// ErrNilShardProvider signals that a nil shard provider has been provided
var ErrNilShardProvider = errors.New("nil shard provider")
14 changes: 14 additions & 0 deletions keysManagement/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package keysManagement

// NodesCoordinator provides Validator methods needed for the peer processing
type NodesCoordinator interface {
GetAllEligibleValidatorsPublicKeys(epoch uint32) (map[uint32][][]byte, error)
GetAllWaitingValidatorsPublicKeys(epoch uint32) (map[uint32][][]byte, error)
IsInterfaceNil() bool
}

// ShardProvider defines a component able to provide self shard
type ShardProvider interface {
SelfId() uint32
IsInterfaceNil() bool
}
101 changes: 101 additions & 0 deletions keysManagement/managedPeersMonitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package keysManagement

import (
"fmt"
"sort"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-go/common"
)

// ArgManagedPeersMonitor is the DTO used to create a new instance of managedPeersMonitor
type ArgManagedPeersMonitor struct {
ManagedPeersHolder common.ManagedPeersHolder
NodesCoordinator NodesCoordinator
ShardProvider ShardProvider
}

type managedPeersMonitor struct {
managedPeersHolder common.ManagedPeersHolder
nodesCoordinator NodesCoordinator
shardProvider ShardProvider
}

// NewManagedPeersMonitor returns a new instance of managedPeersMonitor
func NewManagedPeersMonitor(args ArgManagedPeersMonitor) (*managedPeersMonitor, error) {
err := checkArgs(args)
if err != nil {
return nil, err
}

return &managedPeersMonitor{
managedPeersHolder: args.ManagedPeersHolder,
nodesCoordinator: args.NodesCoordinator,
shardProvider: args.ShardProvider,
}, nil
}

func checkArgs(args ArgManagedPeersMonitor) error {
if check.IfNil(args.ManagedPeersHolder) {
return ErrNilManagedPeersHolder
}
if check.IfNil(args.NodesCoordinator) {
return ErrNilNodesCoordinator
}
if check.IfNil(args.ShardProvider) {
return ErrNilShardProvider
}

return nil
}

// GetManagedKeysCount returns the number of keys managed by the current node
func (monitor *managedPeersMonitor) GetManagedKeysCount() int {
return len(monitor.managedPeersHolder.GetManagedKeysByCurrentNode())
}

// GetEligibleManagedKeys returns eligible keys that are managed by the current node
func (monitor *managedPeersMonitor) GetEligibleManagedKeys(epoch uint32) ([][]byte, error) {
eligibleValidators, err := monitor.nodesCoordinator.GetAllEligibleValidatorsPublicKeys(epoch)
if err != nil {
return nil, err
}

return monitor.extractManagedIntraShardKeys(eligibleValidators)
}

// GetWaitingManagedKeys returns waiting keys that are managed by the current node
func (monitor *managedPeersMonitor) GetWaitingManagedKeys(epoch uint32) ([][]byte, error) {
waitingValidators, err := monitor.nodesCoordinator.GetAllWaitingValidatorsPublicKeys(epoch)
if err != nil {
return nil, err
}

return monitor.extractManagedIntraShardKeys(waitingValidators)
}

func (monitor *managedPeersMonitor) extractManagedIntraShardKeys(keysMap map[uint32][][]byte) ([][]byte, error) {
selfShardID := monitor.shardProvider.SelfId()
intraShardKeys, ok := keysMap[selfShardID]
if !ok {
return nil, fmt.Errorf("%w for shard %d, no validators found", ErrInvalidValue, selfShardID)
}

managedKeys := make([][]byte, 0)
for _, key := range intraShardKeys {
if monitor.managedPeersHolder.IsKeyManagedByCurrentNode(key) {
managedKeys = append(managedKeys, key)
}
}

sort.Slice(managedKeys, func(i, j int) bool {
return string(managedKeys[i]) < string(managedKeys[j])
})

return managedKeys, nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (monitor *managedPeersMonitor) IsInterfaceNil() bool {
return monitor == nil
}
250 changes: 250 additions & 0 deletions keysManagement/managedPeersMonitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package keysManagement

import (
"errors"
"testing"

crypto "github.com/multiversx/mx-chain-crypto-go"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/multiversx/mx-chain-go/testscommon/cryptoMocks"
"github.com/multiversx/mx-chain-go/testscommon/shardingMocks"
"github.com/stretchr/testify/require"
)

var expectedErr = errors.New("expected error")

func createMockArgManagedPeersMonitor() ArgManagedPeersMonitor {
return ArgManagedPeersMonitor{
ManagedPeersHolder: &testscommon.ManagedPeersHolderStub{},
NodesCoordinator: &shardingMocks.NodesCoordinatorStub{},
ShardProvider: &testscommon.ShardsCoordinatorMock{},
}
}

func TestNewManagedPeersMonitor(t *testing.T) {
t.Parallel()

t.Run("nil ManagedPeersHolder should error", func(t *testing.T) {
t.Parallel()

args := createMockArgManagedPeersMonitor()
args.ManagedPeersHolder = nil
monitor, err := NewManagedPeersMonitor(args)
require.Equal(t, ErrNilManagedPeersHolder, err)
require.Nil(t, monitor)
})
t.Run("nil NodesCoordinator should error", func(t *testing.T) {
t.Parallel()

args := createMockArgManagedPeersMonitor()
args.NodesCoordinator = nil
monitor, err := NewManagedPeersMonitor(args)
require.Equal(t, ErrNilNodesCoordinator, err)
require.Nil(t, monitor)
})
t.Run("nil ShardProvider should error", func(t *testing.T) {
t.Parallel()

args := createMockArgManagedPeersMonitor()
args.ShardProvider = nil
monitor, err := NewManagedPeersMonitor(args)
require.Equal(t, ErrNilShardProvider, err)
require.Nil(t, monitor)
})
t.Run("should work", func(t *testing.T) {
t.Parallel()

monitor, err := NewManagedPeersMonitor(createMockArgManagedPeersMonitor())
require.NoError(t, err)
require.NotNil(t, monitor)
})
}

func TestManagedPeersMonitor_IsInterfaceNil(t *testing.T) {
t.Parallel()

var monitor *managedPeersMonitor
require.True(t, monitor.IsInterfaceNil())

monitor, _ = NewManagedPeersMonitor(createMockArgManagedPeersMonitor())
require.False(t, monitor.IsInterfaceNil())
}

func TestManagedPeersMonitor_GetManagedKeysCount(t *testing.T) {
t.Parallel()

managedKeys := map[string]crypto.PrivateKey{
"pk1": &cryptoMocks.PrivateKeyStub{},
"pk2": &cryptoMocks.PrivateKeyStub{},
"pk3": &cryptoMocks.PrivateKeyStub{},
"pk4": &cryptoMocks.PrivateKeyStub{},
}
args := createMockArgManagedPeersMonitor()
args.ManagedPeersHolder = &testscommon.ManagedPeersHolderStub{
GetManagedKeysByCurrentNodeCalled: func() map[string]crypto.PrivateKey {
return managedKeys
},
}
monitor, err := NewManagedPeersMonitor(args)
require.NoError(t, err)

require.Equal(t, len(managedKeys), monitor.GetManagedKeysCount())
}

func TestManagedPeersMonitor_GetEligibleManagedKeys(t *testing.T) {
t.Parallel()

t.Run("nodes coordinator returns error should error", func(t *testing.T) {
t.Parallel()

args := createMockArgManagedPeersMonitor()
args.NodesCoordinator = &shardingMocks.NodesCoordinatorStub{
GetAllEligibleValidatorsPublicKeysCalled: func(epoch uint32) (map[uint32][][]byte, error) {
return nil, expectedErr
},
}
monitor, err := NewManagedPeersMonitor(args)
require.NoError(t, err)

keys, err := monitor.GetEligibleManagedKeys(0)
require.Equal(t, expectedErr, err)
require.Nil(t, keys)
})
t.Run("shard not found should error", func(t *testing.T) {
t.Parallel()

args := createMockArgManagedPeersMonitor()
args.NodesCoordinator = &shardingMocks.NodesCoordinatorStub{
GetAllEligibleValidatorsPublicKeysCalled: func(epoch uint32) (map[uint32][][]byte, error) {
return map[uint32][][]byte{}, nil
},
}
monitor, err := NewManagedPeersMonitor(args)
require.NoError(t, err)

keys, err := monitor.GetEligibleManagedKeys(0)
require.True(t, errors.Is(err, ErrInvalidValue))
require.Nil(t, keys)
})
t.Run("should work", func(t *testing.T) {
t.Parallel()

selfShard := uint32(1)
eligibleValidators := map[uint32][][]byte{
selfShard: {
[]byte("managed 1"),
[]byte("managed 2"),
[]byte("not managed 1"),
[]byte("not managed 2"),
},
100: {
[]byte("managed 3"),
[]byte("not managed 3"),
},
}
args := createMockArgManagedPeersMonitor()
args.NodesCoordinator = &shardingMocks.NodesCoordinatorStub{
GetAllEligibleValidatorsPublicKeysCalled: func(epoch uint32) (map[uint32][][]byte, error) {
return eligibleValidators, nil
},
}
args.ShardProvider = &testscommon.ShardsCoordinatorMock{
SelfIDCalled: func() uint32 {
return selfShard
},
}
args.ManagedPeersHolder = &testscommon.ManagedPeersHolderStub{
IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool {
return string(pkBytes) == "managed 1" ||
string(pkBytes) == "managed 2" ||
string(pkBytes) == "managed 3"
},
}
monitor, err := NewManagedPeersMonitor(args)
require.NoError(t, err)

keys, err := monitor.GetEligibleManagedKeys(0)
require.NoError(t, err)

require.Equal(t, [][]byte{[]byte("managed 1"), []byte("managed 2")}, keys)
})
}

func TestManagedPeersMonitor_GetGetWaitingManagedKeys(t *testing.T) {
t.Parallel()

t.Run("nodes coordinator returns error should error", func(t *testing.T) {
t.Parallel()

args := createMockArgManagedPeersMonitor()
args.NodesCoordinator = &shardingMocks.NodesCoordinatorStub{
GetAllWaitingValidatorsPublicKeysCalled: func(epoch uint32) (map[uint32][][]byte, error) {
return nil, expectedErr
},
}
monitor, err := NewManagedPeersMonitor(args)
require.NoError(t, err)

keys, err := monitor.GetWaitingManagedKeys(0)
require.Equal(t, expectedErr, err)
require.Nil(t, keys)
})
t.Run("shard not found should error", func(t *testing.T) {
t.Parallel()

args := createMockArgManagedPeersMonitor()
args.NodesCoordinator = &shardingMocks.NodesCoordinatorStub{
GetAllWaitingValidatorsPublicKeysCalled: func(epoch uint32) (map[uint32][][]byte, error) {
return map[uint32][][]byte{}, nil
},
}
monitor, err := NewManagedPeersMonitor(args)
require.NoError(t, err)

keys, err := monitor.GetWaitingManagedKeys(0)
require.True(t, errors.Is(err, ErrInvalidValue))
require.Nil(t, keys)
})
t.Run("should work", func(t *testing.T) {
t.Parallel()

selfShard := uint32(1)
eligibleValidators := map[uint32][][]byte{
selfShard: {
[]byte("managed 1"),
[]byte("managed 2"),
[]byte("not managed 1"),
[]byte("not managed 2"),
},
100: {
[]byte("managed 3"),
[]byte("not managed 3"),
},
}
args := createMockArgManagedPeersMonitor()
args.NodesCoordinator = &shardingMocks.NodesCoordinatorStub{
GetAllWaitingValidatorsPublicKeysCalled: func(epoch uint32) (map[uint32][][]byte, error) {
return eligibleValidators, nil
},
}
args.ShardProvider = &testscommon.ShardsCoordinatorMock{
SelfIDCalled: func() uint32 {
return selfShard
},
}
args.ManagedPeersHolder = &testscommon.ManagedPeersHolderStub{
IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool {
return string(pkBytes) == "managed 1" ||
string(pkBytes) == "managed 2" ||
string(pkBytes) == "managed 3"
},
}
monitor, err := NewManagedPeersMonitor(args)
require.NoError(t, err)

keys, err := monitor.GetWaitingManagedKeys(0)
require.NoError(t, err)

require.Equal(t, [][]byte{[]byte("managed 1"), []byte("managed 2")}, keys)
})
}

0 comments on commit f8a1f1b

Please sign in to comment.