Skip to content

Commit

Permalink
Merge #3205 #3211 #3219
Browse files Browse the repository at this point in the history
3205: Refactor ProgramsCache into ChainPrograms r=pattyshack a=pattyshack

The main difference between the two is in how we create BlockPrograms for block execution.  The HasChanges optimization no longer makes sense since cloning BlockPrograms now deep copied the map in order to support fine grain eviction.

3211: Backport "Optimize GetEventsByBlockIDs" to master r=zhangchiqing a=zhangchiqing

Backport #3200

3219: [FVM] Remove old quick fixes r=janezpodhostnik a=janezpodhostnik

Both of these were already properly fixed either in the FVM or in cadence. 

The reason I am removing them now is because the fuzz tests test exactly what these quick fixes were for: a scenario where the node crashes but shouldn't, because of reaching the limits at a bad location in the code.

I ran the fuzzer for an entire day after removing this. 

Co-authored-by: Patrick Lee <patrick.lee@dapperlabs.com>
Co-authored-by: Leo Zhang (zhangchiqing) <zhangchiqing@gmail.com>
Co-authored-by: Maks Pawlak <120831+m4ksio@users.noreply.github.com>
Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>
Co-authored-by: Janez Podhostnik <janez.podhostnik@gmail.com>
  • Loading branch information
5 people committed Sep 13, 2022
4 parents 8fea4c6 + 872fb92 + bf38092 + 0212dd5 commit c58f31f
Show file tree
Hide file tree
Showing 18 changed files with 298 additions and 141 deletions.
2 changes: 1 addition & 1 deletion cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,12 +884,12 @@ func (exeNode *ExecutionNode) LoadGrpcServer(
node.Logger,
exeNode.exeConf.rpcConf,
exeNode.ingestionEng,
node.Storage.Blocks,
node.Storage.Headers,
node.State,
exeNode.events,
exeNode.results,
exeNode.txResults,
exeNode.commits,
node.RootChainID,
signature.NewBlockSignerDecoder(exeNode.committee),
exeNode.exeConf.apiRatelimits,
Expand Down
3 changes: 2 additions & 1 deletion cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/onflow/flow-go/engine/execution/computation"
"github.com/onflow/flow-go/engine/execution/rpc"
"github.com/onflow/flow-go/fvm/programs"
storage "github.com/onflow/flow-go/storage/badger"
)

