Skip to content

Commit

Permalink
Merge #1355
Browse files Browse the repository at this point in the history
1355: [Network] Middleware component r=smnzhu a=smnzhu

* Refactor middleware to implement the `Component` interface.
* Introduces new `ComponentManager` struct to help implement `Component` interface
* Various refactoring in network layer and scaffold to enable the changes above.

### TODO

- [x] As mentioned in #1167 (comment), we should probably explicitly throw an error when `Start` is called multiple times, instead of simply ignoring subsequent calls
- [x] Update the godoc for Startable to reflect this
- [x] Add tests for ComponentManager

Co-authored-by: Simon Zhu <simon.zsiyan@gmail.com>
  • Loading branch information
bors[bot] and synzhu committed Oct 12, 2021
2 parents be64190 + 9bcb5d7 commit 361ba2d
Show file tree
Hide file tree
Showing 105 changed files with 2,439 additions and 1,155 deletions.
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ generate-proto:

.PHONY: generate-mocks
generate-mocks:
GO111MODULE=on mockery -name '(ReadyDoneAwareNetwork|Connector|PingInfoProvider)' -dir=network/p2p -case=underscore -output="./network/mocknetwork" -outpkg="mocknetwork"
GO111MODULE=on mockery -name '(Connector|PingInfoProvider)' -dir=network/p2p -case=underscore -output="./network/mocknetwork" -outpkg="mocknetwork"
GO111MODULE=on mockgen -destination=storage/mocks/storage.go -package=mocks github.com/onflow/flow-go/storage Blocks,Headers,Payloads,Collections,Commits,Events,ServiceEvents,TransactionResults
GO111MODULE=on mockgen -destination=module/mocks/network.go -package=mocks github.com/onflow/flow-go/module Network,Local,Requester
GO111MODULE=on mockgen -destination=module/mocks/network.go -package=mocks github.com/onflow/flow-go/module Local,Requester
GO111MODULE=on mockgen -destination=network/mocknetwork/engine.go -package=mocknetwork github.com/onflow/flow-go/network Engine
GO111MODULE=on mockgen -destination=network/mocknetwork/mock_network.go -package=mocknetwork github.com/onflow/flow-go/network Network
GO111MODULE=on mockery -name 'ExecutionState' -dir=engine/execution/state -case=underscore -output="engine/execution/state/mock" -outpkg="mock"
GO111MODULE=on mockery -name 'BlockComputer' -dir=engine/execution/computation/computer -case=underscore -output="engine/execution/computation/computer/mock" -outpkg="mock"
GO111MODULE=on mockery -name 'ComputationManager' -dir=engine/execution/computation -case=underscore -output="engine/execution/computation/mock" -outpkg="mock"
Expand Down
25 changes: 10 additions & 15 deletions admin/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (
"google.golang.org/grpc/status"

pb "github.com/onflow/flow-go/admin/admin"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
)

var _ component.Component = (*CommandRunner)(nil)

const CommandRunnerShutdownTimeout = 5 * time.Second

type CommandHandler func(ctx context.Context, request *CommandRequest) error
Expand Down Expand Up @@ -63,7 +67,6 @@ func (r *CommandRunnerBootstrapper) Bootstrap(logger zerolog.Logger, bindAddress
httpAddress: bindAddress,
logger: logger.With().Str("admin", "command_runner").Logger(),
startupCompleted: make(chan struct{}),
errors: make(chan error),
}

