Skip to content

Commit

Permalink
fix: boosting syncing process (#482)
Browse files Browse the repository at this point in the history
* fix: boosting syncing process

* chore: decreasing SessionTimeout to 10 seconds

* test: updating tests

* test: adding more tests for peerset

* chore: fixing linting issues

* test: fixing a broken test

* test: fixing review comments
  • Loading branch information
b00f committed May 28, 2023
1 parent efdf5e4 commit b087fe1
Show file tree
Hide file tree
Showing 32 changed files with 974 additions and 201 deletions.
4 changes: 2 additions & 2 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
## heartbeat_timer = "5s"

# `session_timeout` is a timeout for a session to be opened.
# Default is 30 seconds
## session_timeout = "30s"
# Default is 10 seconds
## session_timeout = "10s"

# `max_open_sessions` is the maximum number of open sessions.
# Default is 8
Expand Down
4 changes: 4 additions & 0 deletions network/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type MockNetwork struct {
EventCh chan Event
ID peer.ID
OtherNets []*MockNetwork
SendError error
}

func MockingNetwork(id peer.ID) *MockNetwork {
Expand Down Expand Up @@ -50,6 +51,9 @@ func (mock *MockNetwork) SelfID() peer.ID {
return mock.ID
}
func (mock *MockNetwork) SendTo(data []byte, pid lp2pcore.PeerID) error {
if mock.SendError != nil {
return mock.SendError
}
mock.BroadcastCh <- BroadcastData{
Data: data,
Target: &pid,
Expand Down
3 changes: 3 additions & 0 deletions network/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (s *streamService) SendRequest(msg []byte, pid lp2peer.ID) error {
lp2pnetwork.WithNoDial(s.ctx, "should already have connection"), pid, s.protocolID)
if err != nil {
s.logger.Debug("unable to open direct stream", "pid", pid, "err", err)
if len(s.relayAddrs) == 0 {
return err
}

// We don't have a direct connection to the destination node,
// so we try to connect via a relay node.
Expand Down
18 changes: 11 additions & 7 deletions sync/bundle/message/blocks_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,27 @@ import (
type BlocksRequestMessage struct {
SessionID int `cbor:"1,keyasint"`
From uint32 `cbor:"2,keyasint"`
To uint32 `cbor:"3,keyasint"`
Count uint32 `cbor:"3,keyasint"`
}

func NewBlocksRequestMessage(sid int, from, to uint32) *BlocksRequestMessage {
func NewBlocksRequestMessage(sid int, from, count uint32) *BlocksRequestMessage {
return &BlocksRequestMessage{
SessionID: sid,
From: from,
To: to,
Count: count,
}
}

func (m *BlocksRequestMessage) To() uint32 {
return m.From + m.Count - 1
}

func (m *BlocksRequestMessage) SanityCheck() error {
if m.From == 0 {
return errors.Errorf(errors.ErrInvalidHeight, "invalid height")
return errors.Errorf(errors.ErrInvalidHeight, "height is zero")
}
if m.From > m.To {
return errors.Errorf(errors.ErrInvalidHeight, "invalid range")
if m.Count == 0 {
return errors.Errorf(errors.ErrInvalidMessage, "count is zero")
}
return nil
}
Expand All @@ -35,5 +39,5 @@ func (m *BlocksRequestMessage) Type() Type {
}

func (m *BlocksRequestMessage) Fingerprint() string {
return fmt.Sprintf("{⚓ %d %v:%v}", m.SessionID, m.From, m.To)
return fmt.Sprintf("{⚓ %d %v:%v}", m.SessionID, m.From, m.To())
}
9 changes: 5 additions & 4 deletions sync/bundle/message/blocks_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ func TestBlocksRequestMessage(t *testing.T) {

assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidHeight)
})
t.Run("Invalid range", func(t *testing.T) {
m := NewBlocksRequestMessage(1, 200, 100)
t.Run("Invalid count", func(t *testing.T) {
m := NewBlocksRequestMessage(1, 200, 0)

assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidHeight)
assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidMessage)
})

t.Run("OK", func(t *testing.T) {
m := NewBlocksRequestMessage(1, 100, 200)
m := NewBlocksRequestMessage(1, 100, 7)

assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.To(), uint32(106))
assert.Contains(t, m.Fingerprint(), "100")
})
}
18 changes: 17 additions & 1 deletion sync/bundle/message/blocks_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/errors"
)

const LatestBlocksResponseCodeOK = 0
Expand Down Expand Up @@ -33,6 +34,9 @@ func (m *BlocksResponseMessage) SanityCheck() error {
return err
}
}
if m.From == 0 && len(m.Blocks) != 0 {
return errors.Errorf(errors.ErrInvalidHeight, "unexpected block for height zero")
}
if m.LastCertificate != nil {
if err := m.LastCertificate.SanityCheck(); err != nil {
return err
Expand All @@ -46,11 +50,23 @@ func (m *BlocksResponseMessage) Type() Type {
return MessageTypeBlocksResponse
}

func (m *BlocksResponseMessage) Count() uint32 {
return uint32(len(m.Blocks))
}

func (m *BlocksResponseMessage) To() uint32 {
// response message without any block
if len(m.Blocks) == 0 {
return 0
}
return m.From + m.Count() - 1
}

func (m *BlocksResponseMessage) LastCertificateHeight() uint32 {
if m.LastCertificate != nil {
return m.From
}
return m.From + uint32(len(m.Blocks)-1)
return 0
}

func (m *BlocksResponseMessage) Fingerprint() string {
Expand Down
39 changes: 34 additions & 5 deletions sync/bundle/message/blocks_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package message
import (
"testing"

"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/errors"
"github.com/stretchr/testify/assert"
Expand All @@ -14,20 +15,29 @@ func TestLatestBlocksResponseType(t *testing.T) {
}

func TestBlocksResponseMessage(t *testing.T) {
sid := 123
t.Run("Invalid certificate", func(t *testing.T) {
b := block.GenerateTestBlock(nil, nil)
c := block.NewCertificate(-1, nil, nil, nil)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, 1, 100, []*block.Block{b}, c)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, sid, 100, []*block.Block{b}, c)

assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidRound)
})

t.Run("Unexpected block for height zero", func(t *testing.T) {
b := block.GenerateTestBlock(nil, nil)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, sid, 0, []*block.Block{b}, nil)

assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidHeight)
})

t.Run("OK", func(t *testing.T) {
b1 := block.GenerateTestBlock(nil, nil)
b2 := block.GenerateTestBlock(nil, nil)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, 1, 100, []*block.Block{b1, b2}, nil)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, sid, 100, []*block.Block{b1, b2}, nil)

assert.NoError(t, m.SanityCheck())
assert.Zero(t, m.LastCertificateHeight())
assert.Contains(t, m.Fingerprint(), "100")
})
}
Expand All @@ -37,25 +47,44 @@ func TestLatestBlocksResponseCode(t *testing.T) {
m := NewBlocksResponseMessage(ResponseCodeBusy, 1, 0, nil, nil)

assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.To(), uint32(0))
assert.Zero(t, m.From)
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.True(t, m.IsRequestRejected())
})

