Skip to content

Commit

Permalink
analyzer: grab blocks in parallelism-friendly way
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed May 4, 2023
1 parent 766164f commit 2efd412
Show file tree
Hide file tree
Showing 14 changed files with 454 additions and 343 deletions.
25 changes: 0 additions & 25 deletions analyzer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,6 @@ type Analyzer interface {
Name() string
}

// ConsensusConfig specifies configuration parameters for
// for processing the consensus layer.
type ConsensusConfig struct {
// GenesisChainContext is the chain context that specifies which genesis
// file to analyze.
GenesisChainContext string

// Range is the range of blocks to process.
// If this is set, the analyzer analyzes blocks in the provided range.
Range BlockRange

// Source is the storage source from which to fetch block data
// when processing blocks in this range.
Source storage.ConsensusSourceStorage
}

// BlockRange is a range of blocks.
type BlockRange struct {
// From is the first block to process in this range, inclusive.
From int64

// To is the last block to process in this range, inclusive.
To int64
}

// RuntimeConfig specifies configuration parameters for
// processing the runtime layer.
type RuntimeConfig struct {
Expand Down
241 changes: 241 additions & 0 deletions analyzer/block/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Package block implements the generic block based analyzer.
//
// Block based analyzer uses a BlockProcessor to process blocks and handles the
// common logic for queueing blocks and support for parallel processing.
package block

import (
"context"
"fmt"
"time"

"github.com/jackc/pgx/v5"

"github.com/oasisprotocol/oasis-indexer/analyzer"
"github.com/oasisprotocol/oasis-indexer/analyzer/queries"
"github.com/oasisprotocol/oasis-indexer/analyzer/util"
"github.com/oasisprotocol/oasis-indexer/config"
"github.com/oasisprotocol/oasis-indexer/log"
"github.com/oasisprotocol/oasis-indexer/storage"
)

const (
// Timeout to process a block.
processBlockTimeout = 61 * time.Second
// Number of blocks to be processed in a batch.
blocksBatchSize = 100
// Lock expire timeout for blocks (in minutes). Locked blocks not processed within
// this time can be picked again.
lockExpiryMinutes = 5
)

// BlockProcessor is the interface that block-based processors should implement to use them with the
// block based analyzer.
type BlockProcessor interface {
// PreWork performs tasks that need to be done before the main processing loop starts.
PreWork(ctx context.Context) error
// ProcessBlock processes the provided block, retrieving all required information
// from source storage and committing an atomically-executed batch of queries
// to target storage.
//
// The implementation must commit processed blocks (update the chain.processed_blocks record with processed_time timestamp).
ProcessBlock(ctx context.Context, height uint64) error
// SourceHasBlock checks if the block exists in the source storage.
SourceHasBlock(ctx context.Context, height uint64) error
}

var _ analyzer.Analyzer = (*blockBasedAnalyzer)(nil)

type blockBasedAnalyzer struct {
config *config.BlockBasedAnalyzerConfig
analyzerName string

processor BlockProcessor

target storage.TargetStorage
logger *log.Logger
}

// latestProcessedBlock returns the latest processed block by the analyzer.
func (b *blockBasedAnalyzer) latestProcessedBlock(ctx context.Context) (uint64, error) {
// Because the blocks are not processed in order and there can be gaps,
// find the first unprocessed block and return the block before it.
var firstUnprocessed uint64
_ = b.target.QueryRow(
ctx,
queries.FirstUnprocessedBlock,
b.analyzerName,
).Scan(&firstUnprocessed)
if firstUnprocessed > 0 {
return firstUnprocessed - 1, nil
}

// If there is no unprocessed block, try returning the latest processed block.
var latest uint64
if err := b.target.QueryRow(
ctx,
queries.LatestProcessedBlock,
b.analyzerName,
).Scan(&latest); err != nil {
return 0, err
}
return latest, nil
}

// unlockBlock unlocks a block.
func (b *blockBasedAnalyzer) unlockBlock(ctx context.Context, height uint64) {
rows, err := b.target.Query(
ctx,
queries.UnlockBlockForProcessing,
b.analyzerName,
height,
)
if err != nil {
rows.Close()
}
}

// fetchBatchForProcessing fetches (and locks) a batch of blocks for processing.
func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from, to uint64) ([]uint64, error) {
var (
heights []uint64
rows pgx.Rows
err error
)

var upper *uint64
if to > 0 {
upper = &to
}
rows, err = b.target.Query(
ctx,
queries.PickBlocksForProcessing,
b.analyzerName,
from,
upper,
lockExpiryMinutes,
blocksBatchSize,
)
switch err {
case nil:
// Continues below.
case pgx.ErrNoRows:
// No blocks to process.
return heights, nil
default:
return nil, err
}

defer rows.Close()
for rows.Next() {
var height uint64
if err = rows.Scan(
&height,
); err != nil {
return nil, fmt.Errorf("scanning returned height: %w", err)
}
heights = append(heights, height)
}
return heights, nil
}

// Start starts the block analyzer.
func (b *blockBasedAnalyzer) Start() {
ctx := context.Background()

// Run prework.
if err := b.processor.PreWork(ctx); err != nil {
b.logger.Error("prework failed", "err", err)
return
}

// Start processing blocks.
backoff, err := util.NewBackoff(
100*time.Millisecond,
6*time.Second, // cap the timeout at the expected consensus block time
)
if err != nil {
b.logger.Error("error configuring indexer backoff policy",
"err", err.Error(),
)
return
}
for {
backoff.Wait()

b.logger.Info("picking a batch of blocks to process")

// Pick a batch of blocks to process.
heights, err := b.fetchBatchForProcessing(ctx, b.config.From, b.config.To)
if err != nil {
b.logger.Error("failed to pick blocks for processing",
"err", err,
)
backoff.Failure()
continue
}

b.logger.Debug("picked blocks for processing", "heights", heights)

// Process blocks.
for _, height := range heights {
b.logger.Info("processing block", "height", height)

pCtx, cancel := context.WithTimeout(ctx, processBlockTimeout)
if err := b.processor.ProcessBlock(pCtx, height); err != nil {
cancel()
b.logger.Error("error processing block, unlocking",
"height", height,
"err", err,
)
// Unlock a failed block, so it can be retried sooner.
b.unlockBlock(ctx, height)
continue
}
cancel()
b.logger.Info("processed block", "height", height)
}

switch len(heights) {
case 0:
b.logger.Info("no blocks to process")
backoff.Failure() // No blocks processed, wait a bit.
default:
backoff.Success()
}

// Stop processing if end height is set and was reached.
if b.config.To != 0 {
if latestProcessed, err := b.latestProcessedBlock(ctx); err != nil && latestProcessed >= b.config.To {
break
}
}
}

b.logger.Info(
"finished processing all blocks in the configured range",
"from", b.config.From, "to", b.config.To,
)
}

// Name returns the name of the analyzer.
func (b *blockBasedAnalyzer) Name() string {
return b.analyzerName
}

// NewAnalyzer returns a new block based analyzer for the provided block processor.
func NewAnalyzer(
config *config.BlockBasedAnalyzerConfig,
name string,
processor BlockProcessor,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
return &blockBasedAnalyzer{
config: config,
analyzerName: name,
processor: processor,
target: target,
logger: logger.With("analyzer", name),
}, nil
}

0 comments on commit 2efd412

Please sign in to comment.