Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feedback for peer honesty in consensus #1979

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/node"
"github.com/ElrondNetwork/elrond-go/node/external"
"github.com/ElrondNetwork/elrond-go/node/mock"
"github.com/ElrondNetwork/elrond-go/node/nodeDebugFactory"
"github.com/ElrondNetwork/elrond-go/ntp"
"github.com/ElrondNetwork/elrond-go/process"
Expand Down Expand Up @@ -1201,6 +1202,10 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {
elasticIndexer.SetTxLogsProcessor(processComponents.TxLogsProcessor)
processComponents.TxLogsProcessor.EnableLogToBeSavedInCache()
}

//TODO: This should be set with a real instance which implements PeerHonestyHandler interface
peerHonestyHandler := &mock.PeerHonestyHandlerStub{}

log.Trace("creating node structure")
currentNode, err := createNode(
generalConfig,
Expand Down Expand Up @@ -1228,6 +1233,7 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {
whiteListerVerifiedTxs,
chanStopNodeProcess,
hardForkTrigger,
peerHonestyHandler,
)
if err != nil {
return err
Expand Down Expand Up @@ -1993,6 +1999,7 @@ func createNode(
whiteListerVerifiedTxs process.WhiteListHandler,
chanStopNodeProcess chan endProcess.ArgEndProcess,
hardForkTrigger node.HardforkTrigger,
peerHonestyHandler consensus.PeerHonestyHandler,
) (*node.Node, error) {
var err error
var consensusGroupSize uint32
Expand Down Expand Up @@ -2088,6 +2095,7 @@ func createNode(
node.WithPublicKeySize(config.ValidatorPubkeyConverter.Length),
node.WithNodeStopChannel(chanStopNodeProcess),
node.WithApiTransactionByHashThrottler(apiTxsByHashThrottler),
node.WithPeerHonestyHandler(peerHonestyHandler),
)
if err != nil {
return nil, errors.New("error creating node: " + err.Error())
Expand Down
8 changes: 8 additions & 0 deletions consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,11 @@ type HeadersPoolSubscriber interface {
RegisterHandler(handler func(headerHandler data.HeaderHandler, headerHash []byte))
IsInterfaceNil() bool
}

// PeerHonestyHandler defines the behaivour of a component able to handle/monitor the peer honesty of nodes which are
// participating in consensus
type PeerHonestyHandler interface {
Increase(pk string, topic string, value float64)
Decrease(pk string, topic string, value float64)
IsInterfaceNil() bool
}
6 changes: 6 additions & 0 deletions consensus/mock/consensusDataContainerMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ConsensusCoreMock struct {
validatorGroupSelector sharding.NodesCoordinator
epochStartNotifier epochStart.RegistrationHandler
antifloodHandler consensus.P2PAntifloodHandler
peerHonestyHandler consensus.PeerHonestyHandler
}

// GetAntiFloodHandler -
Expand Down Expand Up @@ -183,6 +184,11 @@ func (ccm *ConsensusCoreMock) SingleSigner() crypto.SingleSigner {
return ccm.blsSingleSigner
}

// PeerHonestyHandler -
func (ccm *ConsensusCoreMock) PeerHonestyHandler() consensus.PeerHonestyHandler {
return ccm.peerHonestyHandler
}

// IsInterfaceNil returns true if there is no value under the interface
func (ccm *ConsensusCoreMock) IsInterfaceNil() bool {
return ccm == nil
Expand Down
2 changes: 2 additions & 0 deletions consensus/mock/mockTestInitializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func InitConsensusCore() *ConsensusCoreMock {
epochStartSubscriber := &EpochStartNotifierStub{}
antifloodHandler := &P2PAntifloodHandlerStub{}
headerPoolSubscriber := &HeadersCacherStub{}
peerHonestyHandler := &PeerHonestyHandlerStub{}

container := &ConsensusCoreMock{
blockChain: blockChain,
Expand All @@ -149,6 +150,7 @@ func InitConsensusCore() *ConsensusCoreMock {
validatorGroupSelector: validatorGroupSelector,
epochStartNotifier: epochStartSubscriber,
antifloodHandler: antifloodHandler,
peerHonestyHandler: peerHonestyHandler,
}

return container
Expand Down
28 changes: 28 additions & 0 deletions consensus/mock/peerHonestyHandlerStub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package mock

// PeerHonestyHandlerStub -
type PeerHonestyHandlerStub struct {
IncreaseCalled func(pk string, topic string, value float64)
DecreaseCalled func(pk string, topic string, value float64)
}

// Increase -
func (phhs *PeerHonestyHandlerStub) Increase(pk string, topic string, value float64) {
if phhs.IncreaseCalled != nil {
phhs.IncreaseCalled(pk, topic, value)
return
}
}

// Decrease -
func (phhs *PeerHonestyHandlerStub) Decrease(pk string, topic string, value float64) {
if phhs.DecreaseCalled != nil {
phhs.DecreaseCalled(pk, topic, value)
return
}
}

// IsInterfaceNil -
func (phhs *PeerHonestyHandlerStub) IsInterfaceNil() bool {
return phhs == nil
}
44 changes: 37 additions & 7 deletions consensus/spos/bls/subroundBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,20 @@ func (sr *subroundBlock) receivedBlockBodyAndHeader(cnsDta *consensus.Message) b
return false
}

if sr.IsBlockBodyAlreadyReceived() {
if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyDecreaseFactor)

return false
}

if sr.IsHeaderAlreadyReceived() {
if sr.IsBlockBodyAlreadyReceived() {
return false
}

if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
if sr.IsHeaderAlreadyReceived() {
return false
}

Expand All @@ -346,18 +351,28 @@ func (sr *subroundBlock) receivedBlockBodyAndHeader(cnsDta *consensus.Message) b
blockProcessedWithSuccess := sr.processReceivedBlock(cnsDta)
sw.Stop("processReceivedBlock")

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyIncreaseFactor)

return blockProcessedWithSuccess
}

// receivedBlockBody method is called when a block body is received through the block body channel
func (sr *subroundBlock) receivedBlockBody(cnsDta *consensus.Message) bool {
node := string(cnsDta.PubKey)

if sr.IsBlockBodyAlreadyReceived() {
if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
sasurobert marked this conversation as resolved.
Show resolved Hide resolved
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyDecreaseFactor)

return false
}

if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
if sr.IsBlockBodyAlreadyReceived() {
return false
}

Expand All @@ -375,6 +390,11 @@ func (sr *subroundBlock) receivedBlockBody(cnsDta *consensus.Message) bool {

blockProcessedWithSuccess := sr.processReceivedBlock(cnsDta)

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyIncreaseFactor)

return blockProcessedWithSuccess
}

Expand All @@ -388,11 +408,16 @@ func (sr *subroundBlock) receivedBlockHeader(cnsDta *consensus.Message) bool {
return false
}

if sr.IsHeaderAlreadyReceived() {
if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
sasurobert marked this conversation as resolved.
Show resolved Hide resolved
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyDecreaseFactor)

return false
}

if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
if sr.IsHeaderAlreadyReceived() {
return false
}

Expand All @@ -412,6 +437,11 @@ func (sr *subroundBlock) receivedBlockHeader(cnsDta *consensus.Message) bool {
"hash", cnsDta.BlockHeaderHash)
blockProcessedWithSuccess := sr.processReceivedBlock(cnsDta)

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyIncreaseFactor)

return blockProcessedWithSuccess
}

