Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: add a per-message type send and receive metric #9622

Merged
merged 31 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
cbe179c
p2p: ressurrect the p2p envelope and use to calculate message metric
williambanfield Oct 18, 2022
0ea99d0
p2p: re-implement the the wrapper abstraction to de-duplicate logic
williambanfield Oct 20, 2022
63a6058
p2p/metrics: add the metricsLabelCache to prevent name recalculation
williambanfield Oct 21, 2022
c5abbe6
cleanup: remove error from function that doesn't error
williambanfield Oct 24, 2022
624505a
cleanup: remove superflous re-wrap in consensus
williambanfield Oct 24, 2022
94f7df7
cleanup: remove superflous error in wrap
williambanfield Oct 24, 2022
0b0d921
cleanup: panic in unmarshal error on receive
williambanfield Oct 24, 2022
d4fcfa1
cleanup: use logger in TrySend and Send
williambanfield Oct 24, 2022
60373be
cleanup: test fixup after refactor
williambanfield Oct 24, 2022
d3920fe
cleanup: fix TestMsgToProto
williambanfield Oct 24, 2022
72c0e66
fix comment on wrap/unwrap
williambanfield Oct 24, 2022
7db1901
add type name to panic
williambanfield Oct 25, 2022
53502b2
Merge branch 'main' into wb/final-metric-impl
williambanfield Oct 25, 2022
8e24fab
Update p2p/base_reactor.go
williambanfield Oct 25, 2022
40992b6
Merge branch 'main' into wb/final-metric-impl
williambanfield Oct 26, 2022
1db0ee8
use metricsLabelCache constructor
williambanfield Oct 26, 2022
074f5f1
gofmt -s
williambanfield Oct 26, 2022
e561129
lint fixes
williambanfield Oct 26, 2022
059e637
make the linter happier
williambanfield Oct 26, 2022
10d3eb1
Merge branch 'main' into wb/final-metric-impl
williambanfield Oct 26, 2022
36d094e
update golangci-lint version
williambanfield Oct 26, 2022
c330e49
Merge branch 'main' into wb/final-metric-impl
williambanfield Oct 27, 2022
d35f1b4
wrap mempool message correctly
williambanfield Oct 27, 2022
80f3aa8
defer stop peers in mempool test
williambanfield Oct 27, 2022
14cd282
mempool message conforms to other types
williambanfield Oct 27, 2022
fe429ce
Merge remote-tracking branch 'origin/main' into wb/final-metric-impl
williambanfield Oct 27, 2022
95d0c31
no lint errcheck
williambanfield Oct 27, 2022
4818ae8
stop peer to prevent loop hanging forever
williambanfield Oct 27, 2022
0163e86
add github issue for bad test
williambanfield Oct 27, 2022
2477e3d
send bad message instead
williambanfield Oct 27, 2022
d33942b
add stop peer for error
williambanfield Oct 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 0 additions & 52 deletions blocksync/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,6 @@ const (
BlockResponseMessageFieldKeySize
)

// EncodeMsg encodes a Protobuf message
func EncodeMsg(pb proto.Message) ([]byte, error) {
msg := bcproto.Message{}

switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}

bz, err := proto.Marshal(&msg)
if err != nil {
return nil, fmt.Errorf("unable to marshal %T: %w", pb, err)
}

return bz, nil
}

// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}

err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}

switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
return msg.StatusRequest, nil
case *bcproto.Message_StatusResponse:
return msg.StatusResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}

// ValidateMsg validates a message.
func ValidateMsg(pb proto.Message) error {
if pb == nil {
Expand Down
107 changes: 41 additions & 66 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,21 +143,20 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: MaxMsgSize,
MessageType: &bcproto.Message{},
},
}
}

// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}

peer.Send(BlocksyncChannel, msgBytes)
peer.Send(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
},
})
// it's OK if send fails. will try later in poolRoutine

// peer is added to the pool once we receive the first
Expand All @@ -182,69 +181,53 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
return false
}

msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
}

return src.TrySend(BlocksyncChannel, msgBytes)
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockResponse{Block: bl},
})
}

bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)

msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}

return src.TrySend(BlocksyncChannel, msgBytes)
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
}

// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
bcR.Switch.StopPeerForError(src, err)
func (bcR *Reactor) Receive(e p2p.Envelope) {
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}

if err = ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message)

bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)

switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
bcR.respondToPeer(msg, e.Src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
return
}
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
case *bcproto.StatusRequest:
// Send peer our state.
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
e.Src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
},
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobut", "err", err)
return
}
src.TrySend(BlocksyncChannel, msgBytes)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
Expand Down Expand Up @@ -285,13 +268,10 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
continue
}

queued := peer.TrySend(BlocksyncChannel, msgBytes)
queued := peer.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockRequest{Height: request.Height},
})
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
Expand Down Expand Up @@ -429,14 +409,9 @@ FOR_LOOP:
}

// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() error {
bm, err := EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
return fmt.Errorf("could not convert msg to proto: %w", err)
}

bcR.Switch.Broadcast(BlocksyncChannel, bm)

return nil
func (bcR *Reactor) BroadcastStatusRequest() {
bcR.Switch.Broadcast(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusRequest{},
})
}
48 changes: 34 additions & 14 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
Expand Down Expand Up @@ -165,10 +166,16 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
for i, peer := range peerList {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1}))
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{prevote1.ToProto()},
ChannelID: VoteChannel,
})
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2}))
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{prevote2.ToProto()},
ChannelID: VoteChannel,
})
}
}
} else {
Expand Down Expand Up @@ -520,28 +527,41 @@ func sendProposalAndParts(
parts *types.PartSet,
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, MustEncode(msg))
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
})

// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
pp, err := part.ToProto()
if err != nil {
panic(err) // TODO: wbanfield better error handling
}
peer.Send(DataChannel, MustEncode(msg))
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: *pp,
},
})
}

// votes
cs.mtx.Lock()
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()

peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{prevote.ToProto()},
})
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{precommit.ToProto()},
})
}

//----------------------------------------
Expand Down Expand Up @@ -579,7 +599,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
func (br *ByzantineReactor) Receive(e p2p.Envelope) {
br.reactor.Receive(e)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
6 changes: 5 additions & 1 deletion consensus/invalid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -94,7 +95,10 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
peers := sw.Peers().List()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{precommit.ToProto()},
ChannelID: VoteChannel,
})
}
}()
}
Loading