Skip to content

Commit

Permalink
Merge branch 'auto-cadence-upgrade/1631553065/supun/script-params' of…
Browse files Browse the repository at this point in the history
… github.com:onflow/flow-go into auto-cadence-upgrade/1631554923/supun/port-pr29
  • Loading branch information
janezpodhostnik committed Sep 15, 2021
2 parents e5fa1ea + 5405ee6 commit d2320be
Show file tree
Hide file tree
Showing 161 changed files with 10,189 additions and 2,676 deletions.
148 changes: 52 additions & 96 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package node_builder

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/rs/zerolog"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -36,12 +35,13 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/buffer"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/mempool/stdmap"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/signature"
"github.com/onflow/flow-go/module/synchronization"
"github.com/onflow/flow-go/network"
jsoncodec "github.com/onflow/flow-go/network/codec/json"
cborcodec "github.com/onflow/flow-go/network/codec/cbor"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/validator"
"github.com/onflow/flow-go/state/protocol"
Expand Down Expand Up @@ -73,10 +73,6 @@ type AccessNodeBuilder interface {
// IsStaked returns True is this is a staked Access Node, False otherwise
IsStaked() bool

// ParticipatesInUnstakedNetwork returns True if this is a staked Access node which also participates
// in the unstaked network acting as an upstream for other unstaked access nodes, False otherwise.
ParticipatesInUnstakedNetwork() bool

// Build defines all of the Access node's components and modules.
Build() AccessNodeBuilder
}
Expand All @@ -88,8 +84,9 @@ type AccessNodeConfig struct {
staked bool
bootstrapNodeAddresses []string
bootstrapNodePublicKeys []string
bootstrapIdentites flow.IdentityList // the identity list of bootstrap peers the node uses to discover other nodes
unstakedNetworkBindAddr string
bootstrapIdentities flow.IdentityList // the identity list of bootstrap peers the node uses to discover other nodes
NetworkKey crypto.PrivateKey // the networking key passed in by the caller when being used as a library
supportsUnstakedFollower bool // True if this is a staked Access node which also supports unstaked access nodes/unstaked consensus follower engines
collectionGRPCPort uint
executionGRPCPort uint
pingEnabled bool
Expand All @@ -113,9 +110,9 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
collectionGRPCPort: 9000,
executionGRPCPort: 9000,
rpcConf: rpc.Config{
UnsecureGRPCListenAddr: "localhost:9000",
SecureGRPCListenAddr: "localhost:9001",
HTTPListenAddr: "localhost:8000",
UnsecureGRPCListenAddr: "0.0.0.0:9000",
SecureGRPCListenAddr: "0.0.0.0:9001",
HTTPListenAddr: "0.0.0.0:8000",
CollectionAddr: "",
HistoricalAccessAddrs: "",
CollectionClientTimeout: 3 * time.Second,
Expand All @@ -137,7 +134,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
staked: true,
bootstrapNodeAddresses: []string{},
bootstrapNodePublicKeys: []string{},
unstakedNetworkBindAddr: cmd.NotSet,
supportsUnstakedFollower: false,
}
}

