Skip to content

Commit

Permalink
Sync 20220629 (#31)
Browse files Browse the repository at this point in the history
* fix set block offset (#1133)

* [SKIP CI] Prerelease

* while hilarious, fix SupQuery » SubQuery output (#1135)

* [SKIP CI] Prerelease

* fix warning for fetch specVersion when dictionary undefined, and tidy up logs (#1127)

* [SKIP CI] Prerelease

* fix subql-query help (#1137)

* [SKIP CI] Prerelease

* Fix pending data being used when it should be reset with new DS (#1140)

* [SKIP CI] Prerelease

* Remove specVersions related code

Co-authored-by: Jay Ji <jiqiang90@hotmail.com>
Co-authored-by: Mike Purvis <1042667+mikedotexe@users.noreply.github.com>
Co-authored-by: Ben <89335033+bz888@users.noreply.github.com>
  • Loading branch information
4 people committed Jun 29, 2022
1 parent bcd2e00 commit ed24ff2
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 96 deletions.
8 changes: 0 additions & 8 deletions packages/node/src/indexer/dictionary.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,4 @@ describe('DictionaryService', () => {
);
expect(dic.batchBlocks[dic.batchBlocks.length - 1]).toBe(333524);
}, 500000);

it('should return all specVersion', async () => {
const project = testSubqueryProject();
const dictionaryService = new DictionaryService(project);

const specVersions = await dictionaryService.getSpecVersion();
console.log(specVersions);
}, 500000);
});
72 changes: 1 addition & 71 deletions packages/node/src/indexer/dictionary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import { getLogger } from '../utils/logger';
import { profiler } from '../utils/profiler';
import { getYargsOption } from '../yargs';

export type SpecVersion = {
id: string;
start: number; //start with this block
end: number;
};

export type Dictionary = {
_metadata: MetaData;
batchBlocks: number[];
Expand Down Expand Up @@ -163,7 +157,6 @@ export class DictionaryService implements OnApplicationShutdown {
variables,
});
const blockHeightSet = new Set<number>();
const specVersionBlockHeightSet = new Set<number>();
const entityEndBlock: { [entity: string]: number } = {};
for (const entity of Object.keys(resp.data)) {
if (entity !== '_metadata' && resp.data[entity].nodes.length >= 0) {
Expand All @@ -173,11 +166,6 @@ export class DictionaryService implements OnApplicationShutdown {
}
}
}
if (resp.data.specVersions && resp.data.specVersions.nodes.length >= 0) {
for (const node of resp.data.specVersions.nodes) {
specVersionBlockHeightSet.add(Number(node.blockHeight));
}
}
const _metadata = resp.data._metadata;
const endBlock = Math.min(
...Object.values(entityEndBlock).map((height) =>
Expand All @@ -187,8 +175,7 @@ export class DictionaryService implements OnApplicationShutdown {
const batchBlocks = Array.from(blockHeightSet)
.filter((block) => block <= endBlock)
.sort((n1, n2) => n1 - n2);
//TODO
// const specVersions = Array.from(specVersionBlockHeightSet);

return {
_metadata,
batchBlocks,
Expand Down Expand Up @@ -234,61 +221,4 @@ export class DictionaryService implements OnApplicationShutdown {
}
return buildQuery(vars, nodes);
}

async getSpecVersion(): Promise<SpecVersion[]> {
const { query } = this.specVersionQuery();
try {
const resp = await this.client.query({
query: gql(query),
});
const specVersionBlockHeightSet = new Set<SpecVersion>();
const _metadata = resp.data._metadata;
const specVersions = resp.data.specVersions.nodes;

if (specVersions && specVersions.length >= 0) {
// Add range for the last specVersion
if (_metadata.lastProcessedHeight) {
specVersionBlockHeightSet.add({
id: specVersions[specVersions.length - 1].id,
start: Number(specVersions[specVersions.length - 1].blockHeight),
end: Number(_metadata.lastProcessedHeight),
});
}
// Add range for -1 specVersions
for (let i = 0; i < resp.data.specVersions.nodes.length - 1; i++) {
specVersionBlockHeightSet.add({
id: specVersions[i].id,
start: Number(specVersions[i].blockHeight),
end: Number(specVersions[i + 1].blockHeight) - 1,
});
}
}
return Array.from(specVersionBlockHeightSet);
} catch (err) {
logger.warn(err, `failed to fetch specVersion result`);
return undefined;
}
}

private specVersionQuery(): GqlQuery {
const nodes: GqlNode[] = [
{
entity: '_metadata',
project: ['lastProcessedHeight', 'genesisHash'],
},
{
entity: 'specVersions',
project: [
{
entity: 'nodes',
project: ['id', 'blockHeight'],
},
],
args: {
orderBy: 'BLOCK_HEIGHT_ASC',
},
},
];
return buildQuery([], nodes);
}
}
35 changes: 23 additions & 12 deletions packages/node/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import {
} from '@subql/common-cosmos';
import {
DictionaryQueryEntry,
SubqlCosmosCustomHandler,
DictionaryQueryCondition,
SubqlCosmosEventHandler,
SubqlCosmosMessageHandler,
Expand All @@ -37,11 +36,7 @@ import { delay } from '../utils/promise';
import { getYargsOption } from '../yargs';
import { ApiService, CosmosClient } from './api.service';
import { BlockedQueue } from './BlockedQueue';
import {
Dictionary,
DictionaryService,
SpecVersion,
} from './dictionary.service';
import { Dictionary, DictionaryService } from './dictionary.service';
import { DsProcessorService } from './ds-processor.service';
import { DynamicDsService } from './dynamic-ds.service';
import { IndexerEvent } from './events';
Expand All @@ -54,7 +49,6 @@ const CHECK_MEMORY_INTERVAL = 60000;
const HIGH_THRESHOLD = 0.85;
const LOW_THRESHOLD = 0.6;
const MINIMUM_BATCH_SIZE = 5;
const SPEC_VERSION_BLOCK_GAP = 100;

const { argv } = getYargsOption();

Expand Down Expand Up @@ -159,12 +153,9 @@ export class FetchService implements OnApplicationShutdown {
private blockBuffer: BlockedQueue<BlockContent>;
private blockNumberBuffer: BlockedQueue<number>;
private isShutdown = false;
private parentSpecVersion: number;
private useDictionary: boolean;
private dictionaryQueryEntries?: DictionaryQueryEntry[];
private batchSizeScale: number;
private specVersionMap: SpecVersion[];
private currentRuntimeVersion: RuntimeVersion;
private templateDynamicDatasouces: SubqlProjectDs[];

constructor(
Expand Down Expand Up @@ -355,10 +346,14 @@ export class FetchService implements OnApplicationShutdown {
let startBlockHeight: number;
let scaledBatchSize: number;

while (!this.isShutdown) {
startBlockHeight = this.latestBufferedHeight
const getStartBlockHeight = (): number => {
return this.latestBufferedHeight
? this.latestBufferedHeight + 1
: initBlockHeight;
};

while (!this.isShutdown) {
startBlockHeight = getStartBlockHeight();

scaledBatchSize = Math.max(
Math.round(this.batchSizeScale * this.nodeConfig.batchSize),
Expand All @@ -381,6 +376,14 @@ export class FetchService implements OnApplicationShutdown {
scaledBatchSize,
this.dictionaryQueryEntries,
);

if (startBlockHeight !== getStartBlockHeight()) {
logger.debug(
`Queue was reset for new DS, discarding dictionary query result`,
);
continue;
}

if (
dictionary &&
(await this.dictionaryValidation(dictionary, startBlockHeight))
Expand Down Expand Up @@ -430,13 +433,21 @@ export class FetchService implements OnApplicationShutdown {
continue;
}

// Used to compare before and after as a way to check if new DS created
const bufferedHeight = this.latestBufferedHeight;

const bufferBlocks = await this.blockNumberBuffer.takeAll(takeCount);
const blocks = await fetchBlocksBatches(this.api, bufferBlocks);
logger.info(
`fetch block [${bufferBlocks[0]},${
bufferBlocks[bufferBlocks.length - 1]
}], total ${bufferBlocks.length} blocks`,
);

if (bufferedHeight > this.latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
continue;
}
this.blockBuffer.putAll(blocks);
this.eventEmitter.emit(IndexerEvent.BlockQueueSize, {
value: this.blockBuffer.size,
Expand Down
3 changes: 1 addition & 2 deletions packages/node/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ const { argv } = getYargsOption();
export class IndexerManager {
private api: CosmosClient;
private filteredDataSources: SubqlProjectDs[];
private blockOffset: number;

constructor(
private storeService: StoreService,
Expand Down Expand Up @@ -137,7 +136,7 @@ export class IndexerManager {
const operationHash = this.storeService.getOperationMerkleRoot();
if (
!u8aEq(operationHash, NULL_MERKEL_ROOT) &&
this.blockOffset === undefined
this.projectService.blockOffset === undefined
) {
await this.projectService.upsertMetadataBlockOffset(
blockHeight - 1,
Expand Down
12 changes: 9 additions & 3 deletions packages/node/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class ProjectService {
private _schema: string;
private metadataRepo: MetadataRepo;
private _startHeight: number;
private _blockOffset: number;

constructor(
private readonly dsProcessorService: DsProcessorService,
Expand All @@ -52,6 +53,10 @@ export class ProjectService {
return this._schema;
}

get blockOffset(): number {
return this._blockOffset;
}

get startHeight(): number {
return this._startHeight;
}
Expand Down Expand Up @@ -247,7 +252,7 @@ export class ProjectService {
await this.metadataRepo.upsert(
{
key: 'blockOffset',
value: height - 1,
value: height,
},
{ transaction: tx },
);
Expand Down Expand Up @@ -288,9 +293,10 @@ export class ProjectService {
return startHeight;
}

// FIXME Dedupe with indexermanager
// FIXME Dedupe with indexer manager
setBlockOffset(offset: number): void {
logger.info(`set blockoffset to ${offset}`);
logger.info(`set blockOffset to ${offset}`);
this._blockOffset = offset;
void this.mmrService
.syncFileBaseFromPoi(this.schema, offset)
.catch((err) => {
Expand Down

0 comments on commit ed24ff2

Please sign in to comment.