Skip to content

Commit

Permalink
refactor: prepare UoW.commit() for possible bulk inserts
Browse files Browse the repository at this point in the history
Related: #732
  • Loading branch information
B4nan committed Aug 24, 2020
1 parent 5c1f60a commit 5a54965
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 70 deletions.
60 changes: 43 additions & 17 deletions packages/core/src/unit-of-work/ChangeSetPersister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,46 @@ export class ChangeSetPersister {
private readonly metadata: MetadataStorage,
private readonly hydrator: Hydrator) { }

async persistToDatabase<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, ctx?: Transaction): Promise<void> {
async executeInserts<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
changeSets.forEach(changeSet => this.processProperties(changeSet));

for (const changeSet of changeSets) {
await this.persistEntity(changeSet, ctx);
}
}

async executeUpdates<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
changeSets.forEach(changeSet => this.processProperties(changeSet));

for (const changeSet of changeSets) {
await this.persistEntity(changeSet, ctx);
}
}

async executeDeletes<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
const meta = changeSets[0].entity.__helper!.__meta;
const pk = Utils.getPrimaryKeyHash(meta.primaryKeys);

if (meta.compositePK) {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKeys);
await this.driver.nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
} else {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKey as Dictionary);
await this.driver.nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
}
}

private processProperties<T extends AnyEntity<T>>(changeSet: ChangeSet<T>): void {
const meta = this.metadata.find(changeSet.name)!;

// process references first
for (const prop of Object.values(meta.properties)) {
this.processReference(changeSet, prop);
this.processProperty(changeSet, prop);
}

// persist the entity itself
await this.persistEntity(changeSet, meta, ctx);
}

