Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import (
netcache "github.com/onflow/flow-go/network/cache"
"github.com/onflow/flow-go/network/channels"
cborcodec "github.com/onflow/flow-go/network/codec/cbor"
"github.com/onflow/flow-go/network/message"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/blob"
p2pbuilder "github.com/onflow/flow-go/network/p2p/builder"
Expand Down Expand Up @@ -2643,6 +2644,7 @@ func (builder *FlowAccessNodeBuilder) enqueuePublicNetworkInit() {
SlashingViolationConsumerFactory: func(adapter network.ConduitAdapter) network.ViolationsConsumer {
return slashing.NewSlashingViolationsConsumer(builder.Logger, builder.Metrics.Network, adapter)
},
UnicastStreamAuthorizer: message.AlwaysAuthorizedUnicastSenderRole,
}, underlay.WithMessageValidators(msgValidators...))
if err != nil {
return nil, fmt.Errorf("could not initialize network: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ import (
netcache "github.com/onflow/flow-go/network/cache"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/network/converter"
"github.com/onflow/flow-go/network/message"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/blob"
"github.com/onflow/flow-go/network/p2p/cache"
Expand Down Expand Up @@ -1857,6 +1858,7 @@ func (builder *ObserverServiceBuilder) enqueuePublicNetworkInit() {
SlashingViolationConsumerFactory: func(adapter network.ConduitAdapter) network.ViolationsConsumer {
return slashing.NewSlashingViolationsConsumer(builder.Logger, builder.Metrics.Network, adapter)
},
UnicastStreamAuthorizer: message.AlwaysAuthorizedUnicastSenderRole,
}, underlay.WithMessageValidators(publicNetworkMsgValidators(node.Logger, node.IdentityProvider, node.NodeID)...))
if err != nil {
return nil, fmt.Errorf("could not initialize network: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/onflow/flow-go/network/channels"
cborcodec "github.com/onflow/flow-go/network/codec/cbor"
"github.com/onflow/flow-go/network/converter"
"github.com/onflow/flow-go/network/message"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/cache"
"github.com/onflow/flow-go/network/p2p/conduit"
Expand Down Expand Up @@ -626,6 +627,7 @@ func (builder *FollowerServiceBuilder) enqueuePublicNetworkInit() {
SlashingViolationConsumerFactory: func(adapter network.ConduitAdapter) network.ViolationsConsumer {
return slashing.NewSlashingViolationsConsumer(builder.Logger, builder.Metrics.Network, adapter)
},
UnicastStreamAuthorizer: message.AlwaysAuthorizedUnicastSenderRole,
}, underlay.WithMessageValidators(publicNetworkMsgValidators(node.Logger, node.IdentityProvider, node.NodeID)...))
if err != nil {
return nil, fmt.Errorf("could not initialize network: %w", err)
Expand Down
39 changes: 39 additions & 0 deletions network/message/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,45 @@ func initializeMessageAuthConfigsMap() {
}
}

// unicastRoleAuthorization defines which sender roles are authorized to open
// unicast streams to each receiver role. This is a more restrictive check than
// channel-based authorization, applied before any message data is read.
//
// The map key is the receiver role, and the value is the list of sender roles
// allowed to initiate unicast streams to that receiver.
var unicastRoleAuthorization = map[flow.Role]flow.RoleList{
// Consensus nodes can receive unicasts from: Consensus (sync), Execution (receipts), Verification (approvals)
flow.RoleConsensus: {flow.RoleConsensus, flow.RoleExecution, flow.RoleVerification},
// Collection nodes can receive unicasts from: Consensus (sync), Execution (state requests), Collection (cluster sync), Access (collection requests)
flow.RoleCollection: {flow.RoleConsensus, flow.RoleExecution, flow.RoleCollection, flow.RoleAccess},
// Execution nodes can receive unicasts from: Consensus (sync), Collection (collections)
flow.RoleExecution: {flow.RoleConsensus, flow.RoleCollection},
// Verification nodes can receive unicasts from: Consensus (sync), Execution (chunk data)
flow.RoleVerification: {flow.RoleConsensus, flow.RoleExecution},
// Access nodes can receive unicasts from: Consensus (sync), Collection (collection responses)
flow.RoleAccess: {flow.RoleConsensus, flow.RoleCollection},
}

// IsAuthorizedUnicastSenderRole checks whether the given sender role is authorized to open a unicast
// stream to the given receiver role. This is used for pre-authorization of unicast streams before
// any message data is read.
//
// This authorization is intentionally more restrictive than channel-based authorization. It is
// rather than derived from channel subscriptions, because channel subscriptions are broader than
// what is correct for unicast (e.g. Access nodes subscribe to RequestCollections to receive
// responses, but should not be unicast targets from other Access nodes).
func IsAuthorizedUnicastSenderRole(sender flow.Role, receiver flow.Role) bool {
senders, ok := unicastRoleAuthorization[receiver]
return ok && senders.Contains(sender)
}

// AlwaysAuthorizedUnicastSenderRole is unicast stream authorizer that always returns true.
//
// This is used for the public network where peers are not authorized based on role.
func AlwaysAuthorizedUnicastSenderRole(sender, receiver flow.Role) bool {
return true
}

// GetMessageAuthConfig checks the underlying type and returns the correct
// message auth Config.
// Expected error returns during normal operations:
Expand Down
119 changes: 119 additions & 0 deletions network/message/authorization_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package message

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/flow"
)

func TestIsAuthorizedUnicastSender(t *testing.T) {
// Consensus nodes can send unicast to all roles (sync protocol)
for _, receiver := range flow.Roles() {
require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleConsensus, receiver), "consensus -> %s should be authorized", receiver)
}

