Skip to content

Commit

Permalink
Discard pending data when dynamic ds is created
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Jun 29, 2022
1 parent 79a3436 commit 668de33
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 69 deletions.
63 changes: 9 additions & 54 deletions packages/node/src/indexer/fetch.service.ts
@@ -1,7 +1,6 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { getHeapStatistics } from 'v8';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Interval } from '@nestjs/schedule';
Expand Down Expand Up @@ -29,6 +28,7 @@ import {
import { range, sortBy, uniqBy } from 'lodash';
import { NodeConfig } from '../configure/NodeConfig';
import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject';
import { checkMemoryUsage } from '../utils/batch-size';
import { getLogger } from '../utils/logger';
import { profiler } from '../utils/profiler';
import { isBaseHandler, isCustomHandler } from '../utils/project';
Expand All @@ -50,8 +50,6 @@ const logger = getLogger('fetch');
const BLOCK_TIME_VARIANCE = 5;
const DICTIONARY_MAX_QUERY_SIZE = 10000;
const CHECK_MEMORY_INTERVAL = 60000;
const HIGH_THRESHOLD = 0.85;
const LOW_THRESHOLD = 0.6;
const MINIMUM_BATCH_SIZE = 5;
const SPEC_VERSION_BLOCK_GAP = 100;

Expand Down Expand Up @@ -87,36 +85,10 @@ function callFilterToQueryEntry(
};
}

function checkMemoryUsage(batchSize: number, batchSizeScale: number): number {
const memoryData = getHeapStatistics();
const ratio = memoryData.used_heap_size / memoryData.heap_size_limit;
if (argv.profiler) {
logger.info(`Heap Statistics: ${JSON.stringify(memoryData)}`);
logger.info(`Heap Usage: ${ratio}`);
}
let scale = batchSizeScale;

if (ratio > HIGH_THRESHOLD) {
if (scale > 0) {
scale = Math.max(scale - 0.1, 0);
logger.debug(`Heap usage: ${ratio}, decreasing batch size by 10%`);
}
}

if (ratio < LOW_THRESHOLD) {
if (scale < 1) {
scale = Math.min(scale + 0.1, 1);
logger.debug(`Heap usage: ${ratio} increasing batch size by 10%`);
}
}
return scale;
}

