Skip to content

Commit

Permalink
Merge pull request #456 from oasisprotocol/mitjat/parallel-analyzers
Browse files Browse the repository at this point in the history
Ability to run analyzers in parallel
  • Loading branch information
mitjat committed Aug 24, 2023
2 parents e65efa9 + 33ee593 commit cf56c92
Show file tree
Hide file tree
Showing 16 changed files with 581 additions and 160 deletions.
7 changes: 7 additions & 0 deletions analyzer/api.go
Expand Up @@ -23,3 +23,10 @@ type Analyzer interface {
// Name returns the name of the analyzer.
Name() string
}

type BlockAnalysisMode string

const (
FastSyncMode BlockAnalysisMode = "fast-sync"
SlowSyncMode BlockAnalysisMode = "slow-sync"
)
147 changes: 130 additions & 17 deletions analyzer/block/block.go
Expand Up @@ -41,12 +41,21 @@ type BlockProcessor interface {
//
// The implementation must commit processed blocks (update the analysis.processed_blocks record with processed_time timestamp).
ProcessBlock(ctx context.Context, height uint64) error
// FinalizeFastSync updates any data that was neglected during the indexing of blocks
// up to and including `lastFastSyncHeight`, e.g. dead-reckoned balances.
// It is intended to be used when an analyzer wishes to transition from fast-sync mode
// into slow-sync mode. If the analyzer never used fast-sync and is starting from
// height 0 in slow-sync mode, `lastFastSyncHeight` will be -1.
// The method should be a no-op if all the blocks up to `height` have been analyzed
// in slow-sync mode, assuming no bugs in dead reckoning code.
FinalizeFastSync(ctx context.Context, lastFastSyncHeight int64) error
}

var _ analyzer.Analyzer = (*blockBasedAnalyzer)(nil)

type blockBasedAnalyzer struct {
config *config.BlockBasedAnalyzerConfig
blockRange config.BlockRange
batchSize uint64
analyzerName string

processor BlockProcessor
Expand Down Expand Up @@ -120,7 +129,7 @@ func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from u
from,
to,
0,
b.config.BatchSize,
b.batchSize,
)
case false:
// Fetch and lock blocks for processing.
Expand All @@ -131,7 +140,7 @@ func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from u
from,
to,
lockExpiryMinutes,
b.config.BatchSize,
b.batchSize,
)
}
if err != nil {
Expand All @@ -156,6 +165,98 @@ func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from u
return heights, nil
}

// Returns info about already-processed blocks in the analyzer's configured range:
// - Whether the already-processed blocks form a contiguous range starting at "to".
// - The height of the highest already-processed block.
func (b *blockBasedAnalyzer) processedSubrangeInfo(ctx context.Context) (bool, int64, error) {
var isContiguous bool
var maxProcessedHeight int64
if err := b.target.QueryRow(
ctx,
queries.ProcessedSubrangeInfo,
b.analyzerName,
b.blockRange.From,
b.blockRange.To,
).Scan(&isContiguous, &maxProcessedHeight); err != nil {
return false, 0, err
}
return isContiguous, maxProcessedHeight, nil
}

// Returns true if the block at `height` has been processed in slow-sync mode.
func (b *blockBasedAnalyzer) isBlockProcessedBySlowSync(ctx context.Context, height int64) (bool, error) {
var isProcessed bool
if err := b.target.QueryRow(
ctx,
queries.IsBlockProcessedBySlowSync,
b.analyzerName,
height,
).Scan(&isProcessed); err != nil {
return false, err
}
return isProcessed, nil
}

// Finds any gaps in the range of already-processed blocks, and soft-enqueues them
// (i.e. adds entries for them in `analysis.processed_blocks`) if they overlap with this
// analyzer's configured range.
// This is useful if we're recovering from misconfigured past runs that left gaps in the
// range of processed blocks. The main block-grabbing loop assumes (for efficiency) that
// we only ever need to process blocks that are larger than the largest entry in the db,
// or else explicitly present in the db and marked as not completed. This function ensures
// that that assumption is valid even in the face of misconfigured past runs, e.g. if we
// processed the range [1000, 2000] but now want to process [1, infinity).
func (b *blockBasedAnalyzer) softEnqueueGapsInProcessedBlocks(ctx context.Context) error {
batch := &storage.QueryBatch{}
batch.Queue(
queries.SoftEnqueueGapsInProcessedBlocks,
b.analyzerName,
b.blockRange.From,
b.blockRange.To,
)
if err := b.target.SendBatch(ctx, batch); err != nil {
b.logger.Error("failed to soft-enqueue gaps in already-processed blocks", "err", err, "from", b.blockRange.From, "to", b.blockRange.To)
return err
}
b.logger.Error("ensured that any gaps in the already-processed block range can be picked up later", "from", b.blockRange.From, "to", b.blockRange.To)
return nil
}

