Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(core): implement bulk deletes #757

Merged
merged 1 commit into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions packages/core/src/unit-of-work/ChangeSetPersister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ export class ChangeSetPersister {
let res: QueryResult | undefined;
const wrapped = changeSet.entity.__helper!;

if (changeSet.type === ChangeSetType.DELETE) {
await this.driver.nativeDelete(changeSet.name, wrapped.__primaryKey as Dictionary, ctx);
} else if (changeSet.type === ChangeSetType.UPDATE) {
if (changeSet.type === ChangeSetType.UPDATE) {
res = await this.updateEntity(meta, changeSet, ctx);
this.mapReturnedValues(changeSet.entity, res, meta);
} else if (Utils.isDefined(wrapped.__primaryKey, true)) { // ChangeSetType.CREATE with primary key
Expand Down
96 changes: 66 additions & 30 deletions packages/core/src/unit-of-work/UnitOfWork.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AnyEntity, EntityData, EntityMetadata, EntityProperty, FilterQuery, Primary } from '../typings';
import { AnyEntity, Dictionary, EntityData, EntityMetadata, EntityProperty, FilterQuery, Primary } from '../typings';
import { Cascade, Collection, EntityIdentifier, Reference, ReferenceType } from '../entity';
import { ChangeSet, ChangeSetType } from './ChangeSet';
import { ChangeSetComputer, ChangeSetPersister, CommitOrderCalculator } from './index';
Expand Down Expand Up @@ -169,14 +169,14 @@ export class UnitOfWork {
return;
}

this.reorderChangeSets();
const groups = this.getChangeSetGroups();
const platform = this.em.getDriver().getPlatform();
const runInTransaction = !this.em.isInTransaction() && platform.supportsTransactions() && this.em.config.get('implicitTransactions');

if (runInTransaction) {
await this.em.getConnection('write').transactional(trx => this.persistToDatabase(trx));
await this.em.getConnection('write').transactional(trx => this.persistToDatabase(groups, trx));
} else {
await this.persistToDatabase(this.em.getTransactionContext());
await this.persistToDatabase(groups, this.em.getTransactionContext());
}

await this.em.getEventManager().dispatchEvent(EventType.afterFlush, { em: this.em, uow: this });
Expand Down Expand Up @@ -337,7 +337,6 @@ export class UnitOfWork {
switch (changeSet.type) {
case ChangeSetType.CREATE: this.em.merge(changeSet.entity as T, true); break;
case ChangeSetType.UPDATE: this.merge(changeSet.entity as T); break;
case ChangeSetType.DELETE: this.unsetIdentity(changeSet.entity as T); break;
}

await this.runHooks(`after${type}` as EventType, changeSet);
Expand Down Expand Up @@ -471,11 +470,30 @@ export class UnitOfWork {
return reference;
}

private async persistToDatabase(tx?: Transaction): Promise<void> {
for (const changeSet of this.changeSets) {
await this.commitChangeSet(changeSet, tx);
private async persistToDatabase(groups: { [K in ChangeSetType]: Map<string, ChangeSet<any>[]> }, tx?: Transaction): Promise<void> {
const commitOrder = this.getCommitOrder();
const commitOrderReversed = [...commitOrder].reverse();

// 1. create
for (const name of commitOrder) {
for (const changeSet of (groups[ChangeSetType.CREATE][name] ?? [])) {
await this.commitChangeSet(changeSet, tx);
}
}

// 2. update
for (const name of commitOrder) {
for (const changeSet of (groups[ChangeSetType.UPDATE][name] ?? [])) {
await this.commitChangeSet(changeSet, tx);
}
}

// 3. delete - entity deletions need to be in reverse commit order
for (const name of commitOrderReversed) {
await this.commitDeleteChangeSets(groups[ChangeSetType.DELETE][name] ?? [], tx);
}

// 4. extra updates
for (const extraUpdate of this.extraUpdates) {
extraUpdate[0][extraUpdate[1]] = extraUpdate[2];
const changeSet = this.changeSetComputer.computeChangeSet(extraUpdate[0])!;
Expand All @@ -485,40 +503,58 @@ export class UnitOfWork {
}
}

// 5. collection updates
for (const coll of this.collectionUpdates) {
await this.em.getDriver().syncCollection(coll, tx);
coll.takeSnapshot();
}
}

private async commitDeleteChangeSets<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
if (changeSets.length === 0) {
return;
}

for (const changeSet of changeSets) {
const copy = Utils.prepareEntity(changeSet.entity, this.metadata, this.platform) as T;
await this.runHooks(EventType.beforeDelete, changeSet);
Object.assign(changeSet.payload, Utils.diffEntities<T>(copy, changeSet.entity, this.metadata, this.platform));
}

const meta = changeSets[0].entity.__helper!.__meta;
const pk = Utils.getPrimaryKeyHash(meta.primaryKeys);

if (meta.compositePK) {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKeys);
await this.em.getDriver().nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
} else {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKey as Dictionary);
await this.em.getDriver().nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
}

for (const changeSet of changeSets) {
this.unsetIdentity(changeSet.entity);
await this.runHooks(EventType.afterDelete, changeSet);
}
}

/**
* Orders change sets so FK constrains are maintained, ensures stable order (needed for node < 11)
*/
private reorderChangeSets() {
const commitOrder = this.getCommitOrder();
const commitOrderReversed = [...commitOrder].reverse();
const typeOrder = [ChangeSetType.CREATE, ChangeSetType.UPDATE, ChangeSetType.DELETE];
const compare = <T, K extends keyof T>(base: T[], arr: T[K][], a: T, b: T, key: K) => {
if (arr.indexOf(a[key]) === arr.indexOf(b[key])) {
return base.indexOf(a) - base.indexOf(b); // ensure stable order
}

return arr.indexOf(a[key]) - arr.indexOf(b[key]);
private getChangeSetGroups(): { [K in ChangeSetType]: Map<string, ChangeSet<any>[]> } {
const groups = {
[ChangeSetType.CREATE]: new Map<string, ChangeSet<any>[]>(),
[ChangeSetType.UPDATE]: new Map<string, ChangeSet<any>[]>(),
[ChangeSetType.DELETE]: new Map<string, ChangeSet<any>[]>(),
};

const copy = this.changeSets.slice(); // make copy to maintain commit order
this.changeSets.sort((a, b) => {
if (a.type !== b.type) {
return compare(copy, typeOrder, a, b, 'type');
}

// Entity deletions come last and need to be in reverse commit order
if (a.type === ChangeSetType.DELETE) {
return compare(copy, commitOrderReversed, a, b, 'name');
}

return compare(copy, commitOrder, a, b, 'name');
this.changeSets.forEach(cs => {
const group = groups[cs.type];
group[cs.name] = group[cs.name] ?? [];
group[cs.name].push(cs);
});

return groups;
}

private getCommitOrder(): string[] {
Expand Down
4 changes: 2 additions & 2 deletions tests/EntityManager.mysql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1282,9 +1282,9 @@ describe('EntityManagerMySql', () => {
['beforeUpdate', 'Book2'],
['afterUpdate', 'Book2'],
['beforeDelete', 'Book2'],
['afterDelete', 'Book2'],
['beforeDelete', 'Book2'],
['afterDelete', 'Book2'],
['afterDelete', 'Book2'],
['beforeDelete', 'Author2'],
['afterDelete', 'Author2'],
['beforeDelete', 'Publisher2'],
Expand Down Expand Up @@ -2119,7 +2119,7 @@ describe('EntityManagerMySql', () => {
Object.assign(orm.config, { logger });
await orm.em.flush();
expect(mock.mock.calls[0][0]).toMatch('begin');
expect(mock.mock.calls[1][0]).toMatch('delete from `foo_baz2` where `id` = ?');
expect(mock.mock.calls[1][0]).toMatch('delete from `foo_baz2` where `id` in (?)');
expect(mock.mock.calls[2][0]).toMatch('commit');
});

Expand Down
2 changes: 1 addition & 1 deletion tests/UnitOfWork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ describe('UnitOfWork', () => {
expect(mock.mock.calls[3][0]).toMatch('db.commit()');

expect(changeSets.map(cs => [cs.type, cs.name])).toEqual([
[ChangeSetType.CREATE, 'FooBaz'],
[ChangeSetType.CREATE, 'FooBar'],
[ChangeSetType.CREATE, 'FooBaz'],
]);
});

Expand Down
2 changes: 1 addition & 1 deletion tests/issues/GH369.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('GH issue 369', () => {
expect(mock.mock.calls[2][0]).toMatch('insert into `b` (`a_id`, `foo`) values (?, ?)');
expect(mock.mock.calls[3][0]).toMatch('commit');
expect(mock.mock.calls[4][0]).toMatch('begin');
expect(mock.mock.calls[5][0]).toMatch('delete from `b` where `id` = ?');
expect(mock.mock.calls[5][0]).toMatch('delete from `b` where `id` in (?)');
expect(mock.mock.calls[6][0]).toMatch('commit');
expect(mock.mock.calls).toHaveLength(7);
});
Expand Down
5 changes: 2 additions & 3 deletions tests/issues/GH482.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ describe('GH issue 482', () => {
orm.config.set('debug', ['query', 'query-params']);
await orm.em.flush();
expect(mock.mock.calls[0][0]).toMatch('begin');
expect(mock.mock.calls[1][0]).toMatch(`delete from "level" where "type" = 'A' and "job_id" = '1'`);
expect(mock.mock.calls[2][0]).toMatch(`delete from "level" where "type" = 'B' and "job_id" = '1'`);
expect(mock.mock.calls[3][0]).toMatch('commit');
expect(mock.mock.calls[1][0]).toMatch(`delete from "level" where ("type", "job_id") in (('A', '1'), ('B', '1'))`);
expect(mock.mock.calls[2][0]).toMatch('commit');
mock.mock.calls.length = 0;
});

Expand Down