t.Run("rejected", func(t *testing.T) {
m := NewBlocksResponseMessage(ResponseCodeRejected, 1, 0, nil, nil)

assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.To(), uint32(0))
assert.Zero(t, m.From)
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.True(t, m.IsRequestRejected())
})

t.Run("OK", func(t *testing.T) {
t.Run("OK - MoreBlocks", func(t *testing.T) {
b1 := block.GenerateTestBlock(nil, nil)
b2 := block.GenerateTestBlock(nil, nil)

m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, 1, 100, []*block.Block{b1, b2}, nil)
assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.From, uint32(100))
assert.Equal(t, m.To(), uint32(101))
assert.Equal(t, m.Count(), uint32(2))
assert.Zero(t, m.LastCertificateHeight())
assert.False(t, m.IsRequestRejected())
})

t.Run("OK - Synced", func(t *testing.T) {
cert := block.GenerateTestCertificate(hash.GenerateTestHash())

m := NewBlocksResponseMessage(ResponseCodeSynced, 1, 100, nil, cert)
assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.From, uint32(100))
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.Equal(t, m.LastCertificateHeight(), uint32(100))
assert.False(t, m.IsRequestRejected())
})
}
2 changes: 1 addition & 1 deletion sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Config struct {
func DefaultConfig() *Config {
return &Config{
HeartBeatTimer: time.Second * 5,
SessionTimeout: time.Second * 30,
SessionTimeout: time.Second * 10,
NodeNetwork: true,
BlockPerMessage: 60,
MaxOpenSessions: 8,
Expand Down
20 changes: 18 additions & 2 deletions sync/firewall/firewall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ func TestInvalidBundlesCounter(t *testing.T) {
}

func TestGossipMessage(t *testing.T) {
t.Run("Message source: unknown, from: unknown => should NOT close the connection", func(t *testing.T) {
setup(t)

bdl := bundle.NewBundle(tUnknownPeerID, message.NewQueryProposalMessage(100, 1))
bdl.Flags = util.SetFlag(bdl.Flags, bundle.BundleFlagNetworkTestnet)
d, _ := bdl.Encode()

assert.False(t, tFirewall.isPeerBanned(tUnknownPeerID))
assert.False(t, tNetwork.IsClosed(tUnknownPeerID))
// TODO: should only accepts hello from unknown peers?
assert.NotNil(t, tFirewall.OpenGossipBundle(d, tUnknownPeerID, tUnknownPeerID))
assert.False(t, tNetwork.IsClosed(tUnknownPeerID))
})

t.Run("Message source: unknown, from: bad => should close the connection", func(t *testing.T) {
setup(t)

Expand Down Expand Up @@ -160,8 +174,10 @@ func TestUpdateLastSeen(t *testing.T) {
now := time.Now().UnixNano()
assert.Nil(t, tFirewall.OpenGossipBundle(d, tUnknownPeerID, tGoodPeerID))

assert.GreaterOrEqual(t, tFirewall.peerSet.GetPeer(tUnknownPeerID).LastSeen.UnixNano(), now)
assert.GreaterOrEqual(t, tFirewall.peerSet.GetPeer(tGoodPeerID).LastSeen.UnixNano(), now)
peerUnknown := tFirewall.peerSet.GetPeer(tUnknownPeerID)
peerGood := tFirewall.peerSet.GetPeer(tGoodPeerID)
assert.GreaterOrEqual(t, peerUnknown.LastSeen.UnixNano(), now)
assert.GreaterOrEqual(t, peerGood.LastSeen.UnixNano(), now)
}

func TestNetworkFlags(t *testing.T) {
Expand Down
8 changes: 3 additions & 5 deletions sync/handler_block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,18 @@ import (
func TestParsingBlockAnnounceMessages(t *testing.T) {
setup(t)

lastBlockHash := tState.LastBlockHash()
lastBlockHeight := tState.LastBlockHeight()
b1 := block.GenerateTestBlock(nil, &lastBlockHash)
b1 := block.GenerateTestBlock(nil, nil)
c1 := block.GenerateTestCertificate(b1.Hash())
b1Hash := b1.Hash()
b2 := block.GenerateTestBlock(nil, &b1Hash)
b2 := block.GenerateTestBlock(nil, nil)
c2 := block.GenerateTestCertificate(b2.Hash())

pid := network.TestRandomPeerID()
msg1 := message.NewBlockAnnounceMessage(lastBlockHeight+1, b1, c1)
msg2 := message.NewBlockAnnounceMessage(lastBlockHeight+2, b2, c2)

pub, _ := bls.GenerateTestKeyPair()
testAddPeer(t, pub, pid)
testAddPeer(t, pub, pid, false)

t.Run("Receiving new block announce message, without committing previous block", func(t *testing.T) {
assert.NoError(t, testReceivingNewMessage(tSync, msg2, pid))
Expand Down
38 changes: 25 additions & 13 deletions sync/handler_blocks_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pactus-project/pactus/sync/bundle"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/util"
"github.com/pactus-project/pactus/util/errors"
)

Expand All @@ -25,7 +26,7 @@ func (handler *blocksRequestHandler) ParsMessage(m message.Message, initiator pe
handler.logger.Warn("we are busy", "message", msg, "pid", initiator)
response := message.NewBlocksResponseMessage(message.ResponseCodeBusy,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)

return nil
}
Expand All @@ -34,7 +35,7 @@ func (handler *blocksRequestHandler) ParsMessage(m message.Message, initiator pe
if !peer.IsKnownOrTrusty() {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)

return errors.Errorf(errors.ErrInvalidMessage, "peer status is %v", peer.Status)
}
Expand All @@ -56,43 +57,54 @@ func (handler *blocksRequestHandler) ParsMessage(m message.Message, initiator pe
if msg.From < ourHeight-LatestBlockInterval {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)

return errors.Errorf(errors.ErrInvalidMessage, "the request height is not acceptable: %v", msg.From)
}
}
height := msg.From
count := handler.config.BlockPerMessage
count := msg.Count

// Help peer to catch up
if count > LatestBlockInterval {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator, msg.SessionID)

return errors.Errorf(errors.ErrInvalidMessage, "too many blocks requested: %v-%v", msg.From, msg.Count)
}

// Help this peer to sync up
for {
blocks := handler.prepareBlocks(height, count)
blockToRead := util.MinU32(handler.config.BlockPerMessage, count)
blocks := handler.prepareBlocks(height, blockToRead)
if len(blocks) == 0 {
break
}

response := message.NewBlocksResponseMessage(message.ResponseCodeMoreBlocks,
msg.SessionID, height, blocks, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)

height += uint32(len(blocks))
if height >= msg.To {
count -= uint32(len(blocks))
if count <= 0 {
break
}
}
// To avoid sending blocks again, we update height for this peer
// Height is always greater than zeo.
handler.peerSet.UpdateHeight(initiator, height-1)
peerHeight := height - 1
handler.peerSet.UpdateHeight(initiator, peerHeight)

if msg.To >= handler.state.LastBlockHeight() {
if msg.To() >= handler.state.LastBlockHeight() {
lastCertificate := handler.state.LastCertificate()
response := message.NewBlocksResponseMessage(message.ResponseCodeSynced,
msg.SessionID, handler.state.LastBlockHeight(), nil, lastCertificate)
handler.sendTo(response, initiator)
msg.SessionID, peerHeight, nil, lastCertificate)
handler.sendTo(response, initiator, msg.SessionID)
} else {
response := message.NewBlocksResponseMessage(message.ResponseCodeNoMoreBlocks,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)
}

return nil
Expand Down

0 comments on commit b087fe1

Please sign in to comment.