Skip to content

Commit

Permalink
* Refactored PeerHonestyHandler interface
Browse files Browse the repository at this point in the history
* Refactored Increase adn Decrease mechanism
* Fixed a blcklisted situation which should not be done
  • Loading branch information
SebastianMarian committed Jun 19, 2020
1 parent e04aeb0 commit aabbad1
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 79 deletions.
4 changes: 2 additions & 2 deletions consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type HeadersPoolSubscriber interface {
// PeerHonestyHandler defines the behaivour of a component able to handle/monitor the peer honesty of nodes which are
// participating in consensus
type PeerHonestyHandler interface {
Increase(round int64, pk string, topic string, value float64)
Decrease(round int64, pk string, topic string, value float64)
Increase(pk string, topic string, value float64)
Decrease(pk string, topic string, value float64)
IsInterfaceNil() bool
}
12 changes: 6 additions & 6 deletions consensus/mock/peerHonestyHandlerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ package mock

// PeerHonestyHandlerStub -
type PeerHonestyHandlerStub struct {
IncreaseCalled func(round int64, pk string, topic string, value float64)
DecreaseCalled func(round int64, pk string, topic string, value float64)
IncreaseCalled func(pk string, topic string, value float64)
DecreaseCalled func(pk string, topic string, value float64)
}

// Increase -
func (phhs *PeerHonestyHandlerStub) Increase(round int64, pk string, topic string, value float64) {
func (phhs *PeerHonestyHandlerStub) Increase(pk string, topic string, value float64) {
if phhs.IncreaseCalled != nil {
phhs.IncreaseCalled(round, pk, topic, value)
phhs.IncreaseCalled(pk, topic, value)
return
}
}

// Decrease -
func (phhs *PeerHonestyHandlerStub) Decrease(round int64, pk string, topic string, value float64) {
func (phhs *PeerHonestyHandlerStub) Decrease(pk string, topic string, value float64) {
if phhs.DecreaseCalled != nil {
phhs.DecreaseCalled(round, pk, topic, value)
phhs.DecreaseCalled(pk, topic, value)
return
}
}
Expand Down
44 changes: 34 additions & 10 deletions consensus/spos/bls/subroundBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,17 +314,20 @@ func (sr *subroundBlock) receivedBlockBodyAndHeader(cnsDta *consensus.Message) b
return false
}

sr.UpdateLeaderPeerHonesty(node)
if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyDecreaseFactor)

if sr.IsBlockBodyAlreadyReceived() {
return false
}

if sr.IsHeaderAlreadyReceived() {
if sr.IsBlockBodyAlreadyReceived() {
return false
}

if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
if sr.IsHeaderAlreadyReceived() {
return false
}

Expand All @@ -348,20 +351,28 @@ func (sr *subroundBlock) receivedBlockBodyAndHeader(cnsDta *consensus.Message) b
blockProcessedWithSuccess := sr.processReceivedBlock(cnsDta)
sw.Stop("processReceivedBlock")

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyIncreaseFactor)

return blockProcessedWithSuccess
}

// receivedBlockBody method is called when a block body is received through the block body channel
func (sr *subroundBlock) receivedBlockBody(cnsDta *consensus.Message) bool {
node := string(cnsDta.PubKey)

sr.UpdateLeaderPeerHonesty(node)
if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyDecreaseFactor)

if sr.IsBlockBodyAlreadyReceived() {
return false
}

if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
if sr.IsBlockBodyAlreadyReceived() {
return false
}

Expand All @@ -379,6 +390,11 @@ func (sr *subroundBlock) receivedBlockBody(cnsDta *consensus.Message) bool {

blockProcessedWithSuccess := sr.processReceivedBlock(cnsDta)

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyIncreaseFactor)

return blockProcessedWithSuccess
}

Expand All @@ -392,13 +408,16 @@ func (sr *subroundBlock) receivedBlockHeader(cnsDta *consensus.Message) bool {
return false
}

sr.UpdateLeaderPeerHonesty(node)
if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyDecreaseFactor)

if sr.IsHeaderAlreadyReceived() {
return false
}

if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
if sr.IsHeaderAlreadyReceived() {
return false
}