@Injectable()
export class FetchService implements OnApplicationShutdown {
private latestBestHeight: number;
private latestFinalizedHeight: number;
private latestBufferedHeight: number;
private isShutdown = false;
private parentSpecVersion: number;
private useDictionary: boolean;
Expand Down Expand Up @@ -266,10 +238,7 @@ export class FetchService implements OnApplicationShutdown {
@Interval(CHECK_MEMORY_INTERVAL)
checkBatchScale(): void {
if (argv['scale-batch-size']) {
const scale = checkMemoryUsage(
this.nodeConfig.batchSize,
this.batchSizeScale,
);
const scale = checkMemoryUsage(this.batchSizeScale);

if (this.batchSizeScale !== scale) {
this.batchSizeScale = scale;
Expand Down Expand Up @@ -330,8 +299,8 @@ export class FetchService implements OnApplicationShutdown {
let scaledBatchSize: number;

const getStartBlockHeight = (): number => {
return this.latestBufferedHeight
? this.latestBufferedHeight + 1
return this.blockDispatcher.latestBufferedHeight
? this.blockDispatcher.latestBufferedHeight + 1
: initBlockHeight;
};

Expand Down Expand Up @@ -373,19 +342,14 @@ export class FetchService implements OnApplicationShutdown {
) {
const { batchBlocks } = dictionary;
if (batchBlocks.length === 0) {
this.setLatestBufferedHeight(
Math.min(
queryEndBlock - 1,
dictionary._metadata.lastProcessedHeight,
),
// There we're no blocks in this query range, we can set a new height we're up to
this.blockDispatcher.latestBufferedHeight = Math.min(
queryEndBlock - 1,
dictionary._metadata.lastProcessedHeight,
);
} else {
this.blockDispatcher.enqueueBlocks(batchBlocks);
this.setLatestBufferedHeight(batchBlocks[batchBlocks.length - 1]);
}
this.eventEmitter.emit(IndexerEvent.BlocknumberQueueSize, {
value: this.blockDispatcher.queueSize,
});
continue; // skip nextBlockRange() way
}
// else use this.nextBlockRange()
Expand All @@ -402,7 +366,6 @@ export class FetchService implements OnApplicationShutdown {
this.blockDispatcher.enqueueBlocks(
range(startBlockHeight, endHeight + 1),
);
this.setLatestBufferedHeight(endHeight);
}
}

Expand Down Expand Up @@ -518,8 +481,7 @@ export class FetchService implements OnApplicationShutdown {
async resetForNewDs(blockHeight: number): Promise<void> {
await this.syncDynamicDatascourcesFromMeta();
this.updateDictionary();
this.blockDispatcher.flushQueue();
this.setLatestBufferedHeight(blockHeight);
this.blockDispatcher.flushQueue(blockHeight);
}

private dictionaryValidation(
Expand All @@ -545,13 +507,6 @@ export class FetchService implements OnApplicationShutdown {
return true;
}

private setLatestBufferedHeight(height: number): void {
this.latestBufferedHeight = height;
this.eventEmitter.emit(IndexerEvent.BlocknumberQueueSize, {
value: this.blockDispatcher.queueSize,
});
}

private getBaseHandlerKind(
ds: SubstrateDataSource,
handler: SubstrateHandler,
Expand Down
72 changes: 57 additions & 15 deletions packages/node/src/indexer/worker/block-dispatcher.service.ts
Expand Up @@ -8,10 +8,10 @@ import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import { SubstrateBlock } from '@subql/types';
import { EventEmitter2 } from 'eventemitter2';
import { last } from 'lodash';
import { NodeConfig } from '../../configure/NodeConfig';
import { AutoQueue, Queue } from '../../utils/autoQueue';
import { getLogger } from '../../utils/logger';
import { delay } from '../../utils/promise';
import { fetchBlocksBatches } from '../../utils/substrate';
import { ApiService } from '../api.service';
import { IndexerEvent } from '../events';
Expand Down Expand Up @@ -77,9 +77,10 @@ export interface IBlockDispatcher {

queueSize: number;
freeSize: number;
latestBufferedHeight: number | undefined;

// Remove all enqueued blocks, used when a dynamic ds is created
flushQueue(): void;
flushQueue(height: number): void;
}

const logger = getLogger('BlockDispatcherService');
Expand All @@ -99,6 +100,7 @@ export class BlockDispatcherService
private isShutdown = false;
private getRuntimeVersion: GetRuntimeVersion;
private onDynamicDsCreated: (height: number) => Promise<void>;
private _latestBufferedHeight: number;

constructor(
private apiService: ApiService,
Expand All @@ -125,18 +127,21 @@ export class BlockDispatcherService
}

enqueueBlocks(heights: number[]): void {
logger.info(
`Enqueing blocks ${heights[0]}...${heights[heights.length - 1]}`,
);
if (!heights.length) return;

logger.info(`Enqueing blocks ${heights[0]}...${last(heights)}`);

this.fetchQueue.putMany(heights);
this.latestBufferedHeight = last(heights);

void this.fetchBlocksFromQueue().catch((e) => {
logger.error(e, 'Failed to fetch blocks from queue');
throw e;
});
}

flushQueue(): void {
flushQueue(height: number): void {
this.latestBufferedHeight = height;
this.fetchQueue.flush(); // Empty
this.processQueue.flush();
}
Expand All @@ -150,6 +155,9 @@ export class BlockDispatcherService

const blockNums = this.fetchQueue.takeMany(this.nodeConfig.batchSize);

// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this._latestBufferedHeight;

logger.info(
`fetch block [${blockNums[0]},${
blockNums[blockNums.length - 1]
Expand All @@ -167,6 +175,11 @@ export class BlockDispatcherService
blockNums,
);

if (bufferedHeight > this._latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
return;
}

const blockTasks = blocks.map((block) => async () => {
const height = block.block.block.header.number.toNumber();
logger.info(`INDEXING BLOCK ${height}`);
Expand Down Expand Up @@ -204,6 +217,17 @@ export class BlockDispatcherService
get freeSize(): number {
return this.fetchQueue.freeSpace;
}

get latestBufferedHeight(): number {
return this._latestBufferedHeight;
}

set latestBufferedHeight(height: number) {
this.eventEmitter.emit(IndexerEvent.BlocknumberQueueSize, {
value: this.queueSize,
});
this._latestBufferedHeight = height;
}
}

@Injectable()
Expand All @@ -218,14 +242,15 @@ export class WorkerBlockDispatcherService
private taskCounter = 0;
private isShutdown = false;
private queue: AutoQueue<void>;
private _latestBufferedHeight: number;

/**
* @param numWorkers. The number of worker threads to run, this is capped at number of cpus
* @param workerQueueSize. The number of fetched blocks queued to be processed
*/
constructor(
numWorkers: number,
private workerQueueSize: number,
workerQueueSize: number,
private eventEmitter: EventEmitter2,
) {
this.numWorkers = getMaxWorkers(numWorkers);
Expand Down Expand Up @@ -254,14 +279,16 @@ export class WorkerBlockDispatcherService
}

enqueueBlocks(heights: number[]): void {
logger.info(
`Enqueing blocks ${heights[0]}...${heights[heights.length - 1]}`,
);
if (!heights.length) return;
logger.info(`Enqueing blocks ${heights[0]}...${last(heights)}`);

heights.map((height) => this.enqueueBlock(height));

this.latestBufferedHeight = last(heights);
}

flushQueue(): void {
flushQueue(height: number): void {
this.latestBufferedHeight = height;
this.queue.flush();
}

Expand All @@ -271,13 +298,21 @@ export class WorkerBlockDispatcherService
const worker = this.workers[workerIdx];

assert(worker, `Worker ${workerIdx} not found`);

// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this._latestBufferedHeight;
const pendingBlock = worker.fetchBlock(height);

const processBlock = async () => {
const start = new Date();
const result = await pendingBlock;
const end = new Date();

if (bufferedHeight > this._latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
return;
}

if (start.getTime() < end.getTime() - 100) {
console.log(
'Waiting for pending block',
Expand Down Expand Up @@ -320,10 +355,6 @@ export class WorkerBlockDispatcherService
void this.queue.put(processBlock);
}

setRuntimeVersionGetter(fn: GetRuntimeVersion): void {
this.getRuntimeVersion = fn;
}

get queueSize(): number {
return this.queue.size;
}
Expand All @@ -332,6 +363,17 @@ export class WorkerBlockDispatcherService
return this.queue.freeSpace;
}

get latestBufferedHeight(): number {
return this._latestBufferedHeight;
}

set latestBufferedHeight(height: number) {
this.eventEmitter.emit(IndexerEvent.BlocknumberQueueSize, {
value: this.queueSize,
});
this._latestBufferedHeight = height;
}

private getNextWorkerIndex(): number {
const index = this.taskCounter % this.numWorkers;

Expand Down
37 changes: 37 additions & 0 deletions packages/node/src/utils/batch-size.ts
@@ -0,0 +1,37 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { getHeapStatistics } from 'v8';
import { getYargsOption } from '../yargs';
import { getLogger } from './logger';

const HIGH_THRESHOLD = 0.85;
const LOW_THRESHOLD = 0.6;

const { argv } = getYargsOption();
const logger = getLogger('memory');

export function checkMemoryUsage(batchSizeScale: number): number {
const memoryData = getHeapStatistics();
const ratio = memoryData.used_heap_size / memoryData.heap_size_limit;
if (argv.profiler) {
logger.info(`Heap Statistics: ${JSON.stringify(memoryData)}`);
logger.info(`Heap Usage: ${ratio}`);
}
let scale = batchSizeScale;

if (ratio > HIGH_THRESHOLD) {
if (scale > 0) {
scale = Math.max(scale - 0.1, 0);
logger.debug(`Heap usage: ${ratio}, decreasing batch size by 10%`);
}
}

if (ratio < LOW_THRESHOLD) {
if (scale < 1) {
scale = Math.min(scale + 0.1, 1);
logger.debug(`Heap usage: ${ratio} increasing batch size by 10%`);
}
}
return scale;
}

0 comments on commit 668de33

Please sign in to comment.