Skip to content

Commit

Permalink
[CONJS-21] batch race condition correction about number of response p…
Browse files Browse the repository at this point in the history
…acket
  • Loading branch information
rusher committed Nov 12, 2018
1 parent 167202b commit da4763e
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 48 deletions.
4 changes: 2 additions & 2 deletions benchmarks/benchs/bench_promise_insert_batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ let sqlTable =
sqlInsert = "INSERT INTO testn.perfTestTextPipe(t0) VALUES (?)";

module.exports.title =
"1000 * insert 100 characters using promise and batch method (for mariadb only, since doesn't exist for others)";
"100 * insert 100 characters using promise and batch method (for mariadb only, since doesn't exist for others)";
module.exports.displaySql = "INSERT INTO testn.perfTestTextPipe VALUES (?) (into BLACKHOLE ENGINE)";
const iterations = 1000;
const iterations = 100;
module.exports.promise = true;
module.exports.benchFct = function(conn, deferred, connType) {
const params = [randomString(100)];
Expand Down
66 changes: 42 additions & 24 deletions lib/cmd/batch-bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class BatchBulk extends CommonBinary {
this.packet.mark(last, last ? null : this.values[this.valueIdx]);
}
this.sendEnded = true;
this.expectedResponseNo = this.packet.packetSend + 1;
}

