Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heartbeat sender factory #4455

Merged
merged 5 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func startNodeRunner(c *cli.Context, log logger.Logger, version string) error {
log.Debug("initialized memory ballast object", "size", core.ConvertBytes(uint64(len(memoryBallastObject))))
}

cfgs.FlagsConfig.BaseVersion = fmt.Sprintf("%s-base", version)
cfgs.FlagsConfig.Version = version

nodeRunner, errRunner := node.NewNodeRunner(cfgs)
Expand Down
1 change: 1 addition & 0 deletions config/contextFlagsConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ContextFlagsConfig struct {
UseLogView bool
ValidatorKeyIndex int
EnableRestAPIServerDebugMode bool
BaseVersion string
Version string
ForceStartFromNetwork bool
DisableConsensusWatchdog bool
Expand Down
4 changes: 4 additions & 0 deletions factory/heartbeatV2Components.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type ArgHeartbeatV2ComponentsFactory struct {
Config config.Config
Prefs config.Preferences
BaseVersion string
AppVersion string
BoostrapComponents BootstrapComponentsHolder
CoreComponents CoreComponentsHolder
Expand All @@ -33,6 +34,7 @@ type ArgHeartbeatV2ComponentsFactory struct {
type heartbeatV2ComponentsFactory struct {
config config.Config
prefs config.Preferences
baseVersion string
version string
boostrapComponents BootstrapComponentsHolder
coreComponents CoreComponentsHolder
Expand Down Expand Up @@ -60,6 +62,7 @@ func NewHeartbeatV2ComponentsFactory(args ArgHeartbeatV2ComponentsFactory) (*hea
return &heartbeatV2ComponentsFactory{
config: args.Config,
prefs: args.Prefs,
baseVersion: args.BaseVersion,
version: args.AppVersion,
boostrapComponents: args.BoostrapComponents,
coreComponents: args.CoreComponents,
Expand Down Expand Up @@ -143,6 +146,7 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
HeartbeatTimeBetweenSends: time.Second * time.Duration(cfg.HeartbeatTimeBetweenSendsInSec),
HeartbeatTimeBetweenSendsWhenError: time.Second * time.Duration(cfg.HeartbeatTimeBetweenSendsWhenErrorInSec),
HeartbeatThresholdBetweenSends: cfg.HeartbeatThresholdBetweenSends,
BaseVersionNumber: hcf.baseVersion,
VersionNumber: hcf.version,
NodeDisplayName: hcf.prefs.Preferences.NodeDisplayName,
Identity: hcf.prefs.Preferences.Identity,
Expand Down
1 change: 1 addition & 0 deletions factory/heartbeatV2Components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func createMockHeartbeatV2ComponentsFactoryArgs() factory.ArgHeartbeatV2Componen
Identity: "identity",
},
},
BaseVersion: "test-base",
AppVersion: "test",
BoostrapComponents: bootstrapC,
CoreComponents: coreC,
Expand Down
1 change: 1 addition & 0 deletions heartbeat/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ type KeysHolder interface {
// ShardCoordinator defines the operations of a shard coordinator
type ShardCoordinator interface {
SelfId() uint32
ComputeId(address []byte) uint32
IsInterfaceNil() bool
}
10 changes: 5 additions & 5 deletions heartbeat/mock/heartbeatSenderInfoProviderStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import "github.com/ElrondNetwork/elrond-go-core/core"

// HeartbeatSenderInfoProviderStub -
type HeartbeatSenderInfoProviderStub struct {
GetSenderInfoCalled func() (string, core.P2PPeerSubType, error)
GetCurrentNodeTypeCalled func() (string, core.P2PPeerSubType, error)
}

// GetSenderInfo -
func (stub *HeartbeatSenderInfoProviderStub) GetSenderInfo() (string, core.P2PPeerSubType, error) {
if stub.GetSenderInfoCalled != nil {
return stub.GetSenderInfoCalled()
// GetCurrentNodeType -
func (stub *HeartbeatSenderInfoProviderStub) GetCurrentNodeType() (string, core.P2PPeerSubType, error) {
if stub.GetCurrentNodeTypeCalled != nil {
return stub.GetCurrentNodeTypeCalled()
}

return "", 0, nil
Expand Down
30 changes: 30 additions & 0 deletions heartbeat/sender/commonHeartbeatSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ import (
"fmt"
"time"

"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go/common"
"github.com/ElrondNetwork/elrond-go/heartbeat"
)

type commonHeartbeatSender struct {
baseSender
versionNumber string
nodeDisplayName string
identity string
peerSubType core.P2PPeerSubType
currentBlockProvider heartbeat.CurrentBlockProvider
peerTypeProvider heartbeat.PeerTypeProviderHandler
}

func (chs *commonHeartbeatSender) generateMessageBytes(
Expand Down Expand Up @@ -59,3 +66,26 @@ func (chs *commonHeartbeatSender) generateMessageBytes(

return chs.marshaller.Marshal(msg)
}

// GetCurrentNodeType will return the current sender type and subtype
func (chs *commonHeartbeatSender) GetCurrentNodeType() (string, core.P2PPeerSubType, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 less duplicated code

_, pk := chs.getCurrentPrivateAndPublicKeys()
pkBytes, err := pk.ToByteArray()
if err != nil {
return "", 0, err
}

peerType := chs.computePeerList(pkBytes)

return peerType, chs.peerSubType, nil
}

func (chs *commonHeartbeatSender) computePeerList(pubkey []byte) string {
peerType, _, err := chs.peerTypeProvider.ComputeForPubKey(pubkey)
if err != nil {
log.Warn("heartbeatSender: compute peer type", "error", err)
return string(common.ObserverList)
}

return string(peerType)
}
39 changes: 5 additions & 34 deletions heartbeat/sender/heartbeatSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go/common"
"github.com/ElrondNetwork/elrond-go/heartbeat"
)

Expand All @@ -24,11 +23,6 @@ type argHeartbeatSender struct {

type heartbeatSender struct {
commonHeartbeatSender
versionNumber string
nodeDisplayName string
identity string
peerSubType core.P2PPeerSubType
peerTypeProvider heartbeat.PeerTypeProviderHandler
}

// newHeartbeatSender creates a new instance of type heartbeatSender
Expand All @@ -42,12 +36,12 @@ func newHeartbeatSender(args argHeartbeatSender) (*heartbeatSender, error) {
commonHeartbeatSender: commonHeartbeatSender{
baseSender: createBaseSender(args.argBaseSender),
currentBlockProvider: args.currentBlockProvider,
peerTypeProvider: args.peerTypeProvider,
versionNumber: args.versionNumber,
nodeDisplayName: args.nodeDisplayName,
identity: args.identity,
peerSubType: args.peerSubType,
},
versionNumber: args.versionNumber,
nodeDisplayName: args.nodeDisplayName,
identity: args.identity,
peerSubType: args.peerSubType,
peerTypeProvider: args.peerTypeProvider,
}, nil
}

Expand Down Expand Up @@ -109,29 +103,6 @@ func (sender *heartbeatSender) execute() error {
return nil
}

// getSenderInfo will return the current sender info
func (sender *heartbeatSender) getSenderInfo() (string, core.P2PPeerSubType, error) {
_, pk := sender.getCurrentPrivateAndPublicKeys()
pkBytes, err := pk.ToByteArray()
if err != nil {
return "", 0, err
}

peerType := sender.computePeerList(pkBytes)

return peerType, sender.peerSubType, nil
}

func (sender *heartbeatSender) computePeerList(pubkey []byte) string {
peerType, _, err := sender.peerTypeProvider.ComputeForPubKey(pubkey)
if err != nil {
log.Warn("heartbeatSender: compute peer type", "error", err)
return string(common.ObserverList)
}

return string(peerType)
}

// IsInterfaceNil returns true if there is no value under the interface
func (sender *heartbeatSender) IsInterfaceNil() bool {
return sender == nil
Expand Down
104 changes: 104 additions & 0 deletions heartbeat/sender/heartbeatSenderFactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package sender

import (
"fmt"

"github.com/ElrondNetwork/covalent-indexer-go/process"
"github.com/ElrondNetwork/elrond-go-core/core"
crypto "github.com/ElrondNetwork/elrond-go-crypto"
"github.com/ElrondNetwork/elrond-go/heartbeat"
)

type argHeartbeatSenderFactory struct {
argBaseSender
baseVersionNumber string
versionNumber string
nodeDisplayName string
identity string
peerSubType core.P2PPeerSubType
currentBlockProvider heartbeat.CurrentBlockProvider
peerTypeProvider heartbeat.PeerTypeProviderHandler
keysHolder heartbeat.KeysHolder
shardCoordinator process.ShardCoordinator
nodesCoordinator heartbeat.NodesCoordinator
}

func createHeartbeatSender(args argHeartbeatSenderFactory) (heartbeatSenderHandler, error) {
isMultikey, err := isMultikeyMode(args.privKey, args.keysHolder, args.nodesCoordinator)
if err != nil {
return nil, fmt.Errorf("%w while creating heartbeat sender", err)
}

if isMultikey {
return createMultikeyHeartbeatSender(args)
}

return createRegularHeartbeatSender(args)
}

func createRegularHeartbeatSender(args argHeartbeatSenderFactory) (*heartbeatSender, error) {
argsSender := argHeartbeatSender{
argBaseSender: argBaseSender{
messenger: args.messenger,
marshaller: args.marshaller,
topic: args.topic,
timeBetweenSends: args.timeBetweenSends,
timeBetweenSendsWhenError: args.timeBetweenSendsWhenError,
thresholdBetweenSends: args.thresholdBetweenSends,
redundancyHandler: args.redundancyHandler,
privKey: args.privKey,
},
versionNumber: args.versionNumber,
nodeDisplayName: args.nodeDisplayName,
identity: args.identity,
peerSubType: args.peerSubType,
currentBlockProvider: args.currentBlockProvider,
peerTypeProvider: args.peerTypeProvider,
}

return newHeartbeatSender(argsSender)
}

func createMultikeyHeartbeatSender(args argHeartbeatSenderFactory) (*multikeyHeartbeatSender, error) {
argsSender := argMultikeyHeartbeatSender{
argBaseSender: argBaseSender{
messenger: args.messenger,
marshaller: args.marshaller,
topic: args.topic,
timeBetweenSends: args.timeBetweenSends,
timeBetweenSendsWhenError: args.timeBetweenSendsWhenError,
thresholdBetweenSends: args.thresholdBetweenSends,
redundancyHandler: args.redundancyHandler,
privKey: args.privKey,
},
peerTypeProvider: args.peerTypeProvider,
versionNumber: args.versionNumber,
baseVersionNumber: args.baseVersionNumber,
nodeDisplayName: args.nodeDisplayName,
identity: args.identity,
peerSubType: args.peerSubType,
currentBlockProvider: args.currentBlockProvider,
keysHolder: args.keysHolder,
shardCoordinator: args.shardCoordinator,
}

return newMultikeyHeartbeatSender(argsSender)
}

func isMultikeyMode(privKey crypto.PrivateKey, keysHolder heartbeat.KeysHolder, nodesCoordinator heartbeat.NodesCoordinator) (bool, error) {
pk := privKey.GeneratePublic()
pkBytes, err := pk.ToByteArray()
if err != nil {
return false, err
}

keysMap := keysHolder.GetManagedKeysByCurrentNode()
isMultikey := len(keysMap) > 0

_, _, err = nodesCoordinator.GetValidatorWithPublicKey(pkBytes)
if err == nil && isMultikey {
return false, heartbeat.ErrInvalidConfiguration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might return more info here, for easier debugging.
Something like:

return false, fmt.Errorf("%w, len(keysMap) = %d, isValidator = %v", heartbeat.ErrInvalidConfiguration, len(keysMap) > 0, err == nil)

}

return isMultikey, nil
}