Expand All @@ -418,6 +437,11 @@ func (sr *subroundBlock) receivedBlockHeader(cnsDta *consensus.Message) bool {
"hash", cnsDta.BlockHeaderHash)
blockProcessedWithSuccess := sr.processReceivedBlock(cnsDta)

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyIncreaseFactor)

return blockProcessedWithSuccess
}

Expand Down
16 changes: 12 additions & 4 deletions consensus/spos/bls/subroundEndRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,20 @@ func (sr *subroundEndRound) receivedBlockHeaderFinalInfo(cnsDta *consensus.Messa
return false
}

sr.UpdateLeaderPeerHonesty(node)
if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyDecreaseFactor)

if sr.IsSelfLeaderInCurrentRound() {
return false
}

if !sr.IsConsensusDataEqual(cnsDta.BlockHeaderHash) {
if sr.IsSelfLeaderInCurrentRound() {
return false
}

if !sr.IsNodeLeaderInCurrentRound(node) { // is NOT this node leader in current round?
if !sr.IsConsensusDataEqual(cnsDta.BlockHeaderHash) {
return false
}

Expand All @@ -110,6 +113,11 @@ func (sr *subroundEndRound) receivedBlockHeaderFinalInfo(cnsDta *consensus.Messa
"AggregateSignature", cnsDta.AggregateSignature,
"LeaderSignature", cnsDta.LeaderSignature)

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.LeaderPeerHonestyIncreaseFactor)

return sr.doEndRoundJobByParticipant(cnsDta)
}

Expand Down
14 changes: 11 additions & 3 deletions consensus/spos/bls/subroundSignature.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,16 @@ func (sr *subroundSignature) receivedSignature(cnsDta *consensus.Message) bool {
return false
}

