Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Network] Middleware component #1355

Merged
merged 53 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
6fb43c7
Merge branch 'smnzhu/error-handling' into smnzhu/middleware-component
synzhu Sep 26, 2021
6398154
middle ware component
synzhu Sep 26, 2021
1508d0d
Merge branch 'smnzhu/error-handling' into smnzhu/middleware-component
synzhu Sep 26, 2021
4a1893f
Merge branch 'master' into smnzhu/middleware-component
synzhu Sep 30, 2021
7ff2dde
add comments
synzhu Sep 30, 2021
e307226
added comments and added AddComponent method to component manager
synzhu Sep 30, 2021
a604495
Networking and scaffold changes
synzhu Sep 30, 2021
24ac4e9
fix build errors
synzhu Sep 30, 2021
aaeb09b
fix build errors
synzhu Sep 30, 2021
a95d253
remove unnecessary contexts.
synzhu Sep 30, 2021
aef82be
fix tests
synzhu Sep 30, 2021
2d85760
generate mocks
synzhu Sep 30, 2021
5eec03b
fix more tests
synzhu Sep 30, 2021
82cc9c5
fix tests
synzhu Sep 30, 2021
c98be62
more test fixes
synzhu Sep 30, 2021
526fc78
Update network.go
synzhu Sep 30, 2021
db9f74c
remove mocks
synzhu Sep 30, 2021
b03b632
fix more tests
synzhu Sep 30, 2021
c4b8280
fix tesets
synzhu Sep 30, 2021
1971d89
modify component implementations
synzhu Sep 30, 2021
de8f135
fix build error
synzhu Sep 30, 2021
9c3a857
fix lint
synzhu Sep 30, 2021
42923a0
address comments
synzhu Sep 30, 2021
1f903f4
go mod tidy
synzhu Sep 30, 2021
a67311b
Add insanely long rapid test
synzhu Oct 3, 2021
bc6efcd
added comments
synzhu Oct 3, 2021
48c1ea1
update test
synzhu Oct 4, 2021
d4d0364
better channel closed checks
synzhu Oct 4, 2021
1891c06
better error checking
synzhu Oct 4, 2021
fe3196a
fix subtle race condition
synzhu Oct 4, 2021
8950da9
reduce channel close latency allowance
synzhu Oct 4, 2021
9bc8c6d
update signaler implementation and runcomponent
synzhu Oct 5, 2021
277b651
Update irrecoverable API to match semantics of Context API
synzhu Oct 5, 2021
8ca40ec
Update component manager
synzhu Oct 5, 2021
4a8fdc5
Update startable
synzhu Oct 5, 2021
74165f0
update network and middleware
synzhu Oct 6, 2021
7939fa1
Merge branch 'master' into smnzhu/middleware-component
synzhu Oct 6, 2021
00ddf2c
fix command runner and scaffold
synzhu Oct 6, 2021
e7487f8
fixing subtle race condition
synzhu Oct 6, 2021
f971f19
Update run_component_test.go
synzhu Oct 6, 2021
80007d9
Delete failures
synzhu Oct 6, 2021
ca5f4b7
fix component tests
synzhu Oct 6, 2021
879f39f
fix test failures
synzhu Oct 6, 2021
b261b80
Update irrecoverable_example_test.go
synzhu Oct 6, 2021
12db255
fix lint
synzhu Oct 6, 2021
323af9b
Merge branch 'master' into smnzhu/middleware-component
synzhu Oct 6, 2021
a9783e4
fix test bug
synzhu Oct 6, 2021
24406ed
Update component.go
synzhu Oct 6, 2021
b7b411b
run_component_test minor updates to tests and documentation
gomisha Oct 7, 2021
640fa28
address comments
synzhu Oct 9, 2021
d1aef23
Merge branch 'master' into smnzhu/middleware-component
synzhu Oct 9, 2021
0fd659c
Merge branch 'master' into smnzhu/middleware-component
synzhu Oct 12, 2021
9bcb5d7
fix lint
synzhu Oct 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ generate-proto:

