Skip to content

Commit

Permalink
Port over error handling changes
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Jul 10, 2022
1 parent d803650 commit a147ab7
Showing 1 changed file with 75 additions and 53 deletions.
128 changes: 75 additions & 53 deletions packages/node/src/indexer/worker/block-dispatcher.service.ts
Expand Up @@ -188,20 +188,30 @@ export class BlockDispatcherService

const blockTasks = blocks.map((block) => async () => {
const height = block.block.block.header.number.toNumber();
this.eventEmitter.emit(IndexerEvent.BlockProcessing, {
height,
timestamp: Date.now(),
});

const runtimeVersion = await this.getRuntimeVersion(block.block);

const { dynamicDsCreated } = await this.indexerManager.indexBlock(
block,
runtimeVersion,
);

if (dynamicDsCreated) {
await this.onDynamicDsCreated(height);
try {
this.eventEmitter.emit(IndexerEvent.BlockProcessing, {
height,
timestamp: Date.now(),
});

const runtimeVersion = await this.getRuntimeVersion(block.block);

const { dynamicDsCreated } = await this.indexerManager.indexBlock(
block,
runtimeVersion,
);

if (dynamicDsCreated) {
await this.onDynamicDsCreated(height);
}
} catch (e) {
logger.error(
e,
`failed to index block at height ${height} ${
e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
}`,
);
throw e;
}
});

Expand Down Expand Up @@ -321,60 +331,72 @@ export class WorkerBlockDispatcherService
const pendingBlock = worker.fetchBlock(height);

const processBlock = async () => {
const start = new Date();
const result = await pendingBlock;
const end = new Date();

if (bufferedHeight > this._latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
return;
}
try {
const start = new Date();
const result = await pendingBlock;
const end = new Date();

if (bufferedHeight > this._latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
return;
}

const waitTime = end.getTime() - start.getTime();
if (waitTime > 1000) {
logger.info(
`Waiting to fetch block ${height}: ${chalk.red(`${waitTime}ms`)}`,
);
} else if (waitTime > 200) {
logger.info(
`Waiting to fetch block ${height}: ${chalk.yellow(`${waitTime}ms`)}`,
);
}
const waitTime = end.getTime() - start.getTime();
if (waitTime > 1000) {
logger.info(
`Waiting to fetch block ${height}: ${chalk.red(`${waitTime}ms`)}`,
);
} else if (waitTime > 200) {
logger.info(
`Waiting to fetch block ${height}: ${chalk.yellow(
`${waitTime}ms`,
)}`,
);
}

if (result) {
const runtimeVersion = await this.getRuntimeVersion({
specVersion: result.specVersion,
block: {
header: {
parentHash: result.parentHash,
if (result) {
const runtimeVersion = await this.getRuntimeVersion({
specVersion: result.specVersion,
block: {
header: {
parentHash: result.parentHash,
},
},
},
} as any);
} as any);

await worker.setCurrentRuntimeVersion(runtimeVersion.toHex());
}
await worker.setCurrentRuntimeVersion(runtimeVersion.toHex());
}

// logger.info(
// `worker ${workerIdx} processing block ${height}, fetched blocks: ${await worker.numFetchedBlocks()}, fetching blocks: ${await worker.numFetchingBlocks()}`,
// );
// logger.info(
// `worker ${workerIdx} processing block ${height}, fetched blocks: ${await worker.numFetchedBlocks()}, fetching blocks: ${await worker.numFetchingBlocks()}`,
// );

this.eventEmitter.emit(IndexerEvent.BlockProcessing, {
height,
timestamp: Date.now(),
});
this.eventEmitter.emit(IndexerEvent.BlockProcessing, {
height,
timestamp: Date.now(),
});

const { dynamicDsCreated } = await worker.processBlock(height);
const { dynamicDsCreated } = await worker.processBlock(height);

if (dynamicDsCreated) {
await this.onDynamicDsCreated(height);
if (dynamicDsCreated) {
await this.onDynamicDsCreated(height);
}
} catch (e) {
logger.error(
e,
`failed to index block at height ${height} ${
e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
}`,
);
throw e;
}
};

void this.queue.put(processBlock);
}

@Interval(15000)
async sampleWorkerStatus() {
async sampleWorkerStatus(): Promise<void> {
for (const worker of this.workers) {
const status = await worker.getStatus();
logger.info(JSON.stringify(status));
Expand Down

0 comments on commit a147ab7

Please sign in to comment.