private async persistEntity<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, meta: EntityMetadata<T>, ctx?: Transaction): Promise<void> {
private async persistEntity<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, ctx?: Transaction): Promise<void> {
let res: QueryResult | undefined;
const meta = this.metadata.find(changeSet.name)!;
const wrapped = changeSet.entity.__helper!;

if (changeSet.type === ChangeSetType.UPDATE) {
Expand Down Expand Up @@ -70,21 +96,21 @@ export class ChangeSetPersister {
}

private async processOptimisticLock<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSet: ChangeSet<T>, res: QueryResult | undefined, ctx?: Transaction) {
if (meta.versionProperty && changeSet.type === ChangeSetType.UPDATE && res && !res.affectedRows) {
throw OptimisticLockError.lockFailed(changeSet.entity);
if (!meta.versionProperty) {
return;
}

if (meta.versionProperty && [ChangeSetType.CREATE, ChangeSetType.UPDATE].includes(changeSet.type)) {
const e = await this.driver.findOne<T>(meta.name!, changeSet.entity.__helper!.__primaryKey, {
populate: [{
field: meta.versionProperty,
}] as unknown as boolean,
}, ctx);
(changeSet.entity as T)[meta.versionProperty] = e![meta.versionProperty];
if (changeSet.type === ChangeSetType.UPDATE && res && !res.affectedRows) {
throw OptimisticLockError.lockFailed(changeSet.entity);
}

const e = await this.driver.findOne<T>(meta.name!, changeSet.entity.__helper!.__primaryKey, {
fields: [meta.versionProperty],
}, ctx);
changeSet.entity[meta.versionProperty] = e![meta.versionProperty];
}

private processReference<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, prop: EntityProperty<T>): void {
private processProperty<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, prop: EntityProperty<T>): void {
const value = changeSet.payload[prop.name];

if (value as unknown instanceof EntityIdentifier) {
Expand Down
110 changes: 61 additions & 49 deletions packages/core/src/unit-of-work/UnitOfWork.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AnyEntity, Dictionary, EntityData, EntityMetadata, EntityProperty, FilterQuery, Primary } from '../typings';
import { AnyEntity, EntityData, EntityMetadata, EntityProperty, FilterQuery, Primary } from '../typings';
import { Cascade, Collection, EntityIdentifier, Reference, ReferenceType } from '../entity';
import { ChangeSet, ChangeSetType } from './ChangeSet';
import { ChangeSetComputer, ChangeSetPersister, CommitOrderCalculator } from './index';
Expand Down Expand Up @@ -311,37 +311,6 @@ export class UnitOfWork {
.forEach(item => this.findNewEntities(item, visited));
}

private async commitChangeSet<T extends AnyEntity<T>>(changeSet: ChangeSet<T>, ctx?: Transaction): Promise<void> {
if (changeSet.type === ChangeSetType.CREATE) {
Object.values<EntityProperty>(changeSet.entity.__helper!.__meta.properties)
.filter(prop => (prop.reference === ReferenceType.ONE_TO_ONE && prop.owner) || prop.reference === ReferenceType.MANY_TO_ONE)
.filter(prop => changeSet.entity[prop.name])
.forEach(prop => {
const cs = this.changeSets.find(cs => cs.entity === Reference.unwrapReference(changeSet.entity[prop.name]));
const isScheduledForInsert = cs && cs.type === ChangeSetType.CREATE && !cs.persisted;

if (isScheduledForInsert) {
this.extraUpdates.add([changeSet.entity, prop.name, changeSet.entity[prop.name]]);
delete changeSet.entity[prop.name];
delete changeSet.payload[prop.name];
}
});
}

const type = changeSet.type.charAt(0).toUpperCase() + changeSet.type.slice(1);
const copy = Utils.prepareEntity(changeSet.entity, this.metadata, this.platform) as T;
await this.runHooks(`before${type}` as EventType, changeSet);
Object.assign(changeSet.payload, Utils.diffEntities<T>(copy, changeSet.entity, this.metadata, this.platform));
await this.changeSetPersister.persistToDatabase(changeSet, ctx);

switch (changeSet.type) {
case ChangeSetType.CREATE: this.em.merge(changeSet.entity as T, true); break;
case ChangeSetType.UPDATE: this.merge(changeSet.entity as T); break;
}

await this.runHooks(`after${type}` as EventType, changeSet);
}

private async runHooks<T extends AnyEntity<T>>(type: EventType, changeSet: ChangeSet<T>) {
await this.em.getEventManager().dispatchEvent(type, { entity: changeSet.entity, em: this.em, changeSet });
}
Expand Down Expand Up @@ -476,16 +445,12 @@ export class UnitOfWork {

// 1. create
for (const name of commitOrder) {
for (const changeSet of (groups[ChangeSetType.CREATE][name] ?? [])) {
await this.commitChangeSet(changeSet, tx);
}
await this.commitCreateChangeSets(groups[ChangeSetType.CREATE][name] ?? [], tx);
}

// 2. update
for (const name of commitOrder) {
for (const changeSet of (groups[ChangeSetType.UPDATE][name] ?? [])) {
await this.commitChangeSet(changeSet, tx);
}
await this.commitUpdateChangeSets(groups[ChangeSetType.UPDATE][name] ?? [], tx);
}

// 3. delete - entity deletions need to be in reverse commit order
Expand All @@ -494,44 +459,91 @@ export class UnitOfWork {
}

// 4. extra updates
const extraUpdates: ChangeSet<any>[] = [];

for (const extraUpdate of this.extraUpdates) {
extraUpdate[0][extraUpdate[1]] = extraUpdate[2];
const changeSet = this.changeSetComputer.computeChangeSet(extraUpdate[0])!;

if (changeSet) {
await this.commitChangeSet(changeSet, tx);
extraUpdates.push(changeSet);
}
}

await this.commitUpdateChangeSets(extraUpdates, tx);

// 5. collection updates
for (const coll of this.collectionUpdates) {
await this.em.getDriver().syncCollection(coll, tx);
coll.takeSnapshot();
}
}

private async commitDeleteChangeSets<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
private async commitCreateChangeSets<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
if (changeSets.length === 0) {
return;
}

for (const changeSet of changeSets) {
Object.values<EntityProperty>(changeSet.entity.__helper!.__meta.properties)
.filter(prop => (prop.reference === ReferenceType.ONE_TO_ONE && prop.owner) || prop.reference === ReferenceType.MANY_TO_ONE)
.filter(prop => changeSet.entity[prop.name])
.forEach(prop => {
const cs = this.changeSets.find(cs => cs.entity === Reference.unwrapReference(changeSet.entity[prop.name]));
const isScheduledForInsert = cs && cs.type === ChangeSetType.CREATE && !cs.persisted;

if (isScheduledForInsert) {
this.extraUpdates.add([changeSet.entity, prop.name, changeSet.entity[prop.name]]);
delete changeSet.entity[prop.name];
delete changeSet.payload[prop.name];
}
});

const copy = Utils.prepareEntity(changeSet.entity, this.metadata, this.platform) as T;
await this.runHooks(EventType.beforeDelete, changeSet);
await this.runHooks(EventType.beforeCreate, changeSet);
Object.assign(changeSet.payload, Utils.diffEntities<T>(copy, changeSet.entity, this.metadata, this.platform));
}

const meta = changeSets[0].entity.__helper!.__meta;
const pk = Utils.getPrimaryKeyHash(meta.primaryKeys);
await this.changeSetPersister.executeInserts(changeSets, ctx);

if (meta.compositePK) {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKeys);
await this.em.getDriver().nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
} else {
const pks = changeSets.map(cs => cs.entity.__helper!.__primaryKey as Dictionary);
await this.em.getDriver().nativeDelete(changeSets[0].name, { [pk]: { $in: pks } }, ctx);
for (const changeSet of changeSets) {
this.em.merge(changeSet.entity as T, true);
await this.runHooks(EventType.afterCreate, changeSet);
}
}

private async commitUpdateChangeSets<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
if (changeSets.length === 0) {
return;
}

for (const changeSet of changeSets) {
const copy = Utils.prepareEntity(changeSet.entity, this.metadata, this.platform) as T;
await this.runHooks(EventType.beforeUpdate, changeSet);
Object.assign(changeSet.payload, Utils.diffEntities<T>(copy, changeSet.entity, this.metadata, this.platform));
}

await this.changeSetPersister.executeUpdates(changeSets, ctx);

for (const changeSet of changeSets) {
this.merge(changeSet.entity as T);
await this.runHooks(EventType.afterUpdate, changeSet);
}
}

private async commitDeleteChangeSets<T extends AnyEntity<T>>(changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
if (changeSets.length === 0) {
return;
}

for (const changeSet of changeSets) {
const copy = Utils.prepareEntity(changeSet.entity, this.metadata, this.platform) as T;
await this.runHooks(EventType.beforeDelete, changeSet);
Object.assign(changeSet.payload, Utils.diffEntities<T>(copy, changeSet.entity, this.metadata, this.platform));
}

await this.changeSetPersister.executeDeletes(changeSets, ctx);

for (const changeSet of changeSets) {
this.unsetIdentity(changeSet.entity);
await this.runHooks(EventType.afterDelete, changeSet);
Expand Down
8 changes: 4 additions & 4 deletions tests/EntityManager.mysql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1255,9 +1255,9 @@ describe('EntityManagerMySql', () => {

expect(Author2Subscriber.log.map(l => [l[0], l[1].entity.constructor.name])).toEqual([
['beforeCreate', 'Author2'],
['afterCreate', 'Author2'],
['beforeCreate', 'Author2'],
['afterCreate', 'Author2'],
['afterCreate', 'Author2'],
['beforeUpdate', 'Author2'],
['afterUpdate', 'Author2'],
['beforeDelete', 'Author2'],
Expand All @@ -1268,15 +1268,15 @@ describe('EntityManagerMySql', () => {
['beforeCreate', 'Publisher2'],
['afterCreate', 'Publisher2'],
['beforeCreate', 'Author2'],
['afterCreate', 'Author2'],
['beforeCreate', 'Author2'],
['afterCreate', 'Author2'],
['afterCreate', 'Author2'],
['beforeCreate', 'Book2'],
['afterCreate', 'Book2'],
['beforeCreate', 'Book2'],
['afterCreate', 'Book2'],
['beforeCreate', 'Book2'],
['afterCreate', 'Book2'],
['afterCreate', 'Book2'],
['afterCreate', 'Book2'],
['beforeUpdate', 'Author2'],
['afterUpdate', 'Author2'],
['beforeUpdate', 'Book2'],
Expand Down

0 comments on commit 5a54965

Please sign in to comment.