Skip to content

Commit

Permalink
fix(core): make ChangeSet.getPrimaryKey() response stable
Browse files Browse the repository at this point in the history
This also fixes concurrency checks in mongo multi update queries.
  • Loading branch information
B4nan committed Nov 11, 2022
1 parent d68b9bd commit d32c956
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 17 deletions.
17 changes: 12 additions & 5 deletions packages/core/src/unit-of-work/ChangeSet.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { inspect } from 'util';
import type { EntityData, EntityMetadata, EntityDictionary, Primary } from '../typings';
import { helper } from '../entity/wrap';
import { Utils } from '../utils/Utils';

export class ChangeSet<T> {

Expand All @@ -20,15 +21,21 @@ export class ChangeSet<T> {
getPrimaryKey(object = false): Primary<T> | null {
if (!this.originalEntity) {
this.primaryKey ??= helper(this.entity).getPrimaryKey(true);
} else if (this.meta.compositePK || object) {
this.primaryKey = this.meta.primaryKeys.reduce((o, pk) => {
o[pk] = (this.originalEntity as T)[pk];
return o;
}, {} as T) as any;
} else if (this.meta.compositePK) {
this.primaryKey = this.meta.primaryKeys.map(pk => (this.originalEntity as T)[pk]) as Primary<T>;
} else {
this.primaryKey = (this.originalEntity as T)[this.meta.primaryKeys[0]] as Primary<T>;
}

if (object && this.primaryKey != null) {
const pks = Utils.asArray(this.primaryKey);
const ret = this.meta.primaryKeys.reduce((o, pk, idx) => {
o[pk] = pks[idx] as any;
return o;
}, {} as T);
return ret as any;
}

return this.primaryKey ?? null;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/unit-of-work/ChangeSetPersister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ export class ChangeSetPersister {
convertCustomTypes: false,
processCollections: false,
});
const cond = changeSets.map(cs => cs.getPrimaryKey() as Dictionary);
const cond = changeSets.map(cs => cs.getPrimaryKey(true) as Dictionary);

changeSets.forEach((changeSet, idx) => {
this.checkConcurrencyKeys(meta, changeSet, cond[idx]);
Expand Down
4 changes: 4 additions & 0 deletions packages/knex/src/AbstractSqlDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
sql += pks.length > 1 ? `(${pks.map(pk => `${this.platform.quoteIdentifier(pk)}`).join(', ')})` : this.platform.quoteIdentifier(pks[0]);

const conds = where.map(cond => {
if (Utils.isPlainObject(cond) && Utils.getObjectKeysSize(cond) === 1) {
cond = Object.values(cond)[0];
}

if (pks.length > 1) {
pkProps.forEach(pk => {
if (Array.isArray(cond![pk as string])) {
Expand Down
36 changes: 30 additions & 6 deletions packages/mongodb/src/MongoConnection.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,38 @@
import type {
Collection, Db, MongoClientOptions, ClientSession, BulkWriteResult, Filter, UpdateFilter, OptionalUnlessRequiredId, UpdateResult,
DeleteResult, InsertManyResult, InsertOneResult, TransactionOptions,
BulkWriteResult,
ClientSession,
Collection,
Db,
DeleteResult,
Filter,
InsertManyResult,
InsertOneResult,
MongoClientOptions,
OptionalUnlessRequiredId,
TransactionOptions,
UpdateFilter,
UpdateResult,
} from 'mongodb';
import { MongoClient } from 'mongodb';
import { ObjectId } from 'bson';
import { inspect } from 'util';
import type {
ConnectionType, ConnectionConfig, QueryResult, Transaction, QueryOrderMap, FilterQuery, AnyEntity, EntityName, Dictionary,
EntityData, TransactionEventBroadcaster, IsolationLevel, Configuration, ConnectionOptions,
AnyEntity,
Configuration,
ConnectionConfig,
ConnectionOptions,
ConnectionType,
Dictionary,
EntityData,
EntityName,
FilterQuery,
IsolationLevel,
QueryOrderMap,
QueryResult,
Transaction,
TransactionEventBroadcaster,
} from '@mikro-orm/core';
import { Connection, Utils, QueryOrder, EventType, ValidationError } from '@mikro-orm/core';
import { Connection, EventType, QueryOrder, Utils, ValidationError } from '@mikro-orm/core';

export class MongoConnection extends Connection {

Expand Down Expand Up @@ -279,7 +302,8 @@ export class MongoConnection extends Connection {
const bulk = this.getCollection<T>(collection).initializeUnorderedBulkOp(options);

(data as T[]).forEach((row, idx) => {
const cond = { _id: (where as Dictionary[])[idx] };
const id = (where as Dictionary[])[idx];
const cond = Utils.isPlainObject(id) ? id : { _id: id };
const doc = this.createUpdatePayload(row);
query += log(() => `bulk.find(${this.logObject(cond)}).update(${this.logObject(doc)});\n`);
bulk.find(cond).update(doc);
Expand Down
2 changes: 1 addition & 1 deletion tests/features/composite-keys/composite-keys.mysql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ describe('composite keys in mysql', () => {
const mock = mockLogger(orm);
await orm.em.flush();
expect(mock).toBeCalledTimes(3);
expect(mock.mock.calls[1][0]).toMatch("update `car2` set `year` = 2015 where `name` = 'Audi A8' and `year` = 2010");
expect(mock.mock.calls[1][0]).toMatch("update `car2` set `year` = 2015 where (`name`, `year`) in (('Audi A8', 2010))");

const c = await orm.em.fork().findOne(Car2, car);
expect(c).toBeDefined();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ describe('optimistic locking - concurrency check (mongo)', () => {
test2.age = 35;
await orm.em.flush();
expect(mock.mock.calls[0][0]).toMatch(`db.getCollection('concurrency-check-user').find({ '$or': [ { _id: '1', firstName: 'Jakub', lastName: 'Smith', age: 20 }, { _id: '2', firstName: 'John', lastName: 'Smith', age: 25 } ] }, { projection: { _id: 1, firstName: 1, lastName: 1, age: 1 } }).toArray();`);
expect(mock.mock.calls[1][0]).toMatch(`bulk = db.getCollection('concurrency-check-user').initializeUnorderedBulkOp({});bulk.find({ _id: '1' }).update({ '$set': { age: 30 } });bulk.find({ _id: '2' }).update({ '$set': { age: 35 } });bulk.execute()`);
expect(mock.mock.calls[1][0]).toMatch(`bulk = db.getCollection('concurrency-check-user').initializeUnorderedBulkOp({});bulk.find({ _id: '1', firstName: 'Jakub', lastName: 'Smith', age: 20 }).update({ '$set': { age: 30 } });bulk.find({ _id: '2', firstName: 'John', lastName: 'Smith', age: 25 }).update({ '$set': { age: 35 } });bulk.execute()`);

mock.mockReset();

test1.age = 40;
test2.age = 45;
await orm.em.flush();
expect(mock.mock.calls[0][0]).toMatch(`db.getCollection('concurrency-check-user').find({ '$or': [ { _id: '1', firstName: 'Jakub', lastName: 'Smith', age: 30 }, { _id: '2', firstName: 'John', lastName: 'Smith', age: 35 } ] }, { projection: { _id: 1, firstName: 1, lastName: 1, age: 1 } }).toArray();`);
expect(mock.mock.calls[1][0]).toMatch(`bulk = db.getCollection('concurrency-check-user').initializeUnorderedBulkOp({});bulk.find({ _id: '1' }).update({ '$set': { age: 40 } });bulk.find({ _id: '2' }).update({ '$set': { age: 45 } });bulk.execute()`);
expect(mock.mock.calls[1][0]).toMatch(`bulk = db.getCollection('concurrency-check-user').initializeUnorderedBulkOp({});bulk.find({ _id: '1', firstName: 'Jakub', lastName: 'Smith', age: 30 }).update({ '$set': { age: 40 } });bulk.find({ _id: '2', firstName: 'John', lastName: 'Smith', age: 35 }).update({ '$set': { age: 45 } });bulk.execute()`);

mock.mockReset();

Expand All @@ -138,7 +138,7 @@ describe('optimistic locking - concurrency check (mongo)', () => {
test2.age = 46;
await orm.em.flush();
expect(mock.mock.calls[0][0]).toMatch(`db.getCollection('concurrency-check-user').find({ '$or': [ { _id: '1', firstName: 'Jakub', lastName: 'Smith', age: 40 }, { _id: '2', firstName: 'John', lastName: 'Smith', age: 45 } ] }, { projection: { _id: 1, firstName: 1, lastName: 1, age: 1 } }).toArray();`);
expect(mock.mock.calls[1][0]).toMatch(`bulk = db.getCollection('concurrency-check-user').initializeUnorderedBulkOp({});bulk.find({ _id: '1' }).update({ '$set': { age: 41, other: 'asd' } });bulk.find({ _id: '2' }).update({ '$set': { age: 46, other: 'lololol' } });bulk.execute()`);
expect(mock.mock.calls[1][0]).toMatch(`bulk = db.getCollection('concurrency-check-user').initializeUnorderedBulkOp({});bulk.find({ _id: '1', firstName: 'Jakub', lastName: 'Smith', age: 40 }).update({ '$set': { age: 41, other: 'asd' } });bulk.find({ _id: '2', firstName: 'John', lastName: 'Smith', age: 45 }).update({ '$set': { age: 46, other: 'lololol' } });bulk.execute()`);
});

test('throws when someone changed the state in the meantime (batch update)', async () => {
Expand Down
2 changes: 1 addition & 1 deletion tests/issues/GH910.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ describe('GH issue 910', () => {
expect(mock.mock.calls[2][0]).toMatch('insert into `cart_item` (`cart_id`, `sku`, `quantity`) values (\'123\', \'sku1\', 10), (\'123\', \'sku2\', 10)');
expect(mock.mock.calls[3][0]).toMatch('commit');
expect(mock.mock.calls[4][0]).toMatch('begin');
expect(mock.mock.calls[5][0]).toMatch('update `cart_item` set `quantity` = 33 where `cart_id` = \'123\' and `sku` = \'sku2\'');
expect(mock.mock.calls[5][0]).toMatch('update `cart_item` set `quantity` = 33 where (`cart_id`, `sku`) in ((\'123\', \'sku2\'))');
expect(mock.mock.calls[6][0]).toMatch('commit');
await orm.close();
});
Expand Down

0 comments on commit d32c956

Please sign in to comment.