Skip to content

Commit

Permalink
fix: increase failed counter when stream got error (#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f committed May 29, 2023
1 parent b087fe1 commit 570c7b5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
38 changes: 20 additions & 18 deletions consensus/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,6 @@ func TestManager(t *testing.T) {

assert.False(t, mgr.HasActiveInstance())

mgr.MoveToNewHeight()

checkHeightRoundWait(t, consA, 1, 0)
checkHeightRoundWait(t, consB, 1, 0)
checkHeightRoundWait(t, consC, 1, 0)
checkHeightRoundWait(t, consD, 1, 0)
checkHeightRoundWait(t, consE, 1, 0)

assert.True(t, mgr.HasActiveInstance())

t.Run("Check if keys are assigned properly", func(t *testing.T) {
instances := mgr.Instances()

Expand All @@ -87,13 +77,16 @@ func TestManager(t *testing.T) {
assert.Equal(t, signers[4].PublicKey(), instances[4].SignerKey())
})

t.Run("Check if one instance publishes a proposal, the other instances receive it", func(t *testing.T) {
p := shouldPublishProposal(t, consA, 1, 0)
t.Run("Check if all instances move to new height", func(t *testing.T) {
mgr.MoveToNewHeight()

assert.Equal(t, consA.RoundProposal(0), p)
assert.Equal(t, consB.RoundProposal(0), p)
assert.Nil(t, consC.RoundProposal(0))
assert.Nil(t, consD.RoundProposal(0))
checkHeightRoundWait(t, consA, 1, 0)
checkHeightRoundWait(t, consB, 1, 0)
checkHeightRoundWait(t, consC, 1, 0)
checkHeightRoundWait(t, consD, 1, 0)
checkHeightRoundWait(t, consE, 1, 0)

assert.True(t, mgr.HasActiveInstance())
})

t.Run("Testing add vote", func(t *testing.T) {
Expand All @@ -109,8 +102,8 @@ func TestManager(t *testing.T) {
})

t.Run("Testing set proposal", func(t *testing.T) {
blk, _ := state.ProposeBlock(committeeSigners[2], committeeSigners[2].Address(), 2)
p := proposal.NewProposal(1, 2, blk)
b, _ := state.ProposeBlock(committeeSigners[2], committeeSigners[2].Address(), 2)
p := proposal.NewProposal(1, 2, b)
committeeSigners[2].SignMsg(p)

mgr.SetProposal(p)
Expand All @@ -121,6 +114,15 @@ func TestManager(t *testing.T) {
assert.Nil(t, consD.RoundProposal(2))
})

t.Run("Check if one instance publishes a proposal, the other instances receive it", func(t *testing.T) {
p := shouldPublishProposal(t, consA, 1, 0)

assert.Equal(t, consA.RoundProposal(0), p)
assert.Equal(t, consB.RoundProposal(0), p)
assert.Nil(t, consC.RoundProposal(0))
assert.Nil(t, consD.RoundProposal(0))
})

t.Run("Testing moving to the next round proposal", func(t *testing.T) {
v3 := vote.NewVote(vote.VoteTypeChangeProposer, 1, 0, hash.UndefHash, committeeSigners[2].Address())
committeeSigners[2].SignMsg(v3)
Expand Down
4 changes: 4 additions & 0 deletions sync/firewall/firewall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ func TestGossipMessage(t *testing.T) {
bdl.Flags = util.SetFlag(bdl.Flags, bundle.BundleFlagNetworkTestnet)
d, _ := bdl.Encode()

assert.False(t, tNetwork.IsClosed(tUnknownPeerID))
assert.False(t, tNetwork.IsClosed(tBadPeerID))
assert.Nil(t, tFirewall.OpenGossipBundle(d, tUnknownPeerID, tBadPeerID))
assert.False(t, tNetwork.IsClosed(tUnknownPeerID))
assert.True(t, tNetwork.IsClosed(tBadPeerID))
})

Expand All @@ -99,8 +101,10 @@ func TestGossipMessage(t *testing.T) {
d, _ := bdl.Encode()

assert.False(t, tNetwork.IsClosed(tBadPeerID))
assert.False(t, tNetwork.IsClosed(tUnknownPeerID))
assert.Nil(t, tFirewall.OpenGossipBundle(d, tBadPeerID, tUnknownPeerID))
assert.True(t, tNetwork.IsClosed(tBadPeerID))
assert.True(t, tNetwork.IsClosed(tUnknownPeerID))
})

t.Run("Message initiator is not the same as source => should close the connection", func(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func (sync *synchronizer) receiveLoop() {
se := e.(*network.StreamMessage)
bdl = sync.firewall.OpenStreamBundle(se.Reader, se.Source)
if err := se.Reader.Close(); err != nil {
// TODO: write test for me
sync.peerSet.IncreaseSendFailedCounter(se.Source)
sync.logger.Warn("error on closing stream", "err", err)
}
}
Expand Down Expand Up @@ -306,7 +308,7 @@ func (sync *synchronizer) sendTo(msg message.Message, to peer.ID, sessionID int)
data, _ := bdl.Encode()
err := sync.network.SendTo(data, to)
if err != nil {
sync.logger.Error("error on sending bundle", "bundle", bdl, "err", err, "to", to)
sync.logger.Warn("error on sending bundle", "bundle", bdl, "err", err, "to", to)
sync.peerSet.IncreaseSendFailedCounter(to)

// Let's close the session with this peer because we couldn't establish a connection.
Expand Down

0 comments on commit 570c7b5

Please sign in to comment.