Skip to content

Commit

Permalink
fix: 🐛 possible duplicate output of blocks
Browse files Browse the repository at this point in the history
Fixed duplicate output of block information if processing a block range
partially failed. Added additional check while processing a block range
to see if a block has already been marked as complete in checkpoints.
  • Loading branch information
ziegfried committed Feb 10, 2020
1 parent 3f8785e commit 80bb0da
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/blockrange.ts
Expand Up @@ -104,3 +104,6 @@ export function getInverseBlockRanges(
}
return compactRanges(result);
}

export const blockRangeIncludes = (range: BlockRange, block: number): boolean =>
block >= range.from && block <= range.to;
18 changes: 14 additions & 4 deletions src/blockwatcher.ts
Expand Up @@ -212,15 +212,21 @@ export class BlockWatcher implements ManagedResource {
info('Block watcher stopped');
}

private async processChunk(chunk: BlockRange) {
async processChunk(chunk: BlockRange) {
const startTime = Date.now();
info('Processing chunk %s', serializeBlockRange(chunk));

debug('Requesting block range', chunk);
const blockRequestStart = Date.now();
const blocks = await this.ethClient.requestBatch(
blockRangeToArray(chunk).map(blockNumber => getBlock(blockNumber))
);
const blocks = await this.ethClient
.requestBatch(
blockRangeToArray(chunk)
.filter(blockNumber => this.checkpoints.isIncomplete(blockNumber))
.map(blockNumber => getBlock(blockNumber))
)
.catch(e =>
Promise.reject(new Error(`Failed to request batch of blocks ${serializeBlockRange(chunk)}: ${e}`))
);
debug('Received %d blocks in %d ms', blocks.length, Date.now() - blockRequestStart);
for (const block of blocks) {
await this.processBlock(block);
Expand All @@ -234,6 +240,10 @@ export class BlockWatcher implements ManagedResource {
}

private async processBlock(block: RawBlockResponse) {
if (block.number != null && !this.checkpoints.isIncomplete(bigIntToNumber(block.number))) {
warn('Skipping processing of block %d since it is marked complete in our checkpoint');
return;
}
const startTime = Date.now();
const outputMessages: OutputMessage[] = [];
const formattedBlock = formatBlock(block);
Expand Down
5 changes: 5 additions & 0 deletions src/checkpoint.ts
Expand Up @@ -6,6 +6,7 @@ import {
parseBlockRange,
serializeBlockRange,
blockRangeSize,
blockRangeIncludes,
} from './blockrange';
import { createModuleDebug } from './utils/debug';
import { ManagedResource } from './utils/resource';
Expand Down Expand Up @@ -78,6 +79,10 @@ export class Checkpoint implements ManagedResource {
this.scheduleSave();
}

public isIncomplete(block: number): boolean {
return !this.completed.some(range => blockRangeIncludes(range, block));
}

public getIncompleteRanges(latestBlock?: number): BlockRange[] {
if (this.initialBlockNumber == null) {
throw new Error('Initial block number not set');
Expand Down

0 comments on commit 80bb0da

Please sign in to comment.