Skip to content

Commit

Permalink
Merge branch 'master' into jordan/5873/generate-enc-key-files
Browse files Browse the repository at this point in the history
  • Loading branch information
Kay-Zee committed Sep 29, 2021
2 parents 7a05ab1 + 13ef07a commit 63ca09f
Show file tree
Hide file tree
Showing 57 changed files with 1,663 additions and 238 deletions.
18 changes: 13 additions & 5 deletions cmd/access/node_builder/staked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"

"github.com/libp2p/go-libp2p-core/host"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/crypto"
Expand Down Expand Up @@ -102,11 +104,11 @@ func (anb *StakedAccessNodeBuilder) Build() AccessNodeBuilder {

anb.
Component("unstaked sync request proxy", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
proxyEngine = splitter.New(node.Logger, engine.UnstakedSyncCommittee)
proxyEngine = splitter.New(node.Logger, engine.PublicSyncCommittee)

// register the proxy engine with the unstaked network
var err error
unstakedNetworkConduit, err = node.Network.Register(engine.UnstakedSyncCommittee, proxyEngine)
unstakedNetworkConduit, err = node.Network.Register(engine.PublicSyncCommittee, proxyEngine)
if err != nil {
return nil, fmt.Errorf("could not register unstaked sync request proxy: %w", err)
}
Expand Down Expand Up @@ -213,13 +215,19 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
resolver := dns.NewResolver(builder.Metrics.Network, dns.WithTTL(builder.BaseConfig.DNSCacheTTL))

return func() (*p2p.Node, error) {
psOpts := p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)
psOpts = append(psOpts, func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(p2p.NewRoleBasedFilter(
h.ID(), builder.RootBlock.ID(), builder.IdentityProvider,
)), nil
})
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, myAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID().String()).
SetRootBlockID(builder.RootBlock.ID()).
// no connection gater
SetConnectionManager(connManager).
// act as a DHT server
SetDHTOptions(dhtOptions...).
SetPubsubOptions(p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)...).
SetPubsubOptions(psOpts...).
SetLogger(builder.Logger).
SetResolver(resolver).
Build(ctx)
Expand All @@ -246,7 +254,7 @@ func (builder *StakedAccessNodeBuilder) initMiddleware(nodeID flow.Identifier,
factoryFunc,
nodeID,
networkMetrics,
builder.RootBlock.ID().String(),
builder.RootBlock.ID(),
p2p.DefaultUnicastTimeout,
false, // no connection gating to allow unstaked nodes to connect
builder.IDTranslator,
Expand Down
6 changes: 3 additions & 3 deletions cmd/access/node_builder/unstaked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,

return func() (*p2p.Node, error) {
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, builder.BaseConfig.BindAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID().String()).
SetRootBlockID(builder.RootBlock.ID()).
SetConnectionManager(connManager).
// unlike the staked side of the network where currently all the node addresses are known upfront,
// for the unstaked side of the network, the nodes need to discover each other using DHT Discovery.
Expand Down Expand Up @@ -269,7 +269,7 @@ func (anb *UnstakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.Con
return nil, err
}

anb.Network = converter.NewNetwork(network, engine.SyncCommittee, engine.UnstakedSyncCommittee)
anb.Network = converter.NewNetwork(network, engine.SyncCommittee, engine.PublicSyncCommittee)

anb.Logger.Info().Msgf("network will run on address: %s", anb.BindAddr)

Expand Down Expand Up @@ -304,7 +304,7 @@ func (anb *UnstakedAccessNodeBuilder) initMiddleware(nodeID flow.Identifier,
factoryFunc,
nodeID,
networkMetrics,
anb.RootBlock.ID().String(),
anb.RootBlock.ID(),
p2p.DefaultUnicastTimeout,
false, // no connection gating for the unstaked nodes
anb.IDTranslator,
Expand Down
34 changes: 28 additions & 6 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func main() {
hotstuffTimeoutDecreaseFactor float64
hotstuffTimeoutVoteAggregationFraction float64
blockRateDelay time.Duration
startupTimeString string
startupTime time.Time

followerState protocol.MutableState
ingestConf ingest.Config
Expand Down Expand Up @@ -141,6 +143,7 @@ func main() {
"additional fraction of replica timeout that the primary will wait for votes")
flags.DurationVar(&blockRateDelay, "block-rate-delay", 250*time.Millisecond,
"the delay to broadcast block proposal in order to control block production rate")
flags.StringVar(&startupTimeString, "hotstuff-startup-time", cmd.NotSet, "specifies date and time (in ISO 8601 format) after which the consensus participant may enter the first view (e.g 2006-01-02T15:04:05Z07:00)")

// epoch qc contract flags
flags.StringVar(&accessAddress, "access-address", "", "the address of an access node")
Expand All @@ -153,6 +156,16 @@ func main() {
}

nodeBuilder.
ValidateFlags(func() error {
if startupTimeString != cmd.NotSet {
t, err := time.Parse(time.RFC3339, startupTimeString)
if err != nil {
return fmt.Errorf("invalid start-time value: %w", err)
}
startupTime = t
}
return nil
}).
Module("mutable follower state", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// If we ever support different implementations, the following can be replaced by a type-aware factory
Expand Down Expand Up @@ -433,19 +446,28 @@ func main() {
return metrics.NewHotstuffCollector(chainID)
}
staking := signature.NewAggregationProvider(encoding.CollectorVoteTag, node.Me)

opts := []consensus.Option{
consensus.WithBlockRateDelay(blockRateDelay),
consensus.WithInitialTimeout(hotstuffTimeout),
consensus.WithMinTimeout(hotstuffMinTimeout),
consensus.WithVoteAggregationTimeoutFraction(hotstuffTimeoutVoteAggregationFraction),
consensus.WithTimeoutIncreaseFactor(hotstuffTimeoutIncreaseFactor),
consensus.WithTimeoutDecreaseFactor(hotstuffTimeoutDecreaseFactor),
}

if !startupTime.IsZero() {
opts = append(opts, consensus.WithStartupTime(startupTime))
}

hotstuffFactory, err := factories.NewHotStuffFactory(
node.Logger,
node.Me,
staking,
node.DB,
node.State,
createMetrics,
consensus.WithBlockRateDelay(blockRateDelay),
consensus.WithInitialTimeout(hotstuffTimeout),
consensus.WithMinTimeout(hotstuffMinTimeout),
consensus.WithVoteAggregationTimeoutFraction(hotstuffTimeoutVoteAggregationFraction),
consensus.WithTimeoutIncreaseFactor(hotstuffTimeoutIncreaseFactor),
consensus.WithTimeoutDecreaseFactor(hotstuffTimeoutDecreaseFactor),
opts...,
)
if err != nil {
return nil, err
Expand Down
38 changes: 32 additions & 6 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func main() {
requiredApprovalsForSealVerification uint
requiredApprovalsForSealConstruction uint
emergencySealing bool
dkgControllerConfig dkgmodule.ControllerConfig
startupTimeString string
startupTime time.Time

// DKG contract client
machineAccountInfo *bootstrap.NodeMachineAccountInfo
Expand Down Expand Up @@ -145,13 +148,27 @@ func main() {
flags.StringVar(&accessAddress, "access-address", "", "the address of an access node")
flags.StringVar(&secureAccessNodeID, "secure-access-node-id", "", "the node ID of the secure access GRPC server")
flags.BoolVar(&insecureAccessAPI, "insecure-access-api", true, "required if insecure GRPC connection should be used")
flags.DurationVar(&dkgControllerConfig.BaseStartDelay, "dkg-controller-base-start-delay", dkgmodule.DefaultBaseStartDelay, "used to define the range for jitter prior to DKG start (eg. 500µs) - the base value is scaled quadratically with the # of DKG participants")
flags.DurationVar(&dkgControllerConfig.BaseHandleFirstBroadcastDelay, "dkg-controller-base-handle-first-broadcast-delay", dkgmodule.DefaultBaseHandleFirstBroadcastDelay, "used to define the range for jitter prior to DKG handling the first broadcast messages (eg. 50ms) - the base value is scaled quadratically with the # of DKG participants")
flags.DurationVar(&dkgControllerConfig.HandleSubsequentBroadcastDelay, "dkg-controller-handle-subsequent-broadcast-delay", dkgmodule.DefaultHandleSubsequentBroadcastDelay, "used to define the constant delay introduced prior to DKG handling subsequent broadcast messages (eg. 2s)")
flags.StringVar(&startupTimeString, "hotstuff-startup-time", cmd.NotSet, "specifies date and time (in ISO 8601 format) after which the consensus participant may enter the first view (e.g 2006-01-02T15:04:05Z07:00)")
})

if err = nodeBuilder.Initialize(); err != nil {
nodeBuilder.Logger.Fatal().Err(err).Send()
}

nodeBuilder.
ValidateFlags(func() error {
if startupTimeString != cmd.NotSet {
t, err := time.Parse(time.RFC3339, startupTimeString)
if err != nil {
return fmt.Errorf("invalid start-time value: %w", err)
}
startupTime = t
}
return nil
}).
Module("consensus node metrics", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) error {
conMetrics = metrics.NewConsensusCollector(node.Tracer, node.MetricsRegisterer)
return nil
Expand Down Expand Up @@ -632,6 +649,19 @@ func main() {
return nil, fmt.Errorf("could not find latest finalized block and pending blocks: %w", err)
}

opts := []consensus.Option{
consensus.WithInitialTimeout(hotstuffTimeout),
consensus.WithMinTimeout(hotstuffMinTimeout),
consensus.WithVoteAggregationTimeoutFraction(hotstuffTimeoutVoteAggregationFraction),
consensus.WithTimeoutIncreaseFactor(hotstuffTimeoutIncreaseFactor),
consensus.WithTimeoutDecreaseFactor(hotstuffTimeoutDecreaseFactor),
consensus.WithBlockRateDelay(blockRateDelay),
}

if !startupTime.IsZero() {
opts = append(opts, consensus.WithStartupTime(startupTime))
}

// initialize hotstuff consensus algorithm
hot, err := consensus.NewParticipant(
node.Logger,
Expand All @@ -648,12 +678,7 @@ func main() {
node.RootQC,
finalized,
pending,
consensus.WithInitialTimeout(hotstuffTimeout),
consensus.WithMinTimeout(hotstuffMinTimeout),
consensus.WithVoteAggregationTimeoutFraction(hotstuffTimeoutVoteAggregationFraction),
consensus.WithTimeoutIncreaseFactor(hotstuffTimeoutIncreaseFactor),
consensus.WithTimeoutDecreaseFactor(hotstuffTimeoutDecreaseFactor),
consensus.WithBlockRateDelay(blockRateDelay),
opts...,
)
if err != nil {
return nil, fmt.Errorf("could not initialize hotstuff engine: %w", err)
Expand Down Expand Up @@ -736,6 +761,7 @@ func main() {
node.Me,
dkgContractClient,
dkgBrokerTunnel,
dkgControllerConfig,
),
viewsObserver,
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func main() {
if err != nil {
return nil, fmt.Errorf("cannot create checkpointer: %w", err)
}
compactor := wal.NewCompactor(checkpointer, 10*time.Second, checkpointDistance, checkpointsToKeep)
compactor := wal.NewCompactor(checkpointer, 10*time.Second, checkpointDistance, checkpointsToKeep, node.Logger.With().Str("subcomponent", "checkpointer").Logger())

return compactor, nil
}).
Expand Down
10 changes: 7 additions & 3 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,14 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
fnb.Me.NodeID(),
myAddr,
fnb.NetworkKey,
fnb.RootBlock.ID().String(),
fnb.RootBlock.ID(),
fnb.RootChainID,
fnb.IdentityProvider,
p2p.DefaultMaxPubSubMsgSize,
fnb.Metrics.Network,
pingProvider,
fnb.BaseConfig.DNSCacheTTL)
fnb.BaseConfig.DNSCacheTTL,
fnb.BaseConfig.NodeRole)

if err != nil {
return nil, fmt.Errorf("could not generate libp2p node factory: %w", err)
Expand All @@ -205,7 +208,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
libP2PNodeFactory,
fnb.Me.NodeID(),
fnb.Metrics.Network,
fnb.RootBlock.ID().String(),
fnb.RootBlock.ID(),
fnb.BaseConfig.UnicastMessageTimeout,
true,
fnb.IDTranslator,
Expand Down Expand Up @@ -481,6 +484,7 @@ func (fnb *FlowNodeBuilder) initSecretsDB() {

opts := badger.DefaultOptions(fnb.BaseConfig.secretsdir).WithLogger(log)
// attempt to read an encryption key for the secrets DB from the canonical path
// TODO enforce encryption in an upcoming spork https://github.com/dapperlabs/flow-go/issues/5893
encryptionKey, err := loadSecretsEncryptionKey(fnb.BootstrapDir, fnb.NodeID)
if errors.Is(err, os.ErrNotExist) {
fnb.Logger.Warn().Msg("starting with secrets database encryption disabled")
Expand Down
4 changes: 3 additions & 1 deletion cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func run(*cobra.Command, []string) {
if err != nil {
log.Fatal().Err(err).Msg("invalid state commitment length")
}
} else if !flagNoMigration {
}

if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 {
// read state commitment from root checkpoint

f, err := os.Open(path.Join(flagExecutionStateDir, bootstrap.FilenameWALRootCheckpoint))
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/ledger/migrations/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func AddMissingKeysMigration(payloads []ledger.Payload) ([]ledger.Payload, error
return l.Payloads(), nil
}

func appendKeyForAccount(accounts *state.Accounts, addressInHex string, encodedKeyInHex string) error {
func appendKeyForAccount(accounts state.Accounts, addressInHex string, encodedKeyInHex string) error {
address := flow.HexToAddress(addressInHex)
ok, err := accounts.Exists(address)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cmd/util/ledger/migrations/storage_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type storageFormatV5MigrationResult struct {
type StorageFormatV5Migration struct {
Log zerolog.Logger
OutputDir string
accounts *state.Accounts
accounts state.Accounts
programs *programs.Programs
brokenTypeIDs map[common.TypeID]brokenTypeCause
reportFile *os.File
Expand Down Expand Up @@ -160,7 +160,7 @@ func (m *StorageFormatV5Migration) Migrate(payloads []ledger.Payload) ([]ledger.
return migratedPayloads, nil
}

func (m StorageFormatV5Migration) getContractsOnlyAccounts(payloads []ledger.Payload) *state.Accounts {
func (m StorageFormatV5Migration) getContractsOnlyAccounts(payloads []ledger.Payload) state.Accounts {
var filteredPayloads []ledger.Payload

for _, payload := range payloads {
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func (m StorageFormatV5Migration) loadProgram(
}

type migrationRuntimeInterface struct {
accounts *state.Accounts
accounts state.Accounts
programs *programs.Programs
}

Expand Down
7 changes: 7 additions & 0 deletions consensus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
)

type ParticipantConfig struct {
StartupTime time.Time // the time when consensus participant enters first view
TimeoutInitial time.Duration // the initial timeout for the pacemaker
TimeoutMinimum time.Duration // the minimum timeout for the pacemaker
TimeoutAggregationFraction float64 // the percentage part of the timeout period reserved for vote aggregation
Expand All @@ -15,6 +16,12 @@ type ParticipantConfig struct {

type Option func(*ParticipantConfig)

func WithStartupTime(time time.Time) Option {
return func(cfg *ParticipantConfig) {
cfg.StartupTime = time
}
}

func WithInitialTimeout(timeout time.Duration) Option {
return func(cfg *ParticipantConfig) {
cfg.TimeoutInitial = timeout
Expand Down
12 changes: 12 additions & 0 deletions consensus/hotstuff/committees/consensus_committee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,25 @@ func TestConsensus_LeaderForView(t *testing.T) {

t.Run("after current epoch", func(t *testing.T) {
t.SkipNow()
// REASON FOR SKIPPING TEST:
// We have a temporary fallback to continue with the current consensus committee, if the
// setup for the next epoch failed (aka emergency epoch chain continuation -- EECC).
// This test covers with behaviour _without_ EECC and is therefore skipped.
// The behaviour _with EECC_ is covered by the following test:
// "after current epoch - with emergency epoch chain continuation"
// TODO: for the mature implementation, remove EECC, enable this test, and remove the following test

// get leader for view in next epoch when it is not set up yet
_, err := committee.LeaderForView(250)
assert.Error(t, err)
assert.True(t, errors.Is(err, protocol.ErrNextEpochNotSetup))
})

t.Run("after current epoch - with emergency epoch chain continuation", func(t *testing.T) {
// This test covers the TEMPORARY emergency epoch chain continuation (EECC) fallback
// TODO: for the mature implementation, remove this test,
// enable the previous test "after current epoch"

// get leader for view in next epoch when it is not set up yet
_, err := committee.LeaderForView(250)
// emergency epoch chain continuation should kick in and return a valid leader
Expand Down

0 comments on commit 63ca09f

Please sign in to comment.