Expand Down Expand Up @@ -63,7 +64,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.UintVar(&exeConf.checkpointDistance, "checkpoint-distance", 20, "number of WAL segments between checkpoints")
flags.UintVar(&exeConf.checkpointsToKeep, "checkpoints-to-keep", 5, "number of recent checkpoints to keep (0 to keep all)")
flags.UintVar(&exeConf.stateDeltasLimit, "state-deltas-limit", 100, "maximum number of state deltas in the memory pool")
flags.UintVar(&exeConf.computationConfig.ProgramsCacheSize, "cadence-execution-cache", computation.DefaultProgramsCacheSize,
flags.UintVar(&exeConf.computationConfig.ProgramsCacheSize, "cadence-execution-cache", programs.DefaultProgramsCacheSize,
"cache size for Cadence execution")
flags.BoolVar(&exeConf.computationConfig.ExtensiveTracing, "extensive-tracing", false, "adds high-overhead tracing to execution")
flags.BoolVar(&exeConf.computationConfig.CadenceTracing, "cadence-tracing", false, "enables cadence runtime level tracing")
Expand Down
6 changes: 4 additions & 2 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,9 @@ func (fnb *FlowNodeBuilder) initStorage() {
transactions := bstorage.NewTransactions(fnb.Metrics.Cache, fnb.DB)
collections := bstorage.NewCollections(fnb.DB, transactions)
setups := bstorage.NewEpochSetups(fnb.Metrics.Cache, fnb.DB)
commits := bstorage.NewEpochCommits(fnb.Metrics.Cache, fnb.DB)
epochCommits := bstorage.NewEpochCommits(fnb.Metrics.Cache, fnb.DB)
statuses := bstorage.NewEpochStatuses(fnb.Metrics.Cache, fnb.DB)
commits := bstorage.NewCommits(fnb.Metrics.Cache, fnb.DB)

fnb.Storage = Storage{
Headers: headers,
Expand All @@ -723,8 +724,9 @@ func (fnb *FlowNodeBuilder) initStorage() {
Transactions: transactions,
Collections: collections,
Setups: setups,
EpochCommits: commits,
EpochCommits: epochCommits,
Statuses: statuses,
Commits: commits,
}
}

Expand Down
1 change: 1 addition & 0 deletions engine/common/follower/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func (e *Engine) onBlockProposal(originID flow.Identifier, proposal *messages.Bl
header := proposal.Header

log := e.log.With().
Hex("origin_id", originID[:]).
Str("chain_id", header.ChainID.String()).
Uint64("block_height", header.Height).
Uint64("block_view", header.View).
Expand Down
1 change: 1 addition & 0 deletions engine/consensus/compliance/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (c *Core) OnBlockProposal(originID flow.Identifier, proposal *messages.Bloc

header := proposal.Header
log := c.log.With().
Hex("origin_id", originID[:]).
Str("chain_id", header.ChainID.String()).
Uint64("block_height", header.Height).
Uint64("block_view", header.View).
Expand Down
37 changes: 8 additions & 29 deletions engine/execution/computation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Manager struct {
vm computer.VirtualMachine
vmCtx fvm.Context
blockComputer computer.BlockComputer
programsCache *ProgramsCache
programsCache *programs.ChainPrograms
scriptLogThreshold time.Duration
scriptExecutionTimeLimit time.Duration
uploaders []uploader.Uploader
Expand Down Expand Up @@ -145,7 +145,7 @@ func New(
return nil, fmt.Errorf("cannot create block computer: %w", err)
}

programsCache, err := NewProgramsCache(params.ProgramsCacheSize)
programsCache, err := programs.NewChainPrograms(params.ProgramsCacheSize)
if err != nil {
return nil, fmt.Errorf("cannot create programs cache: %w", err)
}
Expand Down Expand Up @@ -174,14 +174,6 @@ func (e *Manager) VM() computer.VirtualMachine {
return e.vm
}

func (e *Manager) getChildProgramsOrEmpty(blockID flow.Identifier) *programs.Programs {
blockPrograms := e.programsCache.Get(blockID)
if blockPrograms == nil {
return programs.NewEmptyPrograms()
}
return blockPrograms.ChildPrograms()
}

func (e *Manager) ExecuteScript(
ctx context.Context,
code []byte,
Expand Down Expand Up @@ -215,7 +207,8 @@ func (e *Manager) ExecuteScript(
blockCtx := fvm.NewContextFromParent(
e.vmCtx,
fvm.WithBlockHeader(blockHeader),
fvm.WithBlockPrograms(e.getChildProgramsOrEmpty(blockHeader.ID())))
fvm.WithBlockPrograms(
e.programsCache.NewBlockProgramsForScript(blockHeader.ID())))

err := func() (err error) {

Expand Down Expand Up @@ -292,14 +285,9 @@ func (e *Manager) ComputeBlock(
Hex("block_id", logging.Entity(block.Block)).
Msg("received complete block")

var blockPrograms *programs.Programs
fromCache := e.programsCache.Get(block.ParentID())

if fromCache == nil {
blockPrograms = programs.NewEmptyPrograms()
} else {
blockPrograms = fromCache.ChildPrograms()
}
blockPrograms := e.programsCache.GetOrCreateBlockPrograms(
block.ID(),
block.ParentID())

result, err := e.blockComputer.ExecuteBlock(ctx, block, view, blockPrograms)
if err != nil {
Expand All @@ -312,15 +300,6 @@ func (e *Manager) ComputeBlock(

e.log.Debug().Hex("block_id", logging.Entity(block.Block)).Msg("block result computed")

toInsert := blockPrograms

// if we have item from cache and there were no changes
// insert it under new block, to prevent long chains
if fromCache != nil && !blockPrograms.HasChanges() {
toInsert = fromCache
}

e.programsCache.Set(block.ID(), toInsert)
e.log.Debug().Hex("block_id", logging.Entity(block.Block)).Msg("programs cache updated")

if uploadEnabled {
Expand Down Expand Up @@ -354,7 +333,7 @@ func (e *Manager) ComputeBlock(
func (e *Manager) GetAccount(address flow.Address, blockHeader *flow.Header, view state.View) (*flow.Account, error) {
blockCtx := fvm.NewContextFromParent(e.vmCtx, fvm.WithBlockHeader(blockHeader))

programs := e.getChildProgramsOrEmpty(blockHeader.ID())
programs := e.programsCache.NewBlockProgramsForScript(blockHeader.ID())

account, err := e.vm.GetAccount(blockCtx, address, view, programs)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion engine/execution/computation/manager_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func BenchmarkComputeBlock(b *testing.B) {
blockComputer, err := computer.NewBlockComputer(vm, execCtx, metrics.NewNoopCollector(), tracer, zerolog.Nop(), committer.NewNoopViewCommitter(), prov)
require.NoError(b, err)

programsCache, err := NewProgramsCache(1000)
programsCache, err := programs.NewChainPrograms(
programs.DefaultProgramsCacheSize)
require.NoError(b, err)

engine := &Manager{
Expand Down
20 changes: 10 additions & 10 deletions engine/execution/computation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestComputeBlockWithStorage(t *testing.T) {
blockComputer, err := computer.NewBlockComputer(vm, execCtx, metrics.NewNoopCollector(), trace.NewNoopTracer(), zerolog.Nop(), committer.NewNoopViewCommitter(), prov)
require.NoError(t, err)

programsCache, err := NewProgramsCache(10)
programsCache, err := programs.NewChainPrograms(10)
require.NoError(t, err)

engine := &Manager{
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestComputeBlock_Uploader(t *testing.T) {
computationResult: computationResult,
}

programsCache, err := NewProgramsCache(10)
programsCache, err := programs.NewChainPrograms(10)
require.NoError(t, err)

fakeUploader := &FakeUploader{}
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestExecuteScript(t *testing.T) {
nil,
prov,
ComputationConfig{
ProgramsCacheSize: DefaultProgramsCacheSize,
ProgramsCacheSize: programs.DefaultProgramsCacheSize,
ScriptLogThreshold: scriptLogThreshold,
ScriptExecutionTimeLimit: DefaultScriptExecutionTimeLimit,
},
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestExecuteScript_BalanceScriptFailsIfViewIsEmpty(t *testing.T) {
nil,
prov,
ComputationConfig{
ProgramsCacheSize: DefaultProgramsCacheSize,
ProgramsCacheSize: programs.DefaultProgramsCacheSize,
ScriptLogThreshold: scriptLogThreshold,
ScriptExecutionTimeLimit: DefaultScriptExecutionTimeLimit,
},
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestExecuteScripPanicsAreHandled(t *testing.T) {
nil,
prov,
ComputationConfig{
ProgramsCacheSize: DefaultProgramsCacheSize,
ProgramsCacheSize: programs.DefaultProgramsCacheSize,
ScriptLogThreshold: scriptLogThreshold,
ScriptExecutionTimeLimit: DefaultScriptExecutionTimeLimit,
NewCustomVirtualMachine: func() computer.VirtualMachine {
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestExecuteScript_LongScriptsAreLogged(t *testing.T) {
nil,
prov,
ComputationConfig{
ProgramsCacheSize: DefaultProgramsCacheSize,
ProgramsCacheSize: programs.DefaultProgramsCacheSize,
ScriptLogThreshold: 1 * time.Millisecond,
ScriptExecutionTimeLimit: DefaultScriptExecutionTimeLimit,
NewCustomVirtualMachine: func() computer.VirtualMachine {
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestExecuteScript_ShortScriptsAreNotLogged(t *testing.T) {
nil,
prov,
ComputationConfig{
ProgramsCacheSize: DefaultProgramsCacheSize,
ProgramsCacheSize: programs.DefaultProgramsCacheSize,
ScriptLogThreshold: 1 * time.Second,
ScriptExecutionTimeLimit: DefaultScriptExecutionTimeLimit,
NewCustomVirtualMachine: func() computer.VirtualMachine {
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestExecuteScriptTimeout(t *testing.T) {
nil,
nil,
ComputationConfig{
ProgramsCacheSize: DefaultProgramsCacheSize,
ProgramsCacheSize: programs.DefaultProgramsCacheSize,
ScriptLogThreshold: DefaultScriptLogThreshold,
ScriptExecutionTimeLimit: timeout,
},
Expand Down Expand Up @@ -598,7 +598,7 @@ func TestExecuteScriptCancelled(t *testing.T) {
nil,
nil,
ComputationConfig{
ProgramsCacheSize: DefaultProgramsCacheSize,
ProgramsCacheSize: programs.DefaultProgramsCacheSize,
ScriptLogThreshold: DefaultScriptLogThreshold,
ScriptExecutionTimeLimit: timeout,
},
Expand Down Expand Up @@ -649,7 +649,7 @@ func TestScriptStorageMutationsDiscarded(t *testing.T) {
nil,
nil,
ComputationConfig{
ProgramsCacheSize: DefaultProgramsCacheSize,
ProgramsCacheSize: programs.DefaultProgramsCacheSize,
ScriptLogThreshold: DefaultScriptLogThreshold,
ScriptExecutionTimeLimit: timeout,
},
Expand Down
38 changes: 0 additions & 38 deletions engine/execution/computation/programs.go

This file was deleted.

8 changes: 2 additions & 6 deletions engine/execution/computation/programs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestPrograms_TestContractUpdates(t *testing.T) {
blockComputer, err := computer.NewBlockComputer(vm, execCtx, metrics.NewNoopCollector(), trace.NewNoopTracer(), zerolog.Nop(), committer.NewNoopViewCommitter(), prov)
require.NoError(t, err)

programsCache, err := NewProgramsCache(10)
programsCache, err := programs.NewChainPrograms(10)
require.NoError(t, err)

engine := &Manager{
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestPrograms_TestBlockForks(t *testing.T) {
blockComputer, err := computer.NewBlockComputer(vm, execCtx, metrics.NewNoopCollector(), trace.NewNoopTracer(), zerolog.Nop(), committer.NewNoopViewCommitter(), prov)
require.NoError(t, err)

programsCache, err := NewProgramsCache(10)
programsCache, err := programs.NewChainPrograms(10)
require.NoError(t, err)

engine := &Manager{
Expand Down Expand Up @@ -276,8 +276,6 @@ func TestPrograms_TestBlockForks(t *testing.T) {
block11, res = createTestBlockAndRun(t, engine, block1, col11, block11View)
// cache should include value for this block
require.NotNil(t, programsCache.Get(block11.ID()))
// cache should have changes
require.True(t, programsCache.Get(block11.ID()).HasChanges())
// 1st event should be contract deployed
assert.EqualValues(t, "flow.AccountContractAdded", res.Events[0][0].Type)
})
Expand All @@ -297,8 +295,6 @@ func TestPrograms_TestBlockForks(t *testing.T) {
block111, res = createTestBlockAndRun(t, engine, block11, col111, block111View)
// cache should include a program for this block
require.NotNil(t, programsCache.Get(block111.ID()))
// cache should have changes
require.True(t, programsCache.Get(block111.ID()).HasChanges())

require.Len(t, res.Events, 2)

Expand Down

0 comments on commit c58f31f

Please sign in to comment.