Expand Down
20 changes: 15 additions & 5 deletions consensus/spos/bls/subroundEndRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,26 @@ func checkNewSubroundEndRoundParams(

// receivedBlockHeaderFinalInfo method is called when a block header final info is received
func (sr *subroundEndRound) receivedBlockHeaderFinalInfo(cnsDta *consensus.Message) bool {
if sr.IsSelfLeaderInCurrentRound() {
node := string(cnsDta.PubKey)

if !sr.IsConsensusDataSet() {
return false
}

node := string(cnsDta.PubKey)
if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyDecreaseFactor)

if !sr.IsConsensusDataSet() {
return false
}

if !sr.IsConsensusDataEqual(cnsDta.BlockHeaderHash) {
if sr.IsSelfLeaderInCurrentRound() {
return false
}

if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
if !sr.IsConsensusDataEqual(cnsDta.BlockHeaderHash) {
return false
}

Expand All @@ -108,6 +113,11 @@ func (sr *subroundEndRound) receivedBlockHeaderFinalInfo(cnsDta *consensus.Messa
"AggregateSignature", cnsDta.AggregateSignature,
"LeaderSignature", cnsDta.LeaderSignature)

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyIncreaseFactor)

return sr.doEndRoundJobByParticipant(cnsDta)
}

Expand Down
20 changes: 17 additions & 3 deletions consensus/spos/bls/subroundSignature.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,22 @@ func (sr *subroundSignature) doSignatureJob() bool {
// If the signature is valid, than the jobDone map corresponding to the node which sent it,
// is set on true for the subround Signature
func (sr *subroundSignature) receivedSignature(cnsDta *consensus.Message) bool {
if !sr.IsSelfLeaderInCurrentRound() {
node := string(cnsDta.PubKey)

if !sr.IsConsensusDataSet() {
return false
}

node := string(cnsDta.PubKey)
if !sr.IsNodeInConsensusGroup(node) {
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.ValidatorPeerHonestyDecreaseFactor)

if !sr.IsConsensusDataSet() {
return false
}

if !sr.IsSelfLeaderInCurrentRound() {
return false
}

Expand Down Expand Up @@ -169,6 +178,11 @@ func (sr *subroundSignature) receivedSignature(cnsDta *consensus.Message) bool {
return false
}

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.ValidatorPeerHonestyIncreaseFactor)

sr.appStatusHandler.SetStringValue(core.MetricConsensusRoundState, "signed")
return true
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/bls/subroundStartRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (sr *subroundStartRound) doStartRoundJob() bool {
sr.ResetConsensusState()
sr.RoundIndex = sr.Rounder().Index()
sr.RoundTimeStamp = sr.Rounder().TimeStamp()
topic := spos.GetConsensusTopicIDFromShardCoordinator(sr.ShardCoordinator())
topic := spos.GetConsensusTopicID(sr.ShardCoordinator())
sr.GetAntiFloodHandler().ResetForTopic(topic)
return true
}
Expand Down
7 changes: 3 additions & 4 deletions consensus/spos/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import (
"github.com/ElrondNetwork/elrond-go/sharding"
)

// GetConsensusTopicIDFromShardCoordinator will construct and return the topic ID based on shard coordinator
func GetConsensusTopicIDFromShardCoordinator(shardCoord sharding.Coordinator) string {
return core.ConsensusTopic +
shardCoord.CommunicationIdentifier(shardCoord.SelfId())
// GetConsensusTopicID will construct and return the topic ID based on shard coordinator
func GetConsensusTopicID(shardCoordinator sharding.Coordinator) string {
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
return core.ConsensusTopic + shardCoordinator.CommunicationIdentifier(shardCoordinator.SelfId())
}
8 changes: 8 additions & 0 deletions consensus/spos/consensusCore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ConsensusCore struct {
syncTimer ntp.SyncTimer
epochStartRegistrationHandler epochStart.RegistrationHandler
antifloodHandler consensus.P2PAntifloodHandler
peerHonestyHandler consensus.PeerHonestyHandler
}

// ConsensusCoreArgs store all arguments that are needed to create a ConsensusCore object
Expand All @@ -51,6 +52,7 @@ type ConsensusCoreArgs struct {
SyncTimer ntp.SyncTimer
EpochStartRegistrationHandler epochStart.RegistrationHandler
AntifloodHandler consensus.P2PAntifloodHandler
PeerHonestyHandler consensus.PeerHonestyHandler
}

// NewConsensusCore creates a new ConsensusCore instance
Expand All @@ -74,6 +76,7 @@ func NewConsensusCore(
syncTimer: args.SyncTimer,
epochStartRegistrationHandler: args.EpochStartRegistrationHandler,
antifloodHandler: args.AntifloodHandler,
peerHonestyHandler: args.PeerHonestyHandler,
}

err := ValidateConsensusCore(consensusCore)
Expand Down Expand Up @@ -164,6 +167,11 @@ func (cc *ConsensusCore) SingleSigner() crypto.SingleSigner {
return cc.blsSingleSigner
}

// PeerHonestyHandler will return the peer honesty handler which will be used in subrounds
func (cc *ConsensusCore) PeerHonestyHandler() consensus.PeerHonestyHandler {
return cc.peerHonestyHandler
}

// IsInterfaceNil returns true if there is no value under the interface
func (cc *ConsensusCore) IsInterfaceNil() bool {
return cc == nil
Expand Down
3 changes: 3 additions & 0 deletions consensus/spos/consensusCoreValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func ValidateConsensusCore(container ConsensusCoreHandler) error {
if check.IfNil(container.GetAntiFloodHandler()) {
return ErrNilAntifloodHandler
}
if check.IfNil(container.PeerHonestyHandler()) {
return ErrNilPeerHonestyHandler
}

return nil
}
17 changes: 16 additions & 1 deletion consensus/spos/consensusCore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func createDefaultConsensusCoreArgs() *spos.ConsensusCoreArgs {
NodesCoordinator: consensusCoreMock.NodesCoordinator(),
SyncTimer: consensusCoreMock.SyncTimer(),
EpochStartRegistrationHandler: consensusCoreMock.EpochStartRegistrationHandler(),
AntifloodHandler: consensusCoreMock.GetAntiFloodHandler(),
AntifloodHandler: consensusCoreMock.GetAntiFloodHandler(),
PeerHonestyHandler: consensusCoreMock.PeerHonestyHandler(),
}
return args
}
Expand Down Expand Up @@ -240,6 +241,20 @@ func TestConsensusCore_WithNilAntifloodHandlerShouldFail(t *testing.T) {
assert.Equal(t, spos.ErrNilAntifloodHandler, err)
}

func TestConsensusCore_WithNilPeerHonestyHandlerShouldFail(t *testing.T) {
t.Parallel()

args := createDefaultConsensusCoreArgs()
args.PeerHonestyHandler = nil

consensusCore, err := spos.NewConsensusCore(
args,
)

assert.Nil(t, consensusCore)
assert.Equal(t, spos.ErrNilPeerHonestyHandler, err)
}

func TestConsensusCore_CreateConsensusCoreShouldWork(t *testing.T) {
t.Parallel()

Expand Down