Skip to content

Commit

Permalink
Sync 20230303 (#15)
Browse files Browse the repository at this point in the history
* Add support for near projects to the CLI (#1498)

* Add support for near projects to the CLI

* Update common near version

* Update near packages

* [SKIP CI] Prerelease

* Arranges commands in alphabetical order (#1495)

* [SKIP CI] Prerelease

* user process.env.USERPROFILE for windows platform (#1502)

* [SKIP CI] Prerelease

* add blockTime to NodeConfig (#1501)

* [SKIP CI] Prerelease

* [release] 20230131 (#1503)

* Worker use runtime service (#1491)

* draft

* update

* tidy up

* update

* working worker runtimes

* into base class

* clean up

* improve base on review

* [SKIP CI] Prerelease

* Check queue for lower blocks before processing fetched blocks (#1509)

* [SKIP CI] Prerelease

* Support array type in dictionary queries (#1510)

* [SKIP CI] Prerelease

* add flag query-limit (#1514)

* [SKIP CI] Prerelease

* Fix error with tempDsRecords being undefined (#1516)

* Fix error with tempDsRecords being undefined

* Fix schema not being used for metadata, sync dynamic-ds across workers

* Be smarter about reloading dynamic ds in workers

* [SKIP CI] Prerelease

* Update postgraphile enable table partitions (#1520)

* [SKIP CI] Prerelease

* `dictionary-optimisation` flag  (#1519)

* add flag, to orderby blockheight instead of pk

* add comment

* [SKIP CI] Prerelease

* [WIP] Feature/adding postgres ssl connection option (#1513)

* enable ssl option postgres

* Check DB_SSL Env Value , default is false

* adding ENV to pass in the Path to certificates

* adding SSL option for query pacakge.

* use args to pass the postgres ssl informations.

* load file content moved to comman package.

* Update packages/node-core/src/configure/NodeConfig.ts

---------

Co-authored-by: Scott Twiname <skott.twiname@gmail.com>

* [SKIP CI] Prerelease

* Support Cockroach db (#1521)

* support cockroach db

* Update comment

* [SKIP CI] Prerelease

* ensure pgpool connection work with ssl (#1525)

* [SKIP CI] Prerelease

* Fix issue when use cockroach query service not support pgPartition (#1526)

* [SKIP CI] Prerelease

* [release] 20230221 (#1528)

* fix issue store getByField limit could potentially excess config limit (#1529)

* fix issue store getByField limit could potentially excess config limit

* Update packages/node-core/src/indexer/store.service.ts

Co-authored-by: Scott Twiname <skott.twiname@gmail.com>

* typo

---------

Co-authored-by: Scott Twiname <skott.twiname@gmail.com>

* [SKIP CI] Prerelease

* Abi codegen (#1532)

* draft

* fix abi name, add support for transaction

* Update packages/cli/src/template/abi-interface.ts.ejs

Co-authored-by: Scott Twiname <skott.twiname@gmail.com>

* fix

* fix dependencies

---------

Co-authored-by: Scott Twiname <skott.twiname@gmail.com>

* [SKIP CI] Prerelease

* Move enum under schema (#1527)

* need fix query get enum

* fix enum query, clean all enums

* add logic to handle original enum type

* update sequelize version

Co-authored-by: Scott Twiname <skott.twiname@gmail.com>

* lock @subql/utils version

---------

Co-authored-by: Scott Twiname <skott.twiname@gmail.com>
Co-authored-by: seandotau <seandotau@gmail.com>
Co-authored-by: Jay Ji <jiqiang90@hotmail.com>
Co-authored-by: Ben <89335033+bz888@users.noreply.github.com>
Co-authored-by: Darwin Subramaniam <darwinsubramaniam@googlemail.com>
  • Loading branch information
6 people authored Mar 3, 2023
1 parent 050e99f commit 39f5752
Show file tree
Hide file tree
Showing 12 changed files with 972 additions and 889 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
},
"resolutions": {
"@polkadot/util": "10.2.3",
"node-fetch": "2.6.7"
"node-fetch": "2.6.7",
"sequelize": "6.28.0"
},
"scripts": {
"build": "yarn workspaces foreach -ptA run build",
Expand Down
11 changes: 11 additions & 0 deletions packages/node/docker/cockroach-db/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: '3.5'

services:
crdb:
image: cockroachdb/cockroach:latest-v22.1
ports:
- '26257:26257'
- '8080:8080'
command: start-single-node --insecure
volumes:
- '${PWD}/cockroach-data/crdb:/cockroach/cockroach-data'
6 changes: 3 additions & 3 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
"@nestjs/schedule": "^1.0.2",
"@subql/common": "latest",
"@subql/common-near": "workspace:*",
"@subql/node-core": "^1.8.0",
"@subql/node-core": "^1.9.0",
"@subql/types-near": "workspace:*",
"@subql/utils": "latest",
"@subql/utils": "^1.4.1",
"@subql/x-merkle-mountain-range": "2.0.0-0.1.2",
"@willsoto/nestjs-prometheus": "^4.4.0",
"app-module-path": "^2.2.0",
Expand All @@ -45,7 +45,7 @@
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
"rxjs": "^7.5.2",
"sequelize": "6.23.0",
"sequelize": "6.28.0",
"tar": "^6.1.11",
"typescript": "^4.4.4",
"vm2": "^3.9.9",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,16 @@ export class BlockDispatcherService
blockNums,
);

if (bufferedHeight > this._latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
// Check if the queues have been flushed between queue.takeMany and fetchBlocksBatches resolving
// Peeking the queue is because the latestBufferedHeight could have regrown since fetching block
if (
bufferedHeight > this._latestBufferedHeight ||
this.queue.peek() < Math.min(...blockNums)
) {
logger.info(`Queue was reset for new DS, discarding fetched blocks`);
continue;
}

const blockTasks = blocks.map((block) => async () => {
const height = block.block.header.height;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
NumFetchedBlocks,
NumFetchingBlocks,
GetWorkerStatus,
ReloadDynamicDs,
} from '../worker/worker';
import { BaseBlockDispatcher } from './base-block-dispatcher';

Expand All @@ -34,6 +35,7 @@ type IIndexerWorker = {
numFetchedBlocks: NumFetchedBlocks;
numFetchingBlocks: NumFetchingBlocks;
getStatus: GetWorkerStatus;
reloadDynamicDs: ReloadDynamicDs;
};

type IInitIndexerWorker = IIndexerWorker & {
Expand All @@ -54,6 +56,7 @@ async function createIndexerWorker(): Promise<IndexerWorker> {
'numFetchedBlocks',
'numFetchingBlocks',
'getStatus',
'reloadDynamicDs',
],
);

Expand Down Expand Up @@ -161,12 +164,11 @@ export class WorkerBlockDispatcherService

// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this.latestBufferedHeight;
const pendingBlock = worker.fetchBlock(height);

const processBlock = async () => {
try {
const start = new Date();
const result = await pendingBlock;
await worker.fetchBlock(height);
const end = new Date();

if (bufferedHeight > this.latestBufferedHeight) {
Expand All @@ -187,10 +189,6 @@ export class WorkerBlockDispatcherService
);
}

// logger.info(
// `worker ${workerIdx} processing block ${height}, fetched blocks: ${await worker.numFetchedBlocks()}, fetching blocks: ${await worker.numFetchingBlocks()}`,
// );

this.preProcessBlock(height);

const { dynamicDsCreated, operationHash, reindexBlockHeight } =
Expand All @@ -201,6 +199,11 @@ export class WorkerBlockDispatcherService
operationHash: Buffer.from(operationHash, 'base64'),
reindexBlockHeight,
});

if (dynamicDsCreated) {
// Ensure all workers are aware of all dynamic ds
await Promise.all(this.workers.map((w) => w.reloadDynamicDs()));
}
} catch (e) {
logger.error(
e,
Expand Down
46 changes: 30 additions & 16 deletions packages/node/src/indexer/dynamic-ds.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

import assert from 'assert';
import { isMainThread } from 'worker_threads';
import { Inject, Injectable } from '@nestjs/common';
import { isCustomDs, isRuntimeDs } from '@subql/common-near';
import { getLogger, MetadataRepo } from '@subql/node-core';
Expand Down Expand Up @@ -75,24 +76,41 @@ export class DynamicDsService {
}

async getDynamicDatasources(): Promise<SubqlProjectDs[]> {
// Workers should not cache this result in order to keep in sync
if (!this._datasources) {
try {
const params = await this.getDynamicDatasourceParams();

this._datasources = await Promise.all(
params.map((params) => this.getDatasource(params)),
);
} catch (e) {
logger.error(e, `Unable to get dynamic datasources`);
process.exit(1);
}
this._datasources = await this.loadDynamicDatasources();
}

return this._datasources;
}

deleteTempDsRecords(blockHeight: number) {
delete this.tempDsRecords[TEMP_DS_PREFIX + blockHeight];
// This is used to sync between worker threads
async reloadDynamicDatasources(): Promise<void> {
this._datasources = await this.loadDynamicDatasources();
}

private async loadDynamicDatasources(): Promise<SubqlProjectDs[]> {
try {
const params = await this.getDynamicDatasourceParams();

const dataSources = await Promise.all(
params.map((params) => this.getDatasource(params)),
);

logger.info(`Loaded ${dataSources.length} dynamic datasources`);

return dataSources;
} catch (e) {
logger.error(`Unable to get dynamic datasources:\n${e.message}`);
process.exit(1);
}
}

deleteTempDsRecords(blockHeight: number): void {
// Main thread will not have tempDsRecords with workers
if (this.tempDsRecords) {
delete this.tempDsRecords[TEMP_DS_PREFIX + blockHeight];
}
}

private async getDynamicDatasourceParams(
Expand Down Expand Up @@ -153,10 +171,6 @@ export class DynamicDsService {
);
}

logger.info(
`Initialised dynamic datasource from template: "${params.templateName}"`,
);

const dsObj = {
...template,
startBlock: params.startBlock,
Expand Down
4 changes: 2 additions & 2 deletions packages/node/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export class ProjectService {

this._startHeight = await this.getStartHeight();
} else {
this._schema = await this.getExistingProjectSchema();
this.metadataRepo = await MetadataFactory(
this.sequelize,
this.schema,
Expand All @@ -127,7 +128,6 @@ export class ProjectService {

await this.sequelize.sync();

this._schema = await this.getExistingProjectSchema();
assert(this._schema, 'Schema should be created in main thread');
await this.initDbSchema();

Expand Down Expand Up @@ -225,7 +225,7 @@ export class ProjectService {
const keyValue = entries.reduce((arr, curr) => {
arr[curr.key] = curr.value;
return arr;
}, {} as { [key in typeof keys[number]]: string | boolean | number });
}, {} as { [key in (typeof keys)[number]]: string | boolean | number });

const { chain, genesisHash, specName } = this.apiService.networkMeta;

Expand Down
3 changes: 1 addition & 2 deletions packages/node/src/indexer/worker/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import { threadId } from 'node:worker_threads';
import { Injectable } from '@nestjs/common';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import { NodeConfig, getLogger, AutoQueue } from '@subql/node-core';
import { fetchBlocksBatches } from '../../utils/near';
import { ApiService } from '../api.service';
import { SpecVersion } from '../dictionary.service';
import { IndexerManager } from '../indexer.manager';
import { BlockContent } from '../types';

Expand Down Expand Up @@ -56,7 +56,6 @@ export class WorkerService {
}

const block = this.fetchedBlocks[height];

// Return info to get the runtime version, this lets the worker thread know
return undefined;
});
Expand Down
22 changes: 19 additions & 3 deletions packages/node/src/indexer/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import { threadId } from 'node:worker_threads';
import { INestApplication } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { registerWorker, getLogger, NestLogger } from '@subql/node-core';
import { SpecVersion } from '../dictionary.service';
import { DynamicDsService } from '../dynamic-ds.service';
import { IndexerManager } from '../indexer.manager';
import { WorkerModule } from './worker.module';
import {
Expand All @@ -29,9 +31,9 @@ import {
WorkerService,
WorkerStatusResponse,
} from './worker.service';

let app: INestApplication;
let workerService: WorkerService;
let dynamicDsService: DynamicDsService;

const logger = getLogger(`worker #${threadId}`);

Expand All @@ -53,6 +55,7 @@ async function initWorker(): Promise<void> {
await indexerManager.start();

workerService = app.get(WorkerService);
dynamicDsService = app.get(DynamicDsService);
} catch (e) {
console.log('Failed to start worker', e);
logger.error(e, 'Failed to start worker');
Expand All @@ -62,14 +65,21 @@ async function initWorker(): Promise<void> {

async function fetchBlock(height: number): Promise<FetchBlockResponse> {
assert(workerService, 'Not initialised');

return workerService.fetchBlock(height);
}

async function processBlock(height: number): Promise<ProcessBlockResponse> {
assert(workerService, 'Not initialised');

return workerService.processBlock(height);
const res = await workerService.processBlock(height);

// Clean up the temp ds records for worker thread instance
if (res.dynamicDsCreated) {
const dynamicDsService = app.get(DynamicDsService);
dynamicDsService.deleteTempDsRecords(height);
}

return res;
}

// eslint-disable-next-line @typescript-eslint/require-await
Expand All @@ -92,6 +102,10 @@ async function getStatus(): Promise<WorkerStatusResponse> {
};
}

async function reloadDynamicDs(): Promise<void> {
return dynamicDsService.reloadDynamicDatasources();
}

// Register these functions to be exposed to worker host
registerWorker({
initWorker,
Expand All @@ -100,6 +114,7 @@ registerWorker({
numFetchedBlocks,
numFetchingBlocks,
getStatus,
reloadDynamicDs,
});

// Export types to be used on the parent
Expand All @@ -109,6 +124,7 @@ export type ProcessBlock = typeof processBlock;
export type NumFetchedBlocks = typeof numFetchedBlocks;
export type NumFetchingBlocks = typeof numFetchingBlocks;
export type GetWorkerStatus = typeof getStatus;
export type ReloadDynamicDs = typeof reloadDynamicDs;

process.on('uncaughtException', (e) => {
logger.error(e, 'Uncaught Exception');
Expand Down
3 changes: 2 additions & 1 deletion packages/node/src/subcommands/forceClean.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ export class ForceCleanService {
benchmark: false,
});

// drop all related enums
// TODO, remove this soon, once original enum are cleaned
// Deprecate, now enums are moved under schema, drop schema will remove project enums
await Promise.all(
modelsRelation.enums.map(async (e) => {
const enumTypeName = `${schema}_enum_${enumNameToHash(e.name)}`;
Expand Down
Loading

0 comments on commit 39f5752

Please sign in to comment.