for _, opt := range opts {
Expand Down Expand Up @@ -97,8 +100,6 @@ type CommandRunner struct {
tlsConfig *tls.Config
logger zerolog.Logger

errors chan error

// wait for worker routines to be ready
workersStarted sync.WaitGroup

Expand All @@ -117,14 +118,12 @@ func (r *CommandRunner) getValidator(command string) CommandValidator {
return r.validators[command]
}

func (r *CommandRunner) Start(ctx context.Context) error {
func (r *CommandRunner) Start(ctx irrecoverable.SignalerContext) {
if err := r.runAdminServer(ctx); err != nil {
return fmt.Errorf("failed to start admin server: %w", err)
ctx.Throw(fmt.Errorf("failed to start admin server: %w", err))
}

close(r.startupCompleted)

return nil
}

func (r *CommandRunner) Ready() <-chan struct{} {
Expand All @@ -151,11 +150,7 @@ func (r *CommandRunner) Done() <-chan struct{} {
return done
}

func (r *CommandRunner) Errors() <-chan error {
return r.errors
}

func (r *CommandRunner) runAdminServer(ctx context.Context) error {
func (r *CommandRunner) runAdminServer(ctx irrecoverable.SignalerContext) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -180,7 +175,7 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {

if err := grpcServer.Serve(listener); err != nil {
r.logger.Err(err).Msg("gRPC server encountered fatal error")
r.errors <- err
ctx.Throw(err)
}
}()

Expand Down Expand Up @@ -215,7 +210,7 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {

if err != nil && !errors.Is(err, http.ErrServerClosed) {
r.logger.Err(err).Msg("HTTP server encountered error")
r.errors <- err
ctx.Throw(err)
}
}()

Expand All @@ -236,7 +231,7 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {

if err := httpServer.Shutdown(shutdownCtx); err != nil {
r.logger.Err(err).Msg("failed to shutdown http server")
r.errors <- err
ctx.Throw(err)
}
}
}()
Expand Down
8 changes: 5 additions & 3 deletions admin/command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"

pb "github.com/onflow/flow-go/admin/admin"
"github.com/onflow/flow-go/module/irrecoverable"
)

type CommandRunnerSuite struct {
Expand Down Expand Up @@ -63,16 +64,17 @@ func (suite *CommandRunnerSuite) SetupCommandRunner(opts ...CommandRunnerOption)
ctx, cancel := context.WithCancel(context.Background())
suite.cancel = cancel

signalerCtx, errChan := irrecoverable.WithSignaler(ctx)

logger := zerolog.New(zerolog.NewConsoleWriter())
suite.runner = suite.bootstrapper.Bootstrap(logger, suite.httpAddress, opts...)
err := suite.runner.Start(ctx)
suite.NoError(err)
suite.runner.Start(signalerCtx)
<-suite.runner.Ready()
go func() {
select {
case <-ctx.Done():
return
case err := <-suite.runner.Errors():
case err := <-errChan:
suite.Fail("encountered unexpected error", err)
}
}()
Expand Down
2 changes: 1 addition & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func (builder *FlowAccessNodeBuilder) initNetwork(nodeID module.Local,
builder.Logger,
codec,
nodeID,
builder.Middleware,
func() (network.Middleware, error) { return builder.Middleware, nil },
p2p.DefaultCacheSize,
topology,
p2p.NewChannelSubscriptionManager(middleware),
Expand Down
29 changes: 9 additions & 20 deletions cmd/access/node_builder/staked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,14 @@ func (fnb *StakedAccessNodeBuilder) InitIDProviders() {
}

func (builder *StakedAccessNodeBuilder) Initialize() error {

ctx, cancel := context.WithCancel(context.Background())
builder.Cancel = cancel

builder.InitIDProviders()

// if this is an access node that supports unstaked followers, enqueue the unstaked network
if builder.supportsUnstakedFollower {
builder.enqueueUnstakedNetworkInit(ctx)
builder.enqueueUnstakedNetworkInit()
} else {
// otherwise, enqueue the regular network
builder.EnqueueNetworkInit(ctx)
builder.EnqueueNetworkInit()
}

builder.EnqueueMetricsServerInit()
Expand All @@ -90,7 +86,7 @@ func (builder *StakedAccessNodeBuilder) Initialize() error {
return err
}

builder.EnqueueAdminServerInit(ctx)
builder.EnqueueAdminServerInit()

builder.EnqueueTracer()

Expand Down Expand Up @@ -158,16 +154,11 @@ func (anb *StakedAccessNodeBuilder) Build() AccessNodeBuilder {
}

// enqueueUnstakedNetworkInit enqueues the unstaked network component initialized for the staked node
func (builder *StakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.Context) {
func (builder *StakedAccessNodeBuilder) enqueueUnstakedNetworkInit() {

builder.Component("unstaked network", func(_ cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {

libP2PFactory, err := builder.initLibP2PFactory(ctx,
builder.NodeID,
builder.NodeConfig.NetworkKey)
if err != nil {
return nil, err
}
libP2PFactory := builder.initLibP2PFactory(builder.NodeID, builder.NodeConfig.NetworkKey)

msgValidators := unstakedNetworkMsgValidators(node.Logger, node.IdentityProvider, builder.NodeID)

Expand All @@ -188,7 +179,7 @@ func (builder *StakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.C
builder.ProtocolEvents.AddConsumer(idEvents)

node.Logger.Info().Msgf("network will run on address: %s", builder.BindAddr)
return builder.Network, err
return builder.Network, nil
})
}

Expand All @@ -200,9 +191,7 @@ func (builder *StakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.C
// The passed in private key as the libp2p key
// No connection gater
// Default Flow libp2p pubsub options
func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
nodeID flow.Identifier,
networkKey crypto.PrivateKey) (p2p.LibP2PFactoryFunc, error) {
func (builder *StakedAccessNodeBuilder) initLibP2PFactory(nodeID flow.Identifier, networkKey crypto.PrivateKey) p2p.LibP2PFactoryFunc {

// The staked nodes act as the DHT servers
dhtOptions := []dht.Option{p2p.AsServer(true)}
Expand All @@ -216,7 +205,7 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,

resolver := dns.NewResolver(builder.Metrics.Network, dns.WithTTL(builder.BaseConfig.DNSCacheTTL))

return func() (*p2p.Node, error) {
return func(ctx context.Context) (*p2p.Node, error) {
psOpts := p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)
psOpts = append(psOpts, func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(p2p.NewRoleBasedFilter(
Expand All @@ -238,7 +227,7 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
}
builder.LibP2PNode = libp2pNode
return builder.LibP2PNode, nil
}, nil
}
}

// initMiddleware creates the network.Middleware implementation with the libp2p factory function, metrics, peer update
Expand Down
20 changes: 7 additions & 13 deletions cmd/access/node_builder/unstaked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ func (anb *UnstakedAccessNodeBuilder) InitIDProviders() {
}

func (anb *UnstakedAccessNodeBuilder) Initialize() error {

ctx, cancel := context.WithCancel(context.Background())
anb.Cancel = cancel

if err := anb.deriveBootstrapPeerIdentities(); err != nil {
return err
}
Expand All @@ -106,9 +102,9 @@ func (anb *UnstakedAccessNodeBuilder) Initialize() error {

anb.InitIDProviders()

anb.enqueueMiddleware(ctx)
anb.enqueueMiddleware()

anb.enqueueUnstakedNetworkInit(ctx)
anb.enqueueUnstakedNetworkInit()

anb.enqueueConnectWithStakedAN()

Expand Down Expand Up @@ -162,9 +158,7 @@ func (anb *UnstakedAccessNodeBuilder) validateParams() error {
// No connection gater
// No connection manager
// Default libp2p pubsub options
func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
nodeID flow.Identifier,
networkKey crypto.PrivateKey) (p2p.LibP2PFactoryFunc, error) {
func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(nodeID flow.Identifier, networkKey crypto.PrivateKey) (p2p.LibP2PFactoryFunc, error) {

// the unstaked nodes act as the DHT clients
dhtOptions := []dht.Option{p2p.AsServer(false)}
Expand All @@ -180,7 +174,7 @@ func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,

resolver := dns.NewResolver(builder.Metrics.Network, dns.WithTTL(builder.BaseConfig.DNSCacheTTL))

return func() (*p2p.Node, error) {
return func(ctx context.Context) (*p2p.Node, error) {
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, builder.BaseConfig.BindAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID()).
SetConnectionManager(connManager).
Expand Down Expand Up @@ -220,7 +214,7 @@ func (anb *UnstakedAccessNodeBuilder) initUnstakedLocal() func(builder cmd.NodeB

// enqueueMiddleware enqueues the creation of the network middleware
// this needs to be done before sync engine participants module
func (anb *UnstakedAccessNodeBuilder) enqueueMiddleware(ctx context.Context) {
func (anb *UnstakedAccessNodeBuilder) enqueueMiddleware() {
anb.
Module("network middleware", func(_ cmd.NodeBuilder, node *cmd.NodeConfig) error {

Expand All @@ -234,7 +228,7 @@ func (anb *UnstakedAccessNodeBuilder) enqueueMiddleware(ctx context.Context) {
// for now we use the empty metrics NoopCollector till we have defined the new unstaked network metrics
unstakedNetworkMetrics := metrics.NewNoopCollector()

libP2PFactory, err := anb.initLibP2PFactory(ctx, unstakedNodeID, unstakedNetworkKey)
libP2PFactory, err := anb.initLibP2PFactory(unstakedNodeID, unstakedNetworkKey)
if err != nil {
return err
}
Expand All @@ -255,7 +249,7 @@ func (anb *UnstakedAccessNodeBuilder) Build() AccessNodeBuilder {
}

// enqueueUnstakedNetworkInit enqueues the unstaked network component initialized for the unstaked node
func (anb *UnstakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.Context) {
func (anb *UnstakedAccessNodeBuilder) enqueueUnstakedNetworkInit() {

anb.Component("unstaked network", func(_ cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {

Expand Down
4 changes: 2 additions & 2 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type NodeBuilder interface {
InitIDProviders()

// EnqueueNetworkInit enqueues the default network component with the given context
EnqueueNetworkInit(ctx context.Context)
EnqueueNetworkInit()

// EnqueueMetricsServerInit enqueues the metrics component
EnqueueMetricsServerInit()
Expand Down Expand Up @@ -149,7 +149,7 @@ type NodeConfig struct {
ProtocolEvents *events.Distributor
State protocol.State
Middleware network.Middleware
Network module.ReadyDoneAwareNetwork
Network network.Network
MsgValidators []network.MessageValidator
FvmOptions []fvm.Option
StakingKey crypto.PrivateKey
Expand Down

0 comments on commit 361ba2d

Please sign in to comment.