Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexer workers #1103

Merged
merged 36 commits into from Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
daa095b
POC fetching blocks in workers
stwiname May 18, 2022
2da2091
Use transferList
stwiname May 26, 2022
dedd0e3
WIP worker implementation
stwiname May 30, 2022
70f2a7e
Worker threads processing
stwiname Jun 10, 2022
139bd24
Clean up
stwiname Jun 10, 2022
f149ace
Get threaded indexing working
stwiname Jun 12, 2022
32aa255
Improve worker performance, get tests building
stwiname Jun 20, 2022
f4907c3
Fix build issues and apply dynamic ds dictionary changes
stwiname Jun 20, 2022
2168e9b
Discard pending data when dynamic ds is created
stwiname Jun 29, 2022
3a286b4
Fix rebase build issues
stwiname Jun 29, 2022
ae4de89
Fix lint error
stwiname Jun 29, 2022
25090e6
Fix test errors
stwiname Jun 29, 2022
fb79b38
Improve tests, fix bug with queue capacity
stwiname Jun 30, 2022
5cfcd97
Remove flush on shutdown
stwiname Jul 1, 2022
26c1ced
Rework queue recursion into while loops, improve error logging
stwiname Jul 1, 2022
021c706
Change aftereach order for fetch service tests
stwiname Jul 1, 2022
ed17f73
Add some batching/concurrency to queue and use for fetching blocks in…
stwiname Jul 1, 2022
a4c1f81
Attempt to fix test cleanup, code clean up
stwiname Jul 4, 2022
1897ccb
Bring back MetaModule
stwiname Jul 4, 2022
7c91204
Revert test docker compose
stwiname Jul 4, 2022
a5b5bf8
Fix error where _metadata entity not found
stwiname Jul 5, 2022
9603288
Update packages/node/src/indexer/worker/block-dispatcher.service.ts
stwiname Jul 5, 2022
a3a4eb1
Fix query tests
stwiname Jul 5, 2022
8b88a0b
Fix queue capacity bug, improve logging
stwiname Jul 6, 2022
1ff2d30
assign continuous blocks to worker (#1173)
ianhe8x Jul 8, 2022
a3a0c76
Add different worker dispatch strategies
stwiname Jul 8, 2022
18cae4d
Port over error handling changes
stwiname Jul 10, 2022
ace4fa2
Logging improvements, fix init poi service with threads
stwiname Jul 11, 2022
cd60ec3
Run MMR service loop on main thread
stwiname Jul 11, 2022
1a1e68a
Various improvements to fetch service tests
stwiname Jul 13, 2022
ddec31c
fix fetch test
jiqiang90 Jul 14, 2022
6da5590
Fix error with poi disabled and worker threads enabled
stwiname Jul 17, 2022
f56db62
Fix MMR not updating
stwiname Jul 18, 2022
610dc9b
Update changelog
stwiname Jul 20, 2022
068889c
Update workers flag description
stwiname Jul 20, 2022
61a1212
fix mmr mismatch due to fail to set blockOffset in metadata
jiqiang90 Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion jest.config.js
Expand Up @@ -170,7 +170,7 @@ module.exports = {
// ],

// The regexp pattern or array of patterns that Jest uses to detect test files
testRegex: '.*\\.(spec|test)\\.ts$',
testRegex: '.*\\.spec\\.ts$',

// This option allows the use of a custom results processor
// testResultsProcessor: undefined,
Expand Down
4 changes: 4 additions & 0 deletions packages/node/CHANGELOG.md
Expand Up @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Make handler data types generic (#1194)

### Added

- Support for worker threads. This will move block fetching and processing into a worker, this can increase performance by up to 4 times. By default this feature is disabled, you can enable it with the `--workers=<number>` flag. The number of workers will be capped to the number of CPU cores. (#1103)

## [1.5.1] - 2022-07-15
### Fixed
- Unable to create ds processor from template when project hosted on IPFS (#1190)
Expand Down
6 changes: 2 additions & 4 deletions packages/node/src/app.module.ts
Expand Up @@ -6,11 +6,9 @@ import { EventEmitterModule } from '@nestjs/event-emitter';
import { ScheduleModule } from '@nestjs/schedule';
import { ConfigureModule } from './configure/configure.module';
import { DbModule } from './db/db.module';
import { IndexerModule } from './indexer/indexer.module';
import { FetchModule } from './indexer/fetch.module';
import { MetaModule } from './meta/meta.module';

export class NodeOption {}

@Module({
imports: [
DbModule.forRoot({
Expand All @@ -23,7 +21,7 @@ export class NodeOption {}
EventEmitterModule.forRoot(),
ConfigureModule.register(),
ScheduleModule.forRoot(),
IndexerModule,
FetchModule,
MetaModule,
],
controllers: [],
Expand Down
6 changes: 6 additions & 0 deletions packages/node/src/configure/NodeConfig.ts
Expand Up @@ -32,6 +32,7 @@ export interface IConfig {
readonly mmrPath?: string;
readonly ipfs?: string;
readonly dictionaryTimeout: number;
readonly workers?: number;
}

export type MinConfig = Partial<Omit<IConfig, 'subquery'>> &
Expand Down Expand Up @@ -109,6 +110,7 @@ export class NodeConfig implements IConfig {
get debug(): boolean {
return this._config.debug;
}

get preferRange(): boolean {
return this._config.preferRange;
}
Expand Down Expand Up @@ -152,6 +154,10 @@ export class NodeConfig implements IConfig {
return this._config.dbSchema ?? this.subqueryName;
}

get workers(): number {
return this._config.workers;
}

merge(config: Partial<IConfig>): this {
assign(this._config, config);
return this;
Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/indexer/PoiBlock.ts
@@ -1,6 +1,6 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0
// eslint-disable-next-line header/header

import { u8aConcat, numberToU8a, hexToU8a, isHex, isU8a } from '@polkadot/util';
import { blake2AsU8a } from '@polkadot/util-crypto';
import { ProofOfIndex } from './entities/Poi.entity';
Expand Down
4 changes: 2 additions & 2 deletions packages/node/src/indexer/benchmark.service.ts
Expand Up @@ -66,13 +66,13 @@ export class BenchmarkService {
}

@OnEvent(IndexerEvent.BlockProcessing)
handleProcessingBlock(blockPayload: ProcessBlockPayload) {
handleProcessingBlock(blockPayload: ProcessBlockPayload): void {
this.currentProcessingHeight = blockPayload.height;
this.currentProcessingTimestamp = blockPayload.timestamp;
}

@OnEvent(IndexerEvent.BlockTarget)
handleTargetBlock(blockPayload: TargetBlockPayload) {
handleTargetBlock(blockPayload: TargetBlockPayload): void {
this.targetHeight = blockPayload.height;
}
}
4 changes: 2 additions & 2 deletions packages/node/src/indexer/dynamic-ds.service.ts
Expand Up @@ -53,7 +53,7 @@ export class DynamicDsService {

return ds;
} catch (e) {
logger.error(e.message);
logger.error(e, 'Failed to create dynamic ds');
process.exit(1);
}
}
Expand All @@ -67,7 +67,7 @@ export class DynamicDsService {
params.map((params) => this.getDatasource(params)),
);
} catch (e) {
logger.error(`Unable to get dynamic datasources:\n${e.message}`);
logger.error(e, `Unable to get dynamic datasources`);
process.exit(1);
}
}
Expand Down
109 changes: 109 additions & 0 deletions packages/node/src/indexer/fetch.module.ts
@@ -0,0 +1,109 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { Module } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { SchedulerRegistry } from '@nestjs/schedule';
import { NodeConfig } from '../configure/NodeConfig';
import { SubqueryProject } from '../configure/SubqueryProject';
import { DbModule } from '../db/db.module';
import { getYargsOption } from '../yargs';
import { ApiService } from './api.service';
import { BenchmarkService } from './benchmark.service';
import { DictionaryService } from './dictionary.service';
import { DsProcessorService } from './ds-processor.service';
import { DynamicDsService } from './dynamic-ds.service';
import { FetchService } from './fetch.service';
import { IndexerManager } from './indexer.manager';
import { MmrService } from './mmr.service';
import { PoiService } from './poi.service';
import { ProjectService } from './project.service';
import { SandboxService } from './sandbox.service';
import { StoreService } from './store.service';
import {
BlockDispatcherService,
WorkerBlockDispatcherService,
IBlockDispatcher,
} from './worker/block-dispatcher.service';

const { argv } = getYargsOption();

@Module({
imports: [DbModule.forFeature(['Subquery'])],
providers: [
StoreService,
{
provide: ApiService,
useFactory: async (
project: SubqueryProject,
eventEmitter: EventEmitter2,
) => {
const apiService = new ApiService(project, eventEmitter);
await apiService.init();
return apiService;
},
inject: [SubqueryProject, EventEmitter2],
},
IndexerManager,
{
provide: 'IBlockDispatcher',
useClass: argv.workers
? WorkerBlockDispatcherService
: BlockDispatcherService,
},
{
provide: FetchService,
useFactory: async (
apiService: ApiService,
nodeConfig: NodeConfig,
project: SubqueryProject,
blockDispatcher: IBlockDispatcher,
dictionaryService: DictionaryService,
dsProcessorService: DsProcessorService,
eventEmitter: EventEmitter2,
projectService: ProjectService,
dynamicDsService: DynamicDsService,
schedulerRegistry: SchedulerRegistry,
) => {
await projectService.init();

const fetchService = new FetchService(
apiService,
nodeConfig,
project,
blockDispatcher,
dictionaryService,
dsProcessorService,
dynamicDsService,
eventEmitter,
schedulerRegistry,
);

await fetchService.init(projectService.startHeight);
return fetchService;
},
inject: [
ApiService,
NodeConfig,
SubqueryProject,
'IBlockDispatcher',
DictionaryService,
DsProcessorService,
EventEmitter2,
ProjectService,
DynamicDsService,
SchedulerRegistry,
],
},
BenchmarkService,
DictionaryService,
SandboxService,
DsProcessorService,
DynamicDsService,
PoiService,
MmrService,
ProjectService,
],
exports: [StoreService, MmrService],
})
export class FetchModule {}