Skip to content

Commit

Permalink
[Verification] Logging block height on chunk processing pipeline (#856)
Browse files Browse the repository at this point in the history
* adds block height to assigner engine logs

* adds block height to fetcher engine

* adds block height to chunk status

* refactors chunk status block type

* complements block height logging in fetcher

* adds block height for sealed chunks logging
  • Loading branch information
yhassanzadeh13 committed Jun 17, 2021
1 parent 8bb7d21 commit f45a9bf
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 26 deletions.
18 changes: 10 additions & 8 deletions engine/verification/assigner/engine.go
Expand Up @@ -117,11 +117,13 @@ func (e *Engine) resultChunkAssignment(ctx context.Context,
// (through the chunk assigner), and belong to the execution result.
//
// Deduplication of chunk locators is delegated to the chunks queue.
func (e *Engine) processChunk(chunk *flow.Chunk, resultID flow.Identifier) (bool, error) {
log := e.log.With().
func (e *Engine) processChunk(chunk *flow.Chunk, resultID flow.Identifier, blockHeight uint64) (bool, error) {
lg := e.log.With().
Hex("result_id", logging.ID(resultID)).
Hex("chunk_id", logging.ID(chunk.ID())).
Uint64("chunk_index", chunk.Index).Logger()
Uint64("chunk_index", chunk.Index).
Uint64("block_height", blockHeight).
Logger()

locator := &chunks.Locator{
ResultID: resultID,
Expand All @@ -134,15 +136,15 @@ func (e *Engine) processChunk(chunk *flow.Chunk, resultID flow.Identifier) (bool
return false, fmt.Errorf("could not push chunk locator to chunks queue: %w", err)
}
if !ok {
log.Debug().Msg("could not push duplicate chunk locator to chunks queue")
lg.Debug().Msg("could not push duplicate chunk locator to chunks queue")
return false, nil
}

e.metrics.OnAssignedChunkProcessedAtAssigner()

// notifies chunk queue consumer of a new chunk
e.newChunkListener.Check()
log.Info().Msg("chunk locator successfully pushed to chunks queue")
lg.Info().Msg("chunk locator successfully pushed to chunks queue")

return true, nil
}
Expand Down Expand Up @@ -204,7 +206,7 @@ func (e *Engine) processFinalizedBlock(ctx context.Context, block *flow.Block) {

assignedChunksCount += uint64(len(chunkList))
for _, chunk := range chunkList {
processed, err := e.processChunkWithTracing(ctx, chunk, resultID)
processed, err := e.processChunkWithTracing(ctx, chunk, resultID, block.Header.Height)
if err != nil {
resultLog.Fatal().
Err(err).
Expand Down Expand Up @@ -301,11 +303,11 @@ func (e *Engine) resultChunkAssignmentWithTracing(
//
// Note that the chunk in the input should be legitimately assigned to this verification node
// (through the chunk assigner), and belong to the same execution result.
func (e *Engine) processChunkWithTracing(ctx context.Context, chunk *flow.Chunk, resultID flow.Identifier) (bool, error) {
func (e *Engine) processChunkWithTracing(ctx context.Context, chunk *flow.Chunk, resultID flow.Identifier, blockHeight uint64) (bool, error) {
var err error
var processed bool
e.tracer.WithSpanFromContext(ctx, trace.VERAssignerProcessChunk, func() {
processed, err = e.processChunk(chunk, resultID)
processed, err = e.processChunk(chunk, resultID, blockHeight)
})
return processed, err
}
Expand Down
48 changes: 30 additions & 18 deletions engine/verification/fetcher/engine.go
Expand Up @@ -147,7 +147,9 @@ func (e *Engine) ProcessAssignedChunk(locator *chunks.Locator) {
Logger()
lg.Debug().Msg("result and chunk for locator retrieved")

requested, err := e.processAssignedChunkWithTracing(chunk, result, locatorID)
requested, blockHeight, err := e.processAssignedChunkWithTracing(chunk, result, locatorID)
lg = lg.With().Uint64("block_height", blockHeight).Logger()

if err != nil {
lg.Fatal().Err(err).Msg("could not process assigned chunk")
}
Expand All @@ -161,7 +163,7 @@ func (e *Engine) ProcessAssignedChunk(locator *chunks.Locator) {
}

// processAssignedChunkWithTracing encapsulates the logic of processing assigned chunk with tracing enabled.
func (e *Engine) processAssignedChunkWithTracing(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, error) {
func (e *Engine) processAssignedChunkWithTracing(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, uint64, error) {
chunkID := chunk.ID()

span, ok := e.tracer.GetSpan(chunkID, trace.VERProcessAssignedChunk)
Expand All @@ -174,41 +176,43 @@ func (e *Engine) processAssignedChunkWithTracing(chunk *flow.Chunk, result *flow
ctx := opentracing.ContextWithSpan(e.unit.Ctx(), span)
var err error
var requested bool
var blockHeight uint64
e.tracer.WithSpanFromContext(ctx, trace.VERFetcherHandleAssignedChunk, func() {
requested, err = e.processAssignedChunk(chunk, result, chunkLocatorID)
requested, blockHeight, err = e.processAssignedChunk(chunk, result, chunkLocatorID)
})

return requested, err
return requested, blockHeight, err
}

// processAssignedChunk receives an assigned chunk and its result and requests its chunk data pack from requester.
// Boolean return value determines whether chunk data pack was requested or not.
func (e *Engine) processAssignedChunk(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, error) {
func (e *Engine) processAssignedChunk(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, uint64, error) {
// skips processing a chunk if it belongs to a sealed block.
chunkID := chunk.ID()
sealed, err := e.blockIsSealed(chunk.ChunkBody.BlockID)
sealed, blockHeight, err := e.blockIsSealed(chunk.ChunkBody.BlockID)
if err != nil {
return false, fmt.Errorf("could not determine whether block has been sealed: %w", err)
return false, 0, fmt.Errorf("could not determine whether block has been sealed: %w", err)
}
if sealed {
e.chunkConsumerNotifier.Notify(chunkLocatorID) // tells consumer that we are done with this chunk.
return false, nil
return false, blockHeight, nil
}

// adds chunk status as a pending chunk to mempool.
status := &verification.ChunkStatus{
ChunkIndex: chunk.Index,
ExecutionResult: result,
BlockHeight: blockHeight,
}
added := e.pendingChunks.Add(status)
if !added {
// chunk locators are deduplicated by consumer, reaching this point hints failing deduplication on consumer.
return false, fmt.Errorf("data race detected, received a duplicate chunk locator")
return false, blockHeight, fmt.Errorf("data race detected, received a duplicate chunk locator")
}

err = e.requestChunkDataPack(chunkID, result.ID(), chunk.BlockID)
if err != nil {
return false, fmt.Errorf("could not request chunk data pack: %w", err)
return false, blockHeight, fmt.Errorf("could not request chunk data pack: %w", err)
}

// requesting a chunk data pack is async, i.e., once engine reaches this point
Expand All @@ -218,7 +222,7 @@ func (e *Engine) processAssignedChunk(chunk *flow.Chunk, result *flow.ExecutionR
//
// both these events happen through requester module calling fetchers callbacks.
// it is during those callbacks that we notify the consumer that we are done with this job.
return true, nil
return true, blockHeight, nil
}

// HandleChunkDataPack is called by the chunk requester module everytime a new requested chunk data pack arrives.
Expand All @@ -244,6 +248,7 @@ func (e *Engine) HandleChunkDataPack(originID flow.Identifier, chunkDataPack *fl
resultID := status.ExecutionResult.ID()
lg = lg.With().
Hex("block_id", logging.ID(status.ExecutionResult.BlockID)).
Uint64("block_height", status.BlockHeight).
Hex("result_id", logging.ID(resultID)).
Uint64("chunk_index", status.ChunkIndex).
Logger()
Expand Down Expand Up @@ -481,19 +486,26 @@ func (e Engine) validateStakedExecutionNodeAtBlockID(senderID flow.Identifier, b
// When the requester calls this callback method, it will never returns a chunk data pack for this chunk ID to the handler (i.e.,
// through HandleChunkDataPack).
func (e *Engine) NotifyChunkDataPackSealed(chunkID flow.Identifier) {
lg := e.log.With().
Hex("chunk_id", logging.ID(chunkID)).
Logger()
// we need to report that the job has been finished eventually
status, exists := e.pendingChunks.ByID(chunkID)
if !exists {
e.log.Debug().
Hex("chunk_id", logging.ID(chunkID)).
lg.Debug().
Msg("could not fetch pending status for sealed chunk from mempool, dropping chunk data")
return
}

lg = lg.With().
Uint64("block_height", status.BlockHeight).
Hex("result_id", logging.ID(status.ExecutionResult.ID())).Logger()
removed := e.pendingChunks.Rem(chunkID)

e.chunkConsumerNotifier.Notify(status.ChunkLocatorID())
e.log.Info().Bool("removed", removed).Msg("discards fetching chunk of an already sealed block and notified consumer")
lg.Info().
Bool("removed", removed).
Msg("discards fetching chunk of an already sealed block and notified consumer")
}

// pushToVerifierWithTracing encapsulates the logic of pushing a verifiable chunk to verifier engine with tracing enabled.
Expand Down Expand Up @@ -613,20 +625,20 @@ func (e *Engine) getAgreeAndDisagreeExecutors(blockID flow.Identifier, resultID
}

// blockIsSealed returns true if the block at specified height by block ID is sealed.
func (e Engine) blockIsSealed(blockID flow.Identifier) (bool, error) {
func (e Engine) blockIsSealed(blockID flow.Identifier) (bool, uint64, error) {
// TODO: as an optimization, we can keep record of last sealed height on a local variable.
header, err := e.headers.ByBlockID(blockID)
if err != nil {
return false, fmt.Errorf("could not get block: %w", err)
return false, 0, fmt.Errorf("could not get block: %w", err)
}

lastSealed, err := e.state.Sealed().Head()
if err != nil {
return false, fmt.Errorf("could not get last sealed: %w", err)
return false, 0, fmt.Errorf("could not get last sealed: %w", err)
}

sealed := header.Height <= lastSealed.Height
return sealed, nil
return sealed, header.Height, nil
}

// executorsOf segregates the executors of the given receipts based on the given execution result id.
Expand Down
1 change: 1 addition & 0 deletions model/verification/chunkStatus.go
Expand Up @@ -9,6 +9,7 @@ import (
type ChunkStatus struct {
ChunkIndex uint64
ExecutionResult *flow.ExecutionResult
BlockHeight uint64
}

func (s ChunkStatus) ID() flow.Identifier {
Expand Down

0 comments on commit f45a9bf

Please sign in to comment.