Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[misc] changing query / batching implementation, separating batch pac…
…ket with a setImmediate.

 This permit having some CPU time for socket for example, avoiding resulting having server AND client TCP socket full and socket hanging.
  • Loading branch information
rusher committed Nov 14, 2018
1 parent 37af03c commit 58ad838
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 120 deletions.
45 changes: 30 additions & 15 deletions lib/cmd/batch-bulk.js
Expand Up @@ -13,7 +13,7 @@ const QUOTE = 0x27;
class BatchBulk extends CommonBinary {
constructor(resolve, reject, options, connOpts, sql, values) {
super(resolve, reject, options, connOpts, sql, values);
this.sendEnded = false;
this.sending = true;
this.onPacketReceive = this.readPrepareResultPacket;
}

Expand All @@ -25,9 +25,10 @@ class BatchBulk extends CommonBinary {
* @param info connection information
*/
start(out, opts, info) {
this.info = info;
if (!this.initialValues) {
this.emit("send_end");
return this.throwError(
this.throwError(
Errors.createError(
"Batch must have values set\n" + this.displaySql(),
false,
Expand All @@ -37,6 +38,8 @@ class BatchBulk extends CommonBinary {
),
info
);
this.sending = false;
return;
}
this.initialValues = Array.isArray(this.initialValues)
? this.initialValues
Expand All @@ -56,7 +59,10 @@ class BatchBulk extends CommonBinary {
this.values = res.values;
}

if (!this.validateParameters(info)) return;
if (!this.validateParameters(info)) {
this.sending = false;
return;
}

//send COM_STMT_PREPARE command
this.out = out;
Expand All @@ -66,11 +72,14 @@ class BatchBulk extends CommonBinary {
out.writeInt8(0x16);
out.writeString(questionMarkSql);
out.flushBuffer(true);

out.startPacket(this);

this.valueIdx = 0;
while (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
this.sendQueries();
}

sendQueries() {
let flushed = false;
while (!flushed && this.sending && this.valueIdx < this.values.length) {
this.valueRow = this.values[this.valueIdx++];

//********************************************
Expand All @@ -80,19 +89,26 @@ class BatchBulk extends CommonBinary {
for (let i = 0; i < len; i++) {
const value = this.valueRow[i];
if (value === null) {
this.packet.writeInt8(0x01);
flushed = this.packet.writeInt8(0x01) || flushed;
continue;
}

//********************************************
// param has no stream. directly write in buffer
//********************************************
this.writeParam(this.packet, value, this.opts, info);
flushed = this.writeParam(this.packet, value, this.opts, this.info) || flushed;
}
const last = this.valueIdx === this.values.length;
this.packet.mark(last, last ? null : this.values[this.valueIdx]);
flushed = this.packet.mark(last, last ? null : this.values[this.valueIdx]) || flushed;
}

if (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
//there is still data to send
setImmediate(this.sendQueries.bind(this));
} else {
if (this.sending && this.valueIdx === this.values.length) this.emit("send_end");
this.sending = false;
}
this.sendEnded = true;
}

displaySql() {
Expand Down Expand Up @@ -121,14 +137,15 @@ class BatchBulk extends CommonBinary {
success(val) {
this.packet.waitingResponseNo--;

if (this.sendEnded && this.packet.waitingResponseNo === 0) {
if (!this.sending && this.packet.waitingResponseNo === 0) {
//send COM_STMT_CLOSE packet
this.sequenceNo = -1;
this.compressSequenceNo = -1;
this.out.startPacket(this);
this.out.writeInt8(0x19);
this.out.writeInt32(this.statementId);
this.out.flushBuffer(true);
this.sending = false;
this.emit("send_end");

if (this.packet.haveErrorResponse) {
Expand Down Expand Up @@ -167,10 +184,9 @@ class BatchBulk extends CommonBinary {

throwError(err, info) {
this.packet.waitingResponseNo--;

this.sending = false;
if (this.packet && !this.packet.haveErrorResponse) {
if (err.fatal) {
this.sendEnded = true;
this.packet.waitingResponseNo = 0;
}
if (this.stack) {
Expand All @@ -188,8 +204,7 @@ class BatchBulk extends CommonBinary {
this.packet.endedWithError();
}

if (this.sendEnded && this.packet.waitingResponseNo === 0) {
this.packet = null;
if (!this.sending && this.packet.waitingResponseNo === 0) {
this.resolve = null;

//send COM_STMT_CLOSE packet
Expand Down
71 changes: 48 additions & 23 deletions lib/cmd/batch-rewrite.js
Expand Up @@ -13,7 +13,7 @@ const QUOTE = 0x27;
class BatchRewrite extends CommonText {
constructor(resolve, reject, options, connOpts, sql, values) {
super(resolve, reject, options, connOpts, sql, values);
this.sendEnded = false;
this.sending = true;
}

/**
Expand All @@ -24,9 +24,10 @@ class BatchRewrite extends CommonText {
* @param info connection information
*/
start(out, opts, info) {
this.info = info;
if (!this.initialValues) {
this.emit("send_end");
return this.throwError(
this.throwError(
Errors.createError(
"Batch must have values set\n" + this.displaySql(),
false,
Expand All @@ -36,6 +37,8 @@ class BatchRewrite extends CommonText {
),
info
);
this.sending = false;
return;
}
this.initialValues = Array.isArray(this.initialValues)
? this.initialValues
Expand All @@ -47,7 +50,10 @@ class BatchRewrite extends CommonText {
} else {
this.parseResults = Parse.splitRewritableQuery(this.sql);
this.values = this.initialValues;
if (!this.validateParameters(info)) return;
if (!this.validateParameters(info)) {
this.sending = false;
return;
}
}

out.startPacket(this);
Expand All @@ -60,7 +66,12 @@ class BatchRewrite extends CommonText {

this.onPacketReceive = this.readResponsePacket;
this.valueIdx = 0;
while (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
this.sendQueries();
}

sendQueries() {
let flushed = false;
while (!flushed && this.sending && this.valueIdx < this.values.length) {
this.valueRow = this.values[this.valueIdx++];

//********************************************
Expand All @@ -69,9 +80,9 @@ class BatchRewrite extends CommonText {
const len = this.valueRow.length;
for (let i = 0; i < len; i++) {
const value = this.valueRow[i];
this.packet.writeString(this.parseResults.partList[i + 1]);
flushed = this.packet.writeString(this.parseResults.partList[i + 1]) || flushed;
if (value === null) {
this.packet.writeStringAscii("NULL");
flushed = this.packet.writeStringAscii("NULL") || flushed;
continue;
}

Expand All @@ -84,7 +95,7 @@ class BatchRewrite extends CommonText {
// param is stream,
// now all params will be written by event
//********************************************
this.registerStreamSendEvent(this.packet, info);
this.registerStreamSendEvent(this.packet, this.info);
this.currentParam = i;
this.packet.writeInt8(QUOTE); //'

Expand All @@ -109,14 +120,21 @@ class BatchRewrite extends CommonText {
//********************************************
// param isn't stream. directly write in buffer
//********************************************
this.writeParam(this.packet, value, this.opts, info);
flushed = this.writeParam(this.packet, value, this.opts, this.info) || flushed;
}
}
this.packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]);
this.packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length);
}
this.sendEnded = true;
this.emit("send_end");

if (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
//there is still data to send
setImmediate(this.sendQueries.bind(this));
// process.nextTick();
} else {
if (this.sending && this.valueIdx === this.values.length) this.emit("send_end");
this.sending = false;
}
}

displaySql() {
Expand Down Expand Up @@ -146,7 +164,7 @@ class BatchRewrite extends CommonText {
this.packet.waitingResponseNo--;

if (this.packet.haveErrorResponse) {
if (this.sendEnded && this.packet.waitingResponseNo === 0) {
if (!this.sending && this.packet.waitingResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
Expand All @@ -158,7 +176,7 @@ class BatchRewrite extends CommonText {
return;
}
} else {
if (this.sendEnded && this.packet.waitingResponseNo === 0) {
if (!this.sending && this.packet.waitingResponseNo === 0) {
if (this.parseResults.reWritable) {
this.packet = null;
let totalAffectedRows = 0;
Expand Down Expand Up @@ -187,10 +205,10 @@ class BatchRewrite extends CommonText {

throwError(err, info) {
this.packet.waitingResponseNo--;
this.sending = false;

if (this.packet && !this.packet.haveErrorResponse) {
if (err.fatal) {
this.sendEnded = true;
this.packet.waitingResponseNo = 0;
}
if (this.stack) {
Expand All @@ -208,7 +226,7 @@ class BatchRewrite extends CommonText {
this.packet.endedWithError();
}

if (this.sendEnded && this.packet.waitingResponseNo === 0) {
if (!this.sending && this.packet.waitingResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
Expand Down Expand Up @@ -285,33 +303,38 @@ class BatchRewrite extends CommonText {
*/
registerStreamSendEvent(packet, info) {
this.paramWritten = function() {
while (true) {
let flushed = false;
while (!flushed) {
if (this.packet.haveErrorResponse) {
this.sendEnded = true;
this.sending = false;
this.emit("send_end");
return;
}
if (this.currentParam === this.valueRow.length) {
// all parameters from row are written.
packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]);
packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length);
flushed =
packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]) ||
flushed;
flushed =
packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length) ||
flushed;
if (this.valueIdx < this.values.length) {
// still remaining rows
this.valueRow = this.values[this.valueIdx++];
this.currentParam = 0;
} else {
// all rows are written
this.sendEnded = true;
this.sending = false;
this.emit("send_end");
return;
}
}

packet.writeString(this.parseResults.partList[this.currentParam + 1]);
flushed = packet.writeString(this.parseResults.partList[this.currentParam + 1]) || flushed;
const value = this.valueRow[this.currentParam];

if (value === null) {
packet.writeStringAscii("NULL");
flushed = packet.writeStringAscii("NULL") || flushed;
this.currentParam++;
continue;
}
Expand All @@ -324,7 +347,7 @@ class BatchRewrite extends CommonText {
//********************************************
// param is stream,
//********************************************
packet.writeInt8(QUOTE);
flushed = packet.writeInt8(QUOTE) || flushed;
value.once(
"end",
function() {
Expand All @@ -343,9 +366,11 @@ class BatchRewrite extends CommonText {
//********************************************
// param isn't stream. directly write in buffer
//********************************************
this.writeParam(packet, value, this.opts, info);
flushed = this.writeParam(packet, value, this.opts, info) || flushed;
this.currentParam++;
}

if (this.sending) setImmediate(this.paramWritten.bind(this));
}.bind(this);
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/cmd/command.js
Expand Up @@ -13,6 +13,7 @@ class Command extends EventEmitter {
this.compressSequenceNo = -1;
this.resolve = resolve;
this.reject = reject;
this.sending = false;
}

displaySql() {}
Expand Down

0 comments on commit 58ad838

Please sign in to comment.