diff --git a/packages/node/src/indexer/dictionary.service.test.ts b/packages/node/src/indexer/dictionary.service.test.ts index 427a5a4d23..dbb6aaa577 100644 --- a/packages/node/src/indexer/dictionary.service.test.ts +++ b/packages/node/src/indexer/dictionary.service.test.ts @@ -177,12 +177,4 @@ describe('DictionaryService', () => { ); expect(dic.batchBlocks[dic.batchBlocks.length - 1]).toBe(333524); }, 500000); - - it('should return all specVersion', async () => { - const project = testSubqueryProject(); - const dictionaryService = new DictionaryService(project); - - const specVersions = await dictionaryService.getSpecVersion(); - console.log(specVersions); - }, 500000); }); diff --git a/packages/node/src/indexer/dictionary.service.ts b/packages/node/src/indexer/dictionary.service.ts index 1a829c4e5c..a9c7521a80 100644 --- a/packages/node/src/indexer/dictionary.service.ts +++ b/packages/node/src/indexer/dictionary.service.ts @@ -20,12 +20,6 @@ import { getLogger } from '../utils/logger'; import { profiler } from '../utils/profiler'; import { getYargsOption } from '../yargs'; -export type SpecVersion = { - id: string; - start: number; //start with this block - end: number; -}; - export type Dictionary = { _metadata: MetaData; batchBlocks: number[]; @@ -163,7 +157,6 @@ export class DictionaryService implements OnApplicationShutdown { variables, }); const blockHeightSet = new Set(); - const specVersionBlockHeightSet = new Set(); const entityEndBlock: { [entity: string]: number } = {}; for (const entity of Object.keys(resp.data)) { if (entity !== '_metadata' && resp.data[entity].nodes.length >= 0) { @@ -173,11 +166,6 @@ export class DictionaryService implements OnApplicationShutdown { } } } - if (resp.data.specVersions && resp.data.specVersions.nodes.length >= 0) { - for (const node of resp.data.specVersions.nodes) { - specVersionBlockHeightSet.add(Number(node.blockHeight)); - } - } const _metadata = resp.data._metadata; const endBlock = Math.min( ...Object.values(entityEndBlock).map((height) => @@ -187,8 +175,7 @@ export class DictionaryService implements OnApplicationShutdown { const batchBlocks = Array.from(blockHeightSet) .filter((block) => block <= endBlock) .sort((n1, n2) => n1 - n2); - //TODO - // const specVersions = Array.from(specVersionBlockHeightSet); + return { _metadata, batchBlocks, @@ -234,61 +221,4 @@ export class DictionaryService implements OnApplicationShutdown { } return buildQuery(vars, nodes); } - - async getSpecVersion(): Promise { - const { query } = this.specVersionQuery(); - try { - const resp = await this.client.query({ - query: gql(query), - }); - const specVersionBlockHeightSet = new Set(); - const _metadata = resp.data._metadata; - const specVersions = resp.data.specVersions.nodes; - - if (specVersions && specVersions.length >= 0) { - // Add range for the last specVersion - if (_metadata.lastProcessedHeight) { - specVersionBlockHeightSet.add({ - id: specVersions[specVersions.length - 1].id, - start: Number(specVersions[specVersions.length - 1].blockHeight), - end: Number(_metadata.lastProcessedHeight), - }); - } - // Add range for -1 specVersions - for (let i = 0; i < resp.data.specVersions.nodes.length - 1; i++) { - specVersionBlockHeightSet.add({ - id: specVersions[i].id, - start: Number(specVersions[i].blockHeight), - end: Number(specVersions[i + 1].blockHeight) - 1, - }); - } - } - return Array.from(specVersionBlockHeightSet); - } catch (err) { - logger.warn(err, `failed to fetch specVersion result`); - return undefined; - } - } - - private specVersionQuery(): GqlQuery { - const nodes: GqlNode[] = [ - { - entity: '_metadata', - project: ['lastProcessedHeight', 'genesisHash'], - }, - { - entity: 'specVersions', - project: [ - { - entity: 'nodes', - project: ['id', 'blockHeight'], - }, - ], - args: { - orderBy: 'BLOCK_HEIGHT_ASC', - }, - }, - ]; - return buildQuery([], nodes); - } } diff --git a/packages/node/src/indexer/fetch.service.ts b/packages/node/src/indexer/fetch.service.ts index 5470d58fea..4502e1d53d 100644 --- a/packages/node/src/indexer/fetch.service.ts +++ b/packages/node/src/indexer/fetch.service.ts @@ -19,7 +19,6 @@ import { } from '@subql/common-cosmos'; import { DictionaryQueryEntry, - SubqlCosmosCustomHandler, DictionaryQueryCondition, SubqlCosmosEventHandler, SubqlCosmosMessageHandler, @@ -37,11 +36,7 @@ import { delay } from '../utils/promise'; import { getYargsOption } from '../yargs'; import { ApiService, CosmosClient } from './api.service'; import { BlockedQueue } from './BlockedQueue'; -import { - Dictionary, - DictionaryService, - SpecVersion, -} from './dictionary.service'; +import { Dictionary, DictionaryService } from './dictionary.service'; import { DsProcessorService } from './ds-processor.service'; import { DynamicDsService } from './dynamic-ds.service'; import { IndexerEvent } from './events'; @@ -54,7 +49,6 @@ const CHECK_MEMORY_INTERVAL = 60000; const HIGH_THRESHOLD = 0.85; const LOW_THRESHOLD = 0.6; const MINIMUM_BATCH_SIZE = 5; -const SPEC_VERSION_BLOCK_GAP = 100; const { argv } = getYargsOption(); @@ -159,12 +153,9 @@ export class FetchService implements OnApplicationShutdown { private blockBuffer: BlockedQueue; private blockNumberBuffer: BlockedQueue; private isShutdown = false; - private parentSpecVersion: number; private useDictionary: boolean; private dictionaryQueryEntries?: DictionaryQueryEntry[]; private batchSizeScale: number; - private specVersionMap: SpecVersion[]; - private currentRuntimeVersion: RuntimeVersion; private templateDynamicDatasouces: SubqlProjectDs[]; constructor( @@ -355,10 +346,14 @@ export class FetchService implements OnApplicationShutdown { let startBlockHeight: number; let scaledBatchSize: number; - while (!this.isShutdown) { - startBlockHeight = this.latestBufferedHeight + const getStartBlockHeight = (): number => { + return this.latestBufferedHeight ? this.latestBufferedHeight + 1 : initBlockHeight; + }; + + while (!this.isShutdown) { + startBlockHeight = getStartBlockHeight(); scaledBatchSize = Math.max( Math.round(this.batchSizeScale * this.nodeConfig.batchSize), @@ -381,6 +376,14 @@ export class FetchService implements OnApplicationShutdown { scaledBatchSize, this.dictionaryQueryEntries, ); + + if (startBlockHeight !== getStartBlockHeight()) { + logger.debug( + `Queue was reset for new DS, discarding dictionary query result`, + ); + continue; + } + if ( dictionary && (await this.dictionaryValidation(dictionary, startBlockHeight)) @@ -430,6 +433,9 @@ export class FetchService implements OnApplicationShutdown { continue; } + // Used to compare before and after as a way to check if new DS created + const bufferedHeight = this.latestBufferedHeight; + const bufferBlocks = await this.blockNumberBuffer.takeAll(takeCount); const blocks = await fetchBlocksBatches(this.api, bufferBlocks); logger.info( @@ -437,6 +443,11 @@ export class FetchService implements OnApplicationShutdown { bufferBlocks[bufferBlocks.length - 1] }], total ${bufferBlocks.length} blocks`, ); + + if (bufferedHeight > this.latestBufferedHeight) { + logger.debug(`Queue was reset for new DS, discarding fetched blocks`); + continue; + } this.blockBuffer.putAll(blocks); this.eventEmitter.emit(IndexerEvent.BlockQueueSize, { value: this.blockBuffer.size, diff --git a/packages/node/src/indexer/indexer.manager.ts b/packages/node/src/indexer/indexer.manager.ts index 7ddbdf05a1..2a9a24050c 100644 --- a/packages/node/src/indexer/indexer.manager.ts +++ b/packages/node/src/indexer/indexer.manager.ts @@ -57,7 +57,6 @@ const { argv } = getYargsOption(); export class IndexerManager { private api: CosmosClient; private filteredDataSources: SubqlProjectDs[]; - private blockOffset: number; constructor( private storeService: StoreService, @@ -137,7 +136,7 @@ export class IndexerManager { const operationHash = this.storeService.getOperationMerkleRoot(); if ( !u8aEq(operationHash, NULL_MERKEL_ROOT) && - this.blockOffset === undefined + this.projectService.blockOffset === undefined ) { await this.projectService.upsertMetadataBlockOffset( blockHeight - 1, diff --git a/packages/node/src/indexer/project.service.ts b/packages/node/src/indexer/project.service.ts index b1470a1a7a..ce4f76abdd 100644 --- a/packages/node/src/indexer/project.service.ts +++ b/packages/node/src/indexer/project.service.ts @@ -33,6 +33,7 @@ export class ProjectService { private _schema: string; private metadataRepo: MetadataRepo; private _startHeight: number; + private _blockOffset: number; constructor( private readonly dsProcessorService: DsProcessorService, @@ -52,6 +53,10 @@ export class ProjectService { return this._schema; } + get blockOffset(): number { + return this._blockOffset; + } + get startHeight(): number { return this._startHeight; } @@ -247,7 +252,7 @@ export class ProjectService { await this.metadataRepo.upsert( { key: 'blockOffset', - value: height - 1, + value: height, }, { transaction: tx }, ); @@ -288,9 +293,10 @@ export class ProjectService { return startHeight; } - // FIXME Dedupe with indexermanager + // FIXME Dedupe with indexer manager setBlockOffset(offset: number): void { - logger.info(`set blockoffset to ${offset}`); + logger.info(`set blockOffset to ${offset}`); + this._blockOffset = offset; void this.mmrService .syncFileBaseFromPoi(this.schema, offset) .catch((err) => {