Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Verification] Logging block height on chunk processing pipeline #856

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 10 additions & 8 deletions engine/verification/assigner/engine.go
Expand Up @@ -117,11 +117,13 @@ func (e *Engine) resultChunkAssignment(ctx context.Context,
// (through the chunk assigner), and belong to the execution result.
//
// Deduplication of chunk locators is delegated to the chunks queue.
func (e *Engine) processChunk(chunk *flow.Chunk, resultID flow.Identifier) (bool, error) {
log := e.log.With().
func (e *Engine) processChunk(chunk *flow.Chunk, resultID flow.Identifier, blockHeight uint64) (bool, error) {
lg := e.log.With().
Hex("result_id", logging.ID(resultID)).
Hex("chunk_id", logging.ID(chunk.ID())).
Uint64("chunk_index", chunk.Index).Logger()
Uint64("chunk_index", chunk.Index).
Uint64("block_height", blockHeight).
Logger()

locator := &chunks.Locator{
ResultID: resultID,
Expand All @@ -134,15 +136,15 @@ func (e *Engine) processChunk(chunk *flow.Chunk, resultID flow.Identifier) (bool
return false, fmt.Errorf("could not push chunk locator to chunks queue: %w", err)
}
if !ok {
log.Debug().Msg("could not push duplicate chunk locator to chunks queue")
lg.Debug().Msg("could not push duplicate chunk locator to chunks queue")
return false, nil
}

e.metrics.OnAssignedChunkProcessedAtAssigner()

// notifies chunk queue consumer of a new chunk
e.newChunkListener.Check()
log.Info().Msg("chunk locator successfully pushed to chunks queue")
lg.Info().Msg("chunk locator successfully pushed to chunks queue")

return true, nil
}
Expand Down Expand Up @@ -204,7 +206,7 @@ func (e *Engine) processFinalizedBlock(ctx context.Context, block *flow.Block) {

assignedChunksCount += uint64(len(chunkList))
for _, chunk := range chunkList {
processed, err := e.processChunkWithTracing(ctx, chunk, resultID)
processed, err := e.processChunkWithTracing(ctx, chunk, resultID, block.Header.Height)
if err != nil {
resultLog.Fatal().
Err(err).
Expand Down Expand Up @@ -301,11 +303,11 @@ func (e *Engine) resultChunkAssignmentWithTracing(
//
// Note that the chunk in the input should be legitimately assigned to this verification node
// (through the chunk assigner), and belong to the same execution result.
func (e *Engine) processChunkWithTracing(ctx context.Context, chunk *flow.Chunk, resultID flow.Identifier) (bool, error) {
func (e *Engine) processChunkWithTracing(ctx context.Context, chunk *flow.Chunk, resultID flow.Identifier, blockHeight uint64) (bool, error) {
var err error
var processed bool
e.tracer.WithSpanFromContext(ctx, trace.VERAssignerProcessChunk, func() {
processed, err = e.processChunk(chunk, resultID)
processed, err = e.processChunk(chunk, resultID, blockHeight)
})
return processed, err
}
Expand Down
35 changes: 20 additions & 15 deletions engine/verification/fetcher/engine.go
Expand Up @@ -147,7 +147,9 @@ func (e *Engine) ProcessAssignedChunk(locator *chunks.Locator) {
Logger()
lg.Debug().Msg("result and chunk for locator retrieved")

requested, err := e.processAssignedChunkWithTracing(chunk, result, locatorID)
requested, blockHeight, err := e.processAssignedChunkWithTracing(chunk, result, locatorID)
lg = lg.With().Uint64("block_height", blockHeight).Logger()

if err != nil {
lg.Fatal().Err(err).Msg("could not process assigned chunk")
}
Expand All @@ -161,7 +163,7 @@ func (e *Engine) ProcessAssignedChunk(locator *chunks.Locator) {
}

// processAssignedChunkWithTracing encapsulates the logic of processing assigned chunk with tracing enabled.
func (e *Engine) processAssignedChunkWithTracing(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, error) {
func (e *Engine) processAssignedChunkWithTracing(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, uint64, error) {
chunkID := chunk.ID()

span, ok := e.tracer.GetSpan(chunkID, trace.VERProcessAssignedChunk)
Expand All @@ -174,41 +176,43 @@ func (e *Engine) processAssignedChunkWithTracing(chunk *flow.Chunk, result *flow
ctx := opentracing.ContextWithSpan(e.unit.Ctx(), span)
var err error
var requested bool
var blockHeight uint64
e.tracer.WithSpanFromContext(ctx, trace.VERFetcherHandleAssignedChunk, func() {
requested, err = e.processAssignedChunk(chunk, result, chunkLocatorID)
requested, blockHeight, err = e.processAssignedChunk(chunk, result, chunkLocatorID)
})

return requested, err
return requested, blockHeight, err
}

// processAssignedChunk receives an assigned chunk and its result and requests its chunk data pack from requester.
// Boolean return value determines whether chunk data pack was requested or not.
func (e *Engine) processAssignedChunk(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, error) {
func (e *Engine) processAssignedChunk(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, uint64, error) {
// skips processing a chunk if it belongs to a sealed block.
chunkID := chunk.ID()
sealed, err := e.blockIsSealed(chunk.ChunkBody.BlockID)
sealed, blockHeight, err := e.blockIsSealed(chunk.ChunkBody.BlockID)
if err != nil {
return false, fmt.Errorf("could not determine whether block has been sealed: %w", err)
return false, 0, fmt.Errorf("could not determine whether block has been sealed: %w", err)
}
if sealed {
e.chunkConsumerNotifier.Notify(chunkLocatorID) // tells consumer that we are done with this chunk.
return false, nil
return false, blockHeight, nil
}

// adds chunk status as a pending chunk to mempool.
status := &verification.ChunkStatus{
ChunkIndex: chunk.Index,
ExecutionResult: result,
BlockHeight: blockHeight,
}
added := e.pendingChunks.Add(status)
if !added {
// chunk locators are deduplicated by consumer, reaching this point hints failing deduplication on consumer.
return false, fmt.Errorf("data race detected, received a duplicate chunk locator")
return false, blockHeight, fmt.Errorf("data race detected, received a duplicate chunk locator")
}

err = e.requestChunkDataPack(chunkID, result.ID(), chunk.BlockID)
if err != nil {
return false, fmt.Errorf("could not request chunk data pack: %w", err)
return false, blockHeight, fmt.Errorf("could not request chunk data pack: %w", err)
}

// requesting a chunk data pack is async, i.e., once engine reaches this point
Expand All @@ -218,7 +222,7 @@ func (e *Engine) processAssignedChunk(chunk *flow.Chunk, result *flow.ExecutionR
//
// both these events happen through requester module calling fetchers callbacks.
// it is during those callbacks that we notify the consumer that we are done with this job.
return true, nil
return true, blockHeight, nil
}

// HandleChunkDataPack is called by the chunk requester module everytime a new requested chunk data pack arrives.
Expand All @@ -244,6 +248,7 @@ func (e *Engine) HandleChunkDataPack(originID flow.Identifier, chunkDataPack *fl
resultID := status.ExecutionResult.ID()
lg = lg.With().
Hex("block_id", logging.ID(status.ExecutionResult.BlockID)).
Uint64("block_height", status.BlockHeight).
Hex("result_id", logging.ID(resultID)).
Uint64("chunk_index", status.ChunkIndex).
Logger()
Expand Down Expand Up @@ -613,20 +618,20 @@ func (e *Engine) getAgreeAndDisagreeExecutors(blockID flow.Identifier, resultID
}

// blockIsSealed returns true if the block at specified height by block ID is sealed.
func (e Engine) blockIsSealed(blockID flow.Identifier) (bool, error) {
func (e Engine) blockIsSealed(blockID flow.Identifier) (bool, uint64, error) {
// TODO: as an optimization, we can keep record of last sealed height on a local variable.
header, err := e.headers.ByBlockID(blockID)
if err != nil {
return false, fmt.Errorf("could not get block: %w", err)
return false, 0, fmt.Errorf("could not get block: %w", err)
}

lastSealed, err := e.state.Sealed().Head()
if err != nil {
return false, fmt.Errorf("could not get last sealed: %w", err)
return false, 0, fmt.Errorf("could not get last sealed: %w", err)
}

sealed := header.Height <= lastSealed.Height
return sealed, nil
return sealed, header.Height, nil
}

// executorsOf segregates the executors of the given receipts based on the given execution result id.
Expand Down
1 change: 1 addition & 0 deletions model/verification/chunkStatus.go
Expand Up @@ -9,6 +9,7 @@ import (
type ChunkStatus struct {
ChunkIndex uint64
ExecutionResult *flow.ExecutionResult
BlockHeight uint64
}

func (s ChunkStatus) ID() flow.Identifier {
Expand Down