Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJS-125] permit using batch with returning clause
  • Loading branch information
diego Dupin committed Sep 14, 2021
1 parent 5c9bb63 commit d372512
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 19 deletions.
39 changes: 28 additions & 11 deletions lib/cmd/batch-bulk.js
Expand Up @@ -4,6 +4,7 @@ const Parser = require('./parser');
const Errors = require('../misc/errors');
const BinaryEncoder = require('./encoder/binary-encoder');
const FieldType = require('../const/field-type');
const OkPacket = require('./class/ok-packet');

/**
* Protocol COM_STMT_BULK_EXECUTE
Expand Down Expand Up @@ -393,17 +394,33 @@ class BatchBulk extends Parser {
this.reject = null;
this.emit('end', this.firstError);
} else {
let totalAffectedRows = 0;
this._rows.forEach((row) => {
totalAffectedRows += row.affectedRows;
});

const rs = {
affectedRows: totalAffectedRows,
insertId: this._rows[0].insertId,
warningStatus: this._rows[this._rows.length - 1].warningStatus
};
this.successEnd(rs);
if (this._rows[0].affectedRows != undefined) {
// ok packets, reassemble them if needed
let totalAffectedRows = 0;
this._rows.forEach((row) => {
totalAffectedRows += row.affectedRows;
});

const rs = new OkPacket(
totalAffectedRows,
this._rows[0].insertId,
this._rows[this._rows.length - 1].warningStatus
);
this.successEnd(rs);
} else {
// insert with returning
if (this._rows.length == 1) {
this.successEnd(this._rows[0]);
} else {
const rs = [];
rs.meta = this._rows[0].meta;
this._rows.forEach((row) => {
Array.prototype.push.apply(rs, row);
});
rs.meta = this._rows[0].meta;
this.successEnd(rs);
}
}
this._columns = null;
this._rows = null;
}
Expand Down
4 changes: 2 additions & 2 deletions lib/cmd/class/prepare-result-packet.js
Expand Up @@ -16,8 +16,8 @@ class PrepareResultPacket {
this._placeHolderIndex = placeHolderIndex;
}

execute(values) {
return this._emitter._executePromise(this, values);
execute(values, _opts) {
return this._emitter._executePromise(this, values, _opts);
}

_executeCallback(values, cb) {
Expand Down
20 changes: 15 additions & 5 deletions lib/connection.js
Expand Up @@ -196,15 +196,15 @@ function Connection(options) {
});
};

this._executePromise = (prepare, values) => {
this._executePromise = (prepare, values, cmdOpts) => {
return new Promise(function (resolve, reject) {
const cmd = new Execute(
resolve,
(err) => {
if (opts.logger.error) opts.logger.error(err);
reject(err);
},
null,
cmdOpts,
opts,
prepare,
values
Expand Down Expand Up @@ -356,14 +356,15 @@ function Connection(options) {
} else {
const executes = [];
for (let i = 0; i < vals.length; i++) {
executes.push(self._executePromise(prepare, vals[i]));
executes.push(self._executePromise(prepare, vals[i], _options));
}
Promise.all(executes)
.then((res) => {
prepare.close();
if (_options && _options.fullResult) {
resolve(res);
} else {
// aggregate results
const firstResult = res[0];
if (firstResult instanceof OkPacket) {
let affectedRows = 0;
Expand All @@ -373,9 +374,18 @@ function Connection(options) {
affectedRows += res[i].affectedRows;
}
resolve(new OkPacket(affectedRows, insertId, warningStatus));
return;
} else {
// results have result-set. example :'INSERT ... RETURNING'
// aggregate results
const rs = [];
rs.meta = res.meta;
res.forEach((row) => {
Array.prototype.push.apply(rs, row);
});
rs.meta = res.meta;
resolve(rs);
}
resolve(res);
return;
}
})
.catch(reject);
Expand Down
1 change: 1 addition & 0 deletions lib/const/collations.js
Expand Up @@ -10,6 +10,7 @@ class Collation {
this.index = index;
this.name = name;
this.charset = charset;
this.maxLength = maxLength;
}

static fromCharset(charset) {
Expand Down
40 changes: 40 additions & 0 deletions test/integration/test-batch.js
Expand Up @@ -169,6 +169,34 @@ describe('batch', function () {
conn.end();
};

const batchWithReturning = async (useBulk) => {
const conn = await base.createConnection({ bulk: useBulk });
await conn.query('drop table if exists bar');
await conn.query('create table bar (id bigint not null primary key)');
let res = await conn.batch('insert into bar (id) values (?) returning id', [
[1],
[2],
[3],
[4]
]);
assert.deepEqual(res, [{ id: 1n }, { id: 2n }, { id: 3n }, { id: 4n }]);

res = await conn.batch(
{ sql: 'insert into bar (id) values (?) returning id', rowsAsArray: true },
[[5], [6], [7], [8]]
);
assert.deepEqual(res, [[5n], [6n], [7n], [8n]]);

res = await conn.batch('insert into bar (id) values (?) returning id', [
[9],
['10'],
[11],
[12]
]);
assert.deepEqual(res, [{ id: 9n }, { id: 10n }, { id: 11n }, { id: 12n }]);
conn.end();
};

const simpleBatchWithOptions = async (useCompression, useBulk) => {
const conn = await base.createConnection({ compress: useCompression, bulk: useBulk });
conn.query('DROP TABLE IF EXISTS simpleBatchWithOptions');
Expand Down Expand Up @@ -736,6 +764,12 @@ describe('batch', function () {
await simpleBatchWithOptions(useCompression, true);
});

it('batch with returning', async function () {
this.timeout(30000);
if (!shareConn.info.isMariaDB() || !shareConn.info.hasMinVersion(10, 5, 12)) this.skip();
await batchWithReturning(true);
});

it('batch without value', async function () {
this.timeout(30000);
if (!shareConn.info.isMariaDB() && !shareConn.info.hasMinVersion(5, 6, 0)) this.skip();
Expand Down Expand Up @@ -1089,6 +1123,12 @@ describe('batch', function () {
}
};

it('batch with returning', async function () {
this.timeout(30000);
if (!shareConn.info.isMariaDB() || !shareConn.info.hasMinVersion(10, 5, 12)) this.skip();
await batchWithReturning(false);
});

it('non rewritable batch', async function () {
this.timeout(30000);
await nonRewritableBatch(useCompression, false);
Expand Down
2 changes: 1 addition & 1 deletion test/integration/test-typecast.js
Expand Up @@ -65,7 +65,7 @@ describe('TypeCast', () => {
it('cast fields', async function () {
const checkCaseType = (field, next) => {
assert.equal(field.type, 'VAR_STRING');
assert.equal(field.columnLength, base.utf8Collation() ? 24 : 6);
assert.equal(field.columnLength, shareConn.info.collation.maxLength * 6);
return next();
};
const rows = await shareConn.query({
Expand Down
1 change: 1 addition & 0 deletions types/index.d.ts
Expand Up @@ -885,6 +885,7 @@ export interface Collation {
index: number;
name: string;
encoding: string;
maxLength: number;
fromEncoding(encoding: string): Collation;
fromIndex(index: number): Collation;
fromName(name: string): Collation;
Expand Down

0 comments on commit d372512

Please sign in to comment.