// Execution nodes can unicast to: Consensus, Collection, Verification
require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleConsensus))
require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleCollection))
require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleVerification))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleExecution))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleExecution, flow.RoleAccess))

// Collection nodes can unicast to: Collection, Execution, Access
require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleCollection))
require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleExecution))
require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleAccess))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleConsensus))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleCollection, flow.RoleVerification))

// Verification nodes can unicast to: Consensus only
require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleConsensus))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleCollection))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleExecution))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleVerification))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleVerification, flow.RoleAccess))

// Access nodes can unicast to: Collection only
require.True(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleCollection))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleConsensus))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleExecution))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleVerification))
require.False(t, IsAuthorizedUnicastSenderRole(flow.RoleAccess, flow.RoleAccess))
}

// TestAlwaysAuthorizedUnicastSenderRole verifies that the public-network authorizer permits all role pairs.
func TestAlwaysAuthorizedUnicastSenderRole(t *testing.T) {
for _, sender := range flow.Roles() {
for _, receiver := range flow.Roles() {
require.True(t, AlwaysAuthorizedUnicastSenderRole(sender, receiver),
"AlwaysAuthorizedUnicastSenderRole should permit %s -> %s", sender, receiver)
}
}
}

// TestIsAuthorizedUnicastSender_CrossValidation ensures that the explicit unicast role authorization
// is consistent with the message-level authorization configs. Specifically:
// - Any sender->receiver pair permitted by IsAuthorizedUnicastSenderRole should have at least one
// unicast message config that permits that pair.
// - Any sender->receiver pair permitted by a unicast message config should be permitted by
// IsAuthorizedUnicastSenderRole, unless explicitly intentionally restricted.
func TestIsAuthorizedUnicastSender_CrossValidation(t *testing.T) {
// Build a map of sender -> set of receivers based on unicast message configs
derived := make(map[flow.Role]flow.RoleList)
for _, cfg := range GetAllMessageAuthConfigs() {
for _, channelCfg := range cfg.Config {
if !channelCfg.AllowedProtocols.Contains(ProtocolTypeUnicast) {
continue
}
for _, sender := range channelCfg.AuthorizedRoles {
for _, receiver := range flow.Roles() {
// If sender is authorized to send this unicast message type,
// they could potentially target any receiver who subscribes
// For now, assume all roles could be receivers
if !derived[sender].Contains(receiver) {
derived[sender] = append(derived[sender], receiver)
}
}
}
}
}

// Check that IsAuthorizedUnicastSenderRole is at least as restrictive as message configs
for _, sender := range flow.Roles() {
derivedReceivers, ok := derived[sender]
for _, receiver := range flow.Roles() {
if IsAuthorizedUnicastSenderRole(sender, receiver) {
require.True(t, ok, "sender role %s is authorized but has no unicast message configs", sender)
require.True(t, derivedReceivers.Contains(receiver),
"IsAuthorizedUnicastSender allows %s -> %s but no unicast message config supports this", sender, receiver)
}
}
}

// Check that message configs don't permit more than IsAuthorizedUnicastSenderRole
// (with documented exceptions)
intentionalRestrictions := map[flow.Role]flow.RoleList{
// Execution nodes can send ChunkDataResponse to Verification, but we intentionally
// don't allow Execution -> Execution unicast streams
flow.RoleExecution: {flow.RoleExecution, flow.RoleAccess},
// Collection nodes can send ClusterBlockResponse to other Collection nodes,
// but we intentionally restrict some paths
flow.RoleCollection: {flow.RoleConsensus, flow.RoleVerification},
// Verification can send ApprovalResponse to Consensus, which is allowed
flow.RoleVerification: {flow.RoleCollection, flow.RoleExecution, flow.RoleVerification, flow.RoleAccess},
// Access nodes only need to request collections from Collection nodes
flow.RoleAccess: {flow.RoleConsensus, flow.RoleExecution, flow.RoleVerification, flow.RoleAccess},
}

for senderRole, derivedReceivers := range derived {
for _, receiver := range derivedReceivers {
if intentionalRestrictions[senderRole].Contains(receiver) {
continue // intentionally restricted
}
require.True(t, IsAuthorizedUnicastSenderRole(senderRole, receiver),
"message configs allow %s -> %s via unicast but IsAuthorizedUnicastSender rejects it — update the authorization map or add to intentionalRestrictions", senderRole, receiver)
}
}
}
12 changes: 12 additions & 0 deletions network/underlay/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,17 @@ type NetworkConfig struct {
Libp2pNode p2p.LibP2PNode
BitSwapMetrics module.BitswapMetrics
SlashingViolationConsumerFactory func(network.ConduitAdapter) network.ViolationsConsumer
UnicastStreamAuthorizer func(flow.Role, flow.Role) bool
}

// Validate validates the configuration, and sets default values for any missing fields.
func (cfg *NetworkConfig) Validate() {
if cfg.UnicastMessageTimeout <= 0 {
cfg.UnicastMessageTimeout = DefaultUnicastTimeout
}
if cfg.UnicastStreamAuthorizer == nil {
cfg.UnicastStreamAuthorizer = message.IsAuthorizedUnicastSenderRole
}
}

// NetworkConfigOption is a function that can be used to override network config parmeters.
Expand Down Expand Up @@ -200,6 +204,14 @@ func WithSlashingViolationConsumerFactory(factory func(adapter network.ConduitAd
}
}

// WithUnicastStreamAuthorizer sets a custom unicast stream authorizer function.
// This function is called to authorize incoming unicast streams based on sender and receiver roles.
func WithUnicastStreamAuthorizer(authorizer func(flow.Role, flow.Role) bool) NetworkConfigOption {
return func(params *NetworkConfig) {
params.UnicastStreamAuthorizer = authorizer
}
}

// NetworkOption is a function that can be used to override network attributes.
// It is mostly used for testing purposes.
// Note: do not override network attributes in production unless you know what you are doing.
Expand Down
Loading