Skip to content

Commit

Permalink
perf(core): implement bulk updates in mongo driver
Browse files Browse the repository at this point in the history
Related: #732
  • Loading branch information
B4nan committed Sep 28, 2020
1 parent b005353 commit 5f347c1
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 19 deletions.
13 changes: 5 additions & 8 deletions packages/knex/src/AbstractSqlDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
const keys = new Set<string>();
data.forEach(row => Object.keys(row).forEach(k => keys.add(k)));
const pkCond = Utils.flatten(meta.primaryKeys.map(pk => meta.properties[pk].fieldNames)).map(pk => `${knex.ref(pk)} = ?`).join(' and ');
let res: QueryResult = { affectedRows: 0, insertId: 0, row: {} };

keys.forEach(key => {
meta.properties[key].fieldNames.forEach((fieldName: string, fieldNameIdx: number) => {
Expand All @@ -272,14 +271,12 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
});
});

if (keys.size > 0) {
const qb = this.createQueryBuilder<T>(entityName, ctx, true)
.unsetFlag(QueryFlag.CONVERT_CUSTOM_TYPES)
.update(values)
.where({ [Utils.getPrimaryKeyHash(meta.primaryKeys)]: where });
const qb = this.createQueryBuilder<T>(entityName, ctx, true)
.unsetFlag(QueryFlag.CONVERT_CUSTOM_TYPES)
.update(values)
.where({ [Utils.getPrimaryKeyHash(meta.primaryKeys)]: where });

res = await this.rethrow(qb.execute('run', false));
}
const res = await this.rethrow(qb.execute('run', false));

const pkConds = data.map((_, idx) => Utils.extractPK<T>(where[idx], meta)!);

