Skip to content

Commit

Permalink
[CONJS-21] bulk insert method
Browse files Browse the repository at this point in the history
- add batch benchmark
- handle named parameter
- correct null handling when streaming
- correct strange node.js behaviour about not reusing buffer send to socket
- handle error
  - with multi-packets
  - query array parameter listing
  • Loading branch information
rusher committed Oct 17, 2018
1 parent b70283a commit 06ff44b
Show file tree
Hide file tree
Showing 13 changed files with 1,593 additions and 383 deletions.
86 changes: 86 additions & 0 deletions benchmarks/benchs/bench_promise_insert_batch.js
@@ -0,0 +1,86 @@
const assert = require("assert");

const basechars = "123456789abcdefghijklmnop\\Z";
const chars = basechars.split("");
chars.push("😎");
chars.push("🌶");
chars.push("🎤");
chars.push("🥂");

function randomString(length) {
let result = "";
for (let i = length; i > 0; --i) result += chars[Math.round(Math.random() * (chars.length - 1))];
return result;
}

let sqlTable =
"CREATE TABLE testn.perfTestTextPipe (id MEDIUMINT NOT NULL AUTO_INCREMENT,t0 text" +
", PRIMARY KEY (id))";
sqlInsert = "INSERT INTO testn.perfTestTextPipe(t0) VALUES (?)";

module.exports.title =
"100 * insert 100 characters using promise and batch method (for mariadb only, since doesn't exist for others)";
module.exports.displaySql = "INSERT INTO testn.perfTestTextPipe VALUES (?) (into BLACKHOLE ENGINE)";
const iterations = 100;
module.exports.promise = true;
module.exports.benchFct = function(conn, deferred, connType) {
const params = [randomString(100)];
// console.log(connType.desc);
if (!connType.desc.includes("mariadb")) {
//other driver doesn't have bulk method
let ended = 0;
for (let i = 0; i < iterations; i++) {
conn
.query(sqlInsert, params)
.then(rows => {
// let val = Array.isArray(rows) ? rows[0] : rows;
// assert.equal(1, val.info ? val.info.affectedRows : val.affectedRows);
if (++ended === iterations) {
deferred.resolve();
}
})
.catch(err => {
throw err;
});
}
} else {
//use batch capability
const totalParams = new Array(iterations);
for (let i = 0; i < iterations; i++) {
totalParams[i] = params;
}
conn
.batch(sqlInsert, totalParams)
.then(rows => {
deferred.resolve();
})
.catch(err => {
throw err;
});
}
};

module.exports.initFct = function(conn) {
return Promise.all([
conn.query("DROP TABLE IF EXISTS testn.perfTestTextPipe"),
conn.query("INSTALL SONAME 'ha_blackhole'"),
conn.query(sqlTable + " ENGINE = BLACKHOLE COLLATE='utf8mb4_unicode_ci'")
])
.catch(err => {
return Promise.all([
conn.query("DROP TABLE IF EXISTS testn.perfTestTextPipe"),
conn.query(sqlTable + " COLLATE='utf8mb4_unicode_ci'")
]);
})
.catch(e => {
console.log(e);
throw e;
});
};

module.exports.onComplete = function(conn) {
conn.query("TRUNCATE TABLE testn.perfTestTextPipe").catch(e => {
console.log(e);
throw e;
});
};
2 changes: 1 addition & 1 deletion benchmarks/benchs/bench_promise_insert_pipelining.js
Expand Up @@ -20,7 +20,7 @@ sqlInsert = "INSERT INTO testn.perfTestTextPipe(t0) VALUES (?)";

