Skip to content

Commit

Permalink
Merge pull request #1182 from onflow/m4ksio/port-en-dps-uploader
Browse files Browse the repository at this point in the history
Port of block data uploader
  • Loading branch information
Kay-Zee committed Sep 1, 2021
2 parents b75db19 + 42acf98 commit 0714d5a
Show file tree
Hide file tree
Showing 42 changed files with 1,180 additions and 171 deletions.
137 changes: 94 additions & 43 deletions cmd/execution/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"io"
"os"
Expand All @@ -10,6 +11,8 @@ import (

"github.com/spf13/pflag"

"github.com/onflow/flow-go/engine/execution/computation/computer/uploader"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff/committees"
Expand Down Expand Up @@ -54,49 +57,54 @@ import (
func main() {

var (
followerState protocol.MutableState
ledgerStorage *ledger.Ledger
events *storage.Events
serviceEvents *storage.ServiceEvents
txResults *storage.TransactionResults
results *storage.ExecutionResults
myReceipts *storage.MyExecutionReceipts
providerEngine *exeprovider.Engine
checkerEng *checker.Engine
syncCore *chainsync.Core
pendingBlocks *buffer.PendingBlocks // used in follower engine
deltas *ingestion.Deltas
syncEngine *synchronization.Engine
followerEng *followereng.Engine // to sync blocks from consensus nodes
computationManager *computation.Manager
collectionRequester *requester.Engine
ingestionEng *ingestion.Engine
finalizationDistributor *pubsub.FinalizationDistributor
finalizedHeader *synchronization.FinalizedHeaderCache
rpcConf rpc.Config
err error
executionState state.ExecutionState
triedir string
collector module.ExecutionMetrics
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
stateDeltasLimit uint
cadenceExecutionCache uint
chdpCacheSize uint
requestInterval time.Duration
preferredExeNodeIDStr string
syncByBlocks bool
syncFast bool
syncThreshold int
extensiveLog bool
pauseExecution bool
checkStakedAtBlock func(blockID flow.Identifier) (bool, error)
diskWAL *wal.DiskWAL
scriptLogThreshold time.Duration
chdpQueryTimeout uint
chdpDeliveryTimeout uint
followerState protocol.MutableState
ledgerStorage *ledger.Ledger
events *storage.Events
serviceEvents *storage.ServiceEvents
txResults *storage.TransactionResults
results *storage.ExecutionResults
myReceipts *storage.MyExecutionReceipts
providerEngine *exeprovider.Engine
checkerEng *checker.Engine
syncCore *chainsync.Core
pendingBlocks *buffer.PendingBlocks // used in follower engine
deltas *ingestion.Deltas
syncEngine *synchronization.Engine
followerEng *followereng.Engine // to sync blocks from consensus nodes
computationManager *computation.Manager
collectionRequester *requester.Engine
ingestionEng *ingestion.Engine
finalizationDistributor *pubsub.FinalizationDistributor
finalizedHeader *synchronization.FinalizedHeaderCache
rpcConf rpc.Config
err error
executionState state.ExecutionState
triedir string
collector module.ExecutionMetrics
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
stateDeltasLimit uint
cadenceExecutionCache uint
chdpCacheSize uint
requestInterval time.Duration
preferredExeNodeIDStr string
syncByBlocks bool
syncFast bool
syncThreshold int
extensiveLog bool
pauseExecution bool
checkStakedAtBlock func(blockID flow.Identifier) (bool, error)
diskWAL *wal.DiskWAL
scriptLogThreshold time.Duration
chdpQueryTimeout uint
chdpDeliveryTimeout uint
enableBlockDataUpload bool
gcpBucketName string
blockDataUploader uploader.Uploader
blockDataUploaderMaxRetry uint64 = 5
blockdataUploaderRetryTimeout = 1 * time.Second
)

cmd.FlowNode(flow.RoleExecution.String()).
Expand Down Expand Up @@ -124,6 +132,16 @@ func main() {
flags.UintVar(&chdpQueryTimeout, "chunk-data-pack-query-timeout-sec", 10, "number of seconds to determine a chunk data pack query being slow")
flags.UintVar(&chdpDeliveryTimeout, "chunk-data-pack-delivery-timeout-sec", 10, "number of seconds to determine a chunk data pack response delivery being slow")
flags.BoolVar(&pauseExecution, "pause-execution", false, "pause the execution. when set to true, no block will be executed, but still be able to serve queries")
flags.BoolVar(&enableBlockDataUpload, "enable-blockdata-upload", false, "enable uploading block data to GCP Bucket")
flags.StringVar(&gcpBucketName, "gcp-bucket-name", "", "GCP Bucket name for block data uploader")
}).
ValidateFlags(func() error {
if enableBlockDataUpload {
if gcpBucketName == "" {
return fmt.Errorf("invalid flag. gcp-bucket-name required when blockdata-uploader is enabled")
}
}
return nil
}).
Initialize().
Module("mutable follower state", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -160,6 +178,38 @@ func main() {
pendingBlocks = buffer.NewPendingBlocks() // for following main chain consensus
return nil
}).
Component("Block data uploader", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if enableBlockDataUpload {

logger := node.Logger.With().Str("component_name", "block_data_uploader").Logger()
gcpBucketUploader, err := uploader.NewGCPBucketUploader(
context.Background(),
gcpBucketName,
logger,
)
if err != nil {
return nil, fmt.Errorf("cannot create GCP Bucket uploader: %w", err)
}

asyncUploader := uploader.NewAsyncUploader(
gcpBucketUploader,
blockdataUploaderRetryTimeout,
blockDataUploaderMaxRetry,
logger,
collector,
)

blockDataUploader = asyncUploader

return asyncUploader, nil
}

// Since we don't have conditional component creation, we just use Noop one.
// It's functions will be once per startup/shutdown - non-measurable performance penalty
// blockDataUploader will stay nil and disable calling uploader at all
return &module.NoopReadDoneAware{}, nil

}).
Module("state deltas mempool", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) error {
deltas, err = ingestion.NewDeltas(stateDeltasLimit)
return err
Expand Down Expand Up @@ -248,6 +298,7 @@ func main() {
cadenceExecutionCache,
committer,
scriptLogThreshold,
blockDataUploader,
)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type NodeBuilder interface {

// RegisterBadgerMetrics registers all badger related metrics
RegisterBadgerMetrics()

// ValidateFlags is an extra method called after parsing flags, intended for extra check of flag validity
// for example where certain combinations aren't allowed
ValidateFlags(func() error) NodeBuilder
}

// BaseConfig is the general config for the NodeBuilder and the command line params
Expand Down
33 changes: 25 additions & 8 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ type namedDoneObject struct {
// of the process in case of nodes such as the unstaked access node where the NodeInfo is not part of the genesis data
type FlowNodeBuilder struct {
*NodeConfig
flags *pflag.FlagSet
modules []namedModuleFunc
components []namedComponentFunc
doneObject []namedDoneObject
sig chan os.Signal
preInitFns []func(NodeBuilder, *NodeConfig)
postInitFns []func(NodeBuilder, *NodeConfig)
lm *lifecycle.LifecycleManager
flags *pflag.FlagSet
modules []namedModuleFunc
components []namedComponentFunc
doneObject []namedDoneObject
sig chan os.Signal
preInitFns []func(NodeBuilder, *NodeConfig)
postInitFns []func(NodeBuilder, *NodeConfig)
lm *lifecycle.LifecycleManager
extraFlagCheck func() error
}

func (fnb *FlowNodeBuilder) BaseFlags() {
Expand Down Expand Up @@ -245,6 +246,11 @@ func (fnb *FlowNodeBuilder) ParseAndPrintFlags() {
log.Msg("flags loaded")
}

func (fnb *FlowNodeBuilder) ValidateFlags(f func() error) NodeBuilder {
fnb.extraFlagCheck = f
return fnb
}

func (fnb *FlowNodeBuilder) PrintBuildVersionDetails() {
fnb.Logger.Info().Str("version", build.Semver()).Str("commit", build.Commit()).Msg("build details")
}
Expand Down Expand Up @@ -715,6 +721,8 @@ func (fnb *FlowNodeBuilder) Initialize() NodeBuilder {

fnb.ParseAndPrintFlags()

fnb.extraFlagsValidation()

fnb.EnqueueNetworkInit(ctx)

if fnb.metricsEnabled {
Expand Down Expand Up @@ -843,6 +851,15 @@ func (fnb *FlowNodeBuilder) closeDatabase() {
}
}

func (fnb *FlowNodeBuilder) extraFlagsValidation() {
if fnb.extraFlagCheck != nil {
err := fnb.extraFlagCheck()
if err != nil {
fnb.Logger.Fatal().Err(err).Msg("invalid flags")
}
}
}

// loadRootProtocolSnapshot loads the root protocol snapshot from disk
func loadRootProtocolSnapshot(dir string) (*inmem.Snapshot, error) {
data, err := io.ReadFile(filepath.Join(dir, bootstrap.PathRootProtocolStateSnapshot))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestExtractExecutionState(t *testing.T) {
update, err := ledger.NewUpdate(stateCommitment, keys, values)
require.NoError(t, err)

stateCommitment, err = f.Set(update)
stateCommitment, _, err = f.Set(update)
//stateCommitment, err = f.UpdateRegisters(keys, values, stateCommitment)
require.NoError(t, err)

Expand Down
6 changes: 3 additions & 3 deletions engine/execution/computation/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewLedgerViewCommitter(ldg ledger.Ledger, tracer module.Tracer) *LedgerView
return &LedgerViewCommitter{ldg: ldg, tracer: tracer}
}

func (s *LedgerViewCommitter) CommitView(view state.View, baseState flow.StateCommitment) (newCommit flow.StateCommitment, proof []byte, err error) {
func (s *LedgerViewCommitter) CommitView(view state.View, baseState flow.StateCommitment) (newCommit flow.StateCommitment, proof []byte, trieUpdate *ledger.TrieUpdate, err error) {
var err1, err2 error
var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -31,7 +31,7 @@ func (s *LedgerViewCommitter) CommitView(view state.View, baseState flow.StateCo
wg.Done()
}()

newCommit, err1 = s.commitView(view, baseState)
newCommit, trieUpdate, err1 = s.commitView(view, baseState)
wg.Wait()

if err1 != nil {
Expand All @@ -43,7 +43,7 @@ func (s *LedgerViewCommitter) CommitView(view state.View, baseState flow.StateCo
return
}

func (s *LedgerViewCommitter) commitView(view state.View, baseState flow.StateCommitment) (newCommit flow.StateCommitment, err error) {
func (s *LedgerViewCommitter) commitView(view state.View, baseState flow.StateCommitment) (newCommit flow.StateCommitment, update *ledger.TrieUpdate, err error) {
return execState.CommitDelta(s.ldg, view, baseState)
}

Expand Down
4 changes: 2 additions & 2 deletions engine/execution/computation/committer/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestLedgerViewCommitter(t *testing.T) {
var expectedStateCommitment led.State
copy(expectedStateCommitment[:], []byte{1, 2, 3})
ledger.On("Set", mock.Anything).
Return(expectedStateCommitment, nil).
Return(expectedStateCommitment, nil, nil).
Once()

expectedProof := led.Proof([]byte{2, 3, 4})
Expand All @@ -43,7 +43,7 @@ func TestLedgerViewCommitter(t *testing.T) {
)
require.NoError(t, err)

newState, proof, err := com.CommitView(view, utils.StateCommitmentFixture())
newState, proof, _, err := com.CommitView(view, utils.StateCommitmentFixture())
require.NoError(t, err)
require.Equal(t, flow.StateCommitment(expectedStateCommitment), newState)
require.Equal(t, []uint8(expectedProof), proof)
Expand Down
5 changes: 3 additions & 2 deletions engine/execution/computation/committer/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package committer

import (
"github.com/onflow/flow-go/fvm/state"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/model/flow"
)

Expand All @@ -12,6 +13,6 @@ func NewNoopViewCommitter() *NoopViewCommitter {
return &NoopViewCommitter{}
}

func (n NoopViewCommitter) CommitView(_ state.View, s flow.StateCommitment) (flow.StateCommitment, []byte, error) {
return s, nil, nil
func (n NoopViewCommitter) CommitView(_ state.View, s flow.StateCommitment) (flow.StateCommitment, []byte, *ledger.TrieUpdate, error) {
return s, nil, nil, nil
}
15 changes: 10 additions & 5 deletions engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/rs/zerolog"
"github.com/uber/jaeger-client-go"

"github.com/onflow/flow-go/ledger"

"github.com/onflow/flow-go/engine/execution"
"github.com/onflow/flow-go/engine/execution/state/delta"
"github.com/onflow/flow-go/fvm"
Expand All @@ -32,7 +34,7 @@ type VirtualMachine interface {
// ViewCommitter commits views's deltas to the ledger and collects the proofs
type ViewCommitter interface {
// CommitView commits a views' register delta and collects proofs
CommitView(state.View, flow.StateCommitment) (flow.StateCommitment, []byte, error)
CommitView(state.View, flow.StateCommitment) (flow.StateCommitment, []byte, *ledger.TrieUpdate, error)
}

// A BlockComputer executes the transactions in a block.
Expand Down Expand Up @@ -141,19 +143,21 @@ func (e *blockComputer) executeBlock(

stateCommitments := make([]flow.StateCommitment, 0, len(collections)+1)
proofs := make([][]byte, 0, len(collections)+1)
trieUpdates := make([]*ledger.TrieUpdate, len(collections)+1)

bc := blockCommitter{
committer: e.committer,
blockSpan: blockSpan,
tracer: e.tracer,
state: *block.StartState,
views: make(chan state.View, len(collections)+1),
callBack: func(state flow.StateCommitment, proof []byte, err error) {
callBack: func(state flow.StateCommitment, proof []byte, trieUpdate *ledger.TrieUpdate, err error) {
if err != nil {
panic(err)
}
stateCommitments = append(stateCommitments, state)
proofs = append(proofs, proof)
trieUpdates = append(trieUpdates, trieUpdate)
},
}

Expand Down Expand Up @@ -217,6 +221,7 @@ func (e *blockComputer) executeBlock(
res.StateReads = stateView.(*delta.View).ReadsCount()
res.StateCommitments = stateCommitments
res.Proofs = proofs
res.TrieUpdates = trieUpdates

return res, nil
}
Expand Down Expand Up @@ -399,7 +404,7 @@ func (e *blockComputer) executeTransaction(
type blockCommitter struct {
tracer module.Tracer
committer ViewCommitter
callBack func(state flow.StateCommitment, proof []byte, err error)
callBack func(state flow.StateCommitment, proof []byte, update *ledger.TrieUpdate, err error)
state flow.StateCommitment
views chan state.View
blockSpan opentracing.Span
Expand All @@ -408,8 +413,8 @@ type blockCommitter struct {
func (bc *blockCommitter) Run() {
for view := range bc.views {
span := bc.tracer.StartSpanFromParent(bc.blockSpan, trace.EXECommitDelta)
stateCommit, proof, err := bc.committer.CommitView(view, bc.state)
bc.callBack(stateCommit, proof, err)
stateCommit, proof, trieUpdate, err := bc.committer.CommitView(view, bc.state)
bc.callBack(stateCommit, proof, trieUpdate, err)
bc.state = stateCommit
span.Finish()
}
Expand Down

0 comments on commit 0714d5a

Please sign in to comment.