Skip to content

Commit

Permalink
feat(core): allow fine-grained control over em.upsert() (#4669)
Browse files Browse the repository at this point in the history
Adds 4 new options to `em.upsert()`:

- `onConflictFields?: (keyof T)[]` to control the conflict clause
- `onConflictAction?: 'ignore' | 'merge'` used ignore and merge as that
is how the QB methods are called
- `onConflictMergeFields?: (keyof T)[]` to control the merge clause
- `onConflictExcludeFields?: (keyof T)[]` to omit fields from the merge
clause

Closes #4325
Closes #4602
  • Loading branch information
B4nan committed Sep 8, 2023
1 parent 54f4d02 commit ab0ddee
Show file tree
Hide file tree
Showing 15 changed files with 1,352 additions and 228 deletions.
31 changes: 31 additions & 0 deletions docs/docs/entity-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,37 @@ const [author1, author2, author3] = await em.upsertMany(Author, [
]);
```
By default, the EntityManager will prefer using the primary key, and fallback to the first unique property with a value. Sometimes this might not be the wanted behaviour, one example is when you generate the primary key via property initializer, e.g. with `uuid.v4()`. For those advanced cases, you can control how the underlying upserting logic works via the following options:
- `onConflictFields?: (keyof T)[]` to control the conflict clause
- `onConflictAction?: 'ignore' | 'merge'` used ignore and merge as that is how the QB methods are called
- `onConflictMergeFields?: (keyof T)[]` to control the merge clause
- `onConflictExcludeFields?: (keyof T)[]` to omit fields from the merge clause
```ts
const [author1, author2, author3] = await em.upsertMany(Author, [{ ... }, { ... }, { ... }], {
onConflictFields: ['email'], // specify a manual set of fields pass to the on conflict clause
onConflictAction: 'merge',
onConflictExcludeFields: ['id'],
});
```
This will generate query similar to the following:
```sql
insert into "author"
("id", "current_age", "email", "foo")
values
(1, 41, 'a1', true),
(2, 42, 'a2', true),
(5, 43, 'a3', true)
on conflict ("email")
do update set
"current_age" = excluded."current_age",
"foo" = excluded."foo"
returning "_id", "current_age", "foo", "bar"
```
## Refreshing entity state
We can use `em.refresh(entity)` to synchronize the entity state with database. This is a shortcut for calling `em.findOne()` with `refresh: true` and disabled auto-flush.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
"lerna": "6.6.2",
"lint-staged": "14.0.1",
"mongodb": "5.8.1",
"mongodb-memory-server": "^8.12.2",
"mongodb-memory-server": "8.15.1",
"node-gyp": "^9.3.1",
"rimraf": "5.0.1",
"ts-jest": "29.1.1",
Expand Down
84 changes: 61 additions & 23 deletions packages/core/src/EntityManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { inspect } from 'util';
import { QueryHelper, TransactionContext, Utils, type Configuration } from './utils';
import { EntityAssigner, EntityFactory, EntityLoader, EntityValidator, helper, Reference, type AssignOptions, type EntityLoaderOptions, type EntityRepository, type IdentifiedReference } from './entity';
import { QueryHelper, TransactionContext, Utils, type Configuration, getOnConflictReturningFields } from './utils';
import { EntityAssigner, EntityFactory, EntityLoader, EntityValidator, helper, Reference, type AssignOptions, type EntityLoaderOptions, type EntityRepository } from './entity';
import { ChangeSet, ChangeSetType, UnitOfWork } from './unit-of-work';
import type {
CountOptions,
Expand All @@ -15,6 +15,8 @@ import type {
LockOptions,
NativeInsertUpdateOptions,
UpdateOptions,
UpsertOptions,
UpsertManyOptions,
} from './drivers';
import type {
AnyEntity,
Expand All @@ -36,6 +38,7 @@ import type {
PopulateOptions,
Primary,
RequiredEntityData,
Ref,
} from './typings';
import { EventType, FlushMode, LoadStrategy, LockMode, PopulateHint, ReferenceType, SCALAR_TYPES, type TransactionOptions } from './enums';
import type { MetadataStorage } from './metadata';
Expand All @@ -44,10 +47,6 @@ import { EventManager, TransactionEventBroadcaster, type FlushEventArgs } from '
import type { EntityComparator } from './utils/EntityComparator';
import { OptimisticLockError, ValidationError } from './errors';

export interface UpsertManyOptions<Entity> extends Omit<NativeInsertUpdateOptions<Entity>, 'upsert'> {
batchSize?: number;
}

/**
* The EntityManager is the central access point to ORM functionality. It is a facade to all different ORM subsystems
* such as UnitOfWork, Query Language and Repository API.
Expand Down Expand Up @@ -544,7 +543,7 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
*
* If the entity is already present in current context, there won't be any queries - instead, the entity data will be assigned and an explicit `flush` will be required for those changes to be persisted.
*/
async upsert<Entity extends object>(entityNameOrEntity: EntityName<Entity> | Entity, data?: EntityData<Entity> | Entity, options: NativeInsertUpdateOptions<Entity> = {}): Promise<Entity> {
async upsert<Entity extends object>(entityNameOrEntity: EntityName<Entity> | Entity, data?: EntityData<Entity> | Entity, options: UpsertOptions<Entity> = {}): Promise<Entity> {
const em = this.getContext(false);

let entityName: EntityName<Entity>;
Expand Down Expand Up @@ -583,10 +582,10 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
}
}

const unique = meta.props.filter(p => p.unique).map(p => p.name);
const unique = options.onConflictFields as string[] ?? meta.props.filter(p => p.unique).map(p => p.name);
const propIndex = unique.findIndex(p => data![p] != null);

if (where == null) {
if (options.onConflictFields || where == null) {
if (propIndex >= 0) {
where = { [unique[propIndex]]: data[unique[propIndex]] } as FilterQuery<Entity>;
} else if (meta.uniques.length > 0) {
Expand Down Expand Up @@ -629,15 +628,19 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
convertCustomTypes: true,
});

em.unitOfWork.getChangeSetPersister().mapReturnedValues(entity, data, ret.row, meta);
em.unitOfWork.getChangeSetPersister().mapReturnedValues(entity, data, ret.row, meta, true);
const uniqueFields = options.onConflictFields ?? (Utils.isPlainObject(where) ? Object.keys(where) : meta!.primaryKeys) as (keyof Entity)[];
const returning = getOnConflictReturningFields(meta, data, uniqueFields, options) as string[];

if (!helper(entity).hasPrimaryKey()) {
const pk = await this.driver.findOne(meta.className, where, {
fields: meta.comparableProps.filter(p => !p.lazy && !(p.name in data!)).map(p => p.name) as EntityField<Entity>[],
if (options.onConflictAction === 'ignore' || !helper(entity).hasPrimaryKey() || (returning.length > 0 && !this.getPlatform().usesReturningStatement())) {
const where = {} as FilterQuery<Entity>;
uniqueFields.forEach(prop => where[prop as string] = data![prop as string]);
const data2 = await this.driver.findOne(meta.className, where, {
fields: returning as EntityField<Entity>[],
ctx: em.transactionContext,
convertCustomTypes: true,
});
em.entityFactory.mergeData(meta, entity, pk!, { initialized: true });
em.getHydrator().hydrate(entity, meta, data2!, em.entityFactory, 'full');
}

// recompute the data as there might be some values missing (e.g. those with db column defaults)
Expand Down Expand Up @@ -742,7 +745,7 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
const unique = meta.props.filter(p => p.unique).map(p => p.name);
propIndex = unique.findIndex(p => row[p] != null);

if (where == null) {
if (options.onConflictFields || where == null) {
if (propIndex >= 0) {
where = { [unique[propIndex]]: row[unique[propIndex]] } as FilterQuery<Entity>;
} else if (meta.uniques.length > 0) {
Expand Down Expand Up @@ -800,7 +803,7 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
convertCustomTypes: true,
});

em.unitOfWork.getChangeSetPersister().mapReturnedValues(entity, data![i], res.rows?.[i], meta);
em.unitOfWork.getChangeSetPersister().mapReturnedValues(entity, Utils.isEntity(data![i]) ? {} : data![i], res.rows?.[i], meta, true);

if (!helper(entity).hasPrimaryKey()) {
loadPK.set(entity, allWhere[i]);
Expand All @@ -811,22 +814,30 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
});

// skip if we got the PKs via returning statement (`rows`)
if (!res.rows?.length && loadPK.size > 0) {
const uniqueFields = options.onConflictFields ?? (Utils.isPlainObject(allWhere![0]) ? Object.keys(allWhere![0]) : meta!.primaryKeys) as (keyof Entity)[];
const returning = getOnConflictReturningFields(meta, data[0], uniqueFields, options) as string[];
const reloadFields = returning.length > 0 && !this.getPlatform().usesReturningStatement();

if (options.onConflictAction === 'ignore' || (!res.rows?.length && loadPK.size > 0) || reloadFields) {
const unique = meta.hydrateProps.filter(p => !p.lazy).map(p => p.name);
const add = new Set(propIndex! >= 0 ? [unique[propIndex!]] : []);

for (const cond of loadPK.values()) {
Object.keys(cond).forEach(key => add.add(key));
}

const pks = await this.driver.find(meta.className, { $or: [...loadPK.values()] as Dictionary[] }, {
fields: meta.comparableProps.filter(p => !p.lazy && !(p.name in data![0] && !add.has(p.name))).map(p => p.name),
const where = { $or: [] as Dictionary[] };
data.forEach((item, idx) => {
uniqueFields.forEach(prop => where.$or[idx] = { [prop]: item[prop as string] });
});
const data2 = await this.driver.find(meta.className, where, {
fields: returning.concat(...add).concat(...uniqueFields as string[]),
ctx: em.transactionContext,
convertCustomTypes: true,
});

for (const [entity, cond] of loadPK.entries()) {
const pk = pks.find(pk => {
const row = data2.find(pk => {
const tmp = {};
add.forEach(k => {
if (!meta.properties[k]?.primary) {
Expand All @@ -837,11 +848,38 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
});

/* istanbul ignore next */
if (!pk) {
if (!row) {
throw new Error(`Cannot find matching entity for condition ${JSON.stringify(cond)}`);
}

em.entityFactory.mergeData(meta, entity, pk, { initialized: true });
em.getHydrator().hydrate(entity, meta, row, em.entityFactory, 'full');
}

if (loadPK.size !== data2.length) {
for (let i = 0; i < allData.length; i++) {
const data = allData[i];
const cond = uniqueFields.reduce((a, b) => {
// @ts-ignore
a[b] = data[b];
return a;
}, {});
const entity = entitiesByData.get(data);
const row = data2.find(item => {
const pk = uniqueFields.reduce((a, b) => {
// @ts-ignore
a[b] = item[b];
return a;
}, {});
return this.comparator.matching(entityName, cond, pk);
});

/* istanbul ignore next */
if (!row) {
throw new Error(`Cannot find matching entity for condition ${JSON.stringify(cond)}`);
}

em.getHydrator().hydrate(entity, meta, row, em.entityFactory, 'full');
}
}
}

Expand Down Expand Up @@ -1161,7 +1199,7 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
/**
* Gets a reference to the entity identified by the given type and identifier without actually loading it, if the entity is not yet loaded
*/
getReference<Entity extends object, PK extends keyof Entity>(entityName: EntityName<Entity>, id: Primary<Entity>, options: Omit<GetReferenceOptions, 'wrapped'> & { wrapped: true }): IdentifiedReference<Entity, PK>;
getReference<Entity extends object, PK extends keyof Entity>(entityName: EntityName<Entity>, id: Primary<Entity>, options: Omit<GetReferenceOptions, 'wrapped'> & { wrapped: true }): Ref<Entity, PK>;

/**
* Gets a reference to the entity identified by the given type and identifier without actually loading it, if the entity is not yet loaded
Expand Down
11 changes: 11 additions & 0 deletions packages/core/src/drivers/IDatabaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ export interface NativeInsertUpdateManyOptions<T> extends NativeInsertUpdateOpti
processCollections?: boolean;
}

export interface UpsertOptions<Entity> extends Omit<NativeInsertUpdateOptions<Entity>, 'upsert'> {
onConflictFields?: (keyof Entity)[];
onConflictAction?: 'ignore' | 'merge';
onConflictMergeFields?: (keyof Entity)[];
onConflictExcludeFields?: (keyof Entity)[];
}

export interface UpsertManyOptions<Entity> extends UpsertOptions<Entity> {
batchSize?: number;
}

export interface CountOptions<T extends object, P extends string = never> {
filters?: Dictionary<boolean | Dictionary> | string[] | boolean;
schema?: string;
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/unit-of-work/ChangeSetPersister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,10 @@ export class ChangeSetPersister {
* No need to handle composite keys here as they need to be set upfront.
* We do need to map to the change set payload too, as it will be used in the originalEntityData for new entities.
*/
mapReturnedValues<T extends object>(entity: T, payload: EntityDictionary<T>, row: Dictionary | undefined, meta: EntityMetadata<T>): void {
mapReturnedValues<T extends object>(entity: T, payload: EntityDictionary<T>, row: Dictionary | undefined, meta: EntityMetadata<T>, override = false): void {
if (this.platform.usesReturningStatement() && row && Utils.hasObjectKeys(row)) {
const data = meta.props.reduce((ret, prop) => {
if (prop.fieldNames && row[prop.fieldNames[0]] != null && entity[prop.name] == null) {
if (prop.fieldNames && row[prop.fieldNames[0]] != null && (override || entity[prop.name] == null)) {
ret[prop.name] = row[prop.fieldNames[0]];
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from './QueryHelper';
export * from './NullHighlighter';
export * from './EntityComparator';
export * from './AbstractSchemaGenerator';
export * from './upsert-utils';
40 changes: 40 additions & 0 deletions packages/core/src/utils/upsert-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import type { EntityData, EntityMetadata } from '../typings';
import type { UpsertOptions } from '../drivers/IDatabaseDriver';

/** @internal */
export function getOnConflictFields<T>(data: EntityData<T>, uniqueFields: (keyof T)[], options: UpsertOptions<T>): (keyof T)[] {
if (options.onConflictMergeFields) {
return options.onConflictMergeFields;
}

const keys = Object.keys(data).filter(f => !uniqueFields.includes(f as keyof T)) as (keyof T)[];

if (options.onConflictExcludeFields) {
return keys.filter(f => !options.onConflictExcludeFields!.includes(f));
}

return keys;
}

/** @internal */
export function getOnConflictReturningFields<T>(meta: EntityMetadata<T> | undefined, data: EntityData<T>, uniqueFields: (keyof T)[], options: UpsertOptions<T>): (keyof T)[] | '*' {
if (!meta) {
return '*';
}

const keys = meta.comparableProps.filter(p => !p.lazy && !p.embeddable && !uniqueFields.includes(p.name)).map(p => p.name) as (keyof T)[];

if (options.onConflictAction === 'ignore') {
return keys;
}

if (options.onConflictMergeFields) {
return keys.filter(key => !options.onConflictMergeFields!.includes(key));
}

if (options.onConflictExcludeFields) {
return [...new Set(keys.concat(...options.onConflictExcludeFields))];
}

return keys.filter(key => !(key in data));
}
Loading

0 comments on commit ab0ddee

Please sign in to comment.