// Validates assumptions/prerequisites for starting a slow sync analyzer:
// - No blocks in the configured [from, to] range have been processed, except possibly a contiguous
// subrange [from, X] for some X.
// - If the most recently processed block was not processed by slow-sync (i.e. by fast sync, or not
// at all), triggers a finalization of the fast-sync process.
func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) (ok bool) {
isContiguous, maxProcessedHeight, err := b.processedSubrangeInfo(ctx)
if err != nil {
b.logger.Error("Failed to obtain info about already-processed blocks", "err", err)
return false
}
if !isContiguous {
b.logger.Error(fmt.Sprintf("cannot run in slow-sync mode because a non-contiguous subset of blocks in range [%d, %d] has already been processed. Use fast-sync to process to at least height %d.", b.blockRange.From, b.blockRange.To, maxProcessedHeight))
return false
}

// If the block before the one we'll attempt has been processed by fast-sync or not at all,
// we first refetch the full current state (i.e. genesis or similar).
precededBySlowSync, err := b.isBlockProcessedBySlowSync(ctx, maxProcessedHeight)
if err != nil {
b.logger.Error("Failed to obtain info about the last processed block", "err", err, "last_processed_block", maxProcessedHeight)
return false
}
if !precededBySlowSync {
b.logger.Error("finalizing the work of previous fast-sync analyzer(s)", "last_fast_sync_height", maxProcessedHeight)
if err := b.processor.FinalizeFastSync(ctx, maxProcessedHeight); err != nil {
b.logger.Error("failed to finalize the fast-sync phase (i.e. download genesis or similar)", "err", err)
return false
}
b.logger.Error("fast-sync finalization complete; proceeding with regular slow-sync analysis")
}

return true
}

// Start starts the block analyzer.
func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// Run prework.
Expand All @@ -169,8 +270,18 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// is set to golang's maximum int64 value for convenience.
var to uint64 = math.MaxInt64
// Clamp the latest block height to the configured range.
if b.config.To != 0 {
to = b.config.To
if b.blockRange.To != 0 {
to = b.blockRange.To
}

if b.slowSync && !b.ensureSlowSyncPrerequisites(ctx) {
// We cannot continue or recover automatically. Logging happens inside the validate function.
return
}

if !b.slowSync && b.softEnqueueGapsInProcessedBlocks(ctx) != nil {
// We cannot continue or recover automatically. Logging happens inside the validate function.
return
}

// Start processing blocks.
Expand Down Expand Up @@ -201,8 +312,8 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
batchCtx, batchCtxCancel = context.WithTimeout(ctx, lockExpiryMinutes*time.Minute)

// Pick a batch of blocks to process.
b.logger.Info("picking a batch of blocks to process", "from", b.config.From, "to", to)
heights, err := b.fetchBatchForProcessing(ctx, b.config.From, to)
b.logger.Info("picking a batch of blocks to process", "from", b.blockRange.From, "to", to, "is_fast_sync", !b.slowSync)
heights, err := b.fetchBatchForProcessing(ctx, b.blockRange.From, to)
if err != nil {
b.logger.Error("failed to pick blocks for processing",
"err", err,
Expand Down Expand Up @@ -272,8 +383,8 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
}

// Stop processing if end height is set and was reached.
if len(heights) == 0 && b.config.To != 0 {
if height, err := b.firstUnprocessedBlock(ctx); err == nil && height > b.config.To {
if len(heights) == 0 && b.blockRange.To != 0 {
if height, err := b.firstUnprocessedBlock(ctx); err == nil && height > b.blockRange.To {
break
}
}
Expand All @@ -282,7 +393,7 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {

b.logger.Info(
"finished processing all blocks in the configured range",
"from", b.config.From, "to", b.config.To,
"from", b.blockRange.From, "to", b.blockRange.To,
)
}

Expand All @@ -296,22 +407,24 @@ func (b *blockBasedAnalyzer) Name() string {
// slowSync is a flag that indicates that the analyzer is running in slow-sync mode and it should
// process blocks in order, ignoring locks as it is assumed it is the only analyzer running.
func NewAnalyzer(
config *config.BlockBasedAnalyzerConfig,
blockRange config.BlockRange,
batchSize uint64,
mode analyzer.BlockAnalysisMode,
name string,
processor BlockProcessor,
target storage.TargetStorage,
logger *log.Logger,
slowSync bool,
) (analyzer.Analyzer, error) {
if config.BatchSize == 0 {
config.BatchSize = defaultBatchSize
if batchSize == 0 {
batchSize = defaultBatchSize
}
return &blockBasedAnalyzer{
config: config,
blockRange: blockRange,
batchSize: batchSize,
analyzerName: name,
processor: processor,
target: target,
logger: logger.With("analyzer", name, "slow_sync", slowSync),
slowSync: slowSync,
logger: logger.With("analyzer", name, "mode", mode),
slowSync: mode == analyzer.SlowSyncMode,
}, nil
}

0 comments on commit cf56c92

Please sign in to comment.