Skip to content

Commit

Permalink
perf(core): implement bulk deletes (#757)
Browse files Browse the repository at this point in the history
UoW will now group change sets based on the type and entity. Instead of ordering
all the change sets based on the type and entity, we can now travers those groups
based on the commit order.

Related: #732
  • Loading branch information
B4nan committed Aug 19, 2020
1 parent 5eca87f commit d83f648
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 40 deletions.
4 changes: 1 addition & 3 deletions packages/core/src/unit-of-work/ChangeSetPersister.ts
Expand Up @@ -30,9 +30,7 @@ export class ChangeSetPersister {
let res: QueryResult | undefined;
const wrapped = changeSet.entity.__helper!;

if (changeSet.type === ChangeSetType.DELETE) {
await this.driver.nativeDelete(changeSet.name, wrapped.__primaryKey as Dictionary, ctx);
} else if (changeSet.type === ChangeSetType.UPDATE) {
if (changeSet.type === ChangeSetType.UPDATE) {
res = await this.updateEntity(meta, changeSet, ctx);
this.mapReturnedValues(changeSet.entity, res, meta);
} else if (Utils.isDefined(wrapped.__primaryKey, true)) { // ChangeSetType.CREATE with primary key
Expand Down
96 changes: 66 additions & 30 deletions packages/core/src/unit-of-work/UnitOfWork.ts
@@ -1,4 +1,4 @@
import { AnyEntity, EntityData, EntityMetadata, EntityProperty, FilterQuery, Primary } from '../typings';
import { AnyEntity, Dictionary, EntityData, EntityMetadata, EntityProperty, FilterQuery, Primary } from '../typings';
import { Cascade, Collection, EntityIdentifier, Reference, ReferenceType } from '../entity';
import { ChangeSet, ChangeSetType } from './ChangeSet';
import { ChangeSetComputer, ChangeSetPersister, CommitOrderCalculator } from './index';
Expand Down Expand Up @@ -169,14 +169,14 @@ export class UnitOfWork {
return;
}

this.reorderChangeSets();
const groups = this.getChangeSetGroups();
const platform = this.em.getDriver().getPlatform();
const runInTransaction = !this.em.isInTransaction() && platform.supportsTransactions() && this.em.config.get('implicitTransactions');

if (runInTransaction) {
await this.em.getConnection('write').transactional(trx => this.persistToDatabase(trx));
await this.em.getConnection('write').transactional(trx => this.persistToDatabase(groups, trx));
} else {
await this.persistToDatabase(this.em.getTransactionContext());
await this.persistToDatabase(groups, this.em.getTransactionContext());
}

await this.em.getEventManager().dispatchEvent(EventType.afterFlush, { em: this.em, uow: this });
Expand Down Expand Up @@ -337,7 +337,6 @@ export class UnitOfWork {
switch (changeSet.type) {
case ChangeSetType.CREATE: this.em.merge(changeSet.entity as T, true); break;
case ChangeSetType.UPDATE: this.merge(changeSet.entity as T); break;
case ChangeSetType.DELETE: this.unsetIdentity(changeSet.entity as T); break;
}

await this.runHooks(`after${type}` as EventType, changeSet);
Expand Down Expand Up @@ -471,11 +470,30 @@ export class UnitOfWork {
return reference;
}

private async persistToDatabase(tx?: Transaction): Promise<void> {
for (const changeSet of this.changeSets) {
await this.commitChangeSet(changeSet, tx);
private async persistToDatabase(groups: { [K in ChangeSetType]: Map<string, ChangeSet<any>[]> }, tx?: Transaction): Promise<void> {
const commitOrder = this.getCommitOrder();
const commitOrderReversed = [...commitOrder].reverse();

// 1. create
for (const name of commitOrder) {
for (const changeSet of (groups[ChangeSetType.CREATE][name] ?? [])) {
await this.commitChangeSet(changeSet, tx);
}
}

// 2. update
for (const name of commitOrder) {
for (const changeSet of (groups[ChangeSetType.UPDATE][name] ?? [])) {
await this.commitChangeSet(changeSet, tx);
}
}

// 3. delete - entity deletions need to be in reverse commit order
for (const name of commitOrderReversed) {
await this.commitDeleteChangeSets(groups[ChangeSetType.DELETE][name] ?? [], tx);
}

// 4. extra updates
for (const extraUpdate of this.extraUpdates) {
extraUpdate[0][extraUpdate[1]] = extraUpdate[2];
const changeSet = this.changeSetComputer.computeChangeSet(extraUpdate[0])!;
Expand All @@ -485,40 +503,58 @@ export class UnitOfWork {
}
}

// 5. collection updates
for (const coll of this.collectionUpdates) {
await this.em.getDriver().syncCollection(coll, tx);
coll.takeSnapshot();
}
}

private async commitDeleteChangeSets<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
if (changeSets.length === 0) {
return;
}

for (const changeSet of changeSets) {
const copy = Utils.prepareEntity(changeSet.entity, this.metadata, this.platform) as T;
await this.runHooks(EventType.beforeDelete, changeSet);
Object.assign(changeSet.payload, Utils.diffEntities<T>(copy, changeSet.entity, this.metadata, this.platform));
}

const meta = changeSets[0].entity.__helper!.__meta;
const pk = Utils.getPrimaryKeyHash(meta.primaryKeys);

if (meta.compositePK) {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKeys);
await this.em.getDriver().nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
} else {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKey as Dictionary);
await this.em.getDriver().nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
}

for (const changeSet of changeSets) {
this.unsetIdentity(changeSet.entity);
await this.runHooks(EventType.afterDelete, changeSet);
}
}

/**
* Orders change sets so FK constrains are maintained, ensures stable order (needed for node < 11)
*/
private reorderChangeSets() {
const commitOrder = this.getCommitOrder();
const commitOrderReversed = [...commitOrder].reverse();
const typeOrder = [ChangeSetType.CREATE, ChangeSetType.UPDATE, ChangeSetType.DELETE];
const compare = <T, K extends keyof T>(base: T[], arr: T[K][], a: T, b: T, key: K) => {
if (arr.indexOf(a[key]) === arr.indexOf(b[key])) {
return base.indexOf(a) - base.indexOf(b); // ensure stable order
}

return arr.indexOf(a[key]) - arr.indexOf(b[key]);
private getChangeSetGroups(): { [K in ChangeSetType]: Map<string, ChangeSet<any>[]> } {
const groups = {
[ChangeSetType.CREATE]: new Map<string, ChangeSet<any>[]>(),
[ChangeSetType.UPDATE]: new Map<string, ChangeSet<any>[]>(),
[ChangeSetType.DELETE]: new Map<string, ChangeSet<any>[]>(),
};

const copy = this.changeSets.slice(); // make copy to maintain commit order
this.changeSets.sort((a, b) => {
if (a.type !== b.type) {
return compare(copy, typeOrder, a, b, 'type');
}

// Entity deletions come last and need to be in reverse commit order
if (a.type === ChangeSetType.DELETE) {
return compare(copy, commitOrderReversed, a, b, 'name');
}

return compare(copy, commitOrder, a, b, 'name');
this.changeSets.forEach(cs => {
const group = groups[cs.type];
group[cs.name] = group[cs.name] ?? [];
group[cs.name].push(cs);
});

return groups;
}

private getCommitOrder(): string[] {
Expand Down
4 changes: 2 additions & 2 deletions tests/EntityManager.mysql.test.ts
Expand Up @@ -1282,9 +1282,9 @@ describe('EntityManagerMySql', () => {
['beforeUpdate', 'Book2'],
['afterUpdate', 'Book2'],
['beforeDelete', 'Book2'],
['afterDelete', 'Book2'],
['beforeDelete', 'Book2'],
['afterDelete', 'Book2'],
['afterDelete', 'Book2'],
['beforeDelete', 'Author2'],
['afterDelete', 'Author2'],
['beforeDelete', 'Publisher2'],
Expand Down Expand Up @@ -2119,7 +2119,7 @@ describe('EntityManagerMySql', () => {
Object.assign(orm.config, { logger });
await orm.em.flush();
expect(mock.mock.calls[0][0]).toMatch('begin');
expect(mock.mock.calls[1][0]).toMatch('delete from `foo_baz2` where `id` = ?');
expect(mock.mock.calls[1][0]).toMatch('delete from `foo_baz2` where `id` in (?)');
expect(mock.mock.calls[2][0]).toMatch('commit');
});

Expand Down
2 changes: 1 addition & 1 deletion tests/UnitOfWork.test.ts
Expand Up @@ -172,8 +172,8 @@ describe('UnitOfWork', () => {
expect(mock.mock.calls[3][0]).toMatch('db.commit()');

expect(changeSets.map(cs => [cs.type, cs.name])).toEqual([
[ChangeSetType.CREATE, 'FooBaz'],
[ChangeSetType.CREATE, 'FooBar'],
[ChangeSetType.CREATE, 'FooBaz'],
]);
});

Expand Down
2 changes: 1 addition & 1 deletion tests/issues/GH369.test.ts
Expand Up @@ -59,7 +59,7 @@ describe('GH issue 369', () => {
expect(mock.mock.calls[2][0]).toMatch('insert into `b` (`a_id`, `foo`) values (?, ?)');
expect(mock.mock.calls[3][0]).toMatch('commit');
expect(mock.mock.calls[4][0]).toMatch('begin');
expect(mock.mock.calls[5][0]).toMatch('delete from `b` where `id` = ?');
expect(mock.mock.calls[5][0]).toMatch('delete from `b` where `id` in (?)');
expect(mock.mock.calls[6][0]).toMatch('commit');
expect(mock.mock.calls).toHaveLength(7);
});
Expand Down
5 changes: 2 additions & 3 deletions tests/issues/GH482.test.ts
Expand Up @@ -66,9 +66,8 @@ describe('GH issue 482', () => {
orm.config.set('debug', ['query', 'query-params']);
await orm.em.flush();
expect(mock.mock.calls[0][0]).toMatch('begin');
expect(mock.mock.calls[1][0]).toMatch(`delete from "level" where "type" = 'A' and "job_id" = '1'`);
expect(mock.mock.calls[2][0]).toMatch(`delete from "level" where "type" = 'B' and "job_id" = '1'`);
expect(mock.mock.calls[3][0]).toMatch('commit');
expect(mock.mock.calls[1][0]).toMatch(`delete from "level" where ("type", "job_id") in (('A', '1'), ('B', '1'))`);
expect(mock.mock.calls[2][0]).toMatch('commit');
mock.mock.calls.length = 0;
});

Expand Down

0 comments on commit d83f648

Please sign in to comment.