Skip to content

Commit

Permalink
Logging improvements, fix init poi service with threads
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Jul 11, 2022
1 parent a147ab7 commit d88a16b
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 45 deletions.
4 changes: 2 additions & 2 deletions packages/node/src/indexer/mmr.service.ts
Expand Up @@ -19,7 +19,7 @@ const logger = getLogger('mmr');

const DEFAULT_FETCH_RANGE = 100;

const keccak256Hash = (...nodeValues) =>
const keccak256Hash = (...nodeValues: Uint8Array[]) =>
Buffer.from(keccak256(Buffer.concat(nodeValues)), 'hex');

@Injectable()
Expand Down Expand Up @@ -125,7 +125,7 @@ export class MmrService implements OnApplicationShutdown {
this.nextMmrBlockHeight = poiBlock.id + 1;
}

validatePoiMmr(poiWithMmr: ProofOfIndex, mmrValue: Uint8Array) {
validatePoiMmr(poiWithMmr: ProofOfIndex, mmrValue: Uint8Array): void {
if (!u8aEq(poiWithMmr.mmrRoot, mmrValue)) {
throw new Error(
`Poi block height ${poiWithMmr.id}, Poi mmr ${u8aToHex(
Expand Down
5 changes: 3 additions & 2 deletions packages/node/src/indexer/poi.service.ts
@@ -1,12 +1,13 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { isMainThread } from 'node:worker_threads';
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { hexToU8a } from '@polkadot/util';
import { Sequelize } from 'sequelize';
import { NodeConfig } from '../configure/NodeConfig';
import { SubqueryProject } from '../configure/SubqueryProject';
import { PoiFactory, PoiRepo, ProofOfIndex } from './entities/Poi.entity';
import { PoiFactory, PoiRepo } from './entities/Poi.entity';

const DEFAULT_PARENT_HASH = hexToU8a('0x00');

Expand Down Expand Up @@ -47,7 +48,7 @@ export class PoiService implements OnApplicationShutdown {
}

async getLatestPoiBlockHash(): Promise<Uint8Array | null> {
if (!this.latestPoiBlockHash) {
if (!this.latestPoiBlockHash || !isMainThread) {
const poiBlockHash = await this.fetchPoiBlockHashFromDb();
if (poiBlockHash === null || poiBlockHash === undefined) {
this.latestPoiBlockHash = DEFAULT_PARENT_HASH;
Expand Down
5 changes: 5 additions & 0 deletions packages/node/src/indexer/project.service.ts
Expand Up @@ -88,9 +88,14 @@ export class ProjectService {
this.dynamicDsService.init(this.metadataRepo);

await this.sequelize.sync();

this._schema = await this.getExistingProjectSchema();
assert(this._schema, 'Schema should be created in main thread');
await this.initDbSchema();

if (this.nodeConfig.proofOfIndex) {
await this.poiService.init(this.schema);
}
}
}

Expand Down
89 changes: 51 additions & 38 deletions packages/node/src/indexer/worker/worker.service.ts
@@ -1,10 +1,12 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { threadId } from 'node:worker_threads';
import { Injectable } from '@nestjs/common';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import { NodeConfig } from '../../configure/NodeConfig';
import { AutoQueue } from '../../utils/autoQueue';
import { getLogger } from '../../utils/logger';
import { fetchBlocksBatches } from '../../utils/substrate';
import { ApiService } from '../api.service';
import { IndexerManager } from '../indexer.manager';
Expand All @@ -25,6 +27,8 @@ export type WorkerStatusResponse = {
toFetchBlocks: number;
};

const logger = getLogger(`Worker Service #${threadId}`);

@Injectable()
export class WorkerService {
private fetchedBlocks: Record<string, BlockContent> = {};
Expand All @@ -36,37 +40,41 @@ export class WorkerService {
constructor(
private apiService: ApiService,
private indexerManager: IndexerManager,
private nodeConfig: NodeConfig,
nodeConfig: NodeConfig,
) {
this.queue = new AutoQueue(undefined, nodeConfig.batchSize);
}

async fetchBlock(height: number): Promise<FetchBlockResponse> {
return this.queue.put(async () => {
// If a dynamic ds is created we might be asked to fetch blocks again, use existing result
if (!this.fetchedBlocks[height]) {
const [block] = await fetchBlocksBatches(this.apiService.getApi(), [
height,
]);
this.fetchedBlocks[height] = block;
}

const block = this.fetchedBlocks[height];

// We have the current version, don't need a new one when processing
if (
this.currentRuntimeVersion?.specVersion.toNumber() ===
block.block.specVersion
) {
return;
}

// Return info to get the runtime version, this lets the worker thread know
return {
specVersion: block.block.specVersion,
parentHash: block.block.block.header.parentHash.toHex(),
};
});
try {
return await this.queue.put(async () => {
// If a dynamic ds is created we might be asked to fetch blocks again, use existing result
if (!this.fetchedBlocks[height]) {
const [block] = await fetchBlocksBatches(this.apiService.getApi(), [
height,
]);
this.fetchedBlocks[height] = block;
}

const block = this.fetchedBlocks[height];

// We have the current version, don't need a new one when processing
if (
this.currentRuntimeVersion?.specVersion.toNumber() ===
block.block.specVersion
) {
return;
}

// Return info to get the runtime version, this lets the worker thread know
return {
specVersion: block.block.specVersion,
parentHash: block.block.block.header.parentHash.toHex(),
};
});
} catch (e) {
logger.error(e, `Failed to fetch block ${height}`);
}
}

setCurrentRuntimeVersion(runtimeHex: string): void {
Expand All @@ -78,22 +86,27 @@ export class WorkerService {
}

async processBlock(height: number): Promise<ProcessBlockResponse> {
this._isIndexing = true;
const block = this.fetchedBlocks[height];
try {
this._isIndexing = true;
const block = this.fetchedBlocks[height];

if (!block) {
throw new Error(`Block ${height} has not been fetched`);
}
if (!block) {
throw new Error(`Block ${height} has not been fetched`);
}

delete this.fetchedBlocks[height];
delete this.fetchedBlocks[height];

const response = await this.indexerManager.indexBlock(
block,
this.currentRuntimeVersion,
);
const response = await this.indexerManager.indexBlock(
block,
this.currentRuntimeVersion,
);

this._isIndexing = false;
return response;
this._isIndexing = false;
return response;
} catch (e) {
logger.error(e, `Failed to index block ${height}: ${e.stack}`);
throw e;
}
}

get numFetchedBlocks(): number {
Expand Down
6 changes: 3 additions & 3 deletions packages/node/src/utils/logger.ts
Expand Up @@ -30,19 +30,19 @@ export class NestLogger implements LoggerService {
`nestjs${isMainThread ? '-0' : `-#${threadId}`}`,
);

error(message: any, trace?: string) {
error(message: any, trace?: string): void {
if (trace) {
this.logger.error({ trace }, message);
} else {
this.logger.error(message);
}
}

log(message: any): any {
log(message: any): void {
this.logger.info(message);
}

warn(message: any): any {
warn(message: any): void {
this.logger.warn(message);
}
}

0 comments on commit d88a16b

Please sign in to comment.