Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

message validation #1289

Merged
merged 14 commits into from
Sep 22, 2021
24 changes: 13 additions & 11 deletions network/message/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions network/message/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ package message;

// Run protoc --gofast_out=. message.proto to generate message.pb.go

// Message models a single message that is supposed to get exchanged by the gossip network
// Message models a single message that is supposed to get exchanged by the
// gossip network
message Message {
string ChannelID = 1;
bytes EventID = 2;
bytes OriginID = 3;
repeated bytes TargetIDs = 4;
bytes Payload = 5;
string Type = 6;
string ChannelID = 1;
bytes EventID = 2;
bytes OriginID = 3 [deprecated = true];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just deprecate this, we're not removing the field and are still allowing its use.

Which means, technically, that we could still have OriginID claims in this field, which would still be ser/deser-ialized, and that we should still report if different from the result of the validation logic.

Is it possible to just remove this field and mark the release that would include this PR as breaking? I think it's worth it.

/cc @ramtinms @vishalchangrani for advice on the Message format change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, although I'm hoping we can eventually remove all of these fields except Payload, and I think this is something that can wait until later and we can remove them all in one fell swoop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Do we have an issue for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly, but it is part of https://github.com/dapperlabs/flow-go/issues/5847

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the network layer is passing the application layer the origin ID (which we are), it doesn't matter if we remove the field now or later.

repeated bytes TargetIDs = 4;
bytes Payload = 5;
string Type = 6;
}

27 changes: 11 additions & 16 deletions network/p2p/libp2pNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/onflow/flow-go/network/message"
"github.com/onflow/flow-go/network/p2p/dns"
"github.com/onflow/flow-go/network/p2p/keyutils"
validator "github.com/onflow/flow-go/network/validator/pubsub"
"github.com/onflow/flow-go/utils/logging"
)