displaySql() {
Expand All @@ -119,28 +118,28 @@ class BatchBulk extends CommonBinary {
}

success(val) {
this.expectedResponseNo--;
if (this.packet.haveErrorResponse) {
if (this.sendEnded && this.expectedResponseNo === 0) {
this.packet.waitingResponseNo--;

if (this.sendEnded && this.packet.waitingResponseNo === 0) {
//send COM_STMT_CLOSE packet
this.sequenceNo = -1;
this.compressSequenceNo = -1;
this.out.startPacket(this);
this.out.writeInt8(0x19);
this.out.writeInt32(this.statementId);
this.out.flushBuffer(true);
this.emit("send_end");

if (this.packet.haveErrorResponse) {
this.packet = null;
this.resolve = null;
this.onPacketReceive = null;
this.reject = null;
this._columns = null;
this._rows = null;

//send COM_STMT_CLOSE packet
this.out.startPacket(this);
this.out.writeInt8(0x19);
this.out.writeInt32(this.statementId);
this.out.flushBuffer(true);

this.emit("send_end");
this.emit("end", err);
return;
}
} else {
if (this.sendEnded && this.expectedResponseNo === 0) {
process.nextTick(this.reject, this.firstError);
this.reject = null;
this.emit("end", this.firstError);
} else {
this.packet = null;
let totalAffectedRows = 0;
this._rows.forEach(row => {
Expand All @@ -155,17 +154,24 @@ class BatchBulk extends CommonBinary {
this.successEnd(rs);
this._columns = null;
this._rows = null;
return;
}
return;
}

if (!this.packet.haveErrorResponse) {
this._responseIndex++;
this.onPacketReceive = this.readResponsePacket;
}
}

throwError(err, info) {
this.expectedResponseNo--;
this.packet.waitingResponseNo--;

if (this.packet && !this.packet.haveErrorResponse) {
if (err.fatal) this.expectedResponseNo = 0;
if (err.fatal) {
this.sendEnded = true;
this.packet.waitingResponseNo = 0;
}
if (this.stack) {
err = Errors.createError(
err.message,
Expand All @@ -177,16 +183,28 @@ class BatchBulk extends CommonBinary {
false
);
}
this.firstError = err;
this.packet.endedWithError();
process.nextTick(this.reject, err);
}

if (this.sendEnded && this.expectedResponseNo === 0) {
if (this.sendEnded && this.packet.waitingResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;

//send COM_STMT_CLOSE packet
if (this.statementId) {
this.sequenceNo = -1;
this.compressSequenceNo = -1;
this.out.startPacket(this);
this.out.writeInt8(0x19);
this.out.writeInt32(this.statementId);
this.out.flushBuffer(true);
}
this.emit("send_end");
process.nextTick(this.reject, this.firstError);
this.reject = null;
this.emit("end", err);
this.emit("end", this.firstError);
return;
} else {
this._responseIndex++;
Expand Down
18 changes: 9 additions & 9 deletions lib/cmd/batch-rewrite.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ class BatchRewrite extends CommonText {
this.packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length);
}
this.sendEnded = true;
this.expectedResponseNo = this.packet.packetSend;

this.emit("send_end");
}

Expand Down Expand Up @@ -145,9 +143,9 @@ class BatchRewrite extends CommonText {
}

success(val) {
this.expectedResponseNo--;
this.packet.waitingResponseNo--;
if (this.packet.haveErrorResponse) {
if (this.sendEnded && this.expectedResponseNo === 0) {
if (this.sendEnded && this.packet.waitingResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
Expand All @@ -159,7 +157,7 @@ class BatchRewrite extends CommonText {
return;
}
} else {
if (this.sendEnded && this.expectedResponseNo === 0) {
if (this.sendEnded && this.packet.waitingResponseNo === 0) {
if (this.parseResults.reWritable) {
this.packet = null;
let totalAffectedRows = 0;
Expand Down Expand Up @@ -187,10 +185,13 @@ class BatchRewrite extends CommonText {
}

throwError(err, info) {
this.expectedResponseNo--;
this.packet.waitingResponseNo--;

if (this.packet && !this.packet.haveErrorResponse) {
if (err.fatal) this.expectedResponseNo = 0;
if (err.fatal) {
this.sendEnded = true;
this.packet.waitingResponseNo = 0;
}
if (this.stack) {
err = Errors.createError(
err.message,
Expand All @@ -206,7 +207,7 @@ class BatchRewrite extends CommonText {
this.packet.endedWithError();
}

if (this.sendEnded && this.expectedResponseNo === 0) {
if (this.sendEnded && this.packet.waitingResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
Expand Down Expand Up @@ -295,7 +296,6 @@ class BatchRewrite extends CommonText {
} else {
// all rows are written
this.sendEnded = true;
this.expectedResponseNo = packet.packetSend;
this.emit("send_end");
return;
}
Expand Down
6 changes: 3 additions & 3 deletions lib/io/bulk-packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BulkPacket {
this.pos = 4;
this.datatypes = [];
this.encoding = out.encoding;
this.packetSend = 0;
this.waitingResponseNo = 1;
this.singleQuery = false;
this.haveErrorResponse = false;
this.writeBinaryDate =
Expand Down Expand Up @@ -488,7 +488,7 @@ class BulkPacket {
this.copyAndFlush(false);

this.markPos = undefined;
if (!this.singleQuery) this.packetSend++;
if (!this.singleQuery) this.waitingResponseNo++;
this.singleQuery = true;
this.singleQuerySequenceNo = this.out.cmd.sequenceNo;
this.singleQueryCompressSequenceNo = this.out.cmd.compressSequenceNo;
Expand All @@ -508,7 +508,7 @@ class BulkPacket {

if (!this.haveErrorResponse || this.out.cmd.sequenceNo !== -1) {
this.copyAndFlush(true);
this.packetSend++;
this.waitingResponseNo++;
}

this.pos = 4;
Expand Down
2 changes: 1 addition & 1 deletion lib/io/packet-input-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PacketInputStream {
receivePacket(packet) {
let cmd = this.currentCmd();

if (packet && (this.opts.logPackets || this.opts.debug )) {
if (packet && (this.opts.logPackets || this.opts.debug)) {
const packetStr = Utils.log(this.opts, packet.buf, packet.pos, packet.end, this.header);
if (this.opts.logPackets) {
this.info.addPacket(
Expand Down
6 changes: 3 additions & 3 deletions lib/io/rewrite-packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ReWritePacket {
this.endStr = endString;
this.encoding = out.encoding;
this.endStrLength = Buffer.byteLength(this.endStr, this.encoding);
this.packetSend = 0;
this.waitingResponseNo = 0;
this.singleQuery = false;
this.haveErrorResponse = false;

Expand Down Expand Up @@ -395,7 +395,7 @@ class ReWritePacket {
this.copyAndFlush(false);

this.markPos = undefined;
if (!this.singleQuery) this.packetSend++;
if (!this.singleQuery) this.waitingResponseNo++;
this.singleQuery = true;
this.singleQuerySequenceNo = this.out.cmd.sequenceNo;
this.singleQueryCompressSequenceNo = this.out.cmd.compressSequenceNo;
Expand All @@ -417,7 +417,7 @@ class ReWritePacket {

if (!this.haveErrorResponse || this.out.cmd.sequenceNo !== -1) {
this.copyAndFlush(true);
this.packetSend++;
this.waitingResponseNo++;
}

this.pos = 4;
Expand Down
25 changes: 20 additions & 5 deletions test/integration/test-batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ describe("batch", () => {
base
.createConnection({ compress: useCompression, bulk: useBulk })
.then(conn => {
conn.query("DROP TABLE IF EXISTS simpleBatch");
conn.query(
"CREATE TABLE simpleBatch(id int, id2 int, id3 int, t varchar(128), d datetime, id4 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -82,14 +83,20 @@ describe("batch", () => {
id4: 3
}
]);
conn.query("DROP TABLE simpleBatch");
conn.end();
done();
})
.catch(err => {
done(err);
});
});
conn
.query("select 1")
.then(rows => {
assert.deepEqual(rows, [{ "1": 1 }]);
conn.query("DROP TABLE simpleBatch");
conn.end();
done();
})
.catch(done);
})
.catch(done);
};
Expand Down Expand Up @@ -232,6 +239,7 @@ describe("batch", () => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 200000);
conn.query("DROP TABLE IF EXISTS bigBatchWith4mMaxAllowedPacket");
conn.query(
"CREATE TABLE bigBatchWith4mMaxAllowedPacket(id int, id2 int, id3 int, t varchar(128), id4 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -308,6 +316,7 @@ describe("batch", () => {
base
.createConnection({ compress: useCompression, bulk: useBulk })
.then(conn => {
conn.query("DROP TABLE IF EXISTS singleBigInsertWithoutMaxAllowedPacket");
conn.query(
"CREATE TABLE singleBigInsertWithoutMaxAllowedPacket(id int, id2 int, id3 int, t longtext, id4 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -358,7 +367,7 @@ describe("batch", () => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 1500);

conn.query("DROP TABLE IF EXISTS batchWithStream");
conn.query(
"CREATE TABLE batchWithStream(id int, id2 int, id3 int, t varchar(128), id4 int, id5 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -444,6 +453,7 @@ describe("batch", () => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 200000);
conn.query("DROP TABLE IF EXISTS bigBatchWithStreams");
conn.query(
"CREATE TABLE bigBatchWithStreams(id int, id2 int, id3 int, t varchar(128), id4 int, id5 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -489,7 +499,7 @@ describe("batch", () => {
}

base
.createConnection({ compress: useCompression, bulk: useBulk, logPackets: true})
.createConnection({ compress: useCompression, bulk: useBulk, logPackets: true })
.then(conn => {
conn
.batch("INSERT INTO `blabla` values (1, ?, 2, ?, ?, 3)", values)
Expand All @@ -514,6 +524,7 @@ describe("batch", () => {
base
.createConnection({ namedPlaceholders: true, bulk: useBulk })
.then(conn => {
conn.query("DROP TABLE IF EXISTS simpleNamedPlaceHolders");
conn.query(
"CREATE TABLE simpleNamedPlaceHolders(id int, id2 int, id3 int, t varchar(128), id4 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -632,6 +643,7 @@ describe("batch", () => {
base
.createConnection({ namedPlaceholders: true, bulk: useBulk })
.then(conn => {
conn.query("DROP TABLE IF EXISTS more16MNamedPlaceHolders");
conn.query(
"CREATE TABLE more16MNamedPlaceHolders(id int, id2 int, id3 int, t varchar(128), id4 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -676,6 +688,7 @@ describe("batch", () => {
base
.createConnection({ namedPlaceholders: true, bulk: useBulk })
.then(conn => {
conn.query("DROP TABLE IF EXISTS more16MSingleNamedPlaceHolders");
conn.query(
"CREATE TABLE more16MSingleNamedPlaceHolders(id int, id2 int, id3 int, t longtext, id4 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -722,6 +735,7 @@ describe("batch", () => {
base
.createConnection({ namedPlaceholders: true, bulk: useBulk })
.then(conn => {
conn.query("DROP TABLE IF EXISTS streamNamedPlaceHolders");
conn.query(
"CREATE TABLE streamNamedPlaceHolders(id int, id2 int, id3 int, t varchar(128), id4 int, id5 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -807,6 +821,7 @@ describe("batch", () => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 200000);
conn.query("DROP TABLE IF EXISTS stream16MNamedPlaceHolders");
conn.query(
"CREATE TABLE stream16MNamedPlaceHolders(id int, id2 int, id3 int, t varchar(128), id4 int, id5 int) CHARSET utf8mb4"
);
Expand Down
5 changes: 4 additions & 1 deletion test/integration/test-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ describe("Error", () => {
assert.isTrue(err.message.includes("Lost connection to backend server"), err.message);
assert.equal(err.sqlState, "HY000");
} else {
assert.isTrue(err.message.includes("socket has unexpectedly been closed"), err.message);
assert.isTrue(
err.message.includes("socket has unexpectedly been closed"),
err.message
);
assert.equal(err.sqlState, "08S01");
assert.equal(err.code, "ER_SOCKET_UNEXPECTED_CLOSE");
}
Expand Down

0 comments on commit da4763e

Please sign in to comment.