From 5da9257a96d06a6154a891445b8d0c98d73209bb Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 7 Apr 2026 16:53:09 -0700 Subject: [PATCH] Override role based unicast stream authorization on public network --- .../node_builder/access_node_builder.go | 2 + cmd/observer/node_builder/observer_builder.go | 2 + follower/follower_builder.go | 2 + network/message/authorization.go | 39 ++++++ network/message/authorization_test.go | 119 ++++++++++++++++++ network/underlay/network.go | 12 ++ 6 files changed, 176 insertions(+) create mode 100644 network/message/authorization_test.go diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 75662764628..6abd896fff9 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -102,6 +102,7 @@ import ( netcache "github.com/onflow/flow-go/network/cache" "github.com/onflow/flow-go/network/channels" cborcodec "github.com/onflow/flow-go/network/codec/cbor" + "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/blob" p2pbuilder "github.com/onflow/flow-go/network/p2p/builder" @@ -2643,6 +2644,7 @@ func (builder *FlowAccessNodeBuilder) enqueuePublicNetworkInit() { SlashingViolationConsumerFactory: func(adapter network.ConduitAdapter) network.ViolationsConsumer { return slashing.NewSlashingViolationsConsumer(builder.Logger, builder.Metrics.Network, adapter) }, + UnicastStreamAuthorizer: message.AlwaysAuthorizedUnicastSenderRole, }, underlay.WithMessageValidators(msgValidators...)) if err != nil { return nil, fmt.Errorf("could not initialize network: %w", err) diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index a6bee57590c..311effb1465 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -98,6 +98,7 @@ import ( netcache "github.com/onflow/flow-go/network/cache" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/converter" + "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/blob" "github.com/onflow/flow-go/network/p2p/cache" @@ -1857,6 +1858,7 @@ func (builder *ObserverServiceBuilder) enqueuePublicNetworkInit() { SlashingViolationConsumerFactory: func(adapter network.ConduitAdapter) network.ViolationsConsumer { return slashing.NewSlashingViolationsConsumer(builder.Logger, builder.Metrics.Network, adapter) }, + UnicastStreamAuthorizer: message.AlwaysAuthorizedUnicastSenderRole, }, underlay.WithMessageValidators(publicNetworkMsgValidators(node.Logger, node.IdentityProvider, node.NodeID)...)) if err != nil { return nil, fmt.Errorf("could not initialize network: %w", err) diff --git a/follower/follower_builder.go b/follower/follower_builder.go index 7bc46b95950..a1947ff3818 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -40,6 +40,7 @@ import ( "github.com/onflow/flow-go/network/channels" cborcodec "github.com/onflow/flow-go/network/codec/cbor" "github.com/onflow/flow-go/network/converter" + "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/cache" "github.com/onflow/flow-go/network/p2p/conduit" @@ -626,6 +627,7 @@ func (builder *FollowerServiceBuilder) enqueuePublicNetworkInit() { SlashingViolationConsumerFactory: func(adapter network.ConduitAdapter) network.ViolationsConsumer { return slashing.NewSlashingViolationsConsumer(builder.Logger, builder.Metrics.Network, adapter) }, + UnicastStreamAuthorizer: message.AlwaysAuthorizedUnicastSenderRole, }, underlay.WithMessageValidators(publicNetworkMsgValidators(node.Logger, node.IdentityProvider, node.NodeID)...)) if err != nil { return nil, fmt.Errorf("could not initialize network: %w", err) diff --git a/network/message/authorization.go b/network/message/authorization.go index a964f2cea24..02bcd3b29d7 100644 --- a/network/message/authorization.go +++ b/network/message/authorization.go @@ -398,6 +398,45 @@ func initializeMessageAuthConfigsMap() { } } +// unicastRoleAuthorization defines which sender roles are authorized to open +// unicast streams to each receiver role. This is a more restrictive check than +// channel-based authorization, applied before any message data is read. +// +// The map key is the receiver role, and the value is the list of sender roles +// allowed to initiate unicast streams to that receiver. +var unicastRoleAuthorization = map[flow.Role]flow.RoleList{ + // Consensus nodes can receive unicasts from: Consensus (sync), Execution (receipts), Verification (approvals) + flow.RoleConsensus: {flow.RoleConsensus, flow.RoleExecution, flow.RoleVerification}, + // Collection nodes can receive unicasts from: Consensus (sync), Execution (state requests), Collection (cluster sync), Access (collection requests) + flow.RoleCollection: {flow.RoleConsensus, flow.RoleExecution, flow.RoleCollection, flow.RoleAccess}, + // Execution nodes can receive unicasts from: Consensus (sync), Collection (collections) + flow.RoleExecution: {flow.RoleConsensus, flow.RoleCollection}, + // Verification nodes can receive unicasts from: Consensus (sync), Execution (chunk data) + flow.RoleVerification: {flow.RoleConsensus, flow.RoleExecution}, + // Access nodes can receive unicasts from: Consensus (sync), Collection (collection responses) + flow.RoleAccess: {flow.RoleConsensus, flow.RoleCollection}, +} + +// IsAuthorizedUnicastSenderRole checks whether the given sender role is authorized to open a unicast +// stream to the given receiver role. This is used for pre-authorization of unicast streams before +// any message data is read. +// +// This authorization is intentionally more restrictive than channel-based authorization. It is +// rather than derived from channel subscriptions, because channel subscriptions are broader than +// what is correct for unicast (e.g. Access nodes subscribe to RequestCollections to receive +// responses, but should not be unicast targets from other Access nodes). +func IsAuthorizedUnicastSenderRole(sender flow.Role, receiver flow.Role) bool { + senders, ok := unicastRoleAuthorization[receiver] + return ok && senders.Contains(sender) +} + +// AlwaysAuthorizedUnicastSenderRole is unicast stream authorizer that always returns true. +// +// This is used for the public network where peers are not authorized based on role. +func AlwaysAuthorizedUnicastSenderRole(sender, receiver flow.Role) bool { + return true +} + // GetMessageAuthConfig checks the underlying type and returns the correct // message auth Config. // Expected error returns during normal operations: diff --git a/network/message/authorization_test.go b/network/message/authorization_test.go new file mode 100644 index 00000000000..d88a8b25932 --- /dev/null +++ b/network/message/authorization_test.go @@ -0,0 +1,119 @@ +package message + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" +) + +func TestIsAuthorizedUnicastSender(t *testing.T) { + // Consensus nodes can send unicast to all roles (sync protocol) + for _, receiver := range flow.Roles() { + require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleConsensus, receiver), "consensus -> %s should be authorized", receiver) + } + + // Execution nodes can unicast to: Consensus, Collection, Verification + require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleConsensus)) + require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleCollection)) + require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleVerification)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleExecution)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleAccess)) + + // Collection nodes can unicast to: Collection, Execution, Access + require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleCollection)) + require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleExecution)) + require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleAccess)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleConsensus)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleVerification)) + + // Verification nodes can unicast to: Consensus only + require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleConsensus)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleCollection)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleExecution)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleVerification)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleAccess)) + + // Access nodes can unicast to: Collection only + require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleCollection)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleConsensus)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleExecution)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleVerification)) + require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleAccess)) +} + +// TestAlwaysAuthorizedUnicastSenderRole verifies that the public-network authorizer permits all role pairs. +func TestAlwaysAuthorizedUnicastSenderRole(t *testing.T) { + for _, sender := range flow.Roles() { + for _, receiver := range flow.Roles() { + require.True(t, AlwaysAuthorizedUnicastSenderRole(sender, receiver), + "AlwaysAuthorizedUnicastSenderRole should permit %s -> %s", sender, receiver) + } + } +} + +// TestIsAuthorizedUnicastSender_CrossValidation ensures that the explicit unicast role authorization +// is consistent with the message-level authorization configs. Specifically: +// - Any sender->receiver pair permitted by IsAuthorizedUnicastSenderRole should have at least one +// unicast message config that permits that pair. +// - Any sender->receiver pair permitted by a unicast message config should be permitted by +// IsAuthorizedUnicastSenderRole, unless explicitly intentionally restricted. +func TestIsAuthorizedUnicastSender_CrossValidation(t *testing.T) { + // Build a map of sender -> set of receivers based on unicast message configs + derived := make(map[flow.Role]flow.RoleList) + for _, cfg := range GetAllMessageAuthConfigs() { + for _, channelCfg := range cfg.Config { + if !channelCfg.AllowedProtocols.Contains(ProtocolTypeUnicast) { + continue + } + for _, sender := range channelCfg.AuthorizedRoles { + for _, receiver := range flow.Roles() { + // If sender is authorized to send this unicast message type, + // they could potentially target any receiver who subscribes + // For now, assume all roles could be receivers + if !derived[sender].Contains(receiver) { + derived[sender] = append(derived[sender], receiver) + } + } + } + } + } + + // Check that IsAuthorizedUnicastSenderRole is at least as restrictive as message configs + for _, sender := range flow.Roles() { + derivedReceivers, ok := derived[sender] + for _, receiver := range flow.Roles() { + if IsAuthorizedUnicastSenderRole(sender, receiver) { + require.True(t, ok, "sender role %s is authorized but has no unicast message configs", sender) + require.True(t, derivedReceivers.Contains(receiver), + "IsAuthorizedUnicastSender allows %s -> %s but no unicast message config supports this", sender, receiver) + } + } + } + + // Check that message configs don't permit more than IsAuthorizedUnicastSenderRole + // (with documented exceptions) + intentionalRestrictions := map[flow.Role]flow.RoleList{ + // Execution nodes can send ChunkDataResponse to Verification, but we intentionally + // don't allow Execution -> Execution unicast streams + flow.RoleExecution: {flow.RoleExecution, flow.RoleAccess}, + // Collection nodes can send ClusterBlockResponse to other Collection nodes, + // but we intentionally restrict some paths + flow.RoleCollection: {flow.RoleConsensus, flow.RoleVerification}, + // Verification can send ApprovalResponse to Consensus, which is allowed + flow.RoleVerification: {flow.RoleCollection, flow.RoleExecution, flow.RoleVerification, flow.RoleAccess}, + // Access nodes only need to request collections from Collection nodes + flow.RoleAccess: {flow.RoleConsensus, flow.RoleExecution, flow.RoleVerification, flow.RoleAccess}, + } + + for senderRole, derivedReceivers := range derived { + for _, receiver := range derivedReceivers { + if intentionalRestrictions[senderRole].Contains(receiver) { + continue // intentionally restricted + } + require.True(t, IsAuthorizedUnicastSenderRole(senderRole, receiver), + "message configs allow %s -> %s via unicast but IsAuthorizedUnicastSender rejects it — update the authorization map or add to intentionalRestrictions", senderRole, receiver) + } + } +} diff --git a/network/underlay/network.go b/network/underlay/network.go index f001cfe84ec..6885b226c4b 100644 --- a/network/underlay/network.go +++ b/network/underlay/network.go @@ -162,6 +162,7 @@ type NetworkConfig struct { Libp2pNode p2p.LibP2PNode BitSwapMetrics module.BitswapMetrics SlashingViolationConsumerFactory func(network.ConduitAdapter) network.ViolationsConsumer + UnicastStreamAuthorizer func(flow.Role, flow.Role) bool } // Validate validates the configuration, and sets default values for any missing fields. @@ -169,6 +170,9 @@ func (cfg *NetworkConfig) Validate() { if cfg.UnicastMessageTimeout <= 0 { cfg.UnicastMessageTimeout = DefaultUnicastTimeout } + if cfg.UnicastStreamAuthorizer == nil { + cfg.UnicastStreamAuthorizer = message.IsAuthorizedUnicastSenderRole + } } // NetworkConfigOption is a function that can be used to override network config parmeters. @@ -200,6 +204,14 @@ func WithSlashingViolationConsumerFactory(factory func(adapter network.ConduitAd } } +// WithUnicastStreamAuthorizer sets a custom unicast stream authorizer function. +// This function is called to authorize incoming unicast streams based on sender and receiver roles. +func WithUnicastStreamAuthorizer(authorizer func(flow.Role, flow.Role) bool) NetworkConfigOption { + return func(params *NetworkConfig) { + params.UnicastStreamAuthorizer = authorizer + } +} + // NetworkOption is a function that can be used to override network attributes. // It is mostly used for testing purposes. // Note: do not override network attributes in production unless you know what you are doing.