Skip to content

Commit

Permalink
Try #1133:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Aug 13, 2021
2 parents c5109dd + 1e3f160 commit 031eade
Show file tree
Hide file tree
Showing 18 changed files with 612 additions and 297 deletions.
34 changes: 18 additions & 16 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/local"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/p2p"
Expand Down Expand Up @@ -114,22 +115,23 @@ type BaseConfig struct {
type NodeConfig struct {
Cancel context.CancelFunc // cancel function for the context that is passed to the networking layer
BaseConfig
Logger zerolog.Logger
NodeID flow.Identifier
Me *local.Local
Tracer module.Tracer
MetricsRegisterer prometheus.Registerer
Metrics Metrics
DB *badger.DB
Storage Storage
ProtocolEvents *events.Distributor
State protocol.State
Middleware *p2p.Middleware
Network *p2p.Network
MsgValidators []network.MessageValidator
FvmOptions []fvm.Option
StakingKey crypto.PrivateKey
NetworkKey crypto.PrivateKey
Logger zerolog.Logger
NodeID flow.Identifier
Me *local.Local
Tracer module.Tracer
MetricsRegisterer prometheus.Registerer
Metrics Metrics
DB *badger.DB
Storage Storage
ProtocolEvents *events.Distributor
State protocol.State
Middleware *p2p.Middleware
Network *p2p.Network
MsgValidators []network.MessageValidator
FvmOptions []fvm.Option
StakingKey crypto.PrivateKey
NetworkKey crypto.PrivateKey
IdentifierProvider id.IdentityProvider // TODO: initialize these in scaffold and unstaked node

// root state information
RootBlock *flow.Block
Expand Down
10 changes: 2 additions & 8 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
true,
fnb.MsgValidators...)

participants, err := fnb.State.Final().Identities(p2p.NetworkingSetFilter)
if err != nil {
return nil, fmt.Errorf("could not get network identities: %w", err)
}

subscriptionManager := p2p.NewChannelSubscriptionManager(fnb.Middleware)
top, err := topology.NewTopicBasedTopology(fnb.NodeID, fnb.Logger, fnb.State)
if err != nil {
Expand All @@ -190,7 +185,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
// creates network instance
net, err := p2p.NewNetwork(fnb.Logger,
codec,
participants,
fnb.IdentifierProvider,
fnb.Me,
fnb.Middleware,
p2p.DefaultCacheSize,
Expand All @@ -203,8 +198,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {

fnb.Network = net

idRefresher := p2p.NewNodeIDRefresher(fnb.Logger, fnb.State, net.SetIDs)
idEvents := gadgets.NewIdentityDeltas(idRefresher.OnIdentityTableChanged)
idEvents := gadgets.NewIdentityDeltas(net.RefreshConnectionRules)
fnb.ProtocolEvents.AddConsumer(idEvents)

return net, err
Expand Down
64 changes: 25 additions & 39 deletions engine/common/synchronization/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ import (
"github.com/onflow/flow-go/engine/common/fifoqueue"
"github.com/onflow/flow-go/model/events"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
identifier "github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/lifecycle"
"github.com/onflow/flow-go/module/metrics"
synccore "github.com/onflow/flow-go/module/synchronization"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)

Expand All @@ -43,11 +42,11 @@ type Engine struct {
blocks storage.Blocks
comp network.Engine // compliance layer engine

pollInterval time.Duration
scanInterval time.Duration
core module.SyncCore
state protocol.State
finalizedHeader *FinalizedHeaderCache
pollInterval time.Duration
scanInterval time.Duration
core module.SyncCore
participantsProvider identifier.IdentifierProvider
finalizedHeader *FinalizedHeaderCache

requestHandler *RequestHandlerEngine // component responsible for handling requests

Expand All @@ -66,7 +65,7 @@ func New(
comp network.Engine,
core module.SyncCore,
finalizedHeader *FinalizedHeaderCache,
state protocol.State,
participantsProvider identifier.IdentifierProvider,
opts ...OptionFunc,
) (*Engine, error) {

Expand All @@ -81,18 +80,18 @@ func New(

// initialize the propagation engine with its dependencies
e := &Engine{
unit: engine.NewUnit(),
lm: lifecycle.NewLifecycleManager(),
log: log.With().Str("engine", "synchronization").Logger(),
metrics: metrics,
me: me,
blocks: blocks,
comp: comp,
core: core,
pollInterval: opt.pollInterval,
scanInterval: opt.scanInterval,
finalizedHeader: finalizedHeader,
state: state,
unit: engine.NewUnit(),
lm: lifecycle.NewLifecycleManager(),
log: log.With().Str("engine", "synchronization").Logger(),
metrics: metrics,
me: me,
blocks: blocks,
comp: comp,
core: core,
pollInterval: opt.pollInterval,
scanInterval: opt.scanInterval,
finalizedHeader: finalizedHeader,
participantsProvider: participantsProvider,
}

err := e.setupResponseMessageHandler()
Expand Down Expand Up @@ -332,7 +331,7 @@ CheckLoop:
e.pollHeight()
case <-scan.C:
head := e.finalizedHeader.Get()
participants := e.getParticipants(head.ID())
participants := e.participantsProvider.Identifiers()
ranges, batches := e.core.ScanPending(head)
e.sendRequests(participants, ranges, batches)
}
Expand All @@ -342,30 +341,17 @@ CheckLoop:
scan.Stop()
}

// getParticipants gets all of the consensus nodes from the state at the given block ID.
func (e *Engine) getParticipants(blockID flow.Identifier) flow.IdentityList {
participants, err := e.state.AtBlockID(blockID).Identities(filter.And(
filter.HasRole(flow.RoleConsensus),
filter.Not(filter.HasNodeID(e.me.NodeID())),
))
if err != nil {
e.log.Fatal().Err(err).Msgf("could not get consensus participants at block ID %v", blockID)
}

return participants
}

// pollHeight will send a synchronization request to three random nodes.
func (e *Engine) pollHeight() {
head := e.finalizedHeader.Get()
participants := e.getParticipants(head.ID())
participants := e.participantsProvider.Identifiers()

// send the request for synchronization
req := &messages.SyncRequest{
Nonce: rand.Uint64(),
Height: head.Height,
}
err := e.con.Multicast(req, synccore.DefaultPollNodes, participants.NodeIDs()...)
err := e.con.Multicast(req, synccore.DefaultPollNodes, participants...)
if err != nil {
e.log.Warn().Err(err).Msg("sending sync request to poll heights failed")
return
Expand All @@ -374,7 +360,7 @@ func (e *Engine) pollHeight() {
}

// sendRequests sends a request for each range and batch using consensus participants from last finalized snapshot.
func (e *Engine) sendRequests(participants flow.IdentityList, ranges []flow.Range, batches []flow.Batch) {
func (e *Engine) sendRequests(participants flow.IdentifierList, ranges []flow.Range, batches []flow.Batch) {
var errs *multierror.Error

for _, ran := range ranges {
Expand All @@ -383,7 +369,7 @@ func (e *Engine) sendRequests(participants flow.IdentityList, ranges []flow.Rang
FromHeight: ran.From,
ToHeight: ran.To,
}
err := e.con.Multicast(req, synccore.DefaultBlockRequestNodes, participants.NodeIDs()...)
err := e.con.Multicast(req, synccore.DefaultBlockRequestNodes, participants...)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("could not submit range request: %w", err))
continue
Expand All @@ -402,7 +388,7 @@ func (e *Engine) sendRequests(participants flow.IdentityList, ranges []flow.Rang
Nonce: rand.Uint64(),
BlockIDs: batch.BlockIDs,
}
err := e.con.Multicast(req, synccore.DefaultBlockRequestNodes, participants.NodeIDs()...)
err := e.con.Multicast(req, synccore.DefaultBlockRequestNodes, participants...)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("could not submit batch request: %w", err))
continue
Expand Down
27 changes: 15 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,33 @@ require (
github.com/fxamacker/cbor/v2 v2.2.1-0.20210510192846-c3f3c69e7bc8
github.com/gammazero/workerpool v1.1.2
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.5.0
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.5
github.com/google/uuid v1.1.2
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2 v2.0.0-rc.2
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-20200501113911-9a95f0fdbfea
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.4
github.com/improbable-eng/grpc-web v0.12.0
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log v1.0.5
github.com/jrick/bitset v1.0.0
github.com/kr/text v0.2.0 // indirect
github.com/libp2p/go-addr-util v0.0.2
github.com/libp2p/go-libp2p v0.14.1
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-addr-util v0.1.0
github.com/libp2p/go-libp2p v0.14.4
github.com/libp2p/go-libp2p-core v0.9.0
github.com/libp2p/go-libp2p-discovery v0.5.0
github.com/libp2p/go-libp2p-peerstore v0.2.8
github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/libp2p/go-libp2p-swarm v0.5.0
github.com/libp2p/go-libp2p-swarm v0.5.3
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/libp2p/go-libp2p-transport-upgrader v0.4.2
github.com/libp2p/go-tcp-transport v0.2.1
github.com/libp2p/go-libp2p-transport-upgrader v0.4.6
github.com/libp2p/go-tcp-transport v0.2.7
github.com/m4ksio/wal v1.0.0
github.com/mitchellh/mapstructure v1.3.3 // indirect
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multihash v0.0.15
github.com/onflow/cadence v0.18.1-0.20210729032058-d9eb6683d6ed
github.com/onflow/flow-core-contracts/lib/go/contracts v0.7.5
github.com/onflow/flow-core-contracts/lib/go/templates v0.7.5
Expand All @@ -48,7 +50,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pelletier/go-toml v1.7.0 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_golang v1.10.0
github.com/rs/zerolog v1.19.0
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
Expand All @@ -59,6 +61,7 @@ require (
github.com/vmihailenco/msgpack v4.0.4+incompatible
github.com/vmihailenco/msgpack/v4 v4.3.11
go.uber.org/atomic v1.7.0
go.uber.org/zap v1.18.1 // indirect
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
Expand Down

0 comments on commit 031eade

Please sign in to comment.