Expand Down Expand Up @@ -474,35 +475,29 @@ func (n *Node) GetIPPort() (string, string, error) {
// Subscribe subscribes the node to the given topic and returns the subscription
// Currently only one subscriber is allowed per topic.
// NOTE: A node will receive its own published messages.
func (n *Node) Subscribe(ctx context.Context, topic flownet.Topic, validators ...pubsub.ValidatorEx) (*pubsub.Subscription, error) {
func (n *Node) Subscribe(ctx context.Context, topic flownet.Topic, validators ...validator.MessageValidator) (*pubsub.Subscription, error) {
n.Lock()
defer n.Unlock()

if len(validators) > 1 {
return nil, errors.New("only one topic validator is allowed")
}

// Check if the topic has been already created and is in the cache
n.pubSub.GetTopics()
tp, found := n.topics[topic]
var err error
if !found {
if len(validators) > 0 {
if err := n.pubSub.RegisterTopicValidator(
topic.String(), validators[0], pubsub.WithValidatorInline(true),
); err != nil {
n.logger.Err(err).Str("topic", topic.String()).Msg("failed to register topic validator, aborting subscription")
return nil, fmt.Errorf("failed to register topic validator: %w", err)
}
topic_validator := validator.TopicValidator(validators...)
if err := n.pubSub.RegisterTopicValidator(
topic.String(), topic_validator, pubsub.WithValidatorInline(true),
); err != nil {
n.logger.Err(err).Str("topic", topic.String()).Msg("failed to register topic validator, aborting subscription")
return nil, fmt.Errorf("failed to register topic validator: %w", err)
}

tp, err = n.pubSub.Join(topic.String())
if err != nil {
if len(validators) > 0 {
if err := n.pubSub.UnregisterTopicValidator(topic.String()); err != nil {
n.logger.Err(err).Str("topic", topic.String()).Msg("failed to unregister topic validator")
}
if err := n.pubSub.UnregisterTopicValidator(topic.String()); err != nil {
n.logger.Err(err).Str("topic", topic.String()).Msg("failed to unregister topic validator")
}

return nil, fmt.Errorf("could not join topic (%s): %w", topic, err)
}

Expand Down
40 changes: 0 additions & 40 deletions network/p2p/libp2pUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
"github.com/rs/zerolog"

Expand Down Expand Up @@ -233,42 +232,3 @@ func flowStream(conn network.Conn) network.Stream {
}
return nil
}

// messagePubKey extracts the public key of the envelope signer from a libp2p message.
// The location of that key depends on the type of the key, see:
// https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md
// This reproduces the exact logic of the private function doing the same decoding in libp2p:
// https://github.com/libp2p/go-libp2p-pubsub/blob/ba28f8ecfc551d4d916beb748d3384951bce3ed0/sign.go#L77
func messageSigningID(m *pubsub.Message) (peer.ID, error) {
var pubk crypto.PubKey

// m.From is the original sender of the message (versus `m.ReceivedFrom` which is the last hop which sent us this message)
pid, err := peer.IDFromBytes(m.From)
if err != nil {
return "", err
}

if m.Key == nil {
// no attached key, it must be extractable from the source ID
pubk, err = pid.ExtractPublicKey()
if err != nil {
return "", fmt.Errorf("cannot extract signing key: %s", err.Error())
}
if pubk == nil {
return "", fmt.Errorf("cannot extract signing key")
}
} else {
pubk, err = crypto.UnmarshalPublicKey(m.Key)
if err != nil {
return "", fmt.Errorf("cannot unmarshal signing key: %s", err.Error())
}

// verify that the source ID matches the attached key
if !pid.MatchesPublicKey(pubk) {
return "", fmt.Errorf("bad signing key; source ID %s doesn't match key", pid)
}
}

// the pid either contains or matches the signing pubKey
return pid, nil
}
19 changes: 5 additions & 14 deletions network/p2p/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/message"
"github.com/onflow/flow-go/network/validator"
psValidator "github.com/onflow/flow-go/network/validator/pubsub"
_ "github.com/onflow/flow-go/utils/binstat"
)

Expand Down Expand Up @@ -83,7 +83,6 @@ type Middleware struct {
idTranslator IDTranslator
idProvider id.IdentifierProvider
previousProtocolStatePeers []peer.AddrInfo
stakedTopicValidator *StakedValidator
}

type MiddlewareOption func(*Middleware)
Expand Down Expand Up @@ -246,8 +245,6 @@ func (m *Middleware) Start(ov network.Overlay) error {

m.UpdateNodeAddresses()

m.stakedTopicValidator = &StakedValidator{m.ov.Identity}

if m.connectionGating {
m.libP2PNode.UpdateAllowList(m.allPeers())
}
Expand Down Expand Up @@ -390,10 +387,10 @@ func (m *Middleware) Subscribe(channel network.Channel) error {

topic := engine.TopicFromChannel(channel, m.rootBlockID)

var validators []pubsub.ValidatorEx
var validators []psValidator.MessageValidator
if !engine.UnstakedChannels().Contains(channel) {
// for channels used by the staked nodes, add the topic validator to filter out messages from non-staked nodes
validators = append(validators, m.stakedTopicValidator.Validate)
validators = append(validators, psValidator.StakedValidator(m.ov.Identity))
}

s, err := m.libP2PNode.Subscribe(m.ctx, topic, validators...)
Expand Down Expand Up @@ -429,7 +426,7 @@ func (m *Middleware) Unsubscribe(channel network.Channel) error {
}

// processAuthenticatedMessage processes a message and a source (indicated by its peer ID) and eventually passes it to the overlay
// In particular, it checks the claim of protocol authorship situated in the message against `peerID`
// In particular, it populates the `OriginID` field of the message with a Flow ID.
synzhu marked this conversation as resolved.
Show resolved Hide resolved
// The assumption is that the message has been authenticated at the network level (libp2p) to originate from the peer with ID `peerID`
// this requirement is fulfilled by e.g. the output of readConnection and readSubscription
func (m *Middleware) processAuthenticatedMessage(msg *message.Message, peerID peer.ID) {
Expand All @@ -439,13 +436,7 @@ func (m *Middleware) processAuthenticatedMessage(msg *message.Message, peerID pe
return
}

// check the origin of the message corresponds to the one claimed in the OriginID
originID := flow.HashToID(msg.OriginID)

if flowID != originID {
m.log.Warn().Msgf("received message claiming to be from nodeID %v was actually from %v and dropped", originID, flowID)
return
}
msg.OriginID = flowID[:]

m.processMessage(msg)
}
Expand Down
21 changes: 6 additions & 15 deletions network/p2p/readSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network/message"
_ "github.com/onflow/flow-go/utils/binstat"
validator "github.com/onflow/flow-go/network/validator/pubsub"
)

// readSubscription reads the messages coming in on the subscription and calls the given callback until
Expand Down Expand Up @@ -79,26 +79,17 @@ func (r *readSubscription) receiveLoop(wg *sync.WaitGroup) {
return
}

pid, err := messageSigningID(rawMsg)
if err != nil {
r.log.Err(err).Msg("failed to validate peer ID of incoming message")
return
validatorData, ok := rawMsg.ValidatorData.(validator.ValidatorData)
if !ok {
r.log.Fatal().Str("raw_msg", rawMsg.String()).Msg("validator data missing")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of the log fatal here.

This would be triggered by an independent modification of TopicValidator that e.g. made it return before populating the data in some cases. We should definitely log a spectacular Error, but why not behave like the above failure cases and return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but personally I would consider it a bug if the validator returned ValidationAccept without populating the data. At the bare minimum, the validator should check that the message can be deserialized properly and that the origin id can be extracted. If it cannot do even this, then we should definitely reject the message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My issue is neither with checking that the validation data is here, nor with considering this a bug. We should certainly transmit the message that "something is rotten in the state of Denmark" far and wide.

My issue is with crashing the node based on the input of one (externally controlled) message. A validator that accepts without populating the data has a bug, yes, but should its impact be to lose the entire blockchain?

}

var msg message.Message
// convert the incoming raw message payload to Message type
//bs := binstat.EnterTimeVal(binstat.BinNet+":wire>1protobuf2message", int64(len(rawMsg.Data)))
err = msg.Unmarshal(rawMsg.Data)
//binstat.Leave(bs)
if err != nil {
r.log.Err(err).Str("topic_message", msg.String()).Msg("failed to unmarshal message")
return
}
msg := validatorData.Message

// log metrics
r.metrics.NetworkMessageReceived(msg.Size(), msg.ChannelID, msg.Type)

// call the callback
r.callback(&msg, pid)
r.callback(msg, validatorData.From)
}
}
26 changes: 0 additions & 26 deletions network/p2p/topic_validator.go

This file was deleted.

16 changes: 11 additions & 5 deletions network/p2p/topic_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/network/message"
validator "github.com/onflow/flow-go/network/validator/pubsub"
"github.com/onflow/flow-go/utils/unittest"
)

Expand All @@ -32,13 +34,13 @@ func TestTopicValidator(t *testing.T) {
translator, err := NewFixedTableIdentityTranslator(ids)
require.NoError(t, err)

validator := StakedValidator{func(pid peer.ID) (*flow.Identity, bool) {
stakedValidator := validator.StakedValidator(func(pid peer.ID) (*flow.Identity, bool) {
fid, err := translator.GetFlowID(pid)
if err != nil {
return &flow.Identity{}, false
}
return ids.ByNodeID(fid)
}}
})

unstakedKey, err := unittest.NetworkingKey()
require.NoError(t, err)
Expand All @@ -52,9 +54,9 @@ func TestTopicValidator(t *testing.T) {
require.NoError(t, unstakedNode.AddPeer(context.TODO(), *host.InfoFromHost(node1.host)))

// node1 and node2 subscribe to the topic with the topic validator
sub1, err := node1.Subscribe(context.TODO(), badTopic, validator.Validate)
sub1, err := node1.Subscribe(context.TODO(), badTopic, stakedValidator)
require.NoError(t, err)
sub2, err := node2.Subscribe(context.TODO(), badTopic, validator.Validate)
sub2, err := node2.Subscribe(context.TODO(), badTopic, stakedValidator)
require.NoError(t, err)
// the unstaked node subscribes to the topic WITHOUT the topic validator
unstakedSub, err := unstakedNode.Subscribe(context.TODO(), badTopic)
Expand All @@ -67,7 +69,11 @@ func TestTopicValidator(t *testing.T) {
len(unstakedNode.pubSub.ListPeers(badTopic.String())) > 0
}, 3*time.Second, 100*time.Millisecond)

data := []byte("hello")
m := message.Message{
Payload: []byte("hello"),
}
data, err := m.Marshal()
require.NoError(t, err)

timedCtx, cancel5s := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel5s()
Expand Down
20 changes: 20 additions & 0 deletions network/validator/pubsub/staked_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package validator

import (
"context"

"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/network/message"
)

func StakedValidator(getIdentity func(peer.ID) (*flow.Identity, bool)) MessageValidator {
return func(ctx context.Context, from peer.ID, msg *message.Message) pubsub.ValidationResult {
huitseeker marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := getIdentity(from); ok {
return pubsub.ValidationAccept
}
return pubsub.ValidationReject
}
}