Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delay and don't break loop when process queue is full #1261

Merged
merged 1 commit into from
Aug 25, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
163 changes: 85 additions & 78 deletions packages/node/src/indexer/worker/block-dispatcher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { EventEmitter2 } from '@nestjs/event-emitter';
import { Interval } from '@nestjs/schedule';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import { hexToU8a, u8aEq } from '@polkadot/util';
import { getYargsOption, profilerWrap } from '@subql/node-core';
import { delay, getYargsOption, profilerWrap } from '@subql/node-core';
import { NodeConfig } from '@subql/node-core/configure';
import { IndexerEvent } from '@subql/node-core/events';
import { Worker } from '@subql/node-core/indexer';
Expand Down Expand Up @@ -192,94 +192,101 @@ export class BlockDispatcherService

this.fetching = true;

while (!this.isShutdown) {
const blockNums = this.fetchQueue.takeMany(
Math.min(this.nodeConfig.batchSize, this.processQueue.freeSpace),
);

// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this._latestBufferedHeight;

// Queue is empty
if (!blockNums.length) {
break;
}
try {
while (!this.isShutdown) {
const blockNums = this.fetchQueue.takeMany(
Math.min(this.nodeConfig.batchSize, this.processQueue.freeSpace),
);
// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this._latestBufferedHeight;

// Queue is empty
if (!blockNums.length) {
// The process queue might be full so no block nums were taken, wait and try again
if (this.fetchQueue.size) {
await delay(1);
continue;
}
break;
}

logger.info(
`fetch block [${blockNums[0]},${
blockNums[blockNums.length - 1]
}], total ${blockNums.length} blocks`,
);
logger.info(
`fetch block [${blockNums[0]},${
blockNums[blockNums.length - 1]
}], total ${blockNums.length} blocks`,
);

const blocks = await this.fetchBlocksBatches(
this.apiService.getApi(),
blockNums,
);
const blocks = await this.fetchBlocksBatches(
this.apiService.getApi(),
blockNums,
);

if (bufferedHeight > this._latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
continue;
}
const blockTasks = blocks.map((block) => async () => {
const height = block.block.block.header.number.toNumber();
try {
this.eventEmitter.emit(IndexerEvent.BlockProcessing, {
height,
timestamp: Date.now(),
});

const runtimeVersion = await this.getRuntimeVersion(block.block);

const { dynamicDsCreated, operationHash } =
await this.indexerManager.indexBlock(block, runtimeVersion);

// In memory _processedBlockCount increase, db metadata increase BlockCount in indexer.manager
this.setProcessedBlockCount(this._processedBlockCount + 1);
if (
this.nodeConfig.proofOfIndex &&
!isNullMerkelRoot(operationHash)
) {
if (!this.projectService.blockOffset) {
// Which means during project init, it has not found offset and set value
await this.projectService.upsertMetadataBlockOffset(height - 1);
if (bufferedHeight > this._latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
continue;
}
const blockTasks = blocks.map((block) => async () => {
const height = block.block.block.header.number.toNumber();
try {
this.eventEmitter.emit(IndexerEvent.BlockProcessing, {
height,
timestamp: Date.now(),
});

const runtimeVersion = await this.getRuntimeVersion(block.block);

const { dynamicDsCreated, operationHash } =
await this.indexerManager.indexBlock(block, runtimeVersion);

// In memory _processedBlockCount increase, db metadata increase BlockCount in indexer.manager
this.setProcessedBlockCount(this._processedBlockCount + 1);
if (
this.nodeConfig.proofOfIndex &&
!isNullMerkelRoot(operationHash)
) {
if (!this.projectService.blockOffset) {
// Which means during project init, it has not found offset and set value
await this.projectService.upsertMetadataBlockOffset(height - 1);
}
void this.projectService.setBlockOffset(height - 1);
}
void this.projectService.setBlockOffset(height - 1);
}

if (dynamicDsCreated) {
await this.onDynamicDsCreated(height);
}
if (dynamicDsCreated) {
await this.onDynamicDsCreated(height);
}

assert(
!this.latestProcessedHeight || height > this.latestProcessedHeight,
`Block processed out of order. Height: ${height}. Latest: ${this.latestProcessedHeight}`,
);
this.latestProcessedHeight = height;
} catch (e) {
if (this.isShutdown) {
return;
assert(
!this.latestProcessedHeight ||
height > this.latestProcessedHeight,
`Block processed out of order. Height: ${height}. Latest: ${this.latestProcessedHeight}`,
);
this.latestProcessedHeight = height;
} catch (e) {
if (this.isShutdown) {
return;
}
logger.error(
e,
`failed to index block at height ${height} ${
e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
}`,
);
throw e;
}
logger.error(
e,
`failed to index block at height ${height} ${
e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
}`,
);
throw e;
}
});
});

// There can be enough of a delay after fetching blocks that shutdown could now be true
if (this.isShutdown) break;
// There can be enough of a delay after fetching blocks that shutdown could now be true
if (this.isShutdown) break;

this.processQueue.putMany(blockTasks);
this.processQueue.putMany(blockTasks);

this.eventEmitter.emit(IndexerEvent.BlockQueueSize, {
value: this.processQueue.size,
});
this.eventEmitter.emit(IndexerEvent.BlockQueueSize, {
value: this.processQueue.size,
});
}
} finally {
this.fetching = false;
}

this.fetching = false;
}

get queueSize(): number {
Expand Down