.PHONY: generate-mocks
generate-mocks:
GO111MODULE=on mockery -name '(ReadyDoneAwareNetwork|Connector|PingInfoProvider)' -dir=network/p2p -case=underscore -output="./network/mocknetwork" -outpkg="mocknetwork"
GO111MODULE=on mockery -name '(Connector|PingInfoProvider)' -dir=network/p2p -case=underscore -output="./network/mocknetwork" -outpkg="mocknetwork"
GO111MODULE=on mockgen -destination=storage/mocks/storage.go -package=mocks github.com/onflow/flow-go/storage Blocks,Headers,Payloads,Collections,Commits,Events,ServiceEvents,TransactionResults
GO111MODULE=on mockgen -destination=module/mocks/network.go -package=mocks github.com/onflow/flow-go/module Network,Local,Requester
GO111MODULE=on mockgen -destination=module/mocks/network.go -package=mocks github.com/onflow/flow-go/module Local,Requester
GO111MODULE=on mockgen -destination=network/mocknetwork/engine.go -package=mocknetwork github.com/onflow/flow-go/network Engine
GO111MODULE=on mockgen -destination=network/mocknetwork/mock_network.go -package=mocknetwork github.com/onflow/flow-go/network Network
GO111MODULE=on mockery -name 'ExecutionState' -dir=engine/execution/state -case=underscore -output="engine/execution/state/mock" -outpkg="mock"
GO111MODULE=on mockery -name 'BlockComputer' -dir=engine/execution/computation/computer -case=underscore -output="engine/execution/computation/computer/mock" -outpkg="mock"
GO111MODULE=on mockery -name 'ComputationManager' -dir=engine/execution/computation -case=underscore -output="engine/execution/computation/mock" -outpkg="mock"
Expand Down
21 changes: 9 additions & 12 deletions admin/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (
"google.golang.org/grpc/status"

pb "github.com/onflow/flow-go/admin/admin"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
)

var _ component.Component = (*CommandRunner)(nil)