sr.UpdateValidatorPeerHonesty(node)
if !sr.IsNodeInConsensusGroup(node) {
sr.PeerHonestyHandler().Decrease(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.ValidatorPeerHonestyDecreaseFactor)

if !sr.IsSelfLeaderInCurrentRound() {
return false
}

if !sr.IsNodeInConsensusGroup(node) {
if !sr.IsSelfLeaderInCurrentRound() {
return false
}

Expand Down Expand Up @@ -175,6 +178,11 @@ func (sr *subroundSignature) receivedSignature(cnsDta *consensus.Message) bool {
return false
}

sr.PeerHonestyHandler().Increase(
node,
spos.GetConsensusTopicID(sr.ShardCoordinator()),
spos.ValidatorPeerHonestyIncreaseFactor)

sr.appStatusHandler.SetStringValue(core.MetricConsensusRoundState, "signed")
return true
}
Expand Down
38 changes: 0 additions & 38 deletions consensus/spos/subround.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,44 +200,6 @@ func (sr *Subround) ConsensusChannel() chan bool {
return sr.consensusStateChangedChannel
}

// UpdateValidatorPeerHonesty will increase or decrease the honesty of the given validator public key
func (sr *Subround) UpdateValidatorPeerHonesty(pk string) {
if !sr.IsNodeInConsensusGroup(pk) {
sr.PeerHonestyHandler().Decrease(
sr.Rounder().Index(),
pk,
GetConsensusTopicID(sr.ShardCoordinator()),
ValidatorPeerHonestyDecreaseFactor)

return
}

sr.PeerHonestyHandler().Increase(
sr.Rounder().Index(),
pk,
GetConsensusTopicID(sr.ShardCoordinator()),
ValidatorPeerHonestyIncreaseFactor)
}

// UpdateLeaderPeerHonesty will increase or decrease the honesty of the given leader public key
func (sr *Subround) UpdateLeaderPeerHonesty(pk string) {
if !sr.IsNodeLeaderInCurrentRound(pk) { // is NOT this node leader in current round?
sr.PeerHonestyHandler().Decrease(
sr.Rounder().Index(),
pk,
GetConsensusTopicID(sr.ShardCoordinator()),
LeaderPeerHonestyDecreaseFactor)

return
}

sr.PeerHonestyHandler().Increase(
sr.Rounder().Index(),
pk,
GetConsensusTopicID(sr.ShardCoordinator()),
LeaderPeerHonestyIncreaseFactor)
}

// IsInterfaceNil returns true if there is no value under the interface
func (sr *Subround) IsInterfaceNil() bool {
return sr == nil
Expand Down
22 changes: 19 additions & 3 deletions consensus/spos/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,11 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP
}

defer func() {
shouldBlacklistPeer := err != nil && !errors.Is(err, ErrMessageForPastRound)
if shouldBlacklistPeer {
if wrk.shouldBlacklistPeer(err) {
//this situation is so severe that we have to black list both the message originator and the connected peer
//that disseminated this message.

reason := "blacklisted due to invalid consensus message"
reason := fmt.Sprintf("blacklisted due to invalid consensus message: %s", err.Error())
wrk.antifloodHandler.BlacklistPeer(message.Peer(), reason, core.InvalidMessageBlacklistDuration)
wrk.antifloodHandler.BlacklistPeer(fromConnectedPeer, reason, core.InvalidMessageBlacklistDuration)
}
Expand Down Expand Up @@ -390,6 +389,23 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP
return nil
}

func (wrk *Worker) shouldBlacklistPeer(err error) bool {
if err == nil {
return false
}

if errors.Is(err, ErrMessageForPastRound) {
return false
}

isNodeSynced := wrk.bootstrapper.GetNodeState() == core.NsSynchronized
if errors.Is(err, ErrNodeIsNotInEligibleList) && !isNodeSynced {
return false
}

return true
}

func (wrk *Worker) doJobOnMessageWithBlockBody(cnsMsg *consensus.Message) {
wrk.addBlockToPool(cnsMsg.GetBody())
}
Expand Down
12 changes: 6 additions & 6 deletions integrationTests/mock/peerHonestyHandlerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ package mock

// PeerHonestyHandlerStub -
type PeerHonestyHandlerStub struct {
IncreaseCalled func(round int64, pk string, topic string, value float64)
DecreaseCalled func(round int64, pk string, topic string, value float64)
IncreaseCalled func(pk string, topic string, value float64)
DecreaseCalled func(pk string, topic string, value float64)
}

// Increase -
func (phhs *PeerHonestyHandlerStub) Increase(round int64, pk string, topic string, value float64) {
func (phhs *PeerHonestyHandlerStub) Increase(pk string, topic string, value float64) {
if phhs.IncreaseCalled != nil {
phhs.IncreaseCalled(round, pk, topic, value)
phhs.IncreaseCalled(pk, topic, value)
return
}
}

// Decrease -
func (phhs *PeerHonestyHandlerStub) Decrease(round int64, pk string, topic string, value float64) {
func (phhs *PeerHonestyHandlerStub) Decrease(pk string, topic string, value float64) {
if phhs.DecreaseCalled != nil {
phhs.DecreaseCalled(round, pk, topic, value)
phhs.DecreaseCalled(pk, topic, value)
return
}
}
Expand Down
13 changes: 6 additions & 7 deletions node/mock/peerHonestyHandlerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,26 @@ var log = logger.GetOrCreate("node/mock")

// PeerHonestyHandlerStub -
type PeerHonestyHandlerStub struct {
IncreaseCalled func(round int64, pk string, topic string, value float64)
DecreaseCalled func(round int64, pk string, topic string, value float64)
IncreaseCalled func(pk string, topic string, value float64)
DecreaseCalled func(pk string, topic string, value float64)
}

// Increase -
func (phhs *PeerHonestyHandlerStub) Increase(round int64, pk string, topic string, value float64) {
func (phhs *PeerHonestyHandlerStub) Increase(pk string, topic string, value float64) {
if phhs.IncreaseCalled != nil {
phhs.IncreaseCalled(round, pk, topic, value)
phhs.IncreaseCalled(pk, topic, value)
return
}
}

// Decrease -
func (phhs *PeerHonestyHandlerStub) Decrease(round int64, pk string, topic string, value float64) {
func (phhs *PeerHonestyHandlerStub) Decrease(pk string, topic string, value float64) {
if phhs.DecreaseCalled != nil {
phhs.DecreaseCalled(round, pk, topic, value)
phhs.DecreaseCalled(pk, topic, value)
return
}

log.Warn("PeerHonestyHandlerStub.Decrease",
"round", round,
"topic", topic,
"pk", core.GetTrimmedPk(hex.EncodeToString([]byte(pk))),
"value", value)
Expand Down

0 comments on commit aabbad1

Please sign in to comment.