Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit and offset options to store.getByField #1259

Merged
merged 2 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 41 additions & 50 deletions packages/node-core/src/indexer/store.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

import assert from 'assert';
import { isMainThread } from 'worker_threads';
import {isMainThread} from 'worker_threads';
import {Injectable} from '@nestjs/common';
import {hexToU8a, u8aToBuffer} from '@polkadot/util';
import {blake2AsHex} from '@polkadot/util-crypto';
Expand Down Expand Up @@ -102,7 +102,7 @@ export class StoreService {
async incrementBlockCount(tx: Transaction): Promise<void> {
await this.sequelize.query(
`UPDATE "${this.schema}"._metadata SET value = (COALESCE(value->0):: int + 1)::text::jsonb WHERE key ='processedBlockCount'`,
{ transaction: tx },
{transaction: tx}
);
}

Expand Down Expand Up @@ -260,7 +260,7 @@ export class StoreService {
// this will allow alter current entity, including fields
// TODO, add rules for changes, eg only allow add nullable field
// Only allow altering the tables on the main thread
await this.sequelize.sync({ alter: { drop: isMainThread } });
await this.sequelize.sync({alter: {drop: isMainThread}});
await this.setMetadata('historicalStateEnabled', this.historical);
for (const query of extraQueries) {
await this.sequelize.query(query);
Expand Down Expand Up @@ -364,10 +364,7 @@ export class StoreService {
});
}

validateNotifyTriggers(
triggerName: string,
triggers: NotifyTriggerPayload[],
): void {
validateNotifyTriggers(triggerName: string, triggers: NotifyTriggerPayload[]): void {
if (triggers.length !== NotifyTriggerManipulationType.length) {
throw new Error(
`Found ${triggers.length} ${triggerName} triggers, expected ${NotifyTriggerManipulationType.length} triggers `
Expand Down Expand Up @@ -490,10 +487,7 @@ group by
);
}

async rewind(
targetBlockHeight: number,
transaction: Transaction,
): Promise<void> {
async rewind(targetBlockHeight: number, transaction: Transaction): Promise<void> {
for (const model of Object.values(this.sequelize.models)) {
if ('__block_range' in model.getAttributes()) {
await model.destroy({
Expand All @@ -502,15 +496,15 @@ group by
where: this.sequelize.where(
this.sequelize.fn('lower', this.sequelize.col('_block_range')),
Op.gt,
targetBlockHeight,
targetBlockHeight
),
});
await model.update(
{
__block_range: this.sequelize.fn(
'int8range',
this.sequelize.fn('lower', this.sequelize.col('_block_range')),
null,
null
),
},
{
Expand All @@ -521,7 +515,7 @@ group by
[Op.contains]: targetBlockHeight,
},
},
},
}
);
}
}
Expand All @@ -546,20 +540,28 @@ group by

getStore(): Store {
return {
get: async (entity: string, id: string): Promise<Entity | undefined> => {
get: async <T extends Entity>(entity: string, id: string): Promise<T | undefined> => {
try {
const model = this.sequelize.model(entity);
assert(model, `model ${entity} not exists`);
const record = await model.findOne({
where: {id},
transaction: this.tx,
});
return record?.toJSON() as Entity;
return record?.toJSON() as T;
} catch (e) {
throw new Error(`Failed to get Entity ${entity} with id ${id}: ${e}`);
}
},
getByField: async (entity: string, field: string, value): Promise<Entity[] | undefined> => {
getByField: async <T extends Entity>(
entity: string,
field: keyof T,
value: T[keyof T] | T[keyof T][],
options?: {
offset?: number;
limit?: number;
}
): Promise<T[] | undefined> => {
try {
const model = this.sequelize.model(entity);
assert(model, `model ${entity} not exists`);
Expand All @@ -572,14 +574,19 @@ group by
const records = await model.findAll({
where: {[field]: value},
transaction: this.tx,
limit: this.config.queryLimit,
limit: options?.limit ?? this.config.queryLimit,
offset: options?.offset,
});
return records.map((record) => record.toJSON() as Entity);
return records.map((record) => record.toJSON() as T);
} catch (e) {
throw new Error(`Failed to getByField Entity ${entity} with field ${field}: ${e}`);
}
},
getOneByField: async (entity: string, field: string, value): Promise<Entity | undefined> => {
getOneByField: async <T extends Entity>(
entity: string,
field: keyof T,
value: T[keyof T]
): Promise<T | undefined> => {
try {
const model = this.sequelize.model(entity);
assert(model, `model ${entity} not exists`);
Expand All @@ -595,7 +602,7 @@ group by
where: {[field]: value},
transaction: this.tx,
});
return record?.toJSON() as Entity;
return record?.toJSON() as T;
} catch (e) {
throw new Error(`Failed to getOneByField Entity ${entity} with field ${field}: ${e}`);
}
Expand Down Expand Up @@ -649,44 +656,34 @@ group by
}
},

bulkUpdate: async (
entity: string,
data: Entity[],
fields?: string[],
): Promise<void> => {
bulkUpdate: async (entity: string, data: Entity[], fields?: string[]): Promise<void> => {
try {
const model = this.sequelize.model(entity);
assert(model, `model ${entity} not exists`);
if (this.historical) {
if (fields.length !== 0) {
logger.warn(
`Update specified fields with historical feature is not supported`,
);
logger.warn(`Update specified fields with historical feature is not supported`);
}
const newRecordAttributes: CreationAttributes<Model>[] = [];
await Promise.all(
data.map(async (record) => {
const attributes =
record as unknown as CreationAttributes<Model>;
const attributes = record as unknown as CreationAttributes<Model>;
const [updatedRows] = await model.update(attributes, {
hooks: false,
transaction: this.tx,
where: this.sequelize.and(
{ id: record.id },
{id: record.id},
this.sequelize.where(
this.sequelize.fn(
'lower',
this.sequelize.col('_block_range'),
),
this.blockHeight,
),
this.sequelize.fn('lower', this.sequelize.col('_block_range')),
this.blockHeight
)
),
});
if (updatedRows < 1) {
await this.markAsDeleted(model, record.id);
newRecordAttributes.push(attributes);
}
}),
})
);
if (newRecordAttributes.length !== 0) {
await model.bulkCreate(newRecordAttributes, {
Expand All @@ -695,17 +692,11 @@ group by
}
} else {
const modelFields =
fields ??
Object.keys(model.getAttributes()).filter(
(item) => !KEY_FIELDS.includes(item),
);
await model.bulkCreate(
data as unknown as CreationAttributes<Model>[],
{
transaction: this.tx,
updateOnDuplicate: modelFields,
},
);
fields ?? Object.keys(model.getAttributes()).filter((item) => !KEY_FIELDS.includes(item));
await model.bulkCreate(data as unknown as CreationAttributes<Model>[], {
transaction: this.tx,
updateOnDuplicate: modelFields,
});
}
if (this.config.proofOfIndex) {
for (const item of data) {
Expand Down
6 changes: 6 additions & 0 deletions packages/node-core/src/yargs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ export function getYargsOption() {
describe: 'Number of worker threads to use for fetching and processing blocks. Disabled by default.',
type: 'number',
},
'query-limit': {
demandOption: false,
describe: 'The limit of items a project can query with store.getByField at once',
type: 'number',
default: 100,
},
});
}

Expand Down
4 changes: 2 additions & 2 deletions packages/types/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import {AnyTuple, Codec} from '@polkadot/types-codec/types';
import {GenericExtrinsic} from '@polkadot/types/extrinsic';
import {EventRecord, SignedBlock, Extrinsic} from '@polkadot/types/interfaces';
import {EventRecord, SignedBlock} from '@polkadot/types/interfaces';
import {IEvent} from '@polkadot/types/types';

export interface Entity {
Expand All @@ -16,7 +16,7 @@ export type FunctionPropertyNames<T> = {

export interface Store {
get(entity: string, id: string): Promise<Entity | null>;
getByField(entity: string, field: string, value: any): Promise<Entity[]>;
getByField(entity: string, field: string, value: any, options?: {offset?: number; limit?: number}): Promise<Entity[]>;
getOneByField(entity: string, field: string, value: any): Promise<Entity | null>;
set(entity: string, id: string, data: Entity): Promise<void>;
bulkCreate(entity: string, data: Entity[]): Promise<void>;
Expand Down