Skip to content

Commit

Permalink
[misc] perf improvement : immediately send command when pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed May 16, 2018
1 parent b0b1498 commit 7101b9b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 42 deletions.
2 changes: 0 additions & 2 deletions lib/cmd/change-user.js
Expand Up @@ -94,8 +94,6 @@ class ChangeUser extends Handshake {
}

out.flushBuffer(true);
this.emit("send_end");

return this.handshakeResult;
}

Expand Down
9 changes: 3 additions & 6 deletions lib/cmd/handshake/handshake.js
Expand Up @@ -20,10 +20,7 @@ class Handshake extends Command {
this._addCommand = _addCommand;
this.onResult = callback;
this.getSocket = getSocket;
}

start(out, opts, info) {
return this.parseHandshakeInit;
this.onPacketReceive = this.parseHandshakeInit;
}

parseHandshakeInit(packet, out, opts, info) {
Expand Down Expand Up @@ -85,7 +82,7 @@ class Handshake extends Command {
//*********************************************************************************************************
case 0xfe:
this.dispatchAuthSwitchRequest(packet, out, opts, info);
this.emit("cmd_end");
this.emit("send_end");
this.emit("end");
return null;

Expand All @@ -98,7 +95,7 @@ class Handshake extends Command {
packet.skipLengthCodedNumber(); //skip last insert id
info.status = packet.readUInt16();
this.authEnded();
this.emit("cmd_end");
this.emit("send_end");
this.emit("end");
return null;

Expand Down
87 changes: 64 additions & 23 deletions lib/connection.js
Expand Up @@ -109,7 +109,7 @@ function Connection(options) {
}

const cmd = new ChangeUser(_options, _cb);
return _addCommand(cmd, false);
return _addCommand(cmd);
};

/**
Expand Down Expand Up @@ -178,11 +178,9 @@ function Connection(options) {
*/
this.query = (sql, values, cb) => {
let _options, _sql, _values, _cb;
let _pipelining = opts.pipelining;
if (typeof sql === "object") {
_options = sql;
_sql = _options.sql;
if (_options.pipelining !== undefined) _pipelining = _options.pipelining;
} else {
_sql = sql;
}
Expand All @@ -196,7 +194,7 @@ function Connection(options) {

const cmd = new Query(_options, _sql, _values, _cb);
if (opts.trace) Error.captureStackTrace(cmd);
return _addCommand(cmd, _pipelining);
return _addCommand(cmd);
};

/**
Expand All @@ -207,7 +205,7 @@ function Connection(options) {
*/
this.ping = (options, callback) => {
const _cb = typeof options === "function" ? options : callback;
return _addCommand(new Ping(_cb), false);
return _addCommand(new Ping(_cb));
};

/**
Expand Down Expand Up @@ -412,7 +410,12 @@ function Connection(options) {
_getSocket
);
Error.captureStackTrace(handshake);
_addCommand(handshake, false);

handshake.once("end", () => {
setImmediate(_nextSendCmd);
});

_receiveQueue.push(handshake);
};

const _getSocket = () => {
Expand Down Expand Up @@ -565,15 +568,25 @@ function Connection(options) {
_cb = callback;
}

//TODO ensure that due to node.js threading system, there can't be race condition on status value
//TODO i.e. if possible race condition, just emit command every time.
//if command in progress, driver cannot rely on status and must execute query
let cmdReceive;
while ((cmdReceive = _receiveQueue.peek())) {
if (cmdReceive.onPacketReceive) {
const cmd = new Query(_options, sql, null, _cb);
if (opts.trace) Error.captureStackTrace(cmd);
return _addCommand(cmd);
}
_receiveQueue.shift();
}

//no command in progress, rely on status to know if query is needed
if (
!(info.status & ServerStatus.STATUS_AUTOCOMMIT) &&
info.status & ServerStatus.STATUS_IN_TRANS
) {
const cmd = new Query(_options, sql, null, _cb);
if (opts.trace) Error.captureStackTrace(cmd);
return _addCommand(cmd, false);
return _addCommand(cmd);
}

if (_cb) _cb();
Expand Down Expand Up @@ -618,23 +631,52 @@ function Connection(options) {
* Add command to command sending and receiving queue.
*
* @param cmd command
* @param pipelining can use pipeline
* @returns {*} current command
* @private
*/
const _addCommandEnable = (cmd, pipelining) => {
if (pipelining) {
cmd.once("send_end", () => setImmediate(_nextSendCmd));
const _addCommandEnable = cmd => {
cmd.once("end", () => {
setImmediate(_nextSendCmd);
});

//send immediately only if no current active receiver
if (_sendQueue.isEmpty() && _connected) {
let cmdReceive;
while ((cmdReceive = _receiveQueue.peek())) {
if (cmdReceive.onPacketReceive) {
_receiveQueue.push(cmd);
_sendQueue.push(cmd);
return cmd;
}
_receiveQueue.shift();
}

_receiveQueue.push(cmd);
cmd.init(_out, opts, info);
} else {
cmd.once("end", () => {
setImmediate(_nextSendCmd);
});
_receiveQueue.push(cmd);
_sendQueue.push(cmd);
}
return cmd;
};

/**
* Add command to command sending and receiving queue using pipelining
*
* @param cmd command
* @returns {*} current command
* @private
*/
const _addCommandEnablePipeline = cmd => {
cmd.once("send_end", () => {
setImmediate(_nextSendCmd);
});

_sendQueue.push(cmd);
_receiveQueue.push(cmd);
if (_sendQueue.length === 1) {
process.nextTick(_nextSendCmd);
if (_sendQueue.isEmpty() && _connected) {
cmd.init(_out, opts, info);
} else {
_sendQueue.push(cmd);
}
return cmd;
};
Expand All @@ -643,10 +685,9 @@ function Connection(options) {
* Replacing command when connection is closing or closed to send a proper error message.
*
* @param cmd command
* @param pipelining can use pipeline
* @private
*/
const _addCommandDisabled = (cmd, pipelining) => {
const _addCommandDisabled = cmd => {
const err = Errors.createError(
"Cannot execute new commands: connection closed\n" + cmd.displaySql(),
true,
Expand Down Expand Up @@ -791,9 +832,9 @@ function Connection(options) {
let _connected = null;
let _socketConnected = false;
let _socket = null;
let _addCommand = _addCommandEnable;
let _addCommand = opts.pipelining ? _addCommandEnablePipeline : _addCommandEnable;
let _out = new PacketOutputStream(opts, info);
let _in = new PacketInputStream(_unexpectedPacket, _receiveQueue, _out, opts, info);
let _in = new PacketInputStream(_unexpectedPacket.bind(this), _receiveQueue, _out, opts, info);
this.once("connect", err => {
process.nextTick(_onConnect, err);
});
Expand Down
19 changes: 8 additions & 11 deletions test/integration/test-change-user.js
Expand Up @@ -54,19 +54,16 @@ describe("change user", () => {
conn.changeUser(
{ user: "ChangeUser", password: "mypassword", charset: "UTF8_PERSIAN_CI" },
err => {
if (err) {
done(err);
} else {
conn.query("SELECT CURRENT_USER", (err, res) => {
const user = res[0]["CURRENT_USER"];
assert.equal(user, "ChangeUser@%");
assert.equal(conn.__tests.getCollation().name, "UTF8_PERSIAN_CI");
conn.end();
done();
});
}
if (err) done(err);
}
);
conn.query("SELECT CURRENT_USER", (err, res) => {
const user = res[0]["CURRENT_USER"];
assert.equal(user, "ChangeUser@%");
assert.equal(conn.__tests.getCollation().name, "UTF8_PERSIAN_CI");
conn.end();
done();
});
});
});

Expand Down

0 comments on commit 7101b9b

Please sign in to comment.