Skip to content

Commit

Permalink
consensus: optimize vote and block part gossip with HasProposalBlockP…
Browse files Browse the repository at this point in the history
…artMessage and random sleeps (cometbft#904)

Addresses tendermint/tendermint#627 (finally!)

This PR is based off cometbft#896 which added new metrics to measure duplicate votes and block parts received from peers.

This PR introduces two optimizations to the consensus gossip: HasProposalBlockPartMessage and random sleeps in the fast loop in the gossip routines.

Control for both were added to the config file for testing purposes, but at least the HasProposalBlockPartMessage should probably not be in the config, it should just be hardcoded active once we're happy with it.

On another note, we may want to consider some changes so this can be rolled out in a non-breaking way. If I'm not mistaken since the message was added to an existing p2p channel it will be interpreted by peers on older versions as malicious. My understanding is we could just move it to a new channel to solve this so the change would be backwards compatible and nodes can roll it out progressively.

The first is a new HasProposalBlockPartMessage. This follows identical logic to the HasVoteMessage. When a node receives a ProposalBlockPart, it broadcasts the HasProposalBlockPartMessage to all its peers. This informs them that they don't need to send the node the part, thus reducing the amount of redundant block parts gossiped over the network.

The second change is the introduction of a random sleep in the fast loops of the gossipVote and gossipData routines. We run these routines for each peer, and when there are things to send, the routines run in a tight loop picking and sending votes or parts to send to the peer. This reduces the effectiveness of the "HasXXXMessage" mechanism since there is minimal opportunity for the peer state to be updated with when the HasXXXMessages are received before the new votes/parts are selected to be sent. Adding a small random sleep gives the reactor a chance to breath and process inbound HasXXXMessages before picking new votes/parts to send. Randomizing the sleep ensures the peers don't all sleep the same amount, which could negate its effect.

Combining the new HasBlockPartMessage with the random sleep in gossipData routine results in a ~20% bandwidth reduction on BlockPart messages in 4 node and 7 node networks under load as measured in a local testnet on my laptop, as well as 4 node networks in the cloud. Larger tests are still to be conducted, but these initial results are very promising.

The random sleep in gossipVote routine also results in a ~15% bandwidth reduction on Vote messages in 7 node networks as measured in a local testnet on my laptop. Again, promising, but more tests to be done.

The largest consumers of bandwidth in the comet system are mempool txs, votes, and block parts. While mempool txs are not addressed here, the latter two are, and should lead to significant reductions in bandwidth usage overall.

Note the addition of a new HasProposalBlockPartMessage adds minimal additional bandwidth itself as each message contains only 3 integers (one 64 bit, two 32 bit, so 16 bytes total) compared to hundreds of bytes in a block part message. This was also confirmed experimentally.

Some teaser screenshots below on small local networks using the cometbft `e2e` testing system on my laptop.

With and without the random sleep (upper bounded at 50ms) on a 7 node network, with no load:

<img width="1426" alt="image" src="https://github.com/cometbft/cometbft/assets/2300911/816f86a7-49d7-4988-ae57-e52d9bd196f8">

The main lines visible are the bandwidth from the Vote (the larger one) and BlockPart messages. Note how both are reduced ~15% from adding the sleep (the left). The effect of the sleep on block times is negligible (they rise from 1.6 to 1.7 seconds per block in this experiment)

On a 4 node network, with and without the new HasProposalBlockPartMessage and with and without the random sleep on the gossipDataRoutine:

![image](https://github.com/cometbft/cometbft/assets/2300911/70c616e9-40ff-4f6c-b3aa-fc7b5e9adb73)

Ignore the spikes on the right for now, this is the system acting up under load running all 4 nodes on my laptop ... The image shows 4 experiments. The purple line is bandwidth from mempool txs (roughly the same in all experiments), the green is bandwidth from block part messages (what we're focusing on). The experiments, and the approximate block part bandwidth when it first flattens out, are:

1. has block parts, sleep (680k)
2. no has block parts, sleep (830k)
3. has block parts, no sleep (730k)
4. no has block parts, no sleep (870k)

The last experiment is the current baseline. Adding the sleep alone without the new message type (2nd experiment) has some minimal impact (870 -> 830). Adding just the new message type without the sleep (3rd experiment) has more significant impact (870 -> 730). But adding both together (1st experiiment) has the most significant impact (870->680), a reduction of ~20%.

While we've done some preliminary experiments here, we still need to do larger experiments on cloud deployments, as well as writing some actual tests :)

---

- [ ] Tests written/updated
- [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments
  • Loading branch information
ebuchman authored and nivasan1 committed Nov 8, 2023
1 parent e58f2de commit 96d32ca
Show file tree
Hide file tree
Showing 16 changed files with 611 additions and 152 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[consensus]` New metrics (counters) to track duplicate votes and block parts.
([\#896](https://github.com/cometbft/cometbft/pull/896))
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- `[consensus]` Optimize vote and block part gossip with new message `HasProposalBlockPartMessage`,
which is similar to `HasVoteMessage`; and random sleep in the loop broadcasting those messages.
The sleep can be configured with new config `peer_gossip_intraloop_sleep_duration`, which is set to 0
by default as this is experimental.
Our scale tests show substantial bandwith improvement with a value of 50 ms.
([\#904](https://github.com/cometbft/cometbft/pull/904))
34 changes: 18 additions & 16 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,29 +944,31 @@ type ConsensusConfig struct {
CreateEmptyBlocksInterval time.Duration `mapstructure:"create_empty_blocks_interval"`

// Reactor sleep duration parameters
PeerGossipSleepDuration time.Duration `mapstructure:"peer_gossip_sleep_duration"`
PeerQueryMaj23SleepDuration time.Duration `mapstructure:"peer_query_maj23_sleep_duration"`
PeerGossipSleepDuration time.Duration `mapstructure:"peer_gossip_sleep_duration"`
PeerQueryMaj23SleepDuration time.Duration `mapstructure:"peer_query_maj23_sleep_duration"`
PeerGossipIntraloopSleepDuration time.Duration `mapstructure:"peer_gossip_intraloop_sleep_duration"` // upper bound on randomly selected values

DoubleSignCheckHeight int64 `mapstructure:"double_sign_check_height"`
}

// DefaultConsensusConfig returns a default configuration for the consensus service
func DefaultConsensusConfig() *ConsensusConfig {
return &ConsensusConfig{
WalPath: filepath.Join(DefaultDataDir, "cs.wal", "wal"),
TimeoutPropose: 3000 * time.Millisecond,
TimeoutProposeDelta: 500 * time.Millisecond,
TimeoutPrevote: 1000 * time.Millisecond,
TimeoutPrevoteDelta: 500 * time.Millisecond,
TimeoutPrecommit: 1000 * time.Millisecond,
TimeoutPrecommitDelta: 500 * time.Millisecond,
TimeoutCommit: 1000 * time.Millisecond,
SkipTimeoutCommit: false,
CreateEmptyBlocks: true,
CreateEmptyBlocksInterval: 0 * time.Second,
PeerGossipSleepDuration: 100 * time.Millisecond,
PeerQueryMaj23SleepDuration: 2000 * time.Millisecond,
DoubleSignCheckHeight: int64(0),
WalPath: filepath.Join(DefaultDataDir, "cs.wal", "wal"),
TimeoutPropose: 3000 * time.Millisecond,
TimeoutProposeDelta: 500 * time.Millisecond,
TimeoutPrevote: 1000 * time.Millisecond,
TimeoutPrevoteDelta: 500 * time.Millisecond,
TimeoutPrecommit: 1000 * time.Millisecond,
TimeoutPrecommitDelta: 500 * time.Millisecond,
TimeoutCommit: 1000 * time.Millisecond,
SkipTimeoutCommit: false,
CreateEmptyBlocks: true,
CreateEmptyBlocksInterval: 0 * time.Second,
PeerGossipSleepDuration: 100 * time.Millisecond,
PeerQueryMaj23SleepDuration: 2000 * time.Millisecond,
PeerGossipIntraloopSleepDuration: 0 * time.Second,
DoubleSignCheckHeight: int64(0),
}
}

Expand Down
1 change: 1 addition & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ create_empty_blocks_interval = "{{ .Consensus.CreateEmptyBlocksInterval }}"
# Reactor sleep duration parameters
peer_gossip_sleep_duration = "{{ .Consensus.PeerGossipSleepDuration }}"
peer_gossip_intraloop_sleep_duration = "{{ .Consensus.PeerGossipIntraloopSleepDuration }}"
peer_query_maj23_sleep_duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}"
#######################################################
Expand Down
13 changes: 13 additions & 0 deletions consensus/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func MsgToProto(msg Message) (proto.Message, error) {
Index: msg.Index,
}

case *HasProposalBlockPartMessage:
pb = &cmtcons.HasProposalBlockPart{
Height: msg.Height,
Round: msg.Round,
Index: msg.Index,
}

case *VoteSetMaj23Message:
bi := msg.BlockID.ToProto()
pb = &cmtcons.VoteSetMaj23{
Expand Down Expand Up @@ -199,6 +206,12 @@ func MsgFromProto(p proto.Message) (Message, error) {
Type: msg.Type,
Index: msg.Index,
}
case *cmtcons.HasProposalBlockPart:
pb = &HasProposalBlockPartMessage{
Height: msg.Height,
Round: msg.Round,
Index: msg.Index,
}
case *cmtcons.VoteSetMaj23:
bi, err := types.BlockIDFromProto(&msg.BlockID)
if err != nil {
Expand Down
98 changes: 95 additions & 3 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
cmtevents "github.com/cometbft/cometbft/libs/events"
cmtjson "github.com/cometbft/cometbft/libs/json"
"github.com/cometbft/cometbft/libs/log"
cmtrand "github.com/cometbft/cometbft/libs/rand"
cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/p2p"
cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus"
Expand Down Expand Up @@ -271,6 +272,8 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
ps.ApplyNewValidBlockMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *HasProposalBlockPartMessage:
ps.ApplyHasProposalBlockPartMessage(msg)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.Lock()
Expand Down Expand Up @@ -414,21 +417,28 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
func(data cmtevents.EventData) {
conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
}); err != nil {
conR.Logger.Error("Error adding listener for events", "err", err)
conR.Logger.Error("Error adding listener for events (NewRoundStep)", "err", err)
}

if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventValidBlock,
func(data cmtevents.EventData) {
conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState))
}); err != nil {
conR.Logger.Error("Error adding listener for events", "err", err)
conR.Logger.Error("Error adding listener for events (ValidBlock)", "err", err)
}

if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
func(data cmtevents.EventData) {
conR.broadcastHasVoteMessage(data.(*types.Vote))
}); err != nil {
conR.Logger.Error("Error adding listener for events", "err", err)
conR.Logger.Error("Error adding listener for events (Vote)", "err", err)
}

if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventProposalBlockPart,
func(data cmtevents.EventData) {
conR.broadcastHasProposalBlockPartMessage(data.(*BlockPartMessage))
}); err != nil {
conR.Logger.Error("Error adding listener for events (ProposalBlockPart)", "err", err)
}
}

Expand Down Expand Up @@ -496,6 +506,19 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
*/
}

// Broadcasts HasProposalBlockPartMessage to peers that care.
func (conR *Reactor) broadcastHasProposalBlockPartMessage(partMsg *BlockPartMessage) {
msg := &cmtcons.HasProposalBlockPart{
Height: partMsg.Height,
Round: partMsg.Round,
Index: int32(partMsg.Part.Index),
}
conR.Switch.Broadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: msg,
})
}

func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *cmtcons.NewRoundStep) {
nrsMsg = &cmtcons.NewRoundStep{
Height: rs.Height,
Expand Down Expand Up @@ -545,6 +568,15 @@ OUTER_LOOP:
if !peer.IsRunning() || !conR.IsRunning() {
return
}

// sleep random amount to give reactor a chance to receive HasProposalBlockPart messages
// so we can reduce the amount of redundant block parts we send
if conR.conS.config.PeerGossipIntraloopSleepDuration > 0 {
// the config sets an upper bound for how long we sleep.
randDuration := cmtrand.Int63n(int64(conR.conS.config.PeerGossipIntraloopSleepDuration))
time.Sleep(time.Duration(randDuration))
}

rs := conR.getRoundState()
prs := ps.GetRoundState()

Expand Down Expand Up @@ -703,10 +735,20 @@ func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {

OUTER_LOOP:
for {

// Manage disconnects from self or peer.
if !peer.IsRunning() || !conR.IsRunning() {
return
}

// sleep random amount to give reactor a chance to receive HasVote messages
// so we can reduce the amount of redundant votes we send
if conR.conS.config.PeerGossipIntraloopSleepDuration > 0 {
// the config sets an upper bound for how long we sleep.
randDuration := cmtrand.Int63n(int64(conR.conS.config.PeerGossipIntraloopSleepDuration))
time.Sleep(time.Duration(randDuration))
}

rs := conR.getRoundState()
prs := ps.GetRoundState()

Expand Down Expand Up @@ -1135,6 +1177,17 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
ps.mtx.Lock()
defer ps.mtx.Unlock()

ps.setHasProposalBlockPart(height, round, index)
}

func (ps *PeerState) setHasProposalBlockPart(height int64, round int32, index int) {
ps.logger.Debug("setHasProposalBlockPart",
"peerH/R",
log.NewLazySprintf("%d/%d", ps.PRS.Height, ps.PRS.Round),
"H/R",
log.NewLazySprintf("%d/%d", height, round),
"index", index)

if ps.PRS.Height != height || ps.PRS.Round != round {
return
}
Expand Down Expand Up @@ -1457,6 +1510,18 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
}

// ApplyHasProposalBlockPartMessage updates the peer state for the new block part.
func (ps *PeerState) ApplyHasProposalBlockPartMessage(msg *HasProposalBlockPartMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()

if ps.PRS.Height != msg.Height {
return
}

ps.setHasProposalBlockPart(msg.Height, msg.Round, int(msg.Index))
}

// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
// it claims to have for the corresponding BlockID.
// `ourVotes` is a BitArray of votes we have for msg.BlockID
Expand Down Expand Up @@ -1514,6 +1579,7 @@ func init() {
cmtjson.RegisterType(&BlockPartMessage{}, "tendermint/BlockPart")
cmtjson.RegisterType(&VoteMessage{}, "tendermint/Vote")
cmtjson.RegisterType(&HasVoteMessage{}, "tendermint/HasVote")
cmtjson.RegisterType(&HasProposalBlockPartMessage{}, "tendermint/HasProposalBlockPart")
cmtjson.RegisterType(&VoteSetMaj23Message{}, "tendermint/VoteSetMaj23")
cmtjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
}
Expand Down Expand Up @@ -1813,3 +1879,29 @@ func (m *VoteSetBitsMessage) String() string {
}

//-------------------------------------

// HasProposalBlockPartMessage is sent to indicate that a particular block part has been received.
type HasProposalBlockPartMessage struct {
Height int64
Round int32
Index int32
}

// ValidateBasic performs basic validation.
func (m *HasProposalBlockPartMessage) ValidateBasic() error {
if m.Height < 1 {
return errors.New("invalid Height (< 1)")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if m.Index < 0 {
return errors.New("negative Index")
}
return nil
}

// String returns a string representation.
func (m *HasProposalBlockPartMessage) String() string {
return fmt.Sprintf("[HasProposalBlockPart PI:%v HR:{%v/%02d}]", m.Index, m.Height, m.Round)
}
2 changes: 2 additions & 0 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1975,6 +1975,8 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add
// NOTE: we are disregarding possible duplicates above where heights dont match or we're not expecting block parts yet
// but between the matches_current = true and false, we have all the info.
cs.metrics.DuplicateBlockPart.Add(1)
} else {
cs.evsw.FireEvent(types.EventProposalBlockPart, msg)
}

maxBytes := cs.state.ConsensusParams.Block.MaxBytes
Expand Down
4 changes: 3 additions & 1 deletion consensus/types/peer_round_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ type PeerRoundState struct {
// True if peer has proposal for this round
Proposal bool `json:"proposal"`
ProposalBlockPartSetHeader types.PartSetHeader `json:"proposal_block_part_set_header"`
ProposalBlockParts *bits.BitArray `json:"proposal_block_parts"`
// This bit array is length(# of block parts)
ProposalBlockParts *bits.BitArray `json:"proposal_block_parts"`
// Proposal's POL round. -1 if none.
ProposalPOLRound int32 `json:"proposal_pol_round"`

// nil until ProposalPOLMessage received.
// these bitarrays are length(validator set)
ProposalPOL *bits.BitArray `json:"proposal_pol"`
Prevotes *bits.BitArray `json:"prevotes"` // All votes peer has for this round
Precommits *bits.BitArray `json:"precommits"` // All precommits peer has for this round
Expand Down
10 changes: 10 additions & 0 deletions proto/tendermint/consensus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var _ p2p.Wrapper = &Proposal{}
var _ p2p.Wrapper = &NewValidBlock{}
var _ p2p.Wrapper = &NewRoundStep{}
var _ p2p.Wrapper = &HasVote{}
var _ p2p.Wrapper = &HasProposalBlockPart{}
var _ p2p.Wrapper = &BlockPart{}

func (m *VoteSetBits) Wrap() proto.Message {
Expand All @@ -37,6 +38,12 @@ func (m *HasVote) Wrap() proto.Message {
return cm
}

func (m *HasProposalBlockPart) Wrap() proto.Message {
cm := &Message{}
cm.Sum = &Message_HasProposalBlockPart{HasProposalBlockPart: m}
return cm
}

func (m *Vote) Wrap() proto.Message {
cm := &Message{}
cm.Sum = &Message_Vote{Vote: m}
Expand Down Expand Up @@ -98,6 +105,9 @@ func (m *Message) Unwrap() (proto.Message, error) {
case *Message_HasVote:
return m.GetHasVote(), nil

case *Message_HasProposalBlockPart:
return m.GetHasProposalBlockPart(), nil

case *Message_VoteSetMaj23:
return m.GetVoteSetMaj23(), nil

Expand Down
Loading

0 comments on commit 96d32ca

Please sign in to comment.