Skip to content

Commit

Permalink
Merge pull request #924 from onflow/m4ksio/queueing-fix
Browse files Browse the repository at this point in the history
Fix execution halt when reloading blocks
  • Loading branch information
m4ksio committed Jul 7, 2021
2 parents 9f526ca + a407825 commit d457915
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 104 deletions.
48 changes: 35 additions & 13 deletions engine/execution/ingestion/engine.go
Expand Up @@ -324,9 +324,20 @@ func (e *Engine) reloadUnexecutedBlocks() error {

isRoot := rootBlock.ID() == last.ID()
if !isRoot {
err = e.reloadBlock(blockByCollection, executionQueues, lastExecutedID)
executed, err := state.IsBlockExecuted(e.unit.Ctx(), e.execState, lastExecutedID)
if err != nil {
return fmt.Errorf("could not reload the last executed final block: %v, %w", lastExecutedID, err)
return fmt.Errorf("cannot check is last exeucted final block has been executed %v: %w", lastExecutedID, err)
}
if !executed {
// this should not happen, but if it does, execution should still work
e.log.Warn().
Hex("block_id", lastExecutedID[:]).
Msg("block marked as highest executed one, but not executable - internal inconsistency")

err = e.reloadBlock(blockByCollection, executionQueues, lastExecutedID)
if err != nil {
return fmt.Errorf("could not reload the last executed final block: %v, %w", lastExecutedID, err)
}
}
}

Expand Down Expand Up @@ -461,12 +472,13 @@ func (e *Engine) enqueueBlockAndCheckExecutable(
Logger()

// adding the block to the queue,
queue, added := enqueue(executableBlock, executionQueues)
queue, added, head := enqueue(executableBlock, executionQueues)

// if it's not added, it means the block is not a new block, it already
// exists in the queue, then bail
if !added {
log.Debug().Hex("block_id", logging.Entity(executableBlock)).
Int("block_height", int(executableBlock.Height())).
Msg("block already exists in the execution queue")
return nil
}
Expand Down Expand Up @@ -514,14 +526,21 @@ func (e *Engine) enqueueBlockAndCheckExecutable(
return fmt.Errorf("cannot send collection requests: %w", err)
}

// execute the block if the block is ready to be executed
completed := e.executeBlockIfComplete(executableBlock)
complete := false

// if newly enqueued block is inside any existing queue, we should skip now and wait
// for parent to finish execution
if head {
// execute the block if the block is ready to be executed
complete = e.executeBlockIfComplete(executableBlock)
}

lg.Info().
// if the execution is halt, but the queue keeps growing, we could check which block
// hasn't been executed.
Uint64("first_unexecuted_in_queue", firstUnexecutedHeight).
Bool("completed", completed).
Bool("complete", complete).
Bool("head_of_queue", head).
Msg("block is enqueued")

return nil
Expand Down Expand Up @@ -843,9 +862,11 @@ func newQueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) (*queue.Qu
return q, queues.Add(q)
}

// enqueue adds a block to the queues, return the queue that includes the block and a bool
// indicating whether the block was a new block.
// queues are chained blocks. Since a block can't be executable until its parent has been
// enqueue adds a block to the queues, return the queue that includes the block and booleans
// * is block new one (it's not already enqueued, not a duplicate)
// * is head of the queue (new queue has been created)
//
// Queues are chained blocks. Since a block can't be executable until its parent has been
// executed, the chained structure allows us to only check the head of each queue to see if
// any block becomes executable.
// for instance we have one queue whose head is A:
Expand All @@ -855,18 +876,19 @@ func newQueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) (*queue.Qu
// A <- B <- C
// ^- D <- E <- F
// Even through there are 6 blocks, we only need to check if block A becomes executable.
// when the parent block isn't in the queue, we add it as a new queue. for instace, if
// when the parent block isn't in the queue, we add it as a new queue. for instance, if
// we receive H <- G, then the queues will become:
// A <- B <- C
// ^- D <- E
// G
func enqueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) (*queue.Queue, bool) {
func enqueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) (*queue.Queue, bool, bool) {
for _, queue := range queues.All() {
if stored, isNew := queue.TryAdd(blockify); stored {
return queue, isNew
return queue, isNew, false
}
}
return newQueue(blockify, queues)
queue, isNew := newQueue(blockify, queues)
return queue, isNew, true
}

