Skip to content

Commit

Permalink
feat(core): add driver.nativeInsertMany() method (#688)
Browse files Browse the repository at this point in the history
This method can be used for bulk inserts. It works in all existing drivers,
but has some limitations in SQLite and MySQL/MariaDB. Those drivers do not
return all the just created PKs.

MySQL/MariaDB will return PK of the first entity in the batch. If we have
`innodb_autoinc_lock_mode` set to 0/1 it is possible to compute the following
PKs simply by incrementing the first PK value.
(see https://stackoverflow.com/a/16592867/3665878)

SQLite on the other hand will return only the last PK.

Postgres and MongoDB both return all the PKs in the `res.rows` array.

Related: #442
  • Loading branch information
B4nan committed Aug 9, 2020
1 parent 668363f commit 78b2341
Show file tree
Hide file tree
Showing 15 changed files with 168 additions and 6 deletions.
1 change: 1 addition & 0 deletions packages/core/src/connections/Connection.ts
Expand Up @@ -112,6 +112,7 @@ export interface QueryResult {
affectedRows: number;
insertId: number;
row?: Dictionary;
rows?: Dictionary[];
}

export interface ConnectionConfig {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/drivers/DatabaseDriver.ts
Expand Up @@ -27,6 +27,8 @@ export abstract class DatabaseDriver<C extends Connection> implements IDatabaseD

abstract async nativeInsert<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

abstract async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction): Promise<QueryResult>;

abstract async nativeUpdate<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

abstract async nativeDelete<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, ctx?: Transaction): Promise<QueryResult>;
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/drivers/IDatabaseDriver.ts
Expand Up @@ -36,6 +36,8 @@ export interface IDatabaseDriver<C extends Connection = Connection> {

nativeInsert<T>(entityName: string, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

nativeInsertMany<T>(entityName: string, data: EntityData<T>[], ctx?: Transaction): Promise<QueryResult>;

nativeUpdate<T>(entityName: string, where: FilterQuery<T>, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

nativeDelete<T>(entityName: string, where: FilterQuery<T>, ctx?: Transaction): Promise<QueryResult>;
Expand Down
2 changes: 1 addition & 1 deletion packages/knex/src/AbstractSqlConnection.ts
Expand Up @@ -113,7 +113,7 @@ export abstract class AbstractSqlConnection extends Connection {
const affectedRows = typeof res === 'number' ? res : 0;
const insertId = typeof res[0] === 'number' ? res[0] : 0;

return { insertId, affectedRows, row: res[0] };
return { insertId, affectedRows, row: res[0], rows: res };
}

protected abstract transformRawResult<T>(res: any, method: 'all' | 'get' | 'run'): T;
Expand Down
23 changes: 23 additions & 0 deletions packages/knex/src/AbstractSqlDriver.ts
Expand Up @@ -191,6 +191,29 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
return res;
}

async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<KnexTransaction>): Promise<QueryResult> {
const meta = this.metadata.get<T>(entityName, false, false);
const collections = data.map(d => this.extractManyToMany(entityName, d));
const pks = this.getPrimaryKeyFields(entityName);
const qb = this.createQueryBuilder(entityName, ctx, true);
const res = await this.rethrow(qb.insert(data).execute('run', false));
let pk: any[];

/* istanbul ignore next */
if (pks.length > 1) { // owner has composite pk
pk = data.map(d => Utils.getPrimaryKeyCond(d as T, pks));
} else {
pk = data.map((d, i) => d[pks[0]] ?? res.rows[i]?.[pks[0]]).map(d => [d]);
res.insertId = res.insertId || res.row[pks[0]];
}

for (let i = 0; i < collections.length; i++) {
await this.processManyToMany<T>(meta, pk[i], collections[i], false, ctx);
}

return res;
}

async nativeUpdate<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, data: EntityData<T>, ctx?: Transaction<KnexTransaction>): Promise<QueryResult> {
const meta = this.metadata.get<T>(entityName, false, false);
const pks = this.getPrimaryKeyFields(entityName);
Expand Down
4 changes: 4 additions & 0 deletions packages/knex/src/query/QueryBuilderHelper.ts
Expand Up @@ -52,6 +52,10 @@ export class QueryBuilderHelper {
}

processData(data: Dictionary): any {
if (Array.isArray(data)) {
return data.map(d => this.processData(d));
}

data = Object.assign({}, data); // copy first
const meta = this.metadata.find(this.entityName);

Expand Down
16 changes: 13 additions & 3 deletions packages/mongodb/src/MongoConnection.ts
Expand Up @@ -109,6 +109,10 @@ export class MongoConnection extends Connection {
return this.runQuery<T>('insertOne', collection, data, undefined, ctx);
}

async insertMany<T extends { _id: any }>(collection: string, data: Partial<T>[], ctx?: Transaction<ClientSession>): Promise<QueryResult> {
return this.runQuery<T>('insertMany', collection, data, undefined, ctx);
}

async updateMany<T extends { _id: any }>(collection: string, where: FilterQuery<T>, data: Partial<T>, ctx?: Transaction<ClientSession>): Promise<QueryResult> {
return this.runQuery<T>('updateMany', collection, data, where, ctx);
}
Expand Down Expand Up @@ -153,7 +157,7 @@ export class MongoConnection extends Connection {
super.logQuery(query, took, 'javascript');
}

private async runQuery<T extends { _id: any }, U extends QueryResult | number = QueryResult>(method: 'insertOne' | 'updateMany' | 'deleteMany' | 'countDocuments', collection: string, data?: Partial<T>, where?: FilterQuery<T>, ctx?: Transaction<ClientSession>): Promise<U> {
private async runQuery<T extends { _id: any }, U extends QueryResult | number = QueryResult>(method: 'insertOne' | 'insertMany' | 'updateMany' | 'deleteMany' | 'countDocuments', collection: string, data?: Partial<T> | Partial<T>[], where?: FilterQuery<T>, ctx?: Transaction<ClientSession>): Promise<U> {
collection = this.getCollectionName(collection);
const options: Dictionary = { session: ctx };
const now = Date.now();
Expand All @@ -165,6 +169,10 @@ export class MongoConnection extends Connection {
query = `db.getCollection('${collection}').insertOne(${this.logObject(data)}, ${this.logObject(options)});`;
res = await this.getCollection(collection).insertOne(data, options);
break;
case 'insertMany':
query = `db.getCollection('${collection}').insertMany(${this.logObject(data)}, ${this.logObject(options)});`;
res = await this.getCollection(collection).insertMany(data as Partial<T>[], options);
break;
case 'updateMany': {
const payload = Object.keys(data!).some(k => k.startsWith('$')) ? data : { $set: data };
query = `db.getCollection('${collection}').updateMany(${this.logObject(where)}, ${this.logObject(payload)}, ${this.logObject(options)});`;
Expand All @@ -189,8 +197,10 @@ export class MongoConnection extends Connection {

private transformResult(res: any): QueryResult {
return {
affectedRows: res.modifiedCount || res.deletedCount || 0,
insertId: res.insertedId,
affectedRows: res.modifiedCount || res.deletedCount || res.insertedCount || 0,
insertId: res.insertedId ?? res.insertedIds?.[0],
row: res.ops?.[0],
rows: res.ops,
};
}

Expand Down
7 changes: 6 additions & 1 deletion packages/mongodb/src/MongoDriver.ts
Expand Up @@ -22,7 +22,7 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {
return new MongoEntityManager(this.config, this, this.metadata, useContext) as unknown as EntityManager<D>;
}

async find<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, options: FindOptions<T>, ctx?: Transaction<ClientSession>): Promise<T[]> {
async find<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, options: FindOptions<T> = {}, ctx?: Transaction<ClientSession>): Promise<T[]> {
const fields = this.buildFields(entityName, options.populate as PopulateOptions<T>[] || [], options.fields);
where = this.renameFields(entityName, where);
const res = await this.rethrow(this.getConnection('read').find<T>(entityName, where, options.orderBy, options.limit, options.offset, fields, ctx));
Expand Down Expand Up @@ -52,6 +52,11 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {
return this.rethrow(this.getConnection('write').insertOne(entityName, data as { _id: any }, ctx));
}

async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<ClientSession>): Promise<QueryResult> {
data = data.map(d => this.renameFields(entityName, d));
return this.rethrow(this.getConnection('write').insertMany(entityName, data as any[], ctx));
}

async nativeUpdate<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, data: EntityData<T>, ctx?: Transaction<ClientSession>): Promise<QueryResult> {
if (Utils.isPrimaryKey(where)) {
where = this.buildFilterById(entityName, where as string);
Expand Down
4 changes: 4 additions & 0 deletions tests/DatabaseDriver.test.ts
Expand Up @@ -27,6 +27,10 @@ class Driver extends DatabaseDriver<Connection> {
return { affectedRows: 0, insertId: 0 };
}

async nativeInsertMany<T>(entityName: string, data: EntityData<T>[], ctx: Transaction | undefined): Promise<QueryResult> {
return { affectedRows: 0, insertId: 0 };
}

async nativeUpdate<T>(entityName: string, where: FilterQuery<T>, data: EntityData<T>, ctx: Transaction | undefined): Promise<QueryResult> {
return { affectedRows: 0, insertId: 0 };
}
Expand Down
16 changes: 16 additions & 0 deletions tests/EntityManager.mariadb.test.ts
Expand Up @@ -70,6 +70,22 @@ describe('EntityManagerMariaDb', () => {
expect(driver.getPlatform().denormalizePrimaryKey(1)).toBe(1);
expect(driver.getPlatform().denormalizePrimaryKey('1')).toBe('1');
await expect(driver.find(BookTag2.name, { books: { $in: [1] } })).resolves.not.toBeNull();

// multi inserts
const res = await driver.nativeInsertMany(Publisher2.name, [
{ name: 'test 1', type: PublisherType.GLOBAL },
{ name: 'test 2', type: PublisherType.LOCAL },
{ name: 'test 3', type: PublisherType.GLOBAL },
]);

// mysql returns the first inserted id
expect(res).toMatchObject({ insertId: 1, affectedRows: 0, row: 1, rows: [ 1 ] });
const res2 = await driver.find(Publisher2.name, {});
expect(res2).toMatchObject([
{ id: 1, name: 'test 1', type: PublisherType.GLOBAL },
{ id: 2, name: 'test 2', type: PublisherType.LOCAL },
{ id: 3, name: 'test 3', type: PublisherType.GLOBAL },
]);
});

test('driver appends errored query', async () => {
Expand Down
21 changes: 21 additions & 0 deletions tests/EntityManager.mongo.test.ts
Expand Up @@ -489,6 +489,27 @@ describe('EntityManagerMongo', () => {
await expect(driver.getConnection().execute('')).rejects.toThrowError('MongoConnection does not support generic execute method');
expect(driver.getConnection().getCollection(BookTag).collectionName).toBe('book-tag');
expect(orm.em.getCollection(BookTag).collectionName).toBe('book-tag');

// multi inserts
const res = await driver.nativeInsertMany(Publisher.name, [
{ name: 'test 1', type: 'GLOBAL' },
{ name: 'test 2', type: 'LOCAL' },
{ name: 'test 3', type: 'GLOBAL' },
]);

// mongo returns the persisted objects
expect(res).toMatchObject({ affectedRows: 3 });
expect(res.insertId).toBeInstanceOf(ObjectId);
expect(res.row?._id).toBeInstanceOf(ObjectId);
expect(res.rows?.[0]._id).toBeInstanceOf(ObjectId);
expect(res.rows?.[1]._id).toBeInstanceOf(ObjectId);
expect(res.rows?.[2]._id).toBeInstanceOf(ObjectId);
const res2 = await driver.find(Publisher.name, {});
expect(res2).toEqual([
{ _id: res.rows?.[0]._id, name: 'test 1', type: 'GLOBAL' },
{ _id: res.rows?.[1]._id, name: 'test 2', type: 'LOCAL' },
{ _id: res.rows?.[2]._id, name: 'test 3', type: 'GLOBAL' },
]);
});

test('ensure indexes', async () => {
Expand Down
16 changes: 16 additions & 0 deletions tests/EntityManager.mysql.test.ts
Expand Up @@ -87,6 +87,22 @@ describe('EntityManagerMySql', () => {
await conn.execute(orm.em.getKnex().raw('select 1'), [], 'all', tx);
await conn.execute(orm.em.getRepository(Author2).getKnex().raw('select 1'), [], 'all', tx);
});

// multi inserts
const res = await driver.nativeInsertMany(Publisher2.name, [
{ name: 'test 1', type: PublisherType.GLOBAL },
{ name: 'test 2', type: PublisherType.LOCAL },
{ name: 'test 3', type: PublisherType.GLOBAL },
]);

// mysql returns the first inserted id
expect(res).toMatchObject({ insertId: 1, affectedRows: 0, row: 1, rows: [ 1 ] });
const res2 = await driver.find(Publisher2.name, {});
expect(res2).toMatchObject([
{ id: 1, name: 'test 1', type: PublisherType.GLOBAL },
{ id: 2, name: 'test 2', type: PublisherType.LOCAL },
{ id: 3, name: 'test 3', type: PublisherType.GLOBAL },
]);
});

test('driver appends errored query', async () => {
Expand Down
33 changes: 32 additions & 1 deletion tests/EntityManager.postgre.test.ts
Expand Up @@ -4,7 +4,7 @@ import {
TableNotFoundException, NotNullConstraintViolationException, TableExistsException, SyntaxErrorException, NonUniqueFieldNameException, InvalidFieldNameException,
} from '@mikro-orm/core';
import { PostgreSqlDriver, PostgreSqlConnection } from '@mikro-orm/postgresql';
import { Address2, Author2, Book2, BookTag2, FooBar2, FooBaz2, Publisher2, PublisherType, Test2 } from './entities-sql';
import { Address2, Author2, Book2, BookTag2, FooBar2, FooBaz2, Publisher2, PublisherType, PublisherType2, Test2 } from './entities-sql';
import { initORMPostgreSql, wipeDatabasePostgreSql } from './bootstrap';

describe('EntityManagerPostgre', () => {
Expand Down Expand Up @@ -75,6 +75,37 @@ describe('EntityManagerPostgre', () => {
expect(driver.getPlatform().denormalizePrimaryKey(1)).toBe(1);
expect(driver.getPlatform().denormalizePrimaryKey('1')).toBe('1');
await expect(driver.find(BookTag2.name, { books: { $in: [1] } })).resolves.not.toBeNull();

// multi inserts
await driver.nativeInsert(Test2.name, { id: 1, name: 't1' });
await driver.nativeInsert(Test2.name, { id: 2, name: 't2' });
await driver.nativeInsert(Test2.name, { id: 3, name: 't3' });
await driver.nativeInsert(Test2.name, { id: 4, name: 't4' });
await driver.nativeInsert(Test2.name, { id: 5, name: 't5' });

const mock = jest.fn();
const logger = new Logger(mock, true);
Object.assign(orm.config, { logger });

const res = await driver.nativeInsertMany(Publisher2.name, [
{ name: 'test 1', tests: [1, 3, 4], type: PublisherType.GLOBAL, type2: PublisherType2.LOCAL },
{ name: 'test 2', tests: [4, 2], type: PublisherType.LOCAL, type2: PublisherType2.LOCAL },
{ name: 'test 3', tests: [1, 5, 2], type: PublisherType.GLOBAL, type2: PublisherType2.LOCAL },
]);

expect(mock.mock.calls[0][0]).toMatch('insert into "publisher2" ("name", "type", "type2") values ($1, $2, $3), ($4, $5, $6), ($7, $8, $9) returning "id"');
expect(mock.mock.calls[1][0]).toMatch('insert into "publisher2_tests" ("publisher2_id", "test2_id") values ($1, $2), ($3, $4), ($5, $6)');
expect(mock.mock.calls[2][0]).toMatch('insert into "publisher2_tests" ("publisher2_id", "test2_id") values ($1, $2), ($3, $4)');
expect(mock.mock.calls[3][0]).toMatch('insert into "publisher2_tests" ("publisher2_id", "test2_id") values ($1, $2), ($3, $4), ($5, $6)');

// postgres returns all the ids based on returning clause
expect(res).toMatchObject({ insertId: 1, affectedRows: 0, row: { id: 1 }, rows: [ { id: 1 }, { id: 2 }, { id: 3 } ] });
const res2 = await driver.find(Publisher2.name, {});
expect(res2).toMatchObject([
{ id: 1, name: 'test 1', type: PublisherType.GLOBAL, type2: PublisherType2.LOCAL },
{ id: 2, name: 'test 2', type: PublisherType.LOCAL, type2: PublisherType2.LOCAL },
{ id: 3, name: 'test 3', type: PublisherType.GLOBAL, type2: PublisherType2.LOCAL },
]);
});

test('driver appends errored query', async () => {
Expand Down
16 changes: 16 additions & 0 deletions tests/EntityManager.sqlite.test.ts
Expand Up @@ -59,6 +59,22 @@ describe('EntityManagerSqlite', () => {
insertId: 1,
});
expect(await driver.find(BookTag3.name, { books: [1] })).not.toBeNull();

// multi inserts
const res = await driver.nativeInsertMany(Publisher3.name, [
{ name: 'test 1', type: 'GLOBAL' },
{ name: 'test 2', type: 'LOCAL' },
{ name: 'test 3', type: 'GLOBAL' },
]);

// sqlite returns the last inserted id
expect(res).toMatchObject({ insertId: 3, affectedRows: 0, row: 3, rows: [ 3 ] });
const res2 = await driver.find(Publisher3.name, {});
expect(res2).toEqual([
{ id: 1, name: 'test 1', type: 'GLOBAL' },
{ id: 2, name: 'test 2', type: 'LOCAL' },
{ id: 3, name: 'test 3', type: 'GLOBAL' },
]);
});

test('driver appends errored query', async () => {
Expand Down
11 changes: 11 additions & 0 deletions tests/QueryBuilder.test.ts
Expand Up @@ -1069,6 +1069,17 @@ describe('QueryBuilder', () => {
expect(qb3.getParams()).toEqual([123]);
});

test('insert many query', async () => {
const qb1 = orm.em.createQueryBuilder(Publisher2);
qb1.insert([
{ name: 'test 1', type: PublisherType.GLOBAL },
{ name: 'test 2', type: PublisherType.LOCAL },
{ name: 'test 3', type: PublisherType.GLOBAL },
]);
expect(qb1.getQuery()).toEqual('insert into `publisher2` (`name`, `type`) values (?, ?), (?, ?), (?, ?)');
expect(qb1.getParams()).toEqual(['test 1', PublisherType.GLOBAL, 'test 2', PublisherType.LOCAL, 'test 3', PublisherType.GLOBAL]);
});

test('update query', async () => {
const qb = orm.em.createQueryBuilder(Publisher2);
qb.update({ name: 'test 123', type: PublisherType.GLOBAL }).where({ id: 123, type: PublisherType.LOCAL });
Expand Down

0 comments on commit 78b2341

Please sign in to comment.