const (
CommandRunnerMaxQueueLength = 128
CommandRunnerNumWorkers = 1
Expand Down Expand Up @@ -63,7 +67,6 @@ func (r *CommandRunnerBootstrapper) Bootstrap(logger zerolog.Logger, bindAddress
httpAddress: bindAddress,
logger: logger.With().Str("admin", "command_runner").Logger(),
startupCompleted: make(chan struct{}),
errors: make(chan error),
}

for _, opt := range opts {
Expand Down Expand Up @@ -98,8 +101,6 @@ type CommandRunner struct {
tlsConfig *tls.Config
logger zerolog.Logger

errors chan error

// wait for worker routines to be ready
workersStarted sync.WaitGroup

Expand All @@ -118,7 +119,7 @@ func (r *CommandRunner) getValidator(command string) CommandValidator {
return r.validators[command]
}

func (r *CommandRunner) Start(ctx context.Context) error {
func (r *CommandRunner) Start(ctx irrecoverable.SignalerContext) error {
if err := r.runAdminServer(ctx); err != nil {
return fmt.Errorf("failed to start admin server: %w", err)
}
Expand Down Expand Up @@ -158,11 +159,7 @@ func (r *CommandRunner) Done() <-chan struct{} {
return done
}

func (r *CommandRunner) Errors() <-chan error {
return r.errors
}

func (r *CommandRunner) runAdminServer(ctx context.Context) error {
func (r *CommandRunner) runAdminServer(ctx irrecoverable.SignalerContext) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -187,7 +184,7 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {

if err := grpcServer.Serve(listener); err != nil {
r.logger.Err(err).Msg("gRPC server encountered fatal error")
r.errors <- err
ctx.Throw(err)
}
}()

Expand Down Expand Up @@ -222,7 +219,7 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {

if err != nil && !errors.Is(err, http.ErrServerClosed) {
r.logger.Err(err).Msg("HTTP server encountered error")
r.errors <- err
ctx.Throw(err)
}
}()

Expand All @@ -244,7 +241,7 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {

if err := httpServer.Shutdown(shutdownCtx); err != nil {
r.logger.Err(err).Msg("failed to shutdown http server")
r.errors <- err
ctx.Throw(err)
}
}
}()
Expand Down
7 changes: 5 additions & 2 deletions admin/command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"

pb "github.com/onflow/flow-go/admin/admin"
"github.com/onflow/flow-go/module/irrecoverable"
)

type CommandRunnerSuite struct {
Expand Down Expand Up @@ -66,17 +67,19 @@ func (suite *CommandRunnerSuite) TearDownTest() {
func (suite *CommandRunnerSuite) SetupCommandRunner(opts ...CommandRunnerOption) {
ctx, cancel := context.WithCancel(context.Background())
suite.cancel = cancel
signaler := irrecoverable.NewSignaler()
signalerCtx := irrecoverable.WithSignaler(ctx, signaler)

logger := zerolog.New(zerolog.NewConsoleWriter())
suite.runner = suite.bootstrapper.Bootstrap(logger, suite.httpAddress, opts...)
err := suite.runner.Start(ctx)
err := suite.runner.Start(signalerCtx)
suite.NoError(err)
<-suite.runner.Ready()
go func() {
select {
case <-ctx.Done():
return
case err := <-suite.runner.Errors():
case err := <-signaler.Error():
suite.Fail("encountered unexpected error", err)
}
}()
Expand Down
2 changes: 1 addition & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func (builder *FlowAccessNodeBuilder) initNetwork(nodeID module.Local,
builder.Logger,
codec,
nodeID,
builder.Middleware,
func() (network.Middleware, error) { return builder.Middleware, nil },
p2p.DefaultCacheSize,
topology,
p2p.NewChannelSubscriptionManager(middleware),
Expand Down
27 changes: 8 additions & 19 deletions cmd/access/node_builder/staked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,14 @@ func (fnb *StakedAccessNodeBuilder) InitIDProviders() {
}

func (builder *StakedAccessNodeBuilder) Initialize() error {

ctx, cancel := context.WithCancel(context.Background())
builder.Cancel = cancel

builder.InitIDProviders()

// if this is an access node that supports unstaked followers, enqueue the unstaked network
if builder.supportsUnstakedFollower {
builder.enqueueUnstakedNetworkInit(ctx)
builder.enqueueUnstakedNetworkInit()
} else {
// otherwise, enqueue the regular network
builder.EnqueueNetworkInit(ctx)
builder.EnqueueNetworkInit()
}

builder.EnqueueMetricsServerInit()
Expand Down Expand Up @@ -156,16 +152,11 @@ func (anb *StakedAccessNodeBuilder) Build() AccessNodeBuilder {
}

// enqueueUnstakedNetworkInit enqueues the unstaked network component initialized for the staked node
func (builder *StakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.Context) {
func (builder *StakedAccessNodeBuilder) enqueueUnstakedNetworkInit() {

builder.Component("unstaked network", func(_ cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {

libP2PFactory, err := builder.initLibP2PFactory(ctx,
builder.NodeID,
builder.NodeConfig.NetworkKey)
if err != nil {
return nil, err
}
libP2PFactory := builder.initLibP2PFactory(builder.NodeID, builder.NodeConfig.NetworkKey)

msgValidators := unstakedNetworkMsgValidators(node.Logger, node.IdentityProvider, builder.NodeID)

Expand All @@ -186,7 +177,7 @@ func (builder *StakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.C
builder.ProtocolEvents.AddConsumer(idEvents)

node.Logger.Info().Msgf("network will run on address: %s", builder.BindAddr)
return builder.Network, err
return builder.Network, nil
})
}

Expand All @@ -198,9 +189,7 @@ func (builder *StakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.C
// The passed in private key as the libp2p key
// No connection gater
// Default Flow libp2p pubsub options
func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
nodeID flow.Identifier,
networkKey crypto.PrivateKey) (p2p.LibP2PFactoryFunc, error) {
func (builder *StakedAccessNodeBuilder) initLibP2PFactory(nodeID flow.Identifier, networkKey crypto.PrivateKey) p2p.LibP2PFactoryFunc {

// The staked nodes act as the DHT servers
dhtOptions := []dht.Option{p2p.AsServer(true)}
Expand All @@ -214,7 +203,7 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,

resolver := dns.NewResolver(builder.Metrics.Network, dns.WithTTL(builder.BaseConfig.DNSCacheTTL))

return func() (*p2p.Node, error) {
return func(ctx context.Context) (*p2p.Node, error) {
psOpts := p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)
psOpts = append(psOpts, func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(p2p.NewRoleBasedFilter(
Expand All @@ -236,7 +225,7 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
}
builder.LibP2PNode = libp2pNode
return builder.LibP2PNode, nil
}, nil
}
}

// initMiddleware creates the network.Middleware implementation with the libp2p factory function, metrics, peer update
Expand Down
20 changes: 7 additions & 13 deletions cmd/access/node_builder/unstaked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ func (anb *UnstakedAccessNodeBuilder) InitIDProviders() {
}

func (anb *UnstakedAccessNodeBuilder) Initialize() error {

ctx, cancel := context.WithCancel(context.Background())
anb.Cancel = cancel

if err := anb.deriveBootstrapPeerIdentities(); err != nil {
return err
}
Expand All @@ -106,9 +102,9 @@ func (anb *UnstakedAccessNodeBuilder) Initialize() error {

anb.InitIDProviders()

anb.enqueueMiddleware(ctx)
anb.enqueueMiddleware()

anb.enqueueUnstakedNetworkInit(ctx)
anb.enqueueUnstakedNetworkInit()

anb.enqueueConnectWithStakedAN()

Expand Down Expand Up @@ -162,9 +158,7 @@ func (anb *UnstakedAccessNodeBuilder) validateParams() error {
// No connection gater
// No connection manager
// Default libp2p pubsub options
func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
nodeID flow.Identifier,
networkKey crypto.PrivateKey) (p2p.LibP2PFactoryFunc, error) {
func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(nodeID flow.Identifier, networkKey crypto.PrivateKey) (p2p.LibP2PFactoryFunc, error) {

// the unstaked nodes act as the DHT clients
dhtOptions := []dht.Option{p2p.AsServer(false)}
Expand All @@ -180,7 +174,7 @@ func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,

resolver := dns.NewResolver(builder.Metrics.Network, dns.WithTTL(builder.BaseConfig.DNSCacheTTL))

return func() (*p2p.Node, error) {
return func(ctx context.Context) (*p2p.Node, error) {
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, builder.BaseConfig.BindAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID()).
SetConnectionManager(connManager).
Expand Down Expand Up @@ -220,7 +214,7 @@ func (anb *UnstakedAccessNodeBuilder) initUnstakedLocal() func(builder cmd.NodeB

// enqueueMiddleware enqueues the creation of the network middleware
// this needs to be done before sync engine participants module
func (anb *UnstakedAccessNodeBuilder) enqueueMiddleware(ctx context.Context) {
func (anb *UnstakedAccessNodeBuilder) enqueueMiddleware() {
anb.
Module("network middleware", func(_ cmd.NodeBuilder, node *cmd.NodeConfig) error {

Expand All @@ -234,7 +228,7 @@ func (anb *UnstakedAccessNodeBuilder) enqueueMiddleware(ctx context.Context) {
// for now we use the empty metrics NoopCollector till we have defined the new unstaked network metrics
unstakedNetworkMetrics := metrics.NewNoopCollector()

libP2PFactory, err := anb.initLibP2PFactory(ctx, unstakedNodeID, unstakedNetworkKey)
peterargue marked this conversation as resolved.
Show resolved Hide resolved
libP2PFactory, err := anb.initLibP2PFactory(unstakedNodeID, unstakedNetworkKey)
if err != nil {
return err
}
Expand All @@ -255,7 +249,7 @@ func (anb *UnstakedAccessNodeBuilder) Build() AccessNodeBuilder {
}

// enqueueUnstakedNetworkInit enqueues the unstaked network component initialized for the unstaked node
func (anb *UnstakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.Context) {
func (anb *UnstakedAccessNodeBuilder) enqueueUnstakedNetworkInit() {

anb.Component("unstaked network", func(_ cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {

Expand Down
4 changes: 2 additions & 2 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type NodeBuilder interface {
InitIDProviders()

// EnqueueNetworkInit enqueues the default network component with the given context
EnqueueNetworkInit(ctx context.Context)
EnqueueNetworkInit()

// EnqueueMetricsServerInit enqueues the metrics component
EnqueueMetricsServerInit()
Expand Down Expand Up @@ -149,7 +149,7 @@ type NodeConfig struct {
ProtocolEvents *events.Distributor
State protocol.State
Middleware network.Middleware
Network module.ReadyDoneAwareNetwork
Network network.Network
MsgValidators []network.MessageValidator
FvmOptions []fvm.Option
StakingKey crypto.PrivateKey
Expand Down