diff --git a/packages/node/src/indexer/worker/block-dispatcher.service.ts b/packages/node/src/indexer/worker/block-dispatcher.service.ts index 71cc312ee9..4de8dbe819 100644 --- a/packages/node/src/indexer/worker/block-dispatcher.service.ts +++ b/packages/node/src/indexer/worker/block-dispatcher.service.ts @@ -8,7 +8,7 @@ import { EventEmitter2 } from '@nestjs/event-emitter'; import { Interval } from '@nestjs/schedule'; import { RuntimeVersion } from '@polkadot/types/interfaces'; import { hexToU8a, u8aEq } from '@polkadot/util'; -import { getYargsOption, profilerWrap } from '@subql/node-core'; +import { delay, getYargsOption, profilerWrap } from '@subql/node-core'; import { NodeConfig } from '@subql/node-core/configure'; import { IndexerEvent } from '@subql/node-core/events'; import { Worker } from '@subql/node-core/indexer'; @@ -192,94 +192,101 @@ export class BlockDispatcherService this.fetching = true; - while (!this.isShutdown) { - const blockNums = this.fetchQueue.takeMany( - Math.min(this.nodeConfig.batchSize, this.processQueue.freeSpace), - ); - - // Used to compare before and after as a way to check if queue was flushed - const bufferedHeight = this._latestBufferedHeight; - - // Queue is empty - if (!blockNums.length) { - break; - } + try { + while (!this.isShutdown) { + const blockNums = this.fetchQueue.takeMany( + Math.min(this.nodeConfig.batchSize, this.processQueue.freeSpace), + ); + // Used to compare before and after as a way to check if queue was flushed + const bufferedHeight = this._latestBufferedHeight; + + // Queue is empty + if (!blockNums.length) { + // The process queue might be full so no block nums were taken, wait and try again + if (this.fetchQueue.size) { + await delay(1); + continue; + } + break; + } - logger.info( - `fetch block [${blockNums[0]},${ - blockNums[blockNums.length - 1] - }], total ${blockNums.length} blocks`, - ); + logger.info( + `fetch block [${blockNums[0]},${ + blockNums[blockNums.length - 1] + }], total ${blockNums.length} blocks`, + ); - const blocks = await this.fetchBlocksBatches( - this.apiService.getApi(), - blockNums, - ); + const blocks = await this.fetchBlocksBatches( + this.apiService.getApi(), + blockNums, + ); - if (bufferedHeight > this._latestBufferedHeight) { - logger.debug(`Queue was reset for new DS, discarding fetched blocks`); - continue; - } - const blockTasks = blocks.map((block) => async () => { - const height = block.block.block.header.number.toNumber(); - try { - this.eventEmitter.emit(IndexerEvent.BlockProcessing, { - height, - timestamp: Date.now(), - }); - - const runtimeVersion = await this.getRuntimeVersion(block.block); - - const { dynamicDsCreated, operationHash } = - await this.indexerManager.indexBlock(block, runtimeVersion); - - // In memory _processedBlockCount increase, db metadata increase BlockCount in indexer.manager - this.setProcessedBlockCount(this._processedBlockCount + 1); - if ( - this.nodeConfig.proofOfIndex && - !isNullMerkelRoot(operationHash) - ) { - if (!this.projectService.blockOffset) { - // Which means during project init, it has not found offset and set value - await this.projectService.upsertMetadataBlockOffset(height - 1); + if (bufferedHeight > this._latestBufferedHeight) { + logger.debug(`Queue was reset for new DS, discarding fetched blocks`); + continue; + } + const blockTasks = blocks.map((block) => async () => { + const height = block.block.block.header.number.toNumber(); + try { + this.eventEmitter.emit(IndexerEvent.BlockProcessing, { + height, + timestamp: Date.now(), + }); + + const runtimeVersion = await this.getRuntimeVersion(block.block); + + const { dynamicDsCreated, operationHash } = + await this.indexerManager.indexBlock(block, runtimeVersion); + + // In memory _processedBlockCount increase, db metadata increase BlockCount in indexer.manager + this.setProcessedBlockCount(this._processedBlockCount + 1); + if ( + this.nodeConfig.proofOfIndex && + !isNullMerkelRoot(operationHash) + ) { + if (!this.projectService.blockOffset) { + // Which means during project init, it has not found offset and set value + await this.projectService.upsertMetadataBlockOffset(height - 1); + } + void this.projectService.setBlockOffset(height - 1); } - void this.projectService.setBlockOffset(height - 1); - } - if (dynamicDsCreated) { - await this.onDynamicDsCreated(height); - } + if (dynamicDsCreated) { + await this.onDynamicDsCreated(height); + } - assert( - !this.latestProcessedHeight || height > this.latestProcessedHeight, - `Block processed out of order. Height: ${height}. Latest: ${this.latestProcessedHeight}`, - ); - this.latestProcessedHeight = height; - } catch (e) { - if (this.isShutdown) { - return; + assert( + !this.latestProcessedHeight || + height > this.latestProcessedHeight, + `Block processed out of order. Height: ${height}. Latest: ${this.latestProcessedHeight}`, + ); + this.latestProcessedHeight = height; + } catch (e) { + if (this.isShutdown) { + return; + } + logger.error( + e, + `failed to index block at height ${height} ${ + e.handler ? `${e.handler}(${e.stack ?? ''})` : '' + }`, + ); + throw e; } - logger.error( - e, - `failed to index block at height ${height} ${ - e.handler ? `${e.handler}(${e.stack ?? ''})` : '' - }`, - ); - throw e; - } - }); + }); - // There can be enough of a delay after fetching blocks that shutdown could now be true - if (this.isShutdown) break; + // There can be enough of a delay after fetching blocks that shutdown could now be true + if (this.isShutdown) break; - this.processQueue.putMany(blockTasks); + this.processQueue.putMany(blockTasks); - this.eventEmitter.emit(IndexerEvent.BlockQueueSize, { - value: this.processQueue.size, - }); + this.eventEmitter.emit(IndexerEvent.BlockQueueSize, { + value: this.processQueue.size, + }); + } + } finally { + this.fetching = false; } - - this.fetching = false; } get queueSize(): number {