// check if the block's collections have been received,
Expand Down
160 changes: 156 additions & 4 deletions engine/execution/ingestion/engine_test.go
Expand Up @@ -201,7 +201,7 @@ func (ctx *testingContext) assertSuccessfulBlockComputation(
previousExecutionResultID flow.Identifier,
expectBroadcast bool,
newStateCommitment flow.StateCommitment,
computationResult *execution.ComputationResult) {
computationResult *execution.ComputationResult) *protocol.Snapshot {

if computationResult == nil {
computationResult = executionUnittest.ComputationResultForBlockFixture(executableBlock)
Expand Down Expand Up @@ -307,15 +307,16 @@ func (ctx *testingContext) assertSuccessfulBlockComputation(
}).
Return(nil)

ctx.mockStakedAtBlockID(executableBlock.ID(), expectBroadcast)
protocolSnapshot := ctx.mockStakedAtBlockID(executableBlock.ID(), expectBroadcast)

if !expectBroadcast {
broadcastMock.Maybe()
}

return protocolSnapshot
}

func (ctx testingContext) mockStakedAtBlockID(blockID flow.Identifier, staked bool) {
func (ctx testingContext) mockStakedAtBlockID(blockID flow.Identifier, staked bool) *protocol.Snapshot {
identity := *ctx.identity
identity.Stake = 0
if staked {
Expand All @@ -324,6 +325,8 @@ func (ctx testingContext) mockStakedAtBlockID(blockID flow.Identifier, staked bo
snap := new(protocol.Snapshot)
snap.On("Identity", identity.NodeID).Return(&identity, nil)
ctx.state.On("AtBlockID", blockID).Return(snap)

return snap
}

func (ctx *testingContext) stateCommitmentExist(blockID flow.Identifier, commit flow.StateCommitment) {
Expand Down Expand Up @@ -409,6 +412,156 @@ func TestExecuteOneBlock(t *testing.T) {
})
}

func Test_OnlyHeadOfTheQueueIsExecuted(t *testing.T) {
// only head of the queue should be executing.
// Restarting node or errors in consensus module could trigger
// block (or its parent) which already has been executed to be enqueued again
// as we already have state commitment for it, it will be executed right away.
// Now if it finishes execution before it parent - situation can occur that we try to
// dequeue it, but it will fail since only queue heads are checked.
//
// Similarly, rebuild of queues can happen block connecting two heads is added - for example
// block 1 and 3 are handled and both start executing, in the meantime block 2 is added, and it
// shouldn't make block 3 requeued as child of 2 (which is child of 1) because it's already being executed
//
// Should any of this happen the execution will halt.

runWithEngine(t, func(ctx testingContext) {

// A <- B <- C <- D

// root block
blockA := unittest.BlockHeaderFixture(func(header *flow.Header) {
header.Height = 920
})

// last executed block - it will be re-queued regardless of state commit
blockB := unittest.ExecutableBlockFixtureWithParent(nil, &blockA)
blockB.StartState = unittest.StateCommitmentPointerFixture()

// finalized block - it can be executed in parallel, as blockB has been executed
// and this should be fixed
blockC := unittest.ExecutableBlockFixtureWithParent(nil, blockB.Block.Header)
blockC.StartState = blockB.StartState

// expected to be executed afterwards
blockD := unittest.ExecutableBlockFixtureWithParent(nil, blockC.Block.Header)
blockD.StartState = blockC.StartState

logBlocks(map[string]*entity.ExecutableBlock{
"B": blockB,
"C": blockC,
"D": blockD,
})

commits := make(map[flow.Identifier]flow.StateCommitment)
commits[blockB.Block.Header.ParentID] = *blockB.StartState
commits[blockC.Block.Header.ParentID] = *blockC.StartState
//ctx.mockStateCommitsWithMap(commits)

wg := sync.WaitGroup{}

// this intentionally faulty behaviour (block cannot have no state commitment and later have one without being executed)
// is to hack the first check for block execution and intentionally cause situation where
// next check (executing only queue head) can be tested
bFirstTime := true
bStateCommitment := ctx.executionState.On("StateCommitmentByBlockID", mock.Anything, blockB.ID())
bStateCommitment.RunFn = func(args mock.Arguments) {
if bFirstTime {
bStateCommitment.ReturnArguments = mock.Arguments{flow.StateCommitment{}, storageerr.ErrNotFound}
bFirstTime = false
return
}
bStateCommitment.ReturnArguments = mock.Arguments{*blockB.StartState, nil}
}

ctx.executionState.On("StateCommitmentByBlockID", mock.Anything, blockA.ID()).Return(*blockB.StartState, nil)
ctx.executionState.On("StateCommitmentByBlockID", mock.Anything, mock.Anything).Return(nil, storageerr.ErrNotFound)

ctx.state.On("Sealed").Return(ctx.snapshot)
ctx.snapshot.On("Head").Return(&blockA, nil)

wgB := sync.WaitGroup{}
wgB.Add(1)

bDone := false
cDone := false

// expect B and C to be loaded by loading unexecuted blocks in engine Ready
wg.Add(2)

blockBSnapshot := ctx.assertSuccessfulBlockComputation(commits, func(blockID flow.Identifier, commit flow.StateCommitment) {
require.False(t, bDone)
require.False(t, cDone)
wg.Done()

// make sure block B execution takes enough time so C can start executing to showcase an error
time.Sleep(10 * time.Millisecond)

bDone = true
}, blockB, unittest.IdentifierFixture(), true, *blockB.StartState, nil)

blockCSnapshot := ctx.assertSuccessfulBlockComputation(commits, func(blockID flow.Identifier, commit flow.StateCommitment) {
require.True(t, bDone)
require.False(t, cDone)

wg.Done()
cDone = true

}, blockC, unittest.IdentifierFixture(), true, *blockC.StartState, nil)

ctx.assertSuccessfulBlockComputation(commits, func(blockID flow.Identifier, commit flow.StateCommitment) {
require.True(t, bDone)
require.True(t, cDone)

wg.Done()
}, blockD, unittest.IdentifierFixture(), true, *blockC.StartState, nil)

// mock loading unexecuted blocks at startup
ctx.executionState.On("GetHighestExecutedBlockID", mock.Anything).Return(blockB.Height(), blockB.ID(), nil)
blockASnapshot := new(protocol.Snapshot)

ctx.state.On("AtHeight", blockB.Height()).Return(blockBSnapshot)
blockBSnapshot.On("Head").Return(blockB.Block.Header, nil)

params := new(protocol.Params)
ctx.state.On("Final").Return(blockCSnapshot)

// for reloading
ctx.blocks.EXPECT().ByID(blockB.ID()).Return(blockB.Block, nil)
ctx.blocks.EXPECT().ByID(blockC.ID()).Return(blockC.Block, nil)

blockASnapshot.On("Head").Return(&blockA, nil)
blockCSnapshot.On("Head").Return(blockC.Block.Header, nil)
blockCSnapshot.On("Descendants").Return(nil, nil)

ctx.state.On("AtHeight", blockC.Height()).Return(blockCSnapshot)

ctx.state.On("Params").Return(params)
params.On("Root").Return(&blockA, nil)

<-ctx.engine.Ready()

wg.Add(1) // for block E to be executed - it should wait for D to finish
err := ctx.engine.handleBlock(context.Background(), blockD.Block)
require.NoError(t, err)

unittest.AssertReturnsBefore(t, wg.Wait, 5*time.Second)

_, more := <-ctx.engine.Done() //wait for all the blocks to be processed
require.False(t, more)

_, ok := commits[blockB.ID()]
require.True(t, ok)

_, ok = commits[blockC.ID()]
require.True(t, ok)

_, ok = commits[blockD.ID()]
require.True(t, ok)
})
}

func TestBlocksArentExecutedMultipleTimes_multipleBlockEnqueue(t *testing.T) {
t.Skip("flakey")
runWithEngine(t, func(ctx testingContext) {
Expand Down Expand Up @@ -506,7 +659,6 @@ func TestBlocksArentExecutedMultipleTimes_multipleBlockEnqueue(t *testing.T) {

_, ok = commits[blockC.ID()]
require.True(t, ok)

})
}

Expand Down
27 changes: 0 additions & 27 deletions module/mempool/queue/queue.go
@@ -1,8 +1,6 @@
package queue

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -155,31 +153,6 @@ func (q *Queue) TryAdd(element Blockify) (stored bool, new bool) {
return true, true
}

// Attach joins the other queue to this one, modifying this queue in-place.
// Other queue be attached if only if the following conditions hold:
// * The head of other has a parent in this queue.
// * Both queues have no nodes in common. Specifically, this means that
// the head of _other_ cannot be a member of this queue.
// Fails otherwise with an error. CAUTION: failing with an error leaves
// this queue in a _dysfunctional_ (partially joined) state.
func (q *Queue) Attach(other *Queue) error {
n, ok := q.Nodes[other.Head.Item.ParentID()]
if !ok {
return fmt.Errorf("other queue head does not reference known parent")
}
n.Children = append(n.Children, other.Head)
for identifier, node := range other.Nodes {
if _, ok := q.Nodes[identifier]; ok {
return fmt.Errorf("queues have common nodes")
}
q.Nodes[identifier] = node
}
if other.Highest.Item.Height() > q.Highest.Item.Height() {
q.Highest = other.Highest
}
return nil
}

// Dismount removes the head element, returns it and it's children as new queues
func (q *Queue) Dismount() (Blockify, []*Queue) {

Expand Down

0 comments on commit d457915

Please sign in to comment.