Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: boosting syncing process #482

Merged
merged 7 commits into from
May 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
## heartbeat_timer = "5s"

# `session_timeout` is a timeout for a session to be opened.
# Default is 30 seconds
## session_timeout = "30s"
# Default is 10 seconds
## session_timeout = "10s"

# `max_open_sessions` is the maximum number of open sessions.
# Default is 8
Expand Down
4 changes: 4 additions & 0 deletions network/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type MockNetwork struct {
EventCh chan Event
ID peer.ID
OtherNets []*MockNetwork
SendError error
}

func MockingNetwork(id peer.ID) *MockNetwork {
Expand Down Expand Up @@ -50,6 +51,9 @@ func (mock *MockNetwork) SelfID() peer.ID {
return mock.ID
}
func (mock *MockNetwork) SendTo(data []byte, pid lp2pcore.PeerID) error {
if mock.SendError != nil {
return mock.SendError
}
mock.BroadcastCh <- BroadcastData{
Data: data,
Target: &pid,
Expand Down
3 changes: 3 additions & 0 deletions network/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (s *streamService) SendRequest(msg []byte, pid lp2peer.ID) error {
lp2pnetwork.WithNoDial(s.ctx, "should already have connection"), pid, s.protocolID)
if err != nil {
s.logger.Debug("unable to open direct stream", "pid", pid, "err", err)
if len(s.relayAddrs) == 0 {
return err
}

// We don't have a direct connection to the destination node,
// so we try to connect via a relay node.
Expand Down
18 changes: 11 additions & 7 deletions sync/bundle/message/blocks_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,27 @@ import (
type BlocksRequestMessage struct {
SessionID int `cbor:"1,keyasint"`
From uint32 `cbor:"2,keyasint"`
To uint32 `cbor:"3,keyasint"`
Count uint32 `cbor:"3,keyasint"`
}

func NewBlocksRequestMessage(sid int, from, to uint32) *BlocksRequestMessage {
func NewBlocksRequestMessage(sid int, from, count uint32) *BlocksRequestMessage {
return &BlocksRequestMessage{
SessionID: sid,
From: from,
To: to,
Count: count,
}
}

func (m *BlocksRequestMessage) To() uint32 {
return m.From + m.Count - 1
}

func (m *BlocksRequestMessage) SanityCheck() error {
if m.From == 0 {
return errors.Errorf(errors.ErrInvalidHeight, "invalid height")
return errors.Errorf(errors.ErrInvalidHeight, "height is zero")
}
if m.From > m.To {
return errors.Errorf(errors.ErrInvalidHeight, "invalid range")
if m.Count == 0 {
return errors.Errorf(errors.ErrInvalidMessage, "count is zero")
}
return nil
}
Expand All @@ -35,5 +39,5 @@ func (m *BlocksRequestMessage) Type() Type {
}

func (m *BlocksRequestMessage) Fingerprint() string {
return fmt.Sprintf("{⚓ %d %v:%v}", m.SessionID, m.From, m.To)
return fmt.Sprintf("{⚓ %d %v:%v}", m.SessionID, m.From, m.To())
}
9 changes: 5 additions & 4 deletions sync/bundle/message/blocks_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ func TestBlocksRequestMessage(t *testing.T) {

assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidHeight)
})
t.Run("Invalid range", func(t *testing.T) {
m := NewBlocksRequestMessage(1, 200, 100)
t.Run("Invalid count", func(t *testing.T) {
m := NewBlocksRequestMessage(1, 200, 0)

assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidHeight)
assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidMessage)
})

t.Run("OK", func(t *testing.T) {
m := NewBlocksRequestMessage(1, 100, 200)
m := NewBlocksRequestMessage(1, 100, 7)

assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.To(), uint32(106))
assert.Contains(t, m.Fingerprint(), "100")
})
}
18 changes: 17 additions & 1 deletion sync/bundle/message/blocks_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/errors"
)

const LatestBlocksResponseCodeOK = 0
Expand Down Expand Up @@ -33,6 +34,9 @@ func (m *BlocksResponseMessage) SanityCheck() error {
return err
}
}
if m.From == 0 && len(m.Blocks) != 0 {
return errors.Errorf(errors.ErrInvalidHeight, "unexpected block for height zero")
}
if m.LastCertificate != nil {
if err := m.LastCertificate.SanityCheck(); err != nil {
return err
Expand All @@ -46,11 +50,23 @@ func (m *BlocksResponseMessage) Type() Type {
return MessageTypeBlocksResponse
}

func (m *BlocksResponseMessage) Count() uint32 {
return uint32(len(m.Blocks))
}

func (m *BlocksResponseMessage) To() uint32 {
// response message without any block
if len(m.Blocks) == 0 {
return 0
}
return m.From + m.Count() - 1
}

func (m *BlocksResponseMessage) LastCertificateHeight() uint32 {
if m.LastCertificate != nil {
return m.From
}
return m.From + uint32(len(m.Blocks)-1)
return 0
}

func (m *BlocksResponseMessage) Fingerprint() string {
Expand Down
39 changes: 34 additions & 5 deletions sync/bundle/message/blocks_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package message
import (
"testing"

"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/errors"
"github.com/stretchr/testify/assert"
Expand All @@ -14,20 +15,29 @@ func TestLatestBlocksResponseType(t *testing.T) {
}

func TestBlocksResponseMessage(t *testing.T) {
sid := 123
t.Run("Invalid certificate", func(t *testing.T) {
b := block.GenerateTestBlock(nil, nil)
c := block.NewCertificate(-1, nil, nil, nil)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, 1, 100, []*block.Block{b}, c)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, sid, 100, []*block.Block{b}, c)

assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidRound)
})

t.Run("Unexpected block for height zero", func(t *testing.T) {
b := block.GenerateTestBlock(nil, nil)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, sid, 0, []*block.Block{b}, nil)

assert.Equal(t, errors.Code(m.SanityCheck()), errors.ErrInvalidHeight)
})

t.Run("OK", func(t *testing.T) {
b1 := block.GenerateTestBlock(nil, nil)
b2 := block.GenerateTestBlock(nil, nil)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, 1, 100, []*block.Block{b1, b2}, nil)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, sid, 100, []*block.Block{b1, b2}, nil)

assert.NoError(t, m.SanityCheck())
assert.Zero(t, m.LastCertificateHeight())
assert.Contains(t, m.Fingerprint(), "100")
})
}
Expand All @@ -37,25 +47,44 @@ func TestLatestBlocksResponseCode(t *testing.T) {
m := NewBlocksResponseMessage(ResponseCodeBusy, 1, 0, nil, nil)

assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.To(), uint32(0))
assert.Zero(t, m.From)
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.True(t, m.IsRequestRejected())
})