Expand All @@ -149,9 +146,7 @@ type FlowAccessNodeBuilder struct {
*AccessNodeConfig

// components
UnstakedLibP2PNode *p2p.Node
UnstakedNetwork *p2p.Network
unstakedMiddleware *p2p.Middleware
LibP2PNode *p2p.Node
FollowerState protocol.MutableState
SyncCore *synchronization.Core
RpcEng *rpc.Engine
Expand All @@ -168,6 +163,10 @@ type FlowAccessNodeBuilder struct {
Finalized *flow.Header
Pending []*flow.Header
FollowerCore module.HotStuffFollower
// for the untsaked access node, the sync engine participants provider is the libp2p peer store which is not
// available until after the network has started. Hence, a factory function that needs to be called just before
// creating the sync engine
SyncEngineParticipantsProviderFactory func() id.IdentifierProvider

// engines
IngestEng *ingestion.Engine
Expand Down Expand Up @@ -267,7 +266,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuilder {
builder.Component("follower engine", func(_ cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize cleaner for DB
cleaner := storage.NewCleaner(node.Logger, node.DB, metrics.NewCleanerCollector(), flow.DefaultValueLogGCFrequency)
cleaner := storage.NewCleaner(node.Logger, node.DB, builder.Metrics.CleanCollector, flow.DefaultValueLogGCFrequency)
conCache := buffer.NewPendingBlocks()

followerEng, err := follower.New(
Expand Down Expand Up @@ -320,7 +319,7 @@ func (builder *FlowAccessNodeBuilder) buildSyncEngine() *FlowAccessNodeBuilder {
builder.FollowerEng,
builder.SyncCore,
builder.FinalizedHeader,
node.State,
builder.SyncEngineParticipantsProviderFactory(),
)
if err != nil {
return nil, fmt.Errorf("could not create synchronization engine: %w", err)
Expand Down Expand Up @@ -473,10 +472,13 @@ func (anb *FlowAccessNodeBuilder) Build() AccessNodeBuilder {

anb.IngestEng, err = ingestion.New(node.Logger, node.Network, node.State, node.Me, anb.RequestEng, node.Storage.Blocks, node.Storage.Headers, node.Storage.Collections, node.Storage.Transactions, node.Storage.Results, node.Storage.Receipts, anb.TransactionMetrics,
anb.CollectionsToMarkFinalized, anb.CollectionsToMarkExecuted, anb.BlocksToMarkExecuted, anb.RpcEng)
if err != nil {
return nil, err
}
anb.RequestEng.WithHandle(anb.IngestEng.OnCollection)
anb.FinalizationDistributor.AddConsumer(anb.IngestEng)

return anb.IngestEng, err
return anb.IngestEng, nil
}).
Component("requester engine", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// We initialize the requester engine inside the ingestion engine due to the mutual dependency. However, in
Expand All @@ -492,13 +494,19 @@ type Option func(*AccessNodeConfig)

func WithBootStrapPeers(bootstrapNodes ...*flow.Identity) Option {
return func(config *AccessNodeConfig) {
config.bootstrapIdentites = bootstrapNodes
config.bootstrapIdentities = bootstrapNodes
}
}

func SupportsUnstakedNode(enable bool) Option {
return func(config *AccessNodeConfig) {
config.supportsUnstakedFollower = enable
}
}

func WithUnstakedNetworkBindAddr(bindAddr string) Option {
func WithNetworkKey(key crypto.PrivateKey) Option {
return func(config *AccessNodeConfig) {
config.unstakedNetworkBindAddr = bindAddr
config.NetworkKey = key
}
}

Expand All @@ -524,23 +532,14 @@ func (builder *FlowAccessNodeBuilder) IsStaked() bool {
return builder.staked
}

func (builder *FlowAccessNodeBuilder) ParticipatesInUnstakedNetwork() bool {
// unstaked access nodes can't be upstream of other unstaked access nodes for now
if !builder.IsStaked() {
return false
}
// if an unstaked network bind address is provided, then this staked access node will act as the upstream for
// unstaked access nodes
return builder.unstakedNetworkBindAddr != cmd.NotSet
}

func (builder *FlowAccessNodeBuilder) ParseFlags() {

builder.BaseFlags()

builder.extraFlags()

builder.ParseAndPrintFlags()

}

func (builder *FlowAccessNodeBuilder) extraFlags() {
Expand Down Expand Up @@ -572,95 +571,52 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.BoolVar(&builder.staked, "staked", defaultConfig.staked, "whether this node is a staked access node or not")
flags.StringSliceVar(&builder.bootstrapNodeAddresses, "bootstrap-node-addresses", defaultConfig.bootstrapNodeAddresses, "the network addresses of the bootstrap access node if this is an unstaked access node e.g. access-001.mainnet.flow.org:9653,access-002.mainnet.flow.org:9653")
flags.StringSliceVar(&builder.bootstrapNodePublicKeys, "bootstrap-node-public-keys", defaultConfig.bootstrapNodePublicKeys, "the networking public key of the bootstrap access node if this is an unstaked access node (in the same order as the bootstrap node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.StringVar(&builder.unstakedNetworkBindAddr, "unstaked-bind-addr", defaultConfig.unstakedNetworkBindAddr, "address to bind on for the unstaked network")
flags.BoolVar(&builder.supportsUnstakedFollower, "supports-unstaked-node", defaultConfig.supportsUnstakedFollower, "true if this staked access node supports unstaked node")
})
}

// initLibP2PFactory creates the LibP2P factory function for the given node ID and network key.
// The factory function is later passed into the initMiddleware function to eventually instantiate the p2p.LibP2PNode instance
func (builder *FlowAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
nodeID flow.Identifier,
networkKey crypto.PrivateKey) (p2p.LibP2PFactoryFunc, error) {

// The staked nodes act as the DHT servers
dhtOptions := []dht.Option{p2p.AsServer(builder.IsStaked())}

// if this is an unstaked access node, then seed the DHT with the boostrap identities
if !builder.IsStaked() {
bootstrapPeersOpt, err := p2p.WithBootstrapPeers(builder.bootstrapIdentites)
builder.MustNot(err)
dhtOptions = append(dhtOptions, bootstrapPeersOpt)
}

return func() (*p2p.Node, error) {
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, builder.unstakedNetworkBindAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID().String()).
// unlike the staked network where currently all the node addresses are known upfront,
// for the unstaked network the nodes need to discover each other using DHT Discovery.
SetPubsubOptions(p2p.WithDHTDiscovery(dhtOptions...)).
SetLogger(builder.Logger).
Build(ctx)
if err != nil {
return nil, err
}
builder.UnstakedLibP2PNode = libp2pNode
return builder.UnstakedLibP2PNode, nil
}, nil
}

// initMiddleware creates the network.Middleware implementation with the libp2p factory function, metrics, peer update
// interval, and validators. The network.Middleware is then passed into the initNetwork function.
func (builder *FlowAccessNodeBuilder) initMiddleware(nodeID flow.Identifier,
networkMetrics module.NetworkMetrics,
factoryFunc p2p.LibP2PFactoryFunc,
validators ...network.MessageValidator) *p2p.Middleware {
builder.unstakedMiddleware = p2p.NewMiddleware(builder.Logger,
factoryFunc,
nodeID,
networkMetrics,
builder.RootBlock.ID().String(),
time.Hour, // TODO: this is pretty meaningless since there is no peermanager in play.
p2p.DefaultUnicastTimeout,
false, // no connection gating for the unstaked network
false, // no peer management for the unstaked network (peer discovery will be done via LibP2P discovery mechanism)
validators...)
return builder.unstakedMiddleware
}

// initNetwork creates the network.Network implementation with the given metrics, middleware, initial list of network
// participants and topology used to choose peers from the list of participants. The list of participants can later be
// updated by calling network.SetIDs.
func (builder *FlowAccessNodeBuilder) initNetwork(nodeID module.Local,
networkMetrics module.NetworkMetrics,
middleware *p2p.Middleware,
participants flow.IdentityList,
topology network.Topology) (*p2p.Network, error) {

codec := jsoncodec.NewCodec()
middleware network.Middleware,
topology network.Topology,
) (*p2p.Network, error) {

subscriptionManager := p2p.NewChannelSubscriptionManager(middleware)
codec := cborcodec.NewCodec()

// creates network instance
net, err := p2p.NewNetwork(builder.Logger,
net, err := p2p.NewNetwork(
builder.Logger,
codec,
participants,
nodeID,
builder.unstakedMiddleware,
builder.Middleware,
p2p.DefaultCacheSize,
topology,
subscriptionManager,
networkMetrics)
p2p.NewChannelSubscriptionManager(middleware),
networkMetrics,
builder.IdentityProvider,
)
if err != nil {
return nil, fmt.Errorf("could not initialize network: %w", err)
}

return net, nil
}

func unstakedNetworkMsgValidators(selfID flow.Identifier) []network.MessageValidator {
func unstakedNetworkMsgValidators(log zerolog.Logger, idProvider id.IdentityProvider, selfID flow.Identifier) []network.MessageValidator {
return []network.MessageValidator{
// filter out messages sent by this node itself
validator.ValidateNotSender(selfID),
validator.NewAnyValidator(
// message should be either from a valid staked node
validator.NewOriginValidator(
id.NewFilteredIdentifierProvider(filter.IsValidCurrentEpochParticipant, idProvider),
),
// or the message should be specifically targeted for this node
validator.ValidateTarget(log, selfID),
),
}
}

Expand Down

0 comments on commit d2320be

Please sign in to comment.