Skip to content

Commit

Permalink
[CONJS-21] batch correction
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Nov 12, 2018
1 parent 997580e commit 167202b
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 27 deletions.
21 changes: 11 additions & 10 deletions lib/cmd/batch-rewrite.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,16 @@ class BatchRewrite extends CommonText {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
this.reject = null;
this._columns = null;
this._rows = null;
this.emit("end", err);
process.nextTick(this.reject, this.firstError);
this.reject = null;
this.emit("end", this.firstError);
return;
}
} else {
if (this.parseResults.reWritable) {
if (this.sendEnded && this.expectedResponseNo === 0) {
if (this.sendEnded && this.expectedResponseNo === 0) {
if (this.parseResults.reWritable) {
this.packet = null;
let totalAffectedRows = 0;
this._rows.forEach(row => {
Expand All @@ -172,12 +173,10 @@ class BatchRewrite extends CommonText {
warningStatus: this._rows[this._rows.length - 1].warningStatus
};
this.successEnd(rs);
this._columns = null;
this._rows = null;
return;
} else {
this.successEnd(this._rows);
}
} else if (this.sendEnded && this.expectedResponseNo === 0) {
this.successEnd(this._rows);
this._columns = null;
this._rows = null;
return;
Expand All @@ -189,6 +188,7 @@ class BatchRewrite extends CommonText {

throwError(err, info) {
this.expectedResponseNo--;

if (this.packet && !this.packet.haveErrorResponse) {
if (err.fatal) this.expectedResponseNo = 0;
if (this.stack) {
Expand All @@ -202,16 +202,17 @@ class BatchRewrite extends CommonText {
false
);
}
this.firstError = err;
this.packet.endedWithError();
process.nextTick(this.reject, err);
}

if (this.sendEnded && this.expectedResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;
process.nextTick(this.reject, this.firstError);
this.reject = null;
this.emit("end", err);
this.emit("end", this.firstError);
return;
} else {
this._responseIndex++;
Expand Down
2 changes: 2 additions & 0 deletions lib/config/connection-options.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ConnectionOptions {
this.database = opts.database;
this.dateStrings = opts.dateStrings || false;
this.debug = opts.debug || false;
this.debugCompress = opts.debugCompress || false;
this.debugLen = opts.debugLen || 256;
this.foundRows = opts.foundRows === undefined || opts.foundRows;
this.host = opts.host || "localhost";
Expand Down Expand Up @@ -115,6 +116,7 @@ class ConnectionOptions {
if (opts.socketTimeout) opts.socketTimeout = parseInt(opts.socketTimeout);
if (opts.dateStrings) opts.dateStrings = opts.dateStrings == "true";
if (opts.debug) opts.debug = opts.debug == "true";
if (opts.debugCompress) opts.debugCompress = opts.debugCompress == "true";
if (opts.debugLen) opts.debugLen = parseInt(opts.debugLen);
if (opts.foundRows) opts.foundRows = opts.foundRows == "true";
if (opts.maxAllowedPacket && !isNaN(Number.parseInt(opts.maxAllowedPacket)))
Expand Down
1 change: 1 addition & 0 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ function Connection(options) {
break;
}
}
if (!useBulk) break;
}
} else {
for (let r = 0; r < values.length; r++) {
Expand Down
4 changes: 2 additions & 2 deletions lib/io/packet-input-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PacketInputStream {
receivePacket(packet) {
let cmd = this.currentCmd();

if (packet && (this.opts.logPackets || (this.opts.debug && !this.opts.debugCompress))) {
if (packet && (this.opts.logPackets || this.opts.debug )) {
const packetStr = Utils.log(this.opts, packet.buf, packet.pos, packet.end, this.header);
if (this.opts.logPackets) {
this.info.addPacket(
Expand All @@ -48,7 +48,7 @@ class PacketInputStream {
packetStr
);
}
if (this.opts.debug && !this.opts.debugCompress) {
if (this.opts.debug) {
console.log(
"<== conn:%d %s (%d,%d)\n%s",
this.info.threadId ? this.info.threadId : -1,
Expand Down
4 changes: 2 additions & 2 deletions lib/io/packet-output-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ class PacketOutputStream {

this.stream.writeBuf(this.buf.slice(0, this.pos), this.cmd);

if (this.opts.logPackets || (this.opts.debug && !this.opts.debugCompress)) {
if (this.opts.logPackets || this.opts.debug) {
const packet = Utils.log(this.opts, this.buf, 0, this.pos);
if (this.opts.logPackets) {
this.info.addPacket(
Expand All @@ -403,7 +403,7 @@ class PacketOutputStream {
);
}

if (this.opts.debug && !this.opts.debugCompress) {
if (this.opts.debug) {
console.log(
"==> conn:%d %s\n%s",
this.info.threadId ? this.info.threadId : -1,
Expand Down
57 changes: 46 additions & 11 deletions test/integration/test-batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ describe("batch", () => {
.catch(done);
});

beforeEach(function() {
//just to ensure shared connection is not closed by server due to inactivity
shareConn.ping();
});

after(function() {
fs.unlink(fileName, err => {});
fs.unlink(bigFileName, err => {});
});

const simpleBatch = (useCompression, useBulk, done) => {
base
.createConnection({ compress: useCompression, bulk: useBulk, debug:true })
.createConnection({ compress: useCompression, bulk: useBulk })
.then(conn => {
conn.query(
"CREATE TABLE simpleBatch(id int, id2 int, id3 int, t varchar(128), d datetime, id4 int) CHARSET utf8mb4"
Expand Down Expand Up @@ -121,7 +126,7 @@ describe("batch", () => {

const nonRewritableBatch = (useCompression, useBulk, done) => {
base
.createConnection({ compress: useCompression, bulk: useBulk, debug: useBulk })
.createConnection({ compress: useCompression, bulk: useBulk })
.then(conn => {
conn
.batch("SELECT ? as id, ? as t", [[1, "john"], [2, "jack"]])
Expand Down Expand Up @@ -166,13 +171,18 @@ describe("batch", () => {
};

const bigBatchWith16mMaxAllowedPacket = (useCompression, useBulk, done) => {
let finished = false;
base
.createConnection({
compress: useCompression,
maxAllowedPacket: 16 * 1024 * 1024,
bulk: useBulk
bulk: useBulk,
logPackets: true
})
.then(conn => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 200000);
conn.query("DROP TABLE IF EXISTS bigBatchWith16mMaxAllowedPacket");
conn.query(
"CREATE TABLE bigBatchWith16mMaxAllowedPacket(id int, id2 int, id3 int, t varchar(128), id4 int) CHARSET utf8mb4"
Expand Down Expand Up @@ -206,6 +216,7 @@ describe("batch", () => {
.on("end", () => {
assert.equal(1000000, currRow);
conn.query("DROP TABLE bigBatchWith16mMaxAllowedPacket");
finished = true;
conn.end();
done();
});
Expand All @@ -214,9 +225,13 @@ describe("batch", () => {
};

const bigBatchWith4mMaxAllowedPacket = (useCompression, useBulk, done) => {
let finished = false;
base
.createConnection({ compress: useCompression, bulk: useBulk })
.createConnection({ compress: useCompression, bulk: useBulk, logPackets: true })
.then(conn => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 200000);
conn.query(
"CREATE TABLE bigBatchWith4mMaxAllowedPacket(id int, id2 int, id3 int, t varchar(128), id4 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -249,6 +264,7 @@ describe("batch", () => {
.on("end", () => {
assert.equal(1000000, currRow);
conn.query("DROP TABLE bigBatchWith4mMaxAllowedPacket");
finished = true;
conn.end();
done();
});
Expand All @@ -257,9 +273,13 @@ describe("batch", () => {
};

const bigBatchError = (useCompression, useBulk, done) => {
let finished = false;
base
.createConnection({ compress: useCompression, bulk: useBulk })
.createConnection({ compress: useCompression, bulk: useBulk, logPackets: true })
.then(conn => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 200000);
const values = [];
for (let i = 0; i < 1000000; i++) {
values.push([i, "abcdefghijkflmnopqrtuvwxyz🤘💪"]);
Expand All @@ -274,6 +294,7 @@ describe("batch", () => {
.query("select 1")
.then(rows => {
assert.deepEqual(rows, [{ "1": 1 }]);
finished = true;
conn.end();
done();
})
Expand Down Expand Up @@ -328,11 +349,16 @@ describe("batch", () => {
};

const batchWithStream = (useCompression, useBulk, done) => {
let finished = false;
const stream1 = fs.createReadStream(fileName);
const stream2 = fs.createReadStream(fileName);
base
.createConnection({ compress: useCompression, bulk: useBulk })
.createConnection({ compress: useCompression, bulk: useBulk, logPackets: true })
.then(conn => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 1500);

conn.query(
"CREATE TABLE batchWithStream(id int, id2 int, id3 int, t varchar(128), id4 int, id5 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -363,6 +389,7 @@ describe("batch", () => {
}
]);
conn.query("DROP TABLE batchWithStream");
finished = true;
conn.end();
done();
});
Expand Down Expand Up @@ -410,10 +437,13 @@ describe("batch", () => {
if (i % 100000 === 0) values.push([i, fs.createReadStream(fileName), i * 2]);
else values.push([i, "abcdefghijkflmnopqrtuvwxyz🤘💪", i * 2]);
}

let finished = false;
base
.createConnection({ compress: useCompression, bulk: useBulk })
.createConnection({ compress: useCompression, bulk: useBulk, logPackets: true })
.then(conn => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 200000);
conn.query(
"CREATE TABLE bigBatchWithStreams(id int, id2 int, id3 int, t varchar(128), id4 int, id5 int) CHARSET utf8mb4"
);
Expand Down Expand Up @@ -441,6 +471,7 @@ describe("batch", () => {
.on("end", () => {
assert.equal(1000000, currRow);
conn.query("DROP TABLE bigBatchWithStreams");
finished = true;
conn.end();
done();
});
Expand All @@ -458,7 +489,7 @@ describe("batch", () => {
}

base
.createConnection({ compress: useCompression, bulk: useBulk })
.createConnection({ compress: useCompression, bulk: useBulk, logPackets: true})
.then(conn => {
conn
.batch("INSERT INTO `blabla` values (1, ?, 2, ?, ?, 3)", values)
Expand Down Expand Up @@ -763,6 +794,7 @@ describe("batch", () => {
};

const stream16MNamedPlaceHolders = function(useBulk, done) {
let finished = false;
const values = [];
for (let i = 0; i < 1000000; i++) {
if (i % 100000 === 0) values.push({ id1: i, id2: fs.createReadStream(fileName), id3: i * 2 });
Expand All @@ -772,6 +804,9 @@ describe("batch", () => {
base
.createConnection({ namedPlaceholders: true, bulk: useBulk })
.then(conn => {
setTimeout(() => {
if (!finished) console.log(conn.info.getLastPackets());
}, 200000);
conn.query(
"CREATE TABLE stream16MNamedPlaceHolders(id int, id2 int, id3 int, t varchar(128), id4 int, id5 int) CHARSET utf8mb4"
);
Expand All @@ -789,7 +824,6 @@ describe("batch", () => {
done(new Error("must not have thrown any error !"));
})
.on("data", row => {
if (currRow % 10000 === 0) console.log(currRow);
assert.deepEqual(row, {
id: 1,
id2: currRow,
Expand All @@ -803,6 +837,7 @@ describe("batch", () => {
.on("end", () => {
assert.equal(1000000, currRow);
conn.query("DROP TABLE stream16MNamedPlaceHolders");
finished = true;
conn.end();
done();
});
Expand Down Expand Up @@ -990,7 +1025,7 @@ describe("batch", () => {
});
});

describe("standard question mark and compress with bulk", () => {
describe("standard question mark and compress with rewrite", () => {
const useCompression = true;

it("simple batch", done => {
Expand Down
4 changes: 2 additions & 2 deletions test/integration/test-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,10 @@ describe("Error", () => {
})
.catch(err => {
if (process.env.MAXSCALE_VERSION) {
assert.isTrue(err.message.includes("Lost connection to backend server"));
assert.isTrue(err.message.includes("Lost connection to backend server"), err.message);
assert.equal(err.sqlState, "HY000");
} else {
assert.isTrue(err.message.includes("socket has unexpectedly been closed"));
assert.isTrue(err.message.includes("socket has unexpectedly been closed"), err.message);
assert.equal(err.sqlState, "08S01");
assert.equal(err.code, "ER_SOCKET_UNEXPECTED_CLOSE");
}
Expand Down

0 comments on commit 167202b

Please sign in to comment.