t.Run("rejected", func(t *testing.T) {
m := NewBlocksResponseMessage(ResponseCodeRejected, 1, 0, nil, nil)

assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.To(), uint32(0))
assert.Zero(t, m.From)
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.True(t, m.IsRequestRejected())
})

t.Run("OK", func(t *testing.T) {
t.Run("OK - MoreBlocks", func(t *testing.T) {
b1 := block.GenerateTestBlock(nil, nil)
b2 := block.GenerateTestBlock(nil, nil)

m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, 1, 100, []*block.Block{b1, b2}, nil)
assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.From, uint32(100))
assert.Equal(t, m.To(), uint32(101))
assert.Equal(t, m.Count(), uint32(2))
assert.Zero(t, m.LastCertificateHeight())
assert.False(t, m.IsRequestRejected())
})

t.Run("OK - Synced", func(t *testing.T) {
cert := block.GenerateTestCertificate(hash.GenerateTestHash())

m := NewBlocksResponseMessage(ResponseCodeSynced, 1, 100, nil, cert)
assert.NoError(t, m.SanityCheck())
assert.Equal(t, m.From, uint32(100))
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.Equal(t, m.LastCertificateHeight(), uint32(100))
assert.False(t, m.IsRequestRejected())
})
}
2 changes: 1 addition & 1 deletion sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Config struct {
func DefaultConfig() *Config {
return &Config{
HeartBeatTimer: time.Second * 5,
SessionTimeout: time.Second * 30,
SessionTimeout: time.Second * 10,
NodeNetwork: true,
BlockPerMessage: 60,
MaxOpenSessions: 8,
Expand Down
20 changes: 18 additions & 2 deletions sync/firewall/firewall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ func TestInvalidBundlesCounter(t *testing.T) {
}

func TestGossipMessage(t *testing.T) {
t.Run("Message source: unknown, from: unknown => should NOT close the connection", func(t *testing.T) {
setup(t)

bdl := bundle.NewBundle(tUnknownPeerID, message.NewQueryProposalMessage(100, 1))
bdl.Flags = util.SetFlag(bdl.Flags, bundle.BundleFlagNetworkTestnet)
d, _ := bdl.Encode()

assert.False(t, tFirewall.isPeerBanned(tUnknownPeerID))
assert.False(t, tNetwork.IsClosed(tUnknownPeerID))
// TODO: should only accepts hello from unknown peers?
assert.NotNil(t, tFirewall.OpenGossipBundle(d, tUnknownPeerID, tUnknownPeerID))
assert.False(t, tNetwork.IsClosed(tUnknownPeerID))
})

t.Run("Message source: unknown, from: bad => should close the connection", func(t *testing.T) {
setup(t)

Expand Down Expand Up @@ -160,8 +174,10 @@ func TestUpdateLastSeen(t *testing.T) {
now := time.Now().UnixNano()
assert.Nil(t, tFirewall.OpenGossipBundle(d, tUnknownPeerID, tGoodPeerID))

assert.GreaterOrEqual(t, tFirewall.peerSet.GetPeer(tUnknownPeerID).LastSeen.UnixNano(), now)
assert.GreaterOrEqual(t, tFirewall.peerSet.GetPeer(tGoodPeerID).LastSeen.UnixNano(), now)
peerUnknown := tFirewall.peerSet.GetPeer(tUnknownPeerID)
peerGood := tFirewall.peerSet.GetPeer(tGoodPeerID)
assert.GreaterOrEqual(t, peerUnknown.LastSeen.UnixNano(), now)
assert.GreaterOrEqual(t, peerGood.LastSeen.UnixNano(), now)
}

func TestNetworkFlags(t *testing.T) {
Expand Down
8 changes: 3 additions & 5 deletions sync/handler_block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,18 @@ import (
func TestParsingBlockAnnounceMessages(t *testing.T) {
setup(t)

lastBlockHash := tState.LastBlockHash()
lastBlockHeight := tState.LastBlockHeight()
b1 := block.GenerateTestBlock(nil, &lastBlockHash)
b1 := block.GenerateTestBlock(nil, nil)
c1 := block.GenerateTestCertificate(b1.Hash())
b1Hash := b1.Hash()
b2 := block.GenerateTestBlock(nil, &b1Hash)
b2 := block.GenerateTestBlock(nil, nil)
c2 := block.GenerateTestCertificate(b2.Hash())

pid := network.TestRandomPeerID()
msg1 := message.NewBlockAnnounceMessage(lastBlockHeight+1, b1, c1)
msg2 := message.NewBlockAnnounceMessage(lastBlockHeight+2, b2, c2)

pub, _ := bls.GenerateTestKeyPair()
testAddPeer(t, pub, pid)
testAddPeer(t, pub, pid, false)

t.Run("Receiving new block announce message, without committing previous block", func(t *testing.T) {
assert.NoError(t, testReceivingNewMessage(tSync, msg2, pid))
Expand Down
38 changes: 25 additions & 13 deletions sync/handler_blocks_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pactus-project/pactus/sync/bundle"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/util"
"github.com/pactus-project/pactus/util/errors"
)

Expand All @@ -25,7 +26,7 @@
handler.logger.Warn("we are busy", "message", msg, "pid", initiator)
response := message.NewBlocksResponseMessage(message.ResponseCodeBusy,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)

return nil
}
Expand All @@ -34,7 +35,7 @@
if !peer.IsKnownOrTrusty() {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)

return errors.Errorf(errors.ErrInvalidMessage, "peer status is %v", peer.Status)
}
Expand All @@ -56,43 +57,54 @@
if msg.From < ourHeight-LatestBlockInterval {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)

return errors.Errorf(errors.ErrInvalidMessage, "the request height is not acceptable: %v", msg.From)
}
}
height := msg.From
count := handler.config.BlockPerMessage
count := msg.Count

// Help peer to catch up
if count > LatestBlockInterval {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator, msg.SessionID)

Check warning on line 71 in sync/handler_blocks_request.go

View check run for this annotation

Codecov / codecov/patch

sync/handler_blocks_request.go#L69-L71

Added lines #L69 - L71 were not covered by tests

return errors.Errorf(errors.ErrInvalidMessage, "too many blocks requested: %v-%v", msg.From, msg.Count)

Check warning on line 73 in sync/handler_blocks_request.go

View check run for this annotation

Codecov / codecov/patch

sync/handler_blocks_request.go#L73

Added line #L73 was not covered by tests
}

// Help this peer to sync up
for {
blocks := handler.prepareBlocks(height, count)
blockToRead := util.MinU32(handler.config.BlockPerMessage, count)
blocks := handler.prepareBlocks(height, blockToRead)
if len(blocks) == 0 {
break
}

response := message.NewBlocksResponseMessage(message.ResponseCodeMoreBlocks,
msg.SessionID, height, blocks, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)

height += uint32(len(blocks))
if height >= msg.To {
count -= uint32(len(blocks))
if count <= 0 {
break
}
}
// To avoid sending blocks again, we update height for this peer
// Height is always greater than zeo.
handler.peerSet.UpdateHeight(initiator, height-1)
peerHeight := height - 1
handler.peerSet.UpdateHeight(initiator, peerHeight)

if msg.To >= handler.state.LastBlockHeight() {
if msg.To() >= handler.state.LastBlockHeight() {
lastCertificate := handler.state.LastCertificate()
response := message.NewBlocksResponseMessage(message.ResponseCodeSynced,
msg.SessionID, handler.state.LastBlockHeight(), nil, lastCertificate)
handler.sendTo(response, initiator)
msg.SessionID, peerHeight, nil, lastCertificate)
handler.sendTo(response, initiator, msg.SessionID)
} else {
response := message.NewBlocksResponseMessage(message.ResponseCodeNoMoreBlocks,
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator)
handler.sendTo(response, initiator, msg.SessionID)
}

return nil
Expand Down