Expand Down
35 changes: 28 additions & 7 deletions packages/mongodb/src/MongoConnection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
Collection, Db, DeleteWriteOpResultObject, InsertOneWriteOpResult, MongoClient, MongoClientOptions,
ObjectId, UpdateWriteOpResult, FilterQuery as MongoFilterQuery, ClientSession, SortOptionObject,
ObjectId, UpdateWriteOpResult, FilterQuery as MongoFilterQuery, ClientSession, SortOptionObject, BulkWriteResult,
} from 'mongodb';
import { inspect } from 'util';
import {
Expand Down Expand Up @@ -125,6 +125,10 @@ export class MongoConnection extends Connection {
return this.runQuery<T>('updateMany', collection, data, where, ctx);
}

async bulkUpdateMany<T extends { _id: any }>(collection: string, where: FilterQuery<T>[], data: Partial<T>[], ctx?: Transaction<ClientSession>): Promise<QueryResult> {
return this.runQuery<T>('bulkUpdateMany', collection, data, where, ctx);
}

async deleteMany<T extends { _id: any }>(collection: string, where: FilterQuery<T>, ctx?: Transaction<ClientSession>): Promise<QueryResult> {
return this.runQuery<T>('deleteMany', collection, undefined, where, ctx);
}
Expand Down Expand Up @@ -183,31 +187,48 @@ export class MongoConnection extends Connection {
super.logQuery(query, took);
}

private async runQuery<T extends { _id: any }, U extends QueryResult | number = QueryResult>(method: 'insertOne' | 'insertMany' | 'updateMany' | 'deleteMany' | 'countDocuments', collection: string, data?: Partial<T> | Partial<T>[], where?: FilterQuery<T>, ctx?: Transaction<ClientSession>): Promise<U> {
private async runQuery<T extends { _id: any }, U extends QueryResult | number = QueryResult>(method: 'insertOne' | 'insertMany' | 'updateMany' | 'bulkUpdateMany' | 'deleteMany' | 'countDocuments', collection: string, data?: Partial<T> | Partial<T>[], where?: FilterQuery<T> | FilterQuery<T>[], ctx?: Transaction<ClientSession>): Promise<U> {
collection = this.getCollectionName(collection);
const logger = this.config.getLogger();
const options: Dictionary = { session: ctx };
const now = Date.now();
let res: InsertOneWriteOpResult<T> | UpdateWriteOpResult | DeleteWriteOpResultObject | number;
let res: InsertOneWriteOpResult<T> | UpdateWriteOpResult | DeleteWriteOpResultObject | BulkWriteResult | number;
let query: string;
const log = (msg: () => string) => logger.isEnabled('query') ? msg() : '';

switch (method) {
case 'insertOne':
query = `db.getCollection('${collection}').insertOne(${this.logObject(data)}, ${this.logObject(options)});`;
query = log(() => `db.getCollection('${collection}').insertOne(${this.logObject(data)}, ${this.logObject(options)});`);
res = await this.getCollection(collection).insertOne(data, options);
break;
case 'insertMany':
query = `db.getCollection('${collection}').insertMany(${this.logObject(data)}, ${this.logObject(options)});`;
query = log(() => `db.getCollection('${collection}').insertMany(${this.logObject(data)}, ${this.logObject(options)});`);
res = await this.getCollection(collection).insertMany(data as Partial<T>[], options);
break;
case 'updateMany': {
const payload = Object.keys(data!).some(k => k.startsWith('$')) ? data : { $set: data };
query = `db.getCollection('${collection}').updateMany(${this.logObject(where)}, ${this.logObject(payload)}, ${this.logObject(options)});`;
query = log(() => `db.getCollection('${collection}').updateMany(${this.logObject(where)}, ${this.logObject(payload)}, ${this.logObject(options)});`);
res = await this.getCollection(collection).updateMany(where as MongoFilterQuery<T>, payload!, options);
break;
}
case 'bulkUpdateMany': {
query = log(() => `bulk = db.getCollection('${collection}').initializeOrderedBulkOp(${this.logObject(options)});\n`);
const bulk = this.getCollection(collection).initializeOrderedBulkOp(options);

(data as T[]).forEach((row, idx) => {
const cond = { _id: (where as Dictionary[])[idx] };
const doc = { $set: row };
query += log(() => `bulk.find(${this.logObject(cond)}).update(${this.logObject(doc)});\n`);
bulk.find(cond).update(doc);
});

query += log(() => `bulk.execute()`);
res = await bulk.execute();
break;
}
case 'deleteMany':
case 'countDocuments':
query = `db.getCollection('${collection}').${method}(${this.logObject(where)}, ${this.logObject(options)});`;
query = log(() => `db.getCollection('${collection}').${method}(${this.logObject(where)}, ${this.logObject(options)});`);
res = await this.getCollection(collection)[method as 'deleteMany'](where as MongoFilterQuery<T>, options); // cast to deleteMany to fix some typing weirdness
break;
}
Expand Down
5 changes: 5 additions & 0 deletions packages/mongodb/src/MongoDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {
return this.rethrow(this.getConnection('write').updateMany(entityName, where as FilterQuery<T>, data as { _id: any }, ctx));
}

async nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction<ClientSession>): Promise<QueryResult> {
data = data.map(row => this.renameFields(entityName, row));
return this.rethrow(this.getConnection('write').bulkUpdateMany(entityName, where, data as { _id: any }[], ctx));
}

async nativeDelete<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, ctx?: Transaction<ClientSession>): Promise<QueryResult> {
if (Utils.isPrimaryKey(where)) {
where = this.buildFilterById(entityName, where as string);
Expand Down
4 changes: 0 additions & 4 deletions packages/mongodb/src/MongoPlatform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ export class MongoPlatform extends Platform {
return true;
}

usesBatchUpdates(): boolean {
return false;
}

marshallArray(values: string[]): string {
return values as unknown as string;
}
Expand Down
1 change: 1 addition & 0 deletions tests/DatabaseDriver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ describe('DatabaseDriver', () => {
expect(driver.createEntityManager()).toBeInstanceOf(EntityManager);
expect(driver.getPlatform().getRepositoryClass()).toBe(EntityRepository);
await expect(driver.aggregate('', [])).rejects.toThrowError('Aggregations are not supported by Driver driver');
await expect(driver.nativeUpdateMany('', [], [])).rejects.toThrowError('Batch updates are not supported by Driver driver');
await expect(driver.lockPessimistic({}, LockMode.NONE)).rejects.toThrowError('Pessimistic locks are not supported by Driver driver');
const e1 = driver.convertException(new Error('test'));
const e2 = driver.convertException(e1);
Expand Down
17 changes: 17 additions & 0 deletions tests/EntityManager.mongo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,23 @@ describe('EntityManagerMongo', () => {
expect(author.books.getItems().every(b => b.id)).toBe(true);
});

// this should run in ~600ms (when running single test locally)
test('perf: batch insert and update', async () => {
const authors = new Set<Author>();

for (let i = 1; i <= 1000; i++) {
const author = new Author(`Jon Snow ${i}`, `snow-${i}@wall.st`);
orm.em.persist(author);
authors.add(author);
}

await orm.em.flush();
authors.forEach(author => expect(author.id).toBeDefined());

authors.forEach(a => a.termsAccepted = true);
await orm.em.flush();
});

test('exceptions', async () => {
const driver = orm.em.getDriver();
await driver.nativeInsert(Author.name, { name: 'author', email: 'email' });
Expand Down
20 changes: 20 additions & 0 deletions tests/composite-keys.mysql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,26 @@ describe('composite keys in mysql', () => {
expect(connMock).toBeCalledTimes(1);
});

test('composite entity in m:1 relationship (multi update)', async () => {
const car1 = new Car2('Audi A8', 2011, 100_000);
const car2 = new Car2('Audi A8', 2012, 200_000);
const car3 = new Car2('Audi A8', 2013, 300_000);
const owner1 = new CarOwner2('John Doe 1');
const owner2 = new CarOwner2('John Doe 2');
owner1.car = car1;
owner2.car = car2;
await orm.em.persistAndFlush([owner1, owner2]);

owner1.car = car2;
owner2.car = car3;
await orm.em.flush();
orm.em.clear();

const owners = await orm.em.find(CarOwner2, {}, { orderBy: { name: 'asc' } });
expect(owners[0].car.year).toBe(2012);
expect(owners[1].car.year).toBe(2013);
});

test('composite entity in m:n relationship, both entities are composite', async () => {
const car1 = new Car2('Audi A8', 2011, 100_000);
const car2 = new Car2('Audi A8', 2012, 150_000);
Expand Down

0 comments on commit 5f347c1

Please sign in to comment.