Skip to content

Commit

Permalink
fix(core): lock entities in flush() to get around race conditions w…
Browse files Browse the repository at this point in the history
…ith `Promise.all`

This should fix a flaky test for #2934. It should also help with random
`Transaction query already completed` issues.

Related: #3383
  • Loading branch information
B4nan committed Aug 22, 2022
1 parent cd7c42f commit b62799a
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 29 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/EntityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
return fork.getConnection().transactional(async trx => {
fork.transactionContext = trx;
fork.eventManager.registerSubscriber({
afterFlush: async (args: FlushEventArgs) => {
afterFlush(args: FlushEventArgs) {
args.uow.getChangeSets()
.filter(cs => [ChangeSetType.DELETE, ChangeSetType.DELETE_EARLY].includes(cs.type))
.forEach(cs => em.unitOfWork.unsetIdentity(cs.entity));
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/entity/EntityFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@ export class EntityFactory {
const exists = this.findEntity<T>(data, meta2, options);

if (exists && exists.__helper!.__initialized && !options.refresh) {
exists.__helper!.__processing = true;
exists.__helper!.__initialized = options.initialized;
this.mergeData(meta2, exists, data, options);
exists.__helper!.__processing = false;

return exists as New<T, P>;
}

data = { ...data };
const entity = exists ?? this.createEntity<T>(data, meta2, options);
entity.__helper!.__processing = true;
entity.__helper!.__initialized = options.initialized;
this.hydrate(entity, meta2, data, options);
entity.__helper!.__touched = false;
Expand All @@ -80,6 +83,8 @@ export class EntityFactory {
this.eventManager.dispatchEvent(EventType.onInit, { entity, em: this.em });
}

entity.__helper!.__processing = false;

return entity as New<T, P>;
}

Expand Down Expand Up @@ -204,6 +209,7 @@ export class EntityFactory {
// creates new entity instance, bypassing constructor call as its already persisted entity
const entity = Object.create(meta.class.prototype) as T;
entity.__helper!.__managed = true;
entity.__helper!.__processing = !meta.embeddable && !meta.virtual;
entity.__helper!.__schema = this.driver.getSchemaName(meta, options);

if (options.merge && !options.newEntity) {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/entity/WrappedEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export class WrappedEntity<T extends AnyEntity<T>, PK extends keyof T> {
__serializationContext: { root?: SerializationContext<T>; populate?: PopulateOptions<T>[] } = {};
__loadedProperties = new Set<string>();
__data: Dictionary = {};
__processing = false;

/** holds last entity data snapshot so we can compute changes when persisting managed entities */
__originalEntityData?: EntityData<T>;
Expand Down
32 changes: 16 additions & 16 deletions packages/core/src/events/EventSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ export interface TransactionEventArgs extends Omit<EventArgs<unknown>, 'entity'
export interface EventSubscriber<T = any> {
getSubscribedEntities?(): EntityName<T>[];
onInit?(args: EventArgs<T>): void;
onLoad?(args: EventArgs<T>): Promise<void>;
beforeCreate?(args: EventArgs<T>): Promise<void>;
afterCreate?(args: EventArgs<T>): Promise<void>;
beforeUpdate?(args: EventArgs<T>): Promise<void>;
afterUpdate?(args: EventArgs<T>): Promise<void>;
beforeDelete?(args: EventArgs<T>): Promise<void>;
afterDelete?(args: EventArgs<T>): Promise<void>;
beforeFlush?(args: FlushEventArgs): Promise<void>;
onFlush?(args: FlushEventArgs): Promise<void>;
afterFlush?(args: FlushEventArgs): Promise<void>;
onLoad?(args: EventArgs<T>): void | Promise<void>;
beforeCreate?(args: EventArgs<T>): void | Promise<void>;
afterCreate?(args: EventArgs<T>): void | Promise<void>;
beforeUpdate?(args: EventArgs<T>): void | Promise<void>;
afterUpdate?(args: EventArgs<T>): void | Promise<void>;
beforeDelete?(args: EventArgs<T>): void | Promise<void>;
afterDelete?(args: EventArgs<T>): void | Promise<void>;
beforeFlush?(args: FlushEventArgs): void | Promise<void>;
onFlush?(args: FlushEventArgs): void | Promise<void>;
afterFlush?(args: FlushEventArgs): void | Promise<void>;

beforeTransactionStart?(args: TransactionEventArgs): Promise<void>;
afterTransactionStart?(args: TransactionEventArgs): Promise<void>;
beforeTransactionCommit?(args: TransactionEventArgs): Promise<void>;
afterTransactionCommit?(args: TransactionEventArgs): Promise<void>;
beforeTransactionRollback?(args: TransactionEventArgs): Promise<void>;
afterTransactionRollback?(args: TransactionEventArgs): Promise<void>;
beforeTransactionStart?(args: TransactionEventArgs): void | Promise<void>;
afterTransactionStart?(args: TransactionEventArgs): void | Promise<void>;
beforeTransactionCommit?(args: TransactionEventArgs): void | Promise<void>;
afterTransactionCommit?(args: TransactionEventArgs): void | Promise<void>;
beforeTransactionRollback?(args: TransactionEventArgs): void | Promise<void>;
afterTransactionRollback?(args: TransactionEventArgs): void | Promise<void>;
}
1 change: 1 addition & 0 deletions packages/core/src/typings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export interface IWrappedEntityInternal<T, PK extends keyof T | unknown = Primar
__loadedProperties: Set<string>;
__identifier?: EntityIdentifier;
__managed: boolean;
__processing: boolean;
__schema?: string;
__populated: boolean;
__onLoadFired: boolean;
Expand Down
19 changes: 15 additions & 4 deletions packages/core/src/unit-of-work/UnitOfWork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ export class UnitOfWork {
try {
await this.eventManager.dispatchEvent(EventType.beforeFlush, { em: this.em, uow: this });
this.computeChangeSets();
this.changeSets.forEach(cs => {
cs.entity.__helper.__processing = true;
});
await this.eventManager.dispatchEvent(EventType.onFlush, { em: this.em, uow: this });

// nothing to do, do not start transaction
Expand All @@ -335,8 +338,9 @@ export class UnitOfWork {
}
this.resetTransaction(oldTx);

// To allow working with the UoW in after flush handlers we need to unset the `working` flag early.
this.working = false;
this.changeSets.forEach(cs => {
cs.entity.__helper.__processing = false;
});

// To allow flushing via `Promise.all()` while still supporting queries inside after flush handler,
// we need to run the flush hooks in a separate async context, as we need to skip flush hooks if they
Expand Down Expand Up @@ -417,7 +421,9 @@ export class UnitOfWork {
}

for (const entity of this.orphanRemoveStack) {
this.removeStack.add(entity);
if (!entity.__helper!.__processing) {
this.removeStack.add(entity);
}
}

// Check insert stack if there are any entities matching something from delete stack. This can happen when recreating entities.
Expand All @@ -430,6 +436,10 @@ export class UnitOfWork {
}

for (const entity of this.removeStack) {
if (entity.__helper!.__processing) {
continue;
}

const deletePkHash = [entity.__helper!.getSerializedPrimaryKey(), ...this.expandUniqueProps(entity)];
let type = ChangeSetType.DELETE;

Expand Down Expand Up @@ -478,7 +488,7 @@ export class UnitOfWork {
visited.add(entity);
const wrapped = entity.__helper!;

if (!wrapped.__initialized || this.removeStack.has(entity) || this.orphanRemoveStack.has(entity)) {
if (!wrapped.__initialized || entity.__helper!.__processing || this.removeStack.has(entity) || this.orphanRemoveStack.has(entity)) {
return;
}

Expand Down Expand Up @@ -625,6 +635,7 @@ export class UnitOfWork {
}

private postCommitCleanup(): void {
this.changeSets.forEach(cs => cs.entity.__helper!.__processing = false);
this.persistStack.clear();
this.removeStack.clear();
this.orphanRemoveStack.clear();
Expand Down
21 changes: 13 additions & 8 deletions tests/EntityManager.postgre.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2072,12 +2072,13 @@ describe('EntityManagerPostgre', () => {
});

test('GH #2934', async () => {
const users = [
{ name: 'A', email: 'A' },
{ name: 'B', email: 'B' },
{ name: 'C', email: 'C' },
{ name: 'D', email: 'D' },
];
// This test used to be flaky in CI where it runs with fewer resources. To mimic this behaviour, we can run it with
// larger payload and many times in a row via turning `heavy` to `true`.
const heavy = false; // heavy mode takes around 10 minutes to complete (half a million entities, each doing select + insert)
const length = heavy ? 50 : 4;
const runs = heavy ? 10000 : 3;

const users = Array.from({ length }).map((_, i) => ({ name: `name ${i}`, email: `email ${i}` }));

async function saveUser(options: FilterQuery<Author2>): Promise<Author2> {
let user = await orm.em.findOne(Author2, options);
Expand All @@ -2092,8 +2093,12 @@ describe('EntityManagerPostgre', () => {
return user;
}

const res = await Promise.all(users.map(userData => saveUser(userData)));
res.forEach(user => expect(user.id).toBeDefined());
for (let i = 0; i < runs; i++) {
await orm.em.nativeDelete(Author2, {});
orm.em.clear();
const res = await Promise.all(users.map(userData => saveUser(userData)));
res.forEach(user => expect(user.id).toBeDefined());
}
});

test('required fields validation', async () => {
Expand Down

0 comments on commit b62799a

Please sign in to comment.