Skip to content

Commit

Permalink
Merge #4103
Browse files Browse the repository at this point in the history
4103: Khalil/6474 Gossipsub RPC control message Spam protection: GRAFT & PRUNE r=kc1116 a=gomisha

This PR adds spam protection for gossipsub RPC control messages (GRAFT & PRUNE). It adds a new [ControlMsgValidationInspector](https://github.com/dapperlabs/flow-go/compare/khalil/6474-graft-prune-spam?expand=1#diff-875c4aae39d07a22e184608be05abd0dce2447e593716de23cb70495d8c3ab2fR52) which is a gossipsub RPC inspector that performs the following validation on control messages for each of the control types (GRAFT & PRUNE). These protections are important due to the fact that RPC messages are processed synchronously by libp2p and a malicious actor could exhaust the nodes resources or degrade the nodes network performance by spamming costly control messages.

- Ensure RPC messages with a count > configured upper threshold are immediately rejected
- Ensure RPC messages for specific control type are not rate limited for peer
- Ensure RPC messages for specific control type < safety threshold < upper threshold have valid topic ID's
- Ensure RPC messages with a count < safety threshold bypass validation

I suggest you start your review in the [inspector package](https://github.com/dapperlabs/flow-go/tree/khalil/6474-graft-prune-spam/network/p2p/inspector) which contains all the new inspector logic and the [control message validation inspector gossip spammer tests.](https://github.com/dapperlabs/flow-go/blob/khalil/6474-graft-prune-spam/insecure/rpc_inspector_test/control_message_validation_test.go) 

ref: https://github.com/dapperlabs/flow-go/pull/6555
author: `@kc1116` 

Co-authored-by: Khalil Claybon <khalil.claybon@dapperlabs.com>
Co-authored-by: Misha <misha.rybalov@dapperlabs.com>
  • Loading branch information
3 people committed Mar 27, 2023
2 parents 31eef46 + 09a5704 commit b500369
Show file tree
Hide file tree
Showing 58 changed files with 2,268 additions and 538 deletions.
9 changes: 1 addition & 8 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/mempool/queue"
"github.com/onflow/flow-go/module/mempool/stdmap"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/metrics/unstaked"
Expand All @@ -68,7 +67,6 @@ import (
"github.com/onflow/flow-go/network/p2p/cache"
"github.com/onflow/flow-go/network/p2p/connection"
"github.com/onflow/flow-go/network/p2p/dht"
"github.com/onflow/flow-go/network/p2p/distributor"
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
"github.com/onflow/flow-go/network/p2p/subscription"
Expand Down Expand Up @@ -699,12 +697,7 @@ func (builder *FlowAccessNodeBuilder) InitIDProviders() {
}
builder.IDTranslator = translator.NewHierarchicalIDTranslator(idCache, translator.NewPublicNetworkIDTranslator())

heroStoreOpts := []queue.HeroStoreConfigOption{queue.WithHeroStoreSizeLimit(builder.DisallowListNotificationCacheSize)}
if builder.HeroCacheMetricsEnable {
collector := metrics.DisallowListNotificationQueueMetricFactory(builder.MetricsRegisterer)
heroStoreOpts = append(heroStoreOpts, queue.WithHeroStoreCollector(collector))
}
builder.NodeDisallowListDistributor = distributor.DefaultDisallowListNotificationDistributor(builder.Logger, heroStoreOpts...)
builder.NodeDisallowListDistributor = cmd.BuildDisallowListNotificationDisseminator(builder.DisallowListNotificationCacheSize, builder.MetricsRegisterer, builder.Logger, builder.MetricsEnabled)

// The following wrapper allows to disallow-list byzantine nodes via an admin command:
// the wrapper overrides the 'Ejected' flag of disallow-listed nodes to true
Expand Down
80 changes: 50 additions & 30 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/connection"
"github.com/onflow/flow-go/network/p2p/dns"
"github.com/onflow/flow-go/network/p2p/inspector/validation"
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/unicast"
"github.com/onflow/flow-go/state/protocol"
Expand Down Expand Up @@ -188,33 +189,36 @@ type NetworkConfig struct {
// PreferredUnicastProtocols list of unicast protocols in preferred order
PreferredUnicastProtocols []string
NetworkReceivedMessageCacheSize uint32
// UnicastRateLimitDryRun will disable connection disconnects and gating when unicast rate limiters are configured
UnicastRateLimitDryRun bool
//UnicastRateLimitLockoutDuration the number of seconds a peer will be forced to wait before being allowed to successful reconnect to the node
// after being rate limited.
UnicastRateLimitLockoutDuration time.Duration
// UnicastMessageRateLimit amount of unicast messages that can be sent by a peer per second.
UnicastMessageRateLimit int
// UnicastBandwidthRateLimit bandwidth size in bytes a peer is allowed to send via unicast streams per second.
UnicastBandwidthRateLimit int
// UnicastBandwidthBurstLimit bandwidth size in bytes a peer is allowed to send via unicast streams at once.
UnicastBandwidthBurstLimit int
// PeerUpdateInterval interval used by the libp2p node peer manager component to periodically request peer updates.
PeerUpdateInterval time.Duration
// UnicastMessageTimeout how long a unicast transmission can take to complete.
UnicastMessageTimeout time.Duration

PeerUpdateInterval time.Duration
UnicastMessageTimeout time.Duration
DNSCacheTTL time.Duration
LibP2PResourceManagerConfig *p2pbuilder.ResourceManagerConfig
ConnectionManagerConfig *connection.ManagerConfig
// UnicastCreateStreamRetryDelay initial delay used in the exponential backoff for create stream retries
UnicastCreateStreamRetryDelay time.Duration
// DNSCacheTTL time to live for DNS cache
DNSCacheTTL time.Duration
// LibP2PResourceManagerConfig configuration for p2pbuilder.ResourceManagerConfig
LibP2PResourceManagerConfig *p2pbuilder.ResourceManagerConfig
// ConnectionManagerConfig configuration for connection.ManagerConfig=
ConnectionManagerConfig *connection.ManagerConfig
// size of the queue for notifications about new peers in the disallow list.
DisallowListNotificationCacheSize uint32
// size of the queue for notifications about gossipsub RPC inspections.
GossipSubRPCInspectorNotificationCacheSize uint32
GossipSubRPCInspectorCacheSize uint32
UnicastRateLimitersConfig *UnicastRateLimitersConfig
GossipSubRPCValidationConfigs *p2pbuilder.GossipSubRPCValidationConfigs
}

// UnicastRateLimitersConfig unicast rate limiter configuration for the message and bandwidth rate limiters.
type UnicastRateLimitersConfig struct {
// DryRun setting this to true will disable connection disconnects and gating when unicast rate limiters are configured
DryRun bool
// LockoutDuration the number of seconds a peer will be forced to wait before being allowed to successful reconnect to the node
// after being rate limited.
LockoutDuration time.Duration
// MessageRateLimit amount of unicast messages that can be sent by a peer per second.
MessageRateLimit int
// BandwidthRateLimit bandwidth size in bytes a peer is allowed to send via unicast streams per second.
BandwidthRateLimit int
// BandwidthBurstLimit bandwidth size in bytes a peer is allowed to send via unicast streams at once.
BandwidthBurstLimit int
}

// NodeConfig contains all the derived parameters such the NodeID, private keys etc. and initialized instances of
Expand Down Expand Up @@ -272,6 +276,8 @@ type NodeConfig struct {
UnicastRateLimiterDistributor p2p.UnicastRateLimiterDistributor
// NodeDisallowListDistributor notifies consumers of updates to disallow listing of nodes.
NodeDisallowListDistributor p2p.DisallowListNotificationDistributor
// GossipSubInspectorNotifDistributor notifies consumers when an invalid RPC message is encountered.
GossipSubInspectorNotifDistributor p2p.GossipSubInspectorNotificationDistributor
}

func DefaultBaseConfig() *BaseConfig {
Expand All @@ -288,19 +294,33 @@ func DefaultBaseConfig() *BaseConfig {
PeerUpdateInterval: connection.DefaultPeerUpdateInterval,
UnicastMessageTimeout: middleware.DefaultUnicastTimeout,
NetworkReceivedMessageCacheSize: p2p.DefaultReceiveCacheSize,
// By default we let networking layer trim connections to all nodes that
// are no longer part of protocol state.
NetworkConnectionPruning: connection.ConnectionPruningEnabled,
GossipSubConfig: p2pbuilder.DefaultGossipSubConfig(),
UnicastMessageRateLimit: 0,
UnicastBandwidthRateLimit: 0,
UnicastBandwidthBurstLimit: middleware.LargeMsgMaxUnicastMsgSize,
UnicastRateLimitLockoutDuration: 10,
UnicastRateLimitDryRun: true,
UnicastRateLimitersConfig: &UnicastRateLimitersConfig{
DryRun: true,
LockoutDuration: 10,
MessageRateLimit: 0,
BandwidthRateLimit: 0,
BandwidthBurstLimit: middleware.LargeMsgMaxUnicastMsgSize,
},
GossipSubRPCValidationConfigs: &p2pbuilder.GossipSubRPCValidationConfigs{
NumberOfWorkers: validation.DefaultNumberOfWorkers,
GraftLimits: map[string]int{
validation.DiscardThresholdMapKey: validation.DefaultGraftDiscardThreshold,
validation.SafetyThresholdMapKey: validation.DefaultGraftSafetyThreshold,
validation.RateLimitMapKey: validation.DefaultGraftRateLimit,
},
PruneLimits: map[string]int{
validation.DiscardThresholdMapKey: validation.DefaultPruneDiscardThreshold,
validation.SafetyThresholdMapKey: validation.DefaultPruneSafetyThreshold,
validation.RateLimitMapKey: validation.DefaultPruneRateLimit,
},
},
DNSCacheTTL: dns.DefaultTimeToLive,
LibP2PResourceManagerConfig: p2pbuilder.DefaultResourceManagerConfig(),
ConnectionManagerConfig: connection.DefaultConnManagerConfig(),
NetworkConnectionPruning: connection.ConnectionPruningEnabled,
GossipSubConfig: p2pbuilder.DefaultGossipSubConfig(),
GossipSubRPCInspectorNotificationCacheSize: distributor.DefaultGossipSubInspectorNotificationQueueCacheSize,
GossipSubRPCInspectorCacheSize: validation.DefaultControlMsgValidationInspectorQueueCacheSize,
DisallowListNotificationCacheSize: distributor.DefaultDisallowListNotificationQueueCacheSize,
},
nodeIDHex: NotSet,
Expand Down
18 changes: 9 additions & 9 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/local"
"github.com/onflow/flow-go/module/mempool/queue"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/state_synchronization"
edrequester "github.com/onflow/flow-go/module/state_synchronization/requester"
Expand All @@ -62,7 +61,6 @@ import (
"github.com/onflow/flow-go/network/p2p/blob"
"github.com/onflow/flow-go/network/p2p/cache"
p2pdht "github.com/onflow/flow-go/network/p2p/dht"
"github.com/onflow/flow-go/network/p2p/distributor"
"github.com/onflow/flow-go/network/p2p/keyutils"
"github.com/onflow/flow-go/network/p2p/middleware"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
Expand Down Expand Up @@ -729,13 +727,7 @@ func (builder *ObserverServiceBuilder) InitIDProviders() {
}
builder.IDTranslator = translator.NewHierarchicalIDTranslator(idCache, translator.NewPublicNetworkIDTranslator())

heroStoreOpts := []queue.HeroStoreConfigOption{queue.WithHeroStoreSizeLimit(builder.DisallowListNotificationCacheSize)}
if builder.HeroCacheMetricsEnable {
collector := metrics.DisallowListNotificationQueueMetricFactory(builder.MetricsRegisterer)
heroStoreOpts = append(heroStoreOpts, queue.WithHeroStoreCollector(collector))
}

builder.NodeDisallowListDistributor = distributor.DefaultDisallowListNotificationDistributor(builder.Logger, heroStoreOpts...)
builder.NodeDisallowListDistributor = cmd.BuildDisallowListNotificationDisseminator(builder.DisallowListNotificationCacheSize, builder.MetricsRegisterer, builder.Logger, builder.MetricsEnabled)

// The following wrapper allows to black-list byzantine nodes via an admin command:
// the wrapper overrides the 'Ejected' flag of disallow-listed nodes to true
Expand Down Expand Up @@ -866,6 +858,13 @@ func (builder *ObserverServiceBuilder) initLibP2PFactory(networkKey crypto.Priva
builder.IdentityProvider,
builder.GossipSubConfig.LocalMeshLogInterval)

builder.GossipSubInspectorNotifDistributor = cmd.BuildGossipsubRPCValidationInspectorNotificationDisseminator(builder.GossipSubRPCInspectorNotificationCacheSize, builder.MetricsRegisterer, builder.Logger, builder.MetricsEnabled)
heroStoreOpts := cmd.BuildGossipsubRPCValidationInspectorHeroStoreOpts(builder.GossipSubRPCInspectorCacheSize, builder.MetricsRegisterer, builder.MetricsEnabled)
rpcValidationInspector, err := p2pbuilder.BuildGossipSubRPCValidationInspector(builder.Logger, builder.SporkID, builder.GossipSubRPCValidationConfigs, builder.GossipSubInspectorNotifDistributor, heroStoreOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create gossipsub rpc validation inspector: %w", err)
}

node, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
builder.Metrics.Network,
Expand All @@ -889,6 +888,7 @@ func (builder *ObserverServiceBuilder) initLibP2PFactory(networkKey crypto.Priva
SetStreamCreationRetryInterval(builder.UnicastCreateStreamRetryDelay).
SetGossipSubTracer(meshTracer).
SetGossipSubScoreTracerInterval(builder.GossipSubConfig.ScoreTracerInterval).
SetGossipSubValidationInspector(rpcValidationInspector).
Build()

if err != nil {
Expand Down

0 comments on commit b500369

Please sign in to comment.