Skip to content

Commit

Permalink
Sync 20221117 (#14)
Browse files Browse the repository at this point in the history
* fix: incomplete ds data in same block (subquery#1370)

Co-authored-by: Orion <orion@starfish.finance>

* [SKIP CI] Prerelease

* Best Block (subquery#1308)

* Draft

* update changes

* rebase changes

* fix

* fix (subquery#1329)

* disable best blocks for workers until we support it

* Rename bestBlock to unfinalizedBlocks, other clean up

* Clean up

* Further clean up

* Dedupe reindex function

* Fix tests

* Clean up

* Update checking finalization to use parent hash

* Rename logger

* Refactor unfinalized blocks

* Use header rather than full block, improve detecting forks

* Verify unfinalized blocks when disabled.

Use sorted array for storing unfinalized blocks

* Clean up logs

* Fix not indexing unfinalized blocks right away, exit if historical not enabled

Co-authored-by: Scott Twiname <skott.twiname@gmail.com>

* [SKIP CI] Prerelease

* Fix bugs with unfinalized blocks (subquery#1374)

* Fix bugs with unfinalized blocks

- Fix using wrong hash for unfinalized blocks
- Fix not removing unfinalized blocks when the latest unfinalized block < finalized

* Fix tests getting stuck

* Fix issue finding where fork occurred

* [SKIP CI] Prerelease

* [release] 20221028 (subquery#1372)

* hot fix tests (subquery#1360)

* update tests: api.service.spec, init-controller.test, publish-controller.spec

* update tests following comments

* [SKIP CI] Prerelease

* add ethereum to CLI and Validator (subquery#1378)

* Imporve dictionary query (subquery#1371)

* wip

* wip

* wip

* broken wip

* wip refactoring

* seperate dictionaryQueryEntries

* fix logic for setDicitonaryQueryEntries

* cleaning up

* move dictionaryEntry.ts to node-core

* remove comments

* create class for dictionaryqueryentries

* update tests

* move dictionaryQueryService into dicitonaryService

* refactor

* update logic for useDicitonary

* relocated dictionaryQuery funcs

* add test for dictquerymap

* tests failing on windows?

* test getDictionaryQueryEntries

* test fixed for getDictioanryQueryEntries

* added test for sorting

* conflict fixed ?

* fix conflict_2

* fix conflict_3

* add delete temp ds records back

* clean up with new logic

* clean up, add comments for test

* fix

* update logic

* add generic type

* [SKIP CI] Prerelease

* Enable for better inheritance of generated entity modeld (subquery#1377)

* refactor: enable for entity inheritance

* refactor: include also filed getters

* [SKIP CI] Prerelease

* fix comments issue with new package (subquery#1380)

* fix comments issue with new package

* moved yaml package

* [SKIP CI] Prerelease

* fix logic with reindex and unfinalized height and dynamic ds (subquery#1382)

* fix logic with reindex and unfinalized height

* fix

* include fix for subquery#1379

* update polkadot to 9.7.1 (subquery#1384)

* [release] 20221107

* Fix remove alter table (subquery#1387)

* remove migrate alter table

* remove

* [SKIP CI] Prerelease

* [release] 20221108 (subquery#1388)

* fix missing sequelize sync (subquery#1389)

* [SKIP CI] Prerelease

* [release] 20221108 patch (subquery#1390)

* reindex bind (subquery#1391)

* [SKIP CI] Prerelease

* [release] 20221109 (subquery#1393)

* Handle fetch errors, then retry (subquery#1386)

* add retryOnFail function

* add retryOnFail

* add test, fix logic

* [SKIP CI] Prerelease

* fix (subquery#1395)

* [SKIP CI] Prerelease

* [release] 20221109 node-core (subquery#1394)

* Fix tests hanging (subquery#1396)

* Fix tests hanging

* Update base docker image with newer git version

* [SKIP CI] Prerelease

* Add distinct query plugin (subquery#1274)

* Add distinct query plugin

* Clean up log

* Fix distinct not being provided to query

* Uppercase enum to be consistent with other enums

* Update dictionary queries to try distinct argument

* [SKIP CI] Prerelease

* Add query distinct dependencies (subquery#1398)

* fix missing update forked graphile dependencies

* tidy up

* tidy up

* [SKIP CI] Prerelease

* Break block dispatcher file up and move common code to base class (subquery#1397)

* [SKIP CI] Prerelease

* Hot schema trigger (subquery#1401)

* implement trigger with notification

* working prior clean up

* refactor and clean up on async and await

* clean up

* clear comments

* add filter

* fix

* fix err

* [SKIP CI] Prerelease

* [release] 20221115 (subquery#1402)

* [release] 20221115

* [release] 20221115

* [release] 20221115

* fix hot schema (subquery#1404)

* fix and refactor

* refactor getTriggers

* [SKIP CI] Prerelease

* [release] 20221115 (subquery#1408)

* [release] 20221115

* [release] 20221115

* fix fetchblock for works (subquery#1410)

* [SKIP CI] Prerelease

Co-authored-by: hariu-starfish <103621490+hariu-starfish@users.noreply.github.com>
Co-authored-by: Orion <orion@starfish.finance>
Co-authored-by: Jay Ji <jiqiang90@hotmail.com>
Co-authored-by: Ben <89335033+bz888@users.noreply.github.com>
Co-authored-by: Naveen V <velnaveen99@gmail.com>
Co-authored-by: Filippo <filippo@embriotech.ch>
  • Loading branch information
7 people authored Nov 17, 2022
1 parent ef932a8 commit 8c90c8b
Show file tree
Hide file tree
Showing 20 changed files with 965 additions and 729 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"typescript": "^4.4.4"
},
"resolutions": {
"@polkadot/api": "9.5.2",
"@polkadot/api": "9.7.1",
"@polkadot/util": "10.1.11",
"@terra-money/terra.js": "^3.0.11",
"node-fetch": "2.6.7"
Expand Down
2 changes: 1 addition & 1 deletion packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"@nestjs/schedule": "^1.0.2",
"@subql/common": "latest",
"@subql/common-ethereum": "workspace:*",
"@subql/node-core": "1.3.3",
"@subql/node-core": "1.4.2-0",
"@subql/types-ethereum": "workspace:*",
"@subql/utils": "latest",
"@subql/x-merkle-mountain-range": "2.0.0-0.1.2",
Expand Down
146 changes: 146 additions & 0 deletions packages/node/src/indexer/blockDispatcher/base-block-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2020-2021 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import assert from 'assert';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { hexToU8a, u8aEq } from '@polkadot/util';
import { getLogger, IndexerEvent, IQueue, NodeConfig } from '@subql/node-core';
import { ProjectService } from '../project.service';

const logger = getLogger('BaseBlockDispatcherService');

export type ProcessBlockResponse = {
dynamicDsCreated: boolean;
operationHash: Uint8Array;
reindexBlockHeight: number;
};

export interface IBlockDispatcher {
init(onDynamicDsCreated: (height: number) => Promise<void>): Promise<void>;

enqueueBlocks(heights: number[]): void;

queueSize: number;
freeSize: number;
latestBufferedHeight: number | undefined;

// Remove all enqueued blocks, used when a dynamic ds is created
flushQueue(height: number): void;
rewind(height: number): Promise<void>;
}

const NULL_MERKEL_ROOT = hexToU8a('0x00');

function isNullMerkelRoot(operationHash: Uint8Array): boolean {
return u8aEq(operationHash, NULL_MERKEL_ROOT);
}

export abstract class BaseBlockDispatcher<Q extends IQueue>
implements IBlockDispatcher
{
protected _latestBufferedHeight: number;
protected _processedBlockCount: number;
protected latestProcessedHeight: number;
protected currentProcessingHeight: number;
protected onDynamicDsCreated: (height: number) => Promise<void>;

constructor(
protected nodeConfig: NodeConfig,
protected eventEmitter: EventEmitter2,
protected projectService: ProjectService,
protected queue: Q,
) {}

abstract enqueueBlocks(heights: number[]): void;
abstract init(
onDynamicDsCreated: (height: number) => Promise<void>,
): Promise<void>;

get queueSize(): number {
return this.queue.size;
}

get freeSize(): number {
return this.queue.freeSpace;
}

get latestBufferedHeight(): number {
return this._latestBufferedHeight;
}

set latestBufferedHeight(height: number) {
this.eventEmitter.emit(IndexerEvent.BlocknumberQueueSize, {
value: this.queueSize,
});
this._latestBufferedHeight = height;
}

protected setProcessedBlockCount(processedBlockCount: number): void {
this._processedBlockCount = processedBlockCount;
this.eventEmitter.emit(IndexerEvent.BlockProcessedCount, {
processedBlockCount,
timestamp: Date.now(),
});
}

// Compare it with current indexing number, if last corrected is already indexed
// rewind, also flush queued blocks, drop current indexing transaction, set last processed to correct block too
// if rollback is greater than current index flush queue only
async rewind(lastCorrectHeight: number): Promise<void> {
if (lastCorrectHeight <= this.currentProcessingHeight) {
logger.info(
`Found last verified block at height ${lastCorrectHeight}, rewinding...`,
);
await this.projectService.reindex(lastCorrectHeight);
this.latestProcessedHeight = lastCorrectHeight;
logger.info(`Successful rewind to block ${lastCorrectHeight}!`);
}
this.flushQueue(lastCorrectHeight);
logger.info(`Queued blocks flushed!`); //Also last buffered height reset, next fetching should start after lastCorrectHeight
}

flushQueue(height: number): void {
this.latestBufferedHeight = height;
this.queue.flush();
}

// Is called directly before a block is processed
protected preProcessBlock(height: number): void {
this.currentProcessingHeight = height;
this.eventEmitter.emit(IndexerEvent.BlockProcessing, {
height,
timestamp: Date.now(),
});
}

// Is called directly after a block is processed
protected async postProcessBlock(
height: number,
processBlockResponse: ProcessBlockResponse,
): Promise<void> {
const { dynamicDsCreated, operationHash, reindexBlockHeight } =
processBlockResponse;
if (reindexBlockHeight !== null && reindexBlockHeight !== undefined) {
await this.rewind(reindexBlockHeight);
this.latestProcessedHeight = reindexBlockHeight;
} else {
if (this.nodeConfig.proofOfIndex && !isNullMerkelRoot(operationHash)) {
if (!this.projectService.blockOffset) {
// Which means during project init, it has not found offset and set value
await this.projectService.upsertMetadataBlockOffset(height - 1);
}
void this.projectService.setBlockOffset(height - 1);
}
if (dynamicDsCreated) {
await this.onDynamicDsCreated(height);
}
assert(
!this.latestProcessedHeight || height > this.latestProcessedHeight,
`Block processed out of order. Height: ${height}. Latest: ${this.latestProcessedHeight}`,
);
// In memory _processedBlockCount increase, db metadata increase BlockCount in indexer.manager
this.setProcessedBlockCount(this._processedBlockCount + 1);
this.latestProcessedHeight = height;
}
}
}
179 changes: 179 additions & 0 deletions packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2020-2021 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import {
ApiService,
getLogger,
NodeConfig,
IndexerEvent,
delay,
profilerWrap,
AutoQueue,
Queue,
} from '@subql/node-core';
import { last } from 'lodash';
import { IndexerManager } from '../indexer.manager';
import { ProjectService } from '../project.service';
import { BaseBlockDispatcher } from './base-block-dispatcher';

const logger = getLogger('BlockDispatcherService');

/**
* @description Intended to behave the same as WorkerBlockDispatcherService but doesn't use worker threads or any parallel processing
*/
@Injectable()
export class BlockDispatcherService
extends BaseBlockDispatcher<Queue<number>>
implements OnApplicationShutdown
{
private processQueue: AutoQueue<void>;

private fetching = false;
private isShutdown = false;
private fetchBlocksBatches: ApiService['api']['fetchBlocks'];

constructor(
private apiService: ApiService,
nodeConfig: NodeConfig,
private indexerManager: IndexerManager,
eventEmitter: EventEmitter2,
projectService: ProjectService,
) {
super(
nodeConfig,
eventEmitter,
projectService,
new Queue(nodeConfig.batchSize * 3),
);
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3);

const fetchBlocks = this.apiService.api.fetchBlocks.bind(
this.apiService.api,
);
if (this.nodeConfig.profiler) {
this.fetchBlocksBatches = profilerWrap(
fetchBlocks,
'EthereumUtil',
'fetchBlocksBatches',
);
} else {
this.fetchBlocksBatches = fetchBlocks;
}
}

// eslint-disable-next-line @typescript-eslint/require-await
async init(
onDynamicDsCreated: (height: number) => Promise<void>,
): Promise<void> {
this.onDynamicDsCreated = onDynamicDsCreated;
const blockAmount = await this.projectService.getProcessedBlockCount();
this.setProcessedBlockCount(blockAmount ?? 0);
}

onApplicationShutdown(): void {
this.isShutdown = true;
this.processQueue.abort();
}

enqueueBlocks(heights: number[]): void {
if (!heights.length) return;

logger.info(
`Enqueing blocks ${heights[0]}...${last(heights)}, total ${
heights.length
} blocks`,
);

this.queue.putMany(heights);
this.latestBufferedHeight = last(heights);

void this.fetchBlocksFromQueue().catch((e) => {
logger.error(e, 'Failed to fetch blocks from queue');
if (!this.isShutdown) {
process.exit(1);
}
});
}

flushQueue(height: number): void {
super.flushQueue(height);
this.processQueue.flush();
}

private async fetchBlocksFromQueue(): Promise<void> {
if (this.fetching || this.isShutdown) return;
// Process queue is full, no point in fetching more blocks
// if (this.processQueue.freeSpace < this.nodeConfig.batchSize) return;

this.fetching = true;

try {
while (!this.isShutdown) {
const blockNums = this.queue.takeMany(
Math.min(this.nodeConfig.batchSize, this.processQueue.freeSpace),
);
// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this._latestBufferedHeight;

// Queue is empty
if (!blockNums.length) {
// The process queue might be full so no block nums were taken, wait and try again
if (this.queue.size) {
await delay(1);
continue;
}
break;
}

logger.info(
`fetch block [${blockNums[0]},${
blockNums[blockNums.length - 1]
}], total ${blockNums.length} blocks`,
);

const blocks = await this.fetchBlocksBatches(blockNums);

if (bufferedHeight > this._latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
continue;
}
const blockTasks = blocks.map((block) => async () => {
const height = block.block.block.header.number.toNumber();
try {
this.preProcessBlock(height);

const processBlockResponse = await this.indexerManager.indexBlock(
block,
);

await this.postProcessBlock(height, processBlockResponse);
} catch (e) {
if (this.isShutdown) {
return;
}
logger.error(
e,
`failed to index block at height ${height} ${
e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
}`,
);
throw e;
}
});

// There can be enough of a delay after fetching blocks that shutdown could now be true
if (this.isShutdown) break;

this.processQueue.putMany(blockTasks);

this.eventEmitter.emit(IndexerEvent.BlockQueueSize, {
value: this.processQueue.size,
});
}
} finally {
this.fetching = false;
}
}
}
12 changes: 12 additions & 0 deletions packages/node/src/indexer/blockDispatcher/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2020-2021 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { IBlockDispatcher } from './base-block-dispatcher';
import { BlockDispatcherService } from './block-dispatcher.service';
import { WorkerBlockDispatcherService } from './worker-block-dispatcher.service';

export {
IBlockDispatcher,
BlockDispatcherService,
WorkerBlockDispatcherService,
};
Loading

0 comments on commit 8c90c8b

Please sign in to comment.