Skip to content

Commit

Permalink
analyzer/block: add support for slow-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed May 13, 2023
1 parent 429d3ea commit e2edd74
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 49 deletions.
78 changes: 52 additions & 26 deletions analyzer/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ type blockBasedAnalyzer struct {

target storage.TargetStorage
logger *log.Logger

slowSync bool
}

// firstUnprocessedBlock returns the first block before which all blocks have been processed.
// If no blocks have been processed, it returns error pgx.ErrNoRows.
func (b *blockBasedAnalyzer) firstUnprocessedBlock(ctx context.Context) (first uint64, err error) {
err = b.target.QueryRow(
ctx,
Expand All @@ -80,7 +83,7 @@ func (b *blockBasedAnalyzer) unlockBlock(ctx context.Context, height uint64) {
}

// fetchBatchForProcessing fetches (and locks) a batch of blocks for processing.
func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from uint64, to *uint64) ([]uint64, error) {
func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from uint64, to uint64) ([]uint64, error) {
// XXX: In future, use a system for picking lock IDs in case other parts of the code start using advisory locks.
const lockID = 1001
var (
Expand All @@ -107,19 +110,35 @@ func (b *blockBasedAnalyzer) fetchBatchForProcessing(ctx context.Context, from u
}
rows.Close()

// Fetch and lock blocks for processing.
rows, err = tx.Query(
ctx,
queries.PickBlocksForProcessing,
b.analyzerName,
from,
to,
lockExpiryMinutes,
blocksBatchSize,
)
switch b.slowSync {
case true:
// If running in slow-sync mode, ignore locks as this should be the only instance
// of the analyzer running.
rows, err = tx.Query(
ctx,
queries.PickBlocksForProcessing,
b.analyzerName,
from,
to,
0,
blocksBatchSize,
)
case false:
// Fetch and lock blocks for processing.
rows, err = tx.Query(
ctx,
queries.PickBlocksForProcessing,
b.analyzerName,
from,
to,
lockExpiryMinutes,
blocksBatchSize,
)
}
if err != nil {
return nil, fmt.Errorf("querying blocks for processing: %w", err)
}

defer rows.Close()
for rows.Next() {
var height uint64
Expand Down Expand Up @@ -173,7 +192,7 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
}
batchCtx, batchCtxCancel = context.WithTimeout(ctx, lockExpiryMinutes*time.Minute)

var to *uint64
var to uint64
// Get the latest available block on the source.
latestBlockHeight, err := b.processor.SourceLatestBlockHeight(ctx)
if err != nil {
Expand All @@ -183,11 +202,11 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
backoff.Failure()
continue
}
to = &latestBlockHeight
to = latestBlockHeight

// Clamp the latest block height to the configured range.
if b.config.To < latestBlockHeight {
to = &b.config.To
if b.config.To != 0 && b.config.To < latestBlockHeight {
to = b.config.To
}

// Pick a batch of blocks to process.
Expand All @@ -208,27 +227,29 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {

bCtx, cancel := context.WithTimeout(batchCtx, processBlockTimeout)
if err := b.processor.ProcessBlock(bCtx, height); err != nil {
b.logger.Error("error processing block, unlocking",
"height", height,
"err", err,
)
cancel()
backoff.Failure()
b.logger.Error("error processing block", "height", height, "err", err)

// If running in slow-sync, stop processing the batch on error so that
// the blocks are always processed in order.
if b.slowSync {
break
}

// Unlock a failed block, so it can be retried sooner.
// TODO: Could add a hook to unlock all remaining blocks in the batch on graceful shutdown.
b.unlockBlock(ctx, height)
cancel()
backoff.Failure()
continue
}
cancel()
backoff.Success()
b.logger.Info("processed block", "height", height)
}

switch len(heights) {
case 0:
if len(heights) == 0 {
b.logger.Info("no blocks to process")
backoff.Failure() // No blocks processed, increase the backoff timeout a bit.
default:
backoff.Success()
}

// Stop processing if end height is set and was reached.
Expand All @@ -252,18 +273,23 @@ func (b *blockBasedAnalyzer) Name() string {
}

// NewAnalyzer returns a new block based analyzer for the provided block processor.
//
// slowSync is a flag that indicates that the analyzer is running in slow-sync mode and it should
// process blocks in order, ignoring locks as it is assumed it is the only analyzer running.
func NewAnalyzer(
config *config.BlockBasedAnalyzerConfig,
name string,
processor BlockProcessor,
target storage.TargetStorage,
logger *log.Logger,
slowSync bool,
) (analyzer.Analyzer, error) {
return &blockBasedAnalyzer{
config: config,
analyzerName: name,
processor: processor,
target: target,
logger: logger.With("analyzer", name),
logger: logger.With("analyzer", name, "slow_sync", slowSync),
slowSync: slowSync,
}, nil
}

0 comments on commit e2edd74

Please sign in to comment.