module.exports.title = "100 * insert 100 characters using promise";
module.exports.displaySql = "INSERT INTO testn.perfTestTextPipe VALUES (?) (into BLACKHOLE ENGINE)";
const iterations = 10;
const iterations = 100;
module.exports.promise = true;
module.exports.benchFct = function(conn, deferred) {
const params = [randomString(100)];
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/common_benchmarks.js
Expand Up @@ -394,7 +394,7 @@ const getAddTest = function(self, suite, fct, minSamples, title, displaySql, onC
suite.add({
name: title + " - " + name,
fn: function(deferred) {
fct.call(self, usePool ? conn.pool : conn.drv, deferred);
fct.call(self, usePool ? conn.pool : conn.drv, deferred, conn);
},
onComplete: () => {
if (onComplete) onComplete.call(self, usePool ? conn.pool : conn.drv);
Expand Down
182 changes: 122 additions & 60 deletions lib/cmd/batch.js
Expand Up @@ -39,24 +39,16 @@ class Batch extends CommonText {
info
);
}
this.initialValues = Array.isArray(this.initialValues)
? this.initialValues
: [this.initialValues];

if (this.opts.namedPlaceholders) {
try {
const parsed = Parse.splitQueryPlaceholder(
this.sql,
info,
this.initialValues,
this.displaySql.bind(this)
);
this.queryParts = parsed.parts;
this.values = parsed.values;
} catch (err) {
this.emit("send_end");
return this.throwError(err, info);
}
this.parseResults = Parse.splitRewritableNamedParameterQuery(this.sql, this.initialValues);
this.values = this.parseResults.values;
} else {
this.parseResults = Parse.splitRewritableQuery(this.sql);
this.values = Array.isArray(this.initialValues) ? this.initialValues : [this.initialValues];
this.values = this.initialValues;
if (!this.validateParameters(info)) return;
}

Expand Down Expand Up @@ -109,7 +101,7 @@ class Batch extends CommonText {
function() {
this.packet.writeInt8(QUOTE); //'
this.currentParam++;
process.nextTick(this.paramWritten.bind(this));
this.paramWritten();
}.bind(this)
);

Expand All @@ -126,39 +118,105 @@ class Batch extends CommonText {
}
this.sendEnded = true;
this.expectedResponseNo = this.packet.packetSend;
this.packet = null;

this.emit("send_end");
}

displaySql() {
if (this.opts && this.initialValues) {
if (this.sql.length > this.opts.debugLen) {
return "sql: " + this.sql.substring(0, this.opts.debugLen) + "...";
}

let sqlMsg = "sql: " + this.sql + " - parameters:";
sqlMsg += "[";
for (let i = 0; i < this.initialValues.length; i++) {
if (i !== 0) sqlMsg += ",";
let param = this.initialValues[i];
sqlMsg = this.logParameters(sqlMsg, param);
if (sqlMsg.length > this.opts.debugLen) {
sqlMsg = sqlMsg.substr(0, this.opts.debugLen) + "...";
break;
}
}
sqlMsg += "]";
return sqlMsg;
}
return "sql: " + this.sql + " - parameters:[]";
}

success(val) {
this.expectedResponseNo--;
if (this.parseResults.reWritable) {
if (this.packet.haveErrorResponse) {
if (this.sendEnded && this.expectedResponseNo === 0) {
let totalAffectedRows = 0;
this._rows.forEach(row => {
totalAffectedRows += row.affectedRows;
});
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
this.reject = null;
this._columns = null;
this._rows = null;
this.emit("end", err);
return;
}
} else {
if (this.parseResults.reWritable) {
if (this.sendEnded && this.expectedResponseNo === 0) {
this.packet = null;
let totalAffectedRows = 0;
this._rows.forEach(row => {
totalAffectedRows += row.affectedRows;
});

const rs = {
affectedRows: totalAffectedRows,
insertId: this._rows[0].insertId,
warningStatus: this._rows[this._rows.length - 1].warningStatus
};
this.successEnd(rs);
const rs = {
affectedRows: totalAffectedRows,
insertId: this._rows[0].insertId,
warningStatus: this._rows[this._rows.length - 1].warningStatus
};
this.successEnd(rs);
this._columns = null;
this._rows = null;
return;
}
} else if (this.sendEnded && this.expectedResponseNo === 0) {
this.successEnd(this._rows);
this._columns = null;
this._rows = null;
return;
}
} else if (this.sendEnded && this.expectedResponseNo === 0) {
this.successEnd(this._rows);
this._columns = null;
this._rows = null;
return;
this._responseIndex++;
this.onPacketReceive = this.readResponsePacket;
}
}

throwError(err, info) {
this.expectedResponseNo--;
if (!this.packet.haveErrorResponse) {
if (this.stack) {
err = Errors.createError(
err.message,
err.fatal,
info,
err.sqlState,
err.errno,
this.stack,
false
);
}
this.packet.endedWithError();
process.nextTick(this.reject, err);
}

this._responseIndex++;
this.onPacketReceive = this.readResponsePacket;
if (this.sendEnded && this.expectedResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
this.reject = null;
this.emit("end", err);
return;
} else {
this._responseIndex++;
this.onPacketReceive = this.readResponsePacket;
}
}

/**
Expand Down Expand Up @@ -223,32 +281,32 @@ class Batch extends CommonText {
* emitting event so next parameter can be written.
*/
registerStreamSendEvent(packet, info) {
const self = this;
this.paramWritten = function() {
while (true) {
if (self.currentParam === self.valueRow.length) {
// all parameters are written.
packet.writeString(self.parseResults.partList[self.parseResults.partList.length - 2]);
packet.mark(!this.parseResults.reWritable || self.valueIdx === self.values.length);
if (self.valueIdx < self.values.length) {
self.valueRow = self.values[self.valueIdx++];
self.currentParam = 0;
if (this.currentParam === this.valueRow.length) {
// all parameters from row are written.
packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]);
packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length);
if (this.valueIdx < this.values.length) {
// still remaining rows
this.valueRow = this.values[this.valueIdx++];
this.currentParam = 0;
} else {
// all rows are written
this.sendEnded = true;
this.expectedResponseNo = packet.packetSend;
self.packet = null;
self.emit("send_end");
this.emit("send_end");
return;
}
}

packet.writeString(self.parseResults.partList[self.currentParam + 1]);
const value = self.valueRow[self.currentParam];
packet.writeString(this.parseResults.partList[this.currentParam + 1]);
const value = this.valueRow[this.currentParam];

if (value === null) {
packet.writeStringAscii("NULL");
process.nextTick(self.paramWritten.bind(self));
return;
this.currentParam++;
continue;
}

if (
Expand All @@ -260,24 +318,28 @@ class Batch extends CommonText {
// param is stream,
//********************************************
packet.writeInt8(QUOTE);
value.once("end", function() {
packet.writeInt8(QUOTE);
self.currentParam++;
process.nextTick(self.paramWritten.bind(self));
});
value.once(
"end",
function() {
packet.writeInt8(QUOTE);
this.currentParam++;
this.paramWritten();
}.bind(this)
);

value.on("data", function(chunk) {
packet.writeBufferEscape(chunk);
});
return;
} else {
//********************************************
// param isn't stream. directly write in buffer
//********************************************
self.writeParam(packet, value, self.opts, info);
self.currentParam++;
}

//********************************************
// param isn't stream. directly write in buffer
//********************************************
this.writeParam(packet, value, this.opts, info);
this.currentParam++;
}
};
}.bind(this);
}
}

Expand Down

0 comments on commit 06ff44b

Please sign in to comment.