Skip to content

Commit

Permalink
perf(core): implement bulk updates in sql drivers
Browse files Browse the repository at this point in the history
Related: #732
  • Loading branch information
B4nan committed Sep 28, 2020
1 parent 8cbb22a commit b005353
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 18 deletions.
4 changes: 4 additions & 0 deletions packages/core/src/drivers/DatabaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ export abstract class DatabaseDriver<C extends Connection> implements IDatabaseD

abstract async nativeUpdate<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

async nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction): Promise<QueryResult> {
throw new Error(`Batch updates are not supported by ${this.constructor.name} driver`);
}

abstract async nativeDelete<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, ctx?: Transaction): Promise<QueryResult>;

abstract async count<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, options?: CountOptions<T>, ctx?: Transaction): Promise<number>;
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/drivers/IDatabaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export interface IDatabaseDriver<C extends Connection = Connection> {

nativeUpdate<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction): Promise<QueryResult>;

nativeDelete<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, ctx?: Transaction): Promise<QueryResult>;

syncCollection<T, O>(collection: Collection<T, O>, ctx?: Transaction): Promise<void>;
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/entity/WrappedEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ export class WrappedEntity<T extends AnyEntity<T>, PK extends keyof T> {
const value = this.entity[this.__meta.serializedPrimaryKey];

if (Utils.isEntity<T>(value)) {
return value.__helper!.__serializedPrimaryKey as string;
return value.__helper!.__serializedPrimaryKey;
}

return value as unknown as string;
return '' + value;
}

}
9 changes: 8 additions & 1 deletion packages/core/src/platforms/Platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ export abstract class Platform {
/**
* Whether or not the driver supports retuning list of created PKs back when multi-inserting
*/
returningMultiInsert(): boolean {
usesBatchInserts(): boolean {
return true;
}

/**
* Whether or not the driver supports updating many records at once
*/
usesBatchUpdates(): boolean {
return true;
}

Expand Down
49 changes: 47 additions & 2 deletions packages/core/src/unit-of-work/ChangeSetPersister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class ChangeSetPersister {
const meta = this.metadata.find(changeSets[0].name)!;
changeSets.forEach(changeSet => this.processProperties(changeSet));

if (changeSets.length > 1 && this.platform.returningMultiInsert() && this.config.get('useBatchInserts')) {
if (changeSets.length > 1 && this.config.get('useBatchInserts', this.platform.usesBatchInserts())) {
return this.persistNewEntities(meta, changeSets, ctx);
}

Expand All @@ -31,8 +31,13 @@ export class ChangeSetPersister {
}

async executeUpdates<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
const meta = this.metadata.find(changeSets[0].name)!;
changeSets.forEach(changeSet => this.processProperties(changeSet));

if (changeSets.length > 1 && this.config.get('useBatchUpdates', this.platform.usesBatchUpdates())) {
return this.persistManagedEntities(meta, changeSets, ctx);
}

for (const changeSet of changeSets) {
await this.persistManagedEntity(changeSet, ctx);
}
Expand Down Expand Up @@ -87,6 +92,10 @@ export class ChangeSetPersister {
private async persistNewEntitiesBatch<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
const res = await this.driver.nativeInsertMany(meta.className, changeSets.map(cs => cs.payload), ctx);

if (!this.platform.usesReturningStatement()) {
await this.reloadVersionValues(meta, changeSets, ctx);
}

for (let i = 0; i < changeSets.length; i++) {
const changeSet = changeSets[i];
const wrapped = changeSet.entity.__helper!;
Expand All @@ -96,6 +105,7 @@ export class ChangeSetPersister {
}

this.mapReturnedValues(changeSet, res, meta);

this.markAsPopulated(changeSet, meta);
wrapped.__initialized = true;
wrapped.__managed = true;
Expand All @@ -111,6 +121,22 @@ export class ChangeSetPersister {
changeSet.persisted = true;
}

private async persistManagedEntities<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
const size = this.config.get('batchSize');

for (let i = 0; i < changeSets.length; i += size) {
const chunk = changeSets.slice(i, i + size);
await this.persistManagedEntitiesBatch(meta, chunk, ctx);
await this.reloadVersionValues(meta, chunk, ctx);
}
}

private async persistManagedEntitiesBatch<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
await this.checkOptimisticLocks(meta, changeSets, ctx);
await this.driver.nativeUpdateMany(meta.className, changeSets.map(cs => cs.entity.__helper!.__primaryKey as Dictionary), changeSets.map(cs => cs.payload), ctx);
changeSets.forEach(cs => cs.persisted = true);
}

private mapPrimaryKey<T extends AnyEntity<T>>(meta: EntityMetadata<T>, value: IPrimaryKey, changeSet: ChangeSet<T>): void {
const prop = meta.properties[meta.primaryKeys[0]];
const insertId = prop.customType ? prop.customType.convertToJSValue(value, this.driver.getPlatform()) : value;
Expand Down Expand Up @@ -157,6 +183,25 @@ export class ChangeSetPersister {
return this.driver.nativeUpdate(changeSet.name, cond, changeSet.payload, ctx);
}

private async checkOptimisticLocks<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
if (!meta.versionProperty || changeSets.every(cs => !cs.entity[meta.versionProperty])) {
return;
}

const $or = changeSets.map(cs => ({
...Utils.getPrimaryKeyCond<T>(cs.entity, meta.primaryKeys),
[meta.versionProperty]: cs.entity[meta.versionProperty],
}));

const res = await this.driver.find(meta.className, { $or }, { fields: meta.primaryKeys }, ctx);

if (res.length !== changeSets.length) {
const compare = (a: Dictionary, b: Dictionary, keys: string[]) => keys.every(k => a[k] === b[k]);
const entity = changeSets.find(cs => !res.some(row => compare(Utils.getPrimaryKeyCond(cs.entity, meta.primaryKeys)!, row, meta.primaryKeys)))!.entity;
throw OptimisticLockError.lockFailed(entity);
}
}

private checkOptimisticLock<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSet: ChangeSet<T>, res?: QueryResult) {
if (meta.versionProperty && res && !res.affectedRows) {
throw OptimisticLockError.lockFailed(changeSet.entity);
Expand All @@ -174,7 +219,7 @@ export class ChangeSetPersister {
fields: [meta.versionProperty],
}, ctx);
const map = new Map<string, Date>();
data.forEach(e => map.set(e[pk], e[meta.versionProperty]));
data.forEach(e => map.set(Utils.getCompositeKeyHash<T>(e as T, meta), e[meta.versionProperty]));

for (const changeSet of changeSets) {
const version = map.get(changeSet.entity.__helper!.__serializedPrimaryKey);
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/utils/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export class Configuration<D extends IDatabaseDriver = IDatabaseDriver> {
populateAfterFlush: false,
forceUtcTimezone: false,
ensureIndexes: false,
useBatchInserts: true,
batchSize: 1000,
debug: false,
verbose: false,
Expand Down Expand Up @@ -330,7 +329,8 @@ export interface MikroORMOptions<D extends IDatabaseDriver = IDatabaseDriver> ex
forceUtcTimezone: boolean;
timezone?: string;
ensureIndexes: boolean;
useBatchInserts: boolean;
useBatchInserts?: boolean;
useBatchUpdates?: boolean;
batchSize: number;
hydrator: { new (factory: EntityFactory, em: EntityManager): Hydrator };
loadStrategy: LoadStrategy;
Expand Down
12 changes: 6 additions & 6 deletions packages/core/src/utils/Utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ export class Utils {
/**
* Checks if the argument is instance of `Object`, but not one of the blacklisted types. Returns false for arrays.
*/
static isNotObject<T = Dictionary>(o: any, not: any[] = []): o is T {
return !!o && typeof o === 'object' && !Array.isArray(o) && !not.some(cls => o instanceof cls);
static isNotObject<T = Dictionary>(o: any, not: any[]): o is T {
return this.isObject(o) && !not.some(cls => o instanceof cls);
}

/**
Expand All @@ -45,7 +45,7 @@ export class Utils {
let size = 0;

for (const key in object) {
// eslint-disable-next-line no-prototype-builtins
/* istanbul ignore else */ // eslint-disable-next-line no-prototype-builtins
if (object.hasOwnProperty(key)) {
size++;
}
Expand All @@ -60,7 +60,7 @@ export class Utils {
*/
static hasObjectKeys(object: Dictionary): boolean {
for (const key in object) {
// eslint-disable-next-line no-prototype-builtins
/* istanbul ignore else */ // eslint-disable-next-line no-prototype-builtins
if (object.hasOwnProperty(key)) {
return true;
}
Expand Down Expand Up @@ -351,12 +351,12 @@ export class Utils {
}, {} as any);
}

static getOrderedPrimaryKeys<T extends AnyEntity<T>>(id: Primary<T> | Record<string, Primary<T>>, meta: EntityMetadata<T>, platform: Platform, convertCustomTypes?: boolean): Primary<T>[] {
static getOrderedPrimaryKeys<T extends AnyEntity<T>>(id: Primary<T> | Record<string, Primary<T>>, meta: EntityMetadata<T>, platform?: Platform, convertCustomTypes?: boolean): Primary<T>[] {
const data = (Utils.isPrimaryKey(id) ? { [meta.primaryKeys[0]]: id } : id) as Record<string, Primary<T>>;
return meta.primaryKeys.map(pk => {
const prop = meta.properties[pk];

if (prop.customType && convertCustomTypes) {
if (prop.customType && platform && convertCustomTypes) {
return prop.customType.convertToJSValue(data[pk], platform);
}

Expand Down
44 changes: 44 additions & 0 deletions packages/knex/src/AbstractSqlDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,50 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
return res;
}

async nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction<KnexTransaction>): Promise<QueryResult> {
const meta = this.metadata.get<T>(entityName);
const collections = data.map(d => this.extractManyToMany(entityName, d));
const values: Dictionary<Raw> = {};
const knex = this.connection.getKnex();
const keys = new Set<string>();
data.forEach(row => Object.keys(row).forEach(k => keys.add(k)));
const pkCond = Utils.flatten(meta.primaryKeys.map(pk => meta.properties[pk].fieldNames)).map(pk => `${knex.ref(pk)} = ?`).join(' and ');
let res: QueryResult = { affectedRows: 0, insertId: 0, row: {} };

keys.forEach(key => {
meta.properties[key].fieldNames.forEach((fieldName: string, fieldNameIdx: number) => {
const params: any[] = [];
let sql = `case`;
where.forEach((cond, idx) => {
if (key in data[idx]) {
const pks = Utils.getOrderedPrimaryKeys(cond as Dictionary, meta);
sql += ` when (${pkCond}) then ?`;
params.push(...pks, meta.properties[key].fieldNames.length > 1 ? data[idx][key][fieldNameIdx] : data[idx][key]);
}
});
sql += ` else ${(knex.ref(fieldName))} end`;
values[fieldName] = knex.raw(sql, params);
});
});

if (keys.size > 0) {
const qb = this.createQueryBuilder<T>(entityName, ctx, true)
.unsetFlag(QueryFlag.CONVERT_CUSTOM_TYPES)
.update(values)
.where({ [Utils.getPrimaryKeyHash(meta.primaryKeys)]: where });

res = await this.rethrow(qb.execute('run', false));
}

const pkConds = data.map((_, idx) => Utils.extractPK<T>(where[idx], meta)!);

for (let i = 0; i < collections.length; i++) {
await this.processManyToMany<T>(meta, pkConds[i] as Primary<T>[], collections[i], false, ctx);
}

return res;
}

async nativeDelete<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T> | string | any, ctx?: Transaction<KnexTransaction>): Promise<QueryResult> {
const pks = this.getPrimaryKeyFields(entityName);

Expand Down
4 changes: 4 additions & 0 deletions packages/mongodb/src/MongoPlatform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ export class MongoPlatform extends Platform {
return true;
}

usesBatchUpdates(): boolean {
return false;
}

marshallArray(values: string[]): string {
return values as unknown as string;
}
Expand Down
21 changes: 19 additions & 2 deletions tests/EntityManager.mysql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ describe('EntityManagerMySql', () => {
{ id: 2, name: 'test 2', type: PublisherType.LOCAL },
{ id: 3, name: 'test 3', type: PublisherType.GLOBAL },
]);

// multi updates
const res3 = await driver.nativeUpdateMany<Publisher2>(Publisher2.name, [1, 2, 3], [
{ name: 'test 11', type: PublisherType.LOCAL },
{ type: PublisherType.GLOBAL },
{ name: 'test 33', type: PublisherType.LOCAL },
]);

const res4 = await driver.find(Publisher2.name, {});
expect(res4).toMatchObject([
{ id: 1, name: 'test 11', type: PublisherType.LOCAL },
{ id: 2, name: 'test 2', type: PublisherType.GLOBAL },
{ id: 3, name: 'test 33', type: PublisherType.LOCAL },
]);
});

test('driver appends errored query', async () => {
Expand Down Expand Up @@ -2414,8 +2428,8 @@ describe('EntityManagerMySql', () => {
'order by `e0`.`title` asc');
});

// this should run in ~600ms (when running single test locally)
test('perf: one to many', async () => {
// this should run in ~800ms (when running single test locally)
test('perf: batch insert and update', async () => {
const authors = new Set<Author2>();

for (let i = 1; i <= 1000; i++) {
Expand All @@ -2426,6 +2440,9 @@ describe('EntityManagerMySql', () => {

await orm.em.flush();
authors.forEach(author => expect(author.id).toBeGreaterThan(0));

authors.forEach(a => a.termsAccepted = true);
await orm.em.flush();
});

afterAll(async () => orm.close(true));
Expand Down
17 changes: 17 additions & 0 deletions tests/EntityManager.postgre.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,23 @@ describe('EntityManagerPostgre', () => {
expect(author.books.getItems().every(b => b.uuid)).toBe(true);
});

// this should run in ~800ms (when running single test locally)
test('perf: batch insert and update', async () => {
const authors = new Set<Author2>();

for (let i = 1; i <= 1000; i++) {
const author = new Author2(`Jon Snow ${i}`, `snow-${i}@wall.st`);
orm.em.persist(author);
authors.add(author);
}

await orm.em.flush();
authors.forEach(author => expect(author.id).toBeGreaterThan(0));

authors.forEach(a => a.termsAccepted = true);
await orm.em.flush();
});

afterAll(async () => orm.close(true));

});
19 changes: 18 additions & 1 deletion tests/EntityManager.sqlite2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { unlinkSync } from 'fs';
import { Collection, EntityManager, LockMode, MikroORM, QueryOrder, Logger, ValidationError, wrap } from '@mikro-orm/core';
import { SqliteDriver } from '@mikro-orm/sqlite';
import { initORMSqlite2, wipeDatabaseSqlite2 } from './bootstrap';
import { Author4, Book4, BookTag4, FooBar4, IPublisher4, Publisher4, PublisherType, Test4 } from './entities-schema';
import { Author4, Book4, BookTag4, FooBar4, IAuthor4, IPublisher4, Publisher4, PublisherType, Test4 } from './entities-schema';

describe('EntityManagerSqlite2', () => {

Expand Down Expand Up @@ -846,6 +846,23 @@ describe('EntityManagerSqlite2', () => {
expect(author.books.getItems().every(b => b.id)).toBe(true);
});

// this should run in ~400ms (when running single test locally)
test('perf: batch insert and update', async () => {
const authors = new Set<IAuthor4>();

for (let i = 1; i <= 1000; i++) {
const author = orm.em.create(Author4, { name: `Jon Snow ${i}`, email: `snow-${i}@wall.st` });
orm.em.persist(author);
authors.add(author);
}

await orm.em.flush();
authors.forEach(author => expect(author.id).toBeGreaterThan(0));

authors.forEach(a => a.termsAccepted = true);
await orm.em.flush();
});

afterAll(async () => {
await orm.close(true);
unlinkSync(orm.config.get('dbName')!);
Expand Down
2 changes: 1 addition & 1 deletion tests/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ export async function initORMSqlite2() {
propagateToOneOwner: false,
logger: i => i,
cache: { pretty: true },
batchSize: 200,
batchSize: 100,
});
const schemaGenerator = new SchemaGenerator(orm.em);
await schemaGenerator.dropSchema();
Expand Down
Loading

0 comments on commit b005353

Please sign in to comment.