Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer authentication sender factory #4414

Merged
merged 8 commits into from
Sep 1, 2022
3 changes: 3 additions & 0 deletions heartbeat/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,6 @@ var ErrShouldSkipValidator = errors.New("validator should be skipped")

// ErrNilKeysHolder signals that a nil keys holder has been provided
var ErrNilKeysHolder = errors.New("nil keys holder")

// ErrInvalidConfiguration signals that an invalid configuration has been provided
var ErrInvalidConfiguration = errors.New("invalid configuration")
5 changes: 5 additions & 0 deletions heartbeat/sender/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ type hardforkHandler interface {
Close()
}

type peerAuthenticationSenderHandler interface {
senderHandler
hardforkHandler
}

type timerHandler interface {
CreateNewTimer(duration time.Duration)
ExecutionReadyChannel() <-chan time.Time
Expand Down
74 changes: 74 additions & 0 deletions heartbeat/sender/peerAuthenticationSenderFactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package sender

import (
"time"

"github.com/ElrondNetwork/covalent-indexer-go/process"
crypto "github.com/ElrondNetwork/elrond-go-crypto"
"github.com/ElrondNetwork/elrond-go/heartbeat"
)

type argPeerAuthenticationSenderFactory struct {
argBaseSender
nodesCoordinator heartbeat.NodesCoordinator
peerSignatureHandler crypto.PeerSignatureHandler
hardforkTrigger heartbeat.HardforkTrigger
hardforkTimeBetweenSends time.Duration
hardforkTriggerPubKey []byte
keysHolder heartbeat.KeysHolder
timeBetweenChecks time.Duration
shardCoordinator process.ShardCoordinator
privKey crypto.PrivateKey
redundancyHandler heartbeat.NodeRedundancyHandler
}

func createPeerAuthenticationSender(args argPeerAuthenticationSenderFactory) (peerAuthenticationSenderHandler, error) {
pk := args.privKey.GeneratePublic()
pkBytes, err := pk.ToByteArray()
if err != nil {
return nil, err
}

_, _, err = args.nodesCoordinator.GetValidatorWithPublicKey(pkBytes)
if err == nil {
return createRegularSender(args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a check here for len(keysMap) and allow to create the regular peer only if that len is 0. Otherwise we should output error

}

keysMap := args.keysHolder.GetManagedKeysByCurrentNode()
if len(keysMap) == 0 {
return createRegularSender(args)
}

return createMultikeySender(args)
}

func createRegularSender(args argPeerAuthenticationSenderFactory) (*peerAuthenticationSender, error) {
argsSender := argPeerAuthenticationSender{
argBaseSender: args.argBaseSender,
nodesCoordinator: args.nodesCoordinator,
peerSignatureHandler: args.peerSignatureHandler,
privKey: args.privKey,
redundancyHandler: args.redundancyHandler,
hardforkTrigger: args.hardforkTrigger,
hardforkTimeBetweenSends: args.hardforkTimeBetweenSends,
hardforkTriggerPubKey: args.hardforkTriggerPubKey,
}

return newPeerAuthenticationSender(argsSender)
}

func createMultikeySender(args argPeerAuthenticationSenderFactory) (*multikeyPeerAuthenticationSender, error) {
argsSender := argMultikeyPeerAuthenticationSender{
argBaseSender: args.argBaseSender,
nodesCoordinator: args.nodesCoordinator,
peerSignatureHandler: args.peerSignatureHandler,
hardforkTrigger: args.hardforkTrigger,
hardforkTimeBetweenSends: args.hardforkTimeBetweenSends,
hardforkTriggerPubKey: args.hardforkTriggerPubKey,
keysHolder: args.keysHolder,
timeBetweenChecks: args.timeBetweenChecks,
shardCoordinator: args.shardCoordinator,
}

return newMultikeyPeerAuthenticationSender(argsSender)
}
120 changes: 120 additions & 0 deletions heartbeat/sender/peerAuthenticationSenderFactory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package sender

import (
"errors"
"fmt"
"testing"
"time"

"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go-crypto"
"github.com/ElrondNetwork/elrond-go-crypto/signing"
"github.com/ElrondNetwork/elrond-go-crypto/signing/mcl"
"github.com/ElrondNetwork/elrond-go/heartbeat/mock"
"github.com/ElrondNetwork/elrond-go/sharding/nodesCoordinator"
"github.com/ElrondNetwork/elrond-go/testscommon"
"github.com/ElrondNetwork/elrond-go/testscommon/cryptoMocks"
"github.com/ElrondNetwork/elrond-go/testscommon/shardingMocks"
"github.com/stretchr/testify/assert"
)

func createMockPeerAuthenticationSenderFactoryArgs() argPeerAuthenticationSenderFactory {
return argPeerAuthenticationSenderFactory{
argBaseSender: createMockBaseArgs(),
nodesCoordinator: &shardingMocks.NodesCoordinatorStub{},
peerSignatureHandler: &cryptoMocks.PeerSignatureHandlerStub{},
hardforkTrigger: &testscommon.HardforkTriggerStub{},
hardforkTimeBetweenSends: time.Second,
hardforkTriggerPubKey: providedHardforkPubKey,
keysHolder: &testscommon.KeysHolderStub{},
timeBetweenChecks: time.Second,
shardCoordinator: createShardCoordinatorInShard(0),
privKey: &cryptoMocks.PrivateKeyStub{},
redundancyHandler: &mock.RedundancyHandlerStub{},
}
}

func TestPeerAuthenticationSenderFactory_Create(t *testing.T) {
t.Parallel()

t.Run("ToByteArray fails should error", func(t *testing.T) {
t.Parallel()

args := createMockPeerAuthenticationSenderFactoryArgs()
args.privKey = &cryptoMocks.PrivateKeyStub{
GeneratePublicStub: func() crypto.PublicKey {
return &cryptoMocks.PublicKeyStub{
ToByteArrayStub: func() ([]byte, error) {
return nil, expectedErr
},
}
},
}
peerAuthSender, err := createPeerAuthenticationSender(args)
assert.Equal(t, expectedErr, err)
assert.True(t, check.IfNil(peerAuthSender))
})
t.Run("validator should create regular sender", func(t *testing.T) {
t.Parallel()

args := createMockPeerAuthenticationSenderFactoryArgs()
args.nodesCoordinator = &shardingMocks.NodesCoordinatorStub{
GetValidatorWithPublicKeyCalled: func(publicKey []byte) (validator nodesCoordinator.Validator, shardId uint32, err error) {
return nil, 0, nil
},
}
peerAuthSender, err := createPeerAuthenticationSender(args)
assert.Nil(t, err)
assert.False(t, check.IfNil(peerAuthSender))
assert.Equal(t, "*sender.peerAuthenticationSender", fmt.Sprintf("%T", peerAuthSender))
})
t.Run("regular observer should create regular sender", func(t *testing.T) {
t.Parallel()

args := createMockPeerAuthenticationSenderFactoryArgs()
args.nodesCoordinator = &shardingMocks.NodesCoordinatorStub{
GetValidatorWithPublicKeyCalled: func(publicKey []byte) (validator nodesCoordinator.Validator, shardId uint32, err error) {
return nil, 0, errors.New("not validator")
},
}
args.keysHolder = &testscommon.KeysHolderStub{
GetManagedKeysByCurrentNodeCalled: func() map[string]crypto.PrivateKey {
return make(map[string]crypto.PrivateKey)
},
}
peerAuthSender, err := createPeerAuthenticationSender(args)
assert.Nil(t, err)
assert.False(t, check.IfNil(peerAuthSender))
assert.Equal(t, "*sender.peerAuthenticationSender", fmt.Sprintf("%T", peerAuthSender))
})
t.Run("not validator with keys managed should create multikey sender", func(t *testing.T) {
t.Parallel()

args := createMockPeerAuthenticationSenderFactoryArgs()
args.nodesCoordinator = &shardingMocks.NodesCoordinatorStub{
GetValidatorWithPublicKeyCalled: func(publicKey []byte) (validator nodesCoordinator.Validator, shardId uint32, err error) {
return nil, 0, errors.New("not validator")
},
}
args.keysHolder = &testscommon.KeysHolderStub{
GetManagedKeysByCurrentNodeCalled: func() map[string]crypto.PrivateKey {
keygen := signing.NewKeyGenerator(&mcl.SuiteBLS12{})
sk1, pk1 := keygen.GeneratePair()
pk1Bytes, err := pk1.ToByteArray()
assert.Nil(t, err)
sk2, pk2 := keygen.GeneratePair()
pk2Bytes, err := pk2.ToByteArray()
assert.Nil(t, err)
keysMap := make(map[string]crypto.PrivateKey)
keysMap[string(pk1Bytes)] = sk1
keysMap[string(pk2Bytes)] = sk2
return keysMap
},
}
peerAuthSender, err := createPeerAuthenticationSender(args)
assert.Nil(t, err)
assert.False(t, check.IfNil(peerAuthSender))
assert.Equal(t, "*sender.multikeyPeerAuthenticationSender", fmt.Sprintf("%T", peerAuthSender))
})

}
39 changes: 19 additions & 20 deletions heartbeat/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sender
import (
"time"

"github.com/ElrondNetwork/covalent-indexer-go/process"
"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/marshal"
crypto "github.com/ElrondNetwork/elrond-go-crypto"
Expand Down Expand Up @@ -33,6 +34,9 @@ type ArgSender struct {
HardforkTrigger heartbeat.HardforkTrigger
HardforkTimeBetweenSends time.Duration
HardforkTriggerPubKey []byte
KeysHolder heartbeat.KeysHolder
PeerAuthenticationTimeBetweenChecks time.Duration
ShardCoordinator process.ShardCoordinator
}

// sender defines the component which sends authentication and heartbeat messages
Expand All @@ -47,7 +51,7 @@ func NewSender(args ArgSender) (*sender, error) {
return nil, err
}

pas, err := newPeerAuthenticationSender(argPeerAuthenticationSender{
pas, err := createPeerAuthenticationSender(argPeerAuthenticationSenderFactory{
argBaseSender: argBaseSender{
messenger: args.Messenger,
marshaller: args.Marshaller,
Expand All @@ -58,11 +62,14 @@ func NewSender(args ArgSender) (*sender, error) {
},
nodesCoordinator: args.NodesCoordinator,
peerSignatureHandler: args.PeerSignatureHandler,
privKey: args.PrivateKey,
redundancyHandler: args.RedundancyHandler,
hardforkTrigger: args.HardforkTrigger,
hardforkTimeBetweenSends: args.HardforkTimeBetweenSends,
hardforkTriggerPubKey: args.HardforkTriggerPubKey,
keysHolder: args.KeysHolder,
timeBetweenChecks: args.PeerAuthenticationTimeBetweenChecks,
shardCoordinator: args.ShardCoordinator,
privKey: args.PrivateKey,
redundancyHandler: args.RedundancyHandler,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -93,24 +100,16 @@ func NewSender(args ArgSender) (*sender, error) {
}

func checkSenderArgs(args ArgSender) error {
pasArg := argPeerAuthenticationSender{
argBaseSender: argBaseSender{
messenger: args.Messenger,
marshaller: args.Marshaller,
topic: args.PeerAuthenticationTopic,
timeBetweenSends: args.PeerAuthenticationTimeBetweenSends,
timeBetweenSendsWhenError: args.PeerAuthenticationTimeBetweenSendsWhenError,
thresholdBetweenSends: args.PeerAuthenticationThresholdBetweenSends,
},
nodesCoordinator: args.NodesCoordinator,
peerSignatureHandler: args.PeerSignatureHandler,
privKey: args.PrivateKey,
redundancyHandler: args.RedundancyHandler,
hardforkTrigger: args.HardforkTrigger,
hardforkTimeBetweenSends: args.HardforkTimeBetweenSends,
hardforkTriggerPubKey: args.HardforkTriggerPubKey,
// Only check base sender args, as further checks are done on constructors, based on the type of sender
baseSenderArgs := argBaseSender{
messenger: args.Messenger,
marshaller: args.Marshaller,
topic: args.PeerAuthenticationTopic,
timeBetweenSends: args.PeerAuthenticationTimeBetweenSends,
timeBetweenSendsWhenError: args.PeerAuthenticationTimeBetweenSendsWhenError,
thresholdBetweenSends: args.PeerAuthenticationThresholdBetweenSends,
}
err := checkPeerAuthenticationSenderArgs(pasArg)
err := checkBaseSenderArgs(baseSenderArgs)
if err != nil {
return err
}
Expand Down
75 changes: 3 additions & 72 deletions heartbeat/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func createMockSenderArgs() ArgSender {
HardforkTrigger: &testscommon.HardforkTriggerStub{},
HardforkTimeBetweenSends: time.Second,
HardforkTriggerPubKey: providedHardforkPubKey,
KeysHolder: nil, // default peer authentication sender
PeerAuthenticationTimeBetweenChecks: time.Second,
ShardCoordinator: createShardCoordinatorInShard(0),
}
}

Expand Down Expand Up @@ -176,78 +179,6 @@ func TestNewSender(t *testing.T) {
assert.Nil(t, sender)
assert.Equal(t, heartbeat.ErrNilCurrentBlockProvider, err)
})
t.Run("nil nodes coordinator should error", func(t *testing.T) {
t.Parallel()

args := createMockSenderArgs()
args.NodesCoordinator = nil
sender, err := NewSender(args)

assert.Nil(t, sender)
assert.Equal(t, heartbeat.ErrNilNodesCoordinator, err)
})
t.Run("nil peer signature handler should error", func(t *testing.T) {
t.Parallel()

args := createMockSenderArgs()
args.PeerSignatureHandler = nil
sender, err := NewSender(args)

assert.Nil(t, sender)
assert.Equal(t, heartbeat.ErrNilPeerSignatureHandler, err)
})
t.Run("nil private key should error", func(t *testing.T) {
t.Parallel()

args := createMockSenderArgs()
args.PrivateKey = nil
sender, err := NewSender(args)

assert.Nil(t, sender)
assert.Equal(t, heartbeat.ErrNilPrivateKey, err)
})
t.Run("nil redundancy handler should error", func(t *testing.T) {
t.Parallel()

args := createMockSenderArgs()
args.RedundancyHandler = nil
sender, err := NewSender(args)

assert.Nil(t, sender)
assert.Equal(t, heartbeat.ErrNilRedundancyHandler, err)
})
t.Run("nil hardfork trigger should error", func(t *testing.T) {
t.Parallel()

args := createMockSenderArgs()
args.HardforkTrigger = nil
sender, err := NewSender(args)

assert.Nil(t, sender)
assert.Equal(t, heartbeat.ErrNilHardforkTrigger, err)
})
t.Run("invalid time between hardforks should error", func(t *testing.T) {
t.Parallel()

args := createMockSenderArgs()
args.HardforkTimeBetweenSends = time.Second - time.Nanosecond
sender, err := NewSender(args)

assert.Nil(t, sender)
assert.True(t, errors.Is(err, heartbeat.ErrInvalidTimeDuration))
assert.True(t, strings.Contains(err.Error(), "hardforkTimeBetweenSends"))
})
t.Run("invalid hardfork pub key should error", func(t *testing.T) {
t.Parallel()

args := createMockSenderArgs()
args.HardforkTriggerPubKey = make([]byte, 0)
sender, err := NewSender(args)

assert.Nil(t, sender)
assert.True(t, errors.Is(err, heartbeat.ErrInvalidValue))
assert.True(t, strings.Contains(err.Error(), "hardfork"))
})
t.Run("should work", func(t *testing.T) {
t.Parallel()

Expand Down