From 80bb0daabd4e7434c609dd362e047b888f0c049d Mon Sep 17 00:00:00 2001 From: Siegfried Puchbauer Date: Mon, 10 Feb 2020 14:08:26 -0800 Subject: [PATCH] =?UTF-8?q?fix:=20=F0=9F=90=9B=20possible=20duplicate=20ou?= =?UTF-8?q?tput=20of=20blocks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixed duplicate output of block information if processing a block range partially failed. Added additional check while processing a block range to see if a block has already been marked as complete in checkpoints. --- src/blockrange.ts | 3 +++ src/blockwatcher.ts | 18 ++++++++++++++---- src/checkpoint.ts | 5 +++++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/blockrange.ts b/src/blockrange.ts index e61e4e2..aff71cf 100644 --- a/src/blockrange.ts +++ b/src/blockrange.ts @@ -104,3 +104,6 @@ export function getInverseBlockRanges( } return compactRanges(result); } + +export const blockRangeIncludes = (range: BlockRange, block: number): boolean => + block >= range.from && block <= range.to; diff --git a/src/blockwatcher.ts b/src/blockwatcher.ts index 73e230b..4c6336c 100644 --- a/src/blockwatcher.ts +++ b/src/blockwatcher.ts @@ -212,15 +212,21 @@ export class BlockWatcher implements ManagedResource { info('Block watcher stopped'); } - private async processChunk(chunk: BlockRange) { + async processChunk(chunk: BlockRange) { const startTime = Date.now(); info('Processing chunk %s', serializeBlockRange(chunk)); debug('Requesting block range', chunk); const blockRequestStart = Date.now(); - const blocks = await this.ethClient.requestBatch( - blockRangeToArray(chunk).map(blockNumber => getBlock(blockNumber)) - ); + const blocks = await this.ethClient + .requestBatch( + blockRangeToArray(chunk) + .filter(blockNumber => this.checkpoints.isIncomplete(blockNumber)) + .map(blockNumber => getBlock(blockNumber)) + ) + .catch(e => + Promise.reject(new Error(`Failed to request batch of blocks ${serializeBlockRange(chunk)}: ${e}`)) + ); debug('Received %d blocks in %d ms', blocks.length, Date.now() - blockRequestStart); for (const block of blocks) { await this.processBlock(block); @@ -234,6 +240,10 @@ export class BlockWatcher implements ManagedResource { } private async processBlock(block: RawBlockResponse) { + if (block.number != null && !this.checkpoints.isIncomplete(bigIntToNumber(block.number))) { + warn('Skipping processing of block %d since it is marked complete in our checkpoint'); + return; + } const startTime = Date.now(); const outputMessages: OutputMessage[] = []; const formattedBlock = formatBlock(block); diff --git a/src/checkpoint.ts b/src/checkpoint.ts index 1921416..4f235c6 100644 --- a/src/checkpoint.ts +++ b/src/checkpoint.ts @@ -6,6 +6,7 @@ import { parseBlockRange, serializeBlockRange, blockRangeSize, + blockRangeIncludes, } from './blockrange'; import { createModuleDebug } from './utils/debug'; import { ManagedResource } from './utils/resource'; @@ -78,6 +79,10 @@ export class Checkpoint implements ManagedResource { this.scheduleSave(); } + public isIncomplete(block: number): boolean { + return !this.completed.some(range => blockRangeIncludes(range, block)); + } + public getIncompleteRanges(latestBlock?: number): BlockRange[] { if (this.initialBlockNumber == null) { throw new Error('Initial block number not set');