diff --git a/packages/core/src/drivers/DatabaseDriver.ts b/packages/core/src/drivers/DatabaseDriver.ts index e952356be256..6266153c4648 100644 --- a/packages/core/src/drivers/DatabaseDriver.ts +++ b/packages/core/src/drivers/DatabaseDriver.ts @@ -33,6 +33,10 @@ export abstract class DatabaseDriver implements IDatabaseD abstract async nativeUpdate>(entityName: string, where: FilterQuery, data: EntityData, ctx?: Transaction): Promise; + async nativeUpdateMany>(entityName: string, where: FilterQuery[], data: EntityData[], ctx?: Transaction): Promise { + throw new Error(`Batch updates are not supported by ${this.constructor.name} driver`); + } + abstract async nativeDelete>(entityName: string, where: FilterQuery, ctx?: Transaction): Promise; abstract async count>(entityName: string, where: FilterQuery, options?: CountOptions, ctx?: Transaction): Promise; diff --git a/packages/core/src/drivers/IDatabaseDriver.ts b/packages/core/src/drivers/IDatabaseDriver.ts index dc23bedbf137..d2094b6ae24f 100644 --- a/packages/core/src/drivers/IDatabaseDriver.ts +++ b/packages/core/src/drivers/IDatabaseDriver.ts @@ -39,6 +39,8 @@ export interface IDatabaseDriver { nativeUpdate>(entityName: string, where: FilterQuery, data: EntityData, ctx?: Transaction): Promise; + nativeUpdateMany>(entityName: string, where: FilterQuery[], data: EntityData[], ctx?: Transaction): Promise; + nativeDelete>(entityName: string, where: FilterQuery, ctx?: Transaction): Promise; syncCollection(collection: Collection, ctx?: Transaction): Promise; diff --git a/packages/core/src/entity/WrappedEntity.ts b/packages/core/src/entity/WrappedEntity.ts index a9b80e648388..013b3eef1ec0 100644 --- a/packages/core/src/entity/WrappedEntity.ts +++ b/packages/core/src/entity/WrappedEntity.ts @@ -109,10 +109,10 @@ export class WrappedEntity, PK extends keyof T> { const value = this.entity[this.__meta.serializedPrimaryKey]; if (Utils.isEntity(value)) { - return value.__helper!.__serializedPrimaryKey as string; + return value.__helper!.__serializedPrimaryKey; } - return value as unknown as string; + return '' + value; } } diff --git a/packages/core/src/platforms/Platform.ts b/packages/core/src/platforms/Platform.ts index ac172d7e7d8b..db6400661cf0 100644 --- a/packages/core/src/platforms/Platform.ts +++ b/packages/core/src/platforms/Platform.ts @@ -47,7 +47,14 @@ export abstract class Platform { /** * Whether or not the driver supports retuning list of created PKs back when multi-inserting */ - returningMultiInsert(): boolean { + usesBatchInserts(): boolean { + return true; + } + + /** + * Whether or not the driver supports updating many records at once + */ + usesBatchUpdates(): boolean { return true; } diff --git a/packages/core/src/unit-of-work/ChangeSetPersister.ts b/packages/core/src/unit-of-work/ChangeSetPersister.ts index 73cea7175f57..dd396c295a7f 100644 --- a/packages/core/src/unit-of-work/ChangeSetPersister.ts +++ b/packages/core/src/unit-of-work/ChangeSetPersister.ts @@ -21,7 +21,7 @@ export class ChangeSetPersister { const meta = this.metadata.find(changeSets[0].name)!; changeSets.forEach(changeSet => this.processProperties(changeSet)); - if (changeSets.length > 1 && this.platform.returningMultiInsert() && this.config.get('useBatchInserts')) { + if (changeSets.length > 1 && this.config.get('useBatchInserts', this.platform.usesBatchInserts())) { return this.persistNewEntities(meta, changeSets, ctx); } @@ -31,8 +31,13 @@ export class ChangeSetPersister { } async executeUpdates>(changeSets: ChangeSet[], ctx?: Transaction): Promise { + const meta = this.metadata.find(changeSets[0].name)!; changeSets.forEach(changeSet => this.processProperties(changeSet)); + if (changeSets.length > 1 && this.config.get('useBatchUpdates', this.platform.usesBatchUpdates())) { + return this.persistManagedEntities(meta, changeSets, ctx); + } + for (const changeSet of changeSets) { await this.persistManagedEntity(changeSet, ctx); } @@ -87,6 +92,10 @@ export class ChangeSetPersister { private async persistNewEntitiesBatch>(meta: EntityMetadata, changeSets: ChangeSet[], ctx?: Transaction): Promise { const res = await this.driver.nativeInsertMany(meta.className, changeSets.map(cs => cs.payload), ctx); + if (!this.platform.usesReturningStatement()) { + await this.reloadVersionValues(meta, changeSets, ctx); + } + for (let i = 0; i < changeSets.length; i++) { const changeSet = changeSets[i]; const wrapped = changeSet.entity.__helper!; @@ -96,6 +105,7 @@ export class ChangeSetPersister { } this.mapReturnedValues(changeSet, res, meta); + this.markAsPopulated(changeSet, meta); wrapped.__initialized = true; wrapped.__managed = true; @@ -111,6 +121,22 @@ export class ChangeSetPersister { changeSet.persisted = true; } + private async persistManagedEntities>(meta: EntityMetadata, changeSets: ChangeSet[], ctx?: Transaction): Promise { + const size = this.config.get('batchSize'); + + for (let i = 0; i < changeSets.length; i += size) { + const chunk = changeSets.slice(i, i + size); + await this.persistManagedEntitiesBatch(meta, chunk, ctx); + await this.reloadVersionValues(meta, chunk, ctx); + } + } + + private async persistManagedEntitiesBatch>(meta: EntityMetadata, changeSets: ChangeSet[], ctx?: Transaction): Promise { + await this.checkOptimisticLocks(meta, changeSets, ctx); + await this.driver.nativeUpdateMany(meta.className, changeSets.map(cs => cs.entity.__helper!.__primaryKey as Dictionary), changeSets.map(cs => cs.payload), ctx); + changeSets.forEach(cs => cs.persisted = true); + } + private mapPrimaryKey>(meta: EntityMetadata, value: IPrimaryKey, changeSet: ChangeSet): void { const prop = meta.properties[meta.primaryKeys[0]]; const insertId = prop.customType ? prop.customType.convertToJSValue(value, this.driver.getPlatform()) : value; @@ -157,6 +183,25 @@ export class ChangeSetPersister { return this.driver.nativeUpdate(changeSet.name, cond, changeSet.payload, ctx); } + private async checkOptimisticLocks>(meta: EntityMetadata, changeSets: ChangeSet[], ctx?: Transaction): Promise { + if (!meta.versionProperty || changeSets.every(cs => !cs.entity[meta.versionProperty])) { + return; + } + + const $or = changeSets.map(cs => ({ + ...Utils.getPrimaryKeyCond(cs.entity, meta.primaryKeys), + [meta.versionProperty]: cs.entity[meta.versionProperty], + })); + + const res = await this.driver.find(meta.className, { $or }, { fields: meta.primaryKeys }, ctx); + + if (res.length !== changeSets.length) { + const compare = (a: Dictionary, b: Dictionary, keys: string[]) => keys.every(k => a[k] === b[k]); + const entity = changeSets.find(cs => !res.some(row => compare(Utils.getPrimaryKeyCond(cs.entity, meta.primaryKeys)!, row, meta.primaryKeys)))!.entity; + throw OptimisticLockError.lockFailed(entity); + } + } + private checkOptimisticLock>(meta: EntityMetadata, changeSet: ChangeSet, res?: QueryResult) { if (meta.versionProperty && res && !res.affectedRows) { throw OptimisticLockError.lockFailed(changeSet.entity); @@ -174,7 +219,7 @@ export class ChangeSetPersister { fields: [meta.versionProperty], }, ctx); const map = new Map(); - data.forEach(e => map.set(e[pk], e[meta.versionProperty])); + data.forEach(e => map.set(Utils.getCompositeKeyHash(e as T, meta), e[meta.versionProperty])); for (const changeSet of changeSets) { const version = map.get(changeSet.entity.__helper!.__serializedPrimaryKey); diff --git a/packages/core/src/utils/Configuration.ts b/packages/core/src/utils/Configuration.ts index d8bfb81b41fc..14f647ebd7df 100644 --- a/packages/core/src/utils/Configuration.ts +++ b/packages/core/src/utils/Configuration.ts @@ -49,7 +49,6 @@ export class Configuration { populateAfterFlush: false, forceUtcTimezone: false, ensureIndexes: false, - useBatchInserts: true, batchSize: 1000, debug: false, verbose: false, @@ -330,7 +329,8 @@ export interface MikroORMOptions ex forceUtcTimezone: boolean; timezone?: string; ensureIndexes: boolean; - useBatchInserts: boolean; + useBatchInserts?: boolean; + useBatchUpdates?: boolean; batchSize: number; hydrator: { new (factory: EntityFactory, em: EntityManager): Hydrator }; loadStrategy: LoadStrategy; diff --git a/packages/core/src/utils/Utils.ts b/packages/core/src/utils/Utils.ts index 24067a53031e..9eb68076de98 100644 --- a/packages/core/src/utils/Utils.ts +++ b/packages/core/src/utils/Utils.ts @@ -33,8 +33,8 @@ export class Utils { /** * Checks if the argument is instance of `Object`, but not one of the blacklisted types. Returns false for arrays. */ - static isNotObject(o: any, not: any[] = []): o is T { - return !!o && typeof o === 'object' && !Array.isArray(o) && !not.some(cls => o instanceof cls); + static isNotObject(o: any, not: any[]): o is T { + return this.isObject(o) && !not.some(cls => o instanceof cls); } /** @@ -45,7 +45,7 @@ export class Utils { let size = 0; for (const key in object) { - // eslint-disable-next-line no-prototype-builtins + /* istanbul ignore else */ // eslint-disable-next-line no-prototype-builtins if (object.hasOwnProperty(key)) { size++; } @@ -60,7 +60,7 @@ export class Utils { */ static hasObjectKeys(object: Dictionary): boolean { for (const key in object) { - // eslint-disable-next-line no-prototype-builtins + /* istanbul ignore else */ // eslint-disable-next-line no-prototype-builtins if (object.hasOwnProperty(key)) { return true; } @@ -351,12 +351,12 @@ export class Utils { }, {} as any); } - static getOrderedPrimaryKeys>(id: Primary | Record>, meta: EntityMetadata, platform: Platform, convertCustomTypes?: boolean): Primary[] { + static getOrderedPrimaryKeys>(id: Primary | Record>, meta: EntityMetadata, platform?: Platform, convertCustomTypes?: boolean): Primary[] { const data = (Utils.isPrimaryKey(id) ? { [meta.primaryKeys[0]]: id } : id) as Record>; return meta.primaryKeys.map(pk => { const prop = meta.properties[pk]; - if (prop.customType && convertCustomTypes) { + if (prop.customType && platform && convertCustomTypes) { return prop.customType.convertToJSValue(data[pk], platform); } diff --git a/packages/knex/src/AbstractSqlDriver.ts b/packages/knex/src/AbstractSqlDriver.ts index 0c696bd44dbc..354deee9456b 100644 --- a/packages/knex/src/AbstractSqlDriver.ts +++ b/packages/knex/src/AbstractSqlDriver.ts @@ -246,6 +246,50 @@ export abstract class AbstractSqlDriver>(entityName: string, where: FilterQuery[], data: EntityData[], ctx?: Transaction): Promise { + const meta = this.metadata.get(entityName); + const collections = data.map(d => this.extractManyToMany(entityName, d)); + const values: Dictionary = {}; + const knex = this.connection.getKnex(); + const keys = new Set(); + data.forEach(row => Object.keys(row).forEach(k => keys.add(k))); + const pkCond = Utils.flatten(meta.primaryKeys.map(pk => meta.properties[pk].fieldNames)).map(pk => `${knex.ref(pk)} = ?`).join(' and '); + let res: QueryResult = { affectedRows: 0, insertId: 0, row: {} }; + + keys.forEach(key => { + meta.properties[key].fieldNames.forEach((fieldName: string, fieldNameIdx: number) => { + const params: any[] = []; + let sql = `case`; + where.forEach((cond, idx) => { + if (key in data[idx]) { + const pks = Utils.getOrderedPrimaryKeys(cond as Dictionary, meta); + sql += ` when (${pkCond}) then ?`; + params.push(...pks, meta.properties[key].fieldNames.length > 1 ? data[idx][key][fieldNameIdx] : data[idx][key]); + } + }); + sql += ` else ${(knex.ref(fieldName))} end`; + values[fieldName] = knex.raw(sql, params); + }); + }); + + if (keys.size > 0) { + const qb = this.createQueryBuilder(entityName, ctx, true) + .unsetFlag(QueryFlag.CONVERT_CUSTOM_TYPES) + .update(values) + .where({ [Utils.getPrimaryKeyHash(meta.primaryKeys)]: where }); + + res = await this.rethrow(qb.execute('run', false)); + } + + const pkConds = data.map((_, idx) => Utils.extractPK(where[idx], meta)!); + + for (let i = 0; i < collections.length; i++) { + await this.processManyToMany(meta, pkConds[i] as Primary[], collections[i], false, ctx); + } + + return res; + } + async nativeDelete>(entityName: string, where: FilterQuery | string | any, ctx?: Transaction): Promise { const pks = this.getPrimaryKeyFields(entityName); diff --git a/packages/mongodb/src/MongoPlatform.ts b/packages/mongodb/src/MongoPlatform.ts index 36f9f045fdcb..5d40337b26ad 100644 --- a/packages/mongodb/src/MongoPlatform.ts +++ b/packages/mongodb/src/MongoPlatform.ts @@ -39,6 +39,10 @@ export class MongoPlatform extends Platform { return true; } + usesBatchUpdates(): boolean { + return false; + } + marshallArray(values: string[]): string { return values as unknown as string; } diff --git a/tests/EntityManager.mysql.test.ts b/tests/EntityManager.mysql.test.ts index f5bc4ac87b14..daa17502ae0a 100644 --- a/tests/EntityManager.mysql.test.ts +++ b/tests/EntityManager.mysql.test.ts @@ -107,6 +107,20 @@ describe('EntityManagerMySql', () => { { id: 2, name: 'test 2', type: PublisherType.LOCAL }, { id: 3, name: 'test 3', type: PublisherType.GLOBAL }, ]); + + // multi updates + const res3 = await driver.nativeUpdateMany(Publisher2.name, [1, 2, 3], [ + { name: 'test 11', type: PublisherType.LOCAL }, + { type: PublisherType.GLOBAL }, + { name: 'test 33', type: PublisherType.LOCAL }, + ]); + + const res4 = await driver.find(Publisher2.name, {}); + expect(res4).toMatchObject([ + { id: 1, name: 'test 11', type: PublisherType.LOCAL }, + { id: 2, name: 'test 2', type: PublisherType.GLOBAL }, + { id: 3, name: 'test 33', type: PublisherType.LOCAL }, + ]); }); test('driver appends errored query', async () => { @@ -2414,8 +2428,8 @@ describe('EntityManagerMySql', () => { 'order by `e0`.`title` asc'); }); - // this should run in ~600ms (when running single test locally) - test('perf: one to many', async () => { + // this should run in ~800ms (when running single test locally) + test('perf: batch insert and update', async () => { const authors = new Set(); for (let i = 1; i <= 1000; i++) { @@ -2426,6 +2440,9 @@ describe('EntityManagerMySql', () => { await orm.em.flush(); authors.forEach(author => expect(author.id).toBeGreaterThan(0)); + + authors.forEach(a => a.termsAccepted = true); + await orm.em.flush(); }); afterAll(async () => orm.close(true)); diff --git a/tests/EntityManager.postgre.test.ts b/tests/EntityManager.postgre.test.ts index 321538b485b5..2abe3f3f0908 100644 --- a/tests/EntityManager.postgre.test.ts +++ b/tests/EntityManager.postgre.test.ts @@ -1506,6 +1506,23 @@ describe('EntityManagerPostgre', () => { expect(author.books.getItems().every(b => b.uuid)).toBe(true); }); + // this should run in ~800ms (when running single test locally) + test('perf: batch insert and update', async () => { + const authors = new Set(); + + for (let i = 1; i <= 1000; i++) { + const author = new Author2(`Jon Snow ${i}`, `snow-${i}@wall.st`); + orm.em.persist(author); + authors.add(author); + } + + await orm.em.flush(); + authors.forEach(author => expect(author.id).toBeGreaterThan(0)); + + authors.forEach(a => a.termsAccepted = true); + await orm.em.flush(); + }); + afterAll(async () => orm.close(true)); }); diff --git a/tests/EntityManager.sqlite2.test.ts b/tests/EntityManager.sqlite2.test.ts index be3da24b045d..c81485e35f8a 100644 --- a/tests/EntityManager.sqlite2.test.ts +++ b/tests/EntityManager.sqlite2.test.ts @@ -2,7 +2,7 @@ import { unlinkSync } from 'fs'; import { Collection, EntityManager, LockMode, MikroORM, QueryOrder, Logger, ValidationError, wrap } from '@mikro-orm/core'; import { SqliteDriver } from '@mikro-orm/sqlite'; import { initORMSqlite2, wipeDatabaseSqlite2 } from './bootstrap'; -import { Author4, Book4, BookTag4, FooBar4, IPublisher4, Publisher4, PublisherType, Test4 } from './entities-schema'; +import { Author4, Book4, BookTag4, FooBar4, IAuthor4, IPublisher4, Publisher4, PublisherType, Test4 } from './entities-schema'; describe('EntityManagerSqlite2', () => { @@ -846,6 +846,23 @@ describe('EntityManagerSqlite2', () => { expect(author.books.getItems().every(b => b.id)).toBe(true); }); + // this should run in ~400ms (when running single test locally) + test('perf: batch insert and update', async () => { + const authors = new Set(); + + for (let i = 1; i <= 1000; i++) { + const author = orm.em.create(Author4, { name: `Jon Snow ${i}`, email: `snow-${i}@wall.st` }); + orm.em.persist(author); + authors.add(author); + } + + await orm.em.flush(); + authors.forEach(author => expect(author.id).toBeGreaterThan(0)); + + authors.forEach(a => a.termsAccepted = true); + await orm.em.flush(); + }); + afterAll(async () => { await orm.close(true); unlinkSync(orm.config.get('dbName')!); diff --git a/tests/bootstrap.ts b/tests/bootstrap.ts index b0e87567df3f..4a7e6ff1fffa 100644 --- a/tests/bootstrap.ts +++ b/tests/bootstrap.ts @@ -144,7 +144,7 @@ export async function initORMSqlite2() { propagateToOneOwner: false, logger: i => i, cache: { pretty: true }, - batchSize: 200, + batchSize: 100, }); const schemaGenerator = new SchemaGenerator(orm.em); await schemaGenerator.dropSchema(); diff --git a/tests/composite-keys.mysql.test.ts b/tests/composite-keys.mysql.test.ts index 6646136f2816..c353dd014f01 100644 --- a/tests/composite-keys.mysql.test.ts +++ b/tests/composite-keys.mysql.test.ts @@ -1,4 +1,4 @@ -import { LoadStrategy, Logger, MikroORM, wrap } from '@mikro-orm/core'; +import { LoadStrategy, Logger, MikroORM, ValidationError, wrap } from '@mikro-orm/core'; import { AbstractSqlConnection, MySqlDriver } from '@mikro-orm/mysql'; import { Author2, Configuration2, FooBar2, FooBaz2, FooParam2, Test2, Address2, Car2, CarOwner2, User2, Sandwich } from './entities-sql'; import { initORMMySql, wipeDatabaseMySql } from './bootstrap'; @@ -286,6 +286,41 @@ describe('composite keys in mysql', () => { expect(mock.mock.calls[4][0]).toMatch('commit'); }); + test('batch updates with optimistic locking', async () => { + const bar1 = FooBar2.create('bar 1'); + bar1.id = 17; + const baz1 = new FooBaz2('baz 1'); + baz1.id = 13; + const param1 = new FooParam2(bar1, baz1, 'val 1'); + const bar2 = FooBar2.create('bar 2'); + bar2.id = 27; + const baz2 = new FooBaz2('baz 2'); + baz2.id = 23; + const param2 = new FooParam2(bar2, baz2, 'val 1'); + const bar3 = FooBar2.create('bar 3'); + bar3.id = 37; + const baz3 = new FooBaz2('baz 3'); + baz3.id = 33; + const param3 = new FooParam2(bar3, baz3, 'val 1'); + await orm.em.persistAndFlush([param1, param2, param3]); + + param1.value += ' changed!'; + param2.value += ' changed!'; + param3.value += ' changed!'; + await orm.em.flush(); + + try { + await orm.em.nativeUpdate(FooParam2, param2, { version: new Date('2020-01-01T00:00:00Z') }); // simulate concurrent update + param1.value += ' changed!!'; + param2.value += ' changed!!'; + param3.value += ' changed!!'; + await orm.em.flush(); + expect(1).toBe('should be unreachable'); + } catch (e) { + expect((e as ValidationError).getEntity()).toBe(param2); + } + }); + afterAll(async () => orm.close(true)); });