Skip to content

Commit

Permalink
[misc] compress option work only of server accept compression
Browse files Browse the repository at this point in the history
geometry in bulk working with MariaDB 10.3
improve testing geometry with batch
  • Loading branch information
rusher committed Nov 14, 2018
1 parent 5d2b62f commit 37af03c
Show file tree
Hide file tree
Showing 15 changed files with 494 additions and 235 deletions.
2 changes: 1 addition & 1 deletion lib/cmd/batch-bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class BatchBulk extends CommonBinary {
out.startPacket(this);

this.valueIdx = 0;
while (this.valueIdx < this.values.length) {
while (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
this.valueRow = this.values[this.valueIdx++];

//********************************************
Expand Down
8 changes: 7 additions & 1 deletion lib/cmd/batch-rewrite.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class BatchRewrite extends CommonText {

this.onPacketReceive = this.readResponsePacket;
this.valueIdx = 0;
while (this.valueIdx < this.values.length) {
while (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
this.valueRow = this.values[this.valueIdx++];

//********************************************
Expand Down Expand Up @@ -144,6 +144,7 @@ class BatchRewrite extends CommonText {

success(val) {
this.packet.waitingResponseNo--;

if (this.packet.haveErrorResponse) {
if (this.sendEnded && this.packet.waitingResponseNo === 0) {
this.packet = null;
Expand Down Expand Up @@ -285,6 +286,11 @@ class BatchRewrite extends CommonText {
registerStreamSendEvent(packet, info) {
this.paramWritten = function() {
while (true) {
if (this.packet.haveErrorResponse) {
this.sendEnded = true;
this.emit("send_end");
return;
}
if (this.currentParam === this.valueRow.length) {
// all parameters from row are written.
packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]);
Expand Down
2 changes: 1 addition & 1 deletion lib/cmd/common-binary-cmd.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class CommonBinary extends ResultSet {
) {
const geoBuff = this.getBufferFromGeometryValue(value);
if (geoBuff) {
out.writeInt8(0x00);
out.writeInt8(0x00); //Value follow
out.writeLengthEncodedBuffer(Buffer.concat([Buffer.from([0, 0, 0, 0]), geoBuff]));
} else {
out.writeInt8(0x01); //NULL
Expand Down
15 changes: 11 additions & 4 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const ConnectionInformation = require("./misc/connection-information");
const tls = require("tls");
const Errors = require("./misc/errors");
const Utils = require("./misc/utils");
const Capabilities = require("./const/capabilities");

/*commands*/
const Handshake = require("./cmd/handshake/handshake");
Expand Down Expand Up @@ -754,10 +755,16 @@ function Connection(options) {
const _authSucceedHandler = (resolve, rejected) => {
//enable packet compression according to option
if (opts.compress) {
_out.setStream(new CompressionOutputStream(_socket, opts, info));
_in = new CompressionInputStream(_in, _receiveQueue, opts, info);
_socket.removeAllListeners("data");
_socket.on("data", _in.onData.bind(_in));
if (info.serverCapabilities & Capabilities.COMPRESS) {
_out.setStream(new CompressionOutputStream(_socket, opts, info));
_in = new CompressionInputStream(_in, _receiveQueue, opts, info);
_socket.removeAllListeners("data");
_socket.on("data", _in.onData.bind(_in));
} else {
console.error(
"connection is configured to use packet compression, but the server doesn't have this capability"
);
}
}

if (opts.pipelining) _addCommand = _addCommandEnablePipeline;
Expand Down
6 changes: 3 additions & 3 deletions lib/io/bulk-packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class BulkPacket {
"GeometryCollection"
].includes(row[r].type)
) {
if (this.datatypes[r] !== 0xff) return true;
if (this.datatypes[r] !== 0xfb) return true;
} else {
if (this.datatypes[r] !== 0x0f) return true;
}
Expand Down Expand Up @@ -122,7 +122,7 @@ class BulkPacket {
"GeometryCollection"
].includes(row[r].type)
) {
this.buf[this.pos++] = 0xff;
this.buf[this.pos++] = 0xfb;
} else {
this.buf[this.pos++] = 0x0f;
}
Expand Down Expand Up @@ -532,7 +532,7 @@ class BulkPacket {

this.pos = this.markPos;

if (!this.haveErrorResponse || this.out.cmd.sequenceNo !== -1) {
if (!this.haveErrorResponse) {
this.copyAndFlush(true);
this.waitingResponseNo++;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/io/rewrite-packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ class ReWritePacket {
this.pos = this.markPos;
this.writeString(this.endStr);

if (!this.haveErrorResponse || this.out.cmd.sequenceNo !== -1) {
if (!this.haveErrorResponse) {
this.copyAndFlush(true);
this.waitingResponseNo++;
}
Expand Down
3 changes: 2 additions & 1 deletion lib/misc/connection-information.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ class ConnectionInformation {
this.status = null;
this.serverVersion = null;
this.lastPackets = new Queue();
this.serverCapabilities = -1;
}

addPacket(msg) {
this.lastPackets.push(msg);
while (this.lastPackets.size() > 16) this.lastPackets.shift();
while (this.lastPackets.size() > 32) this.lastPackets.shift();
}

getLastPackets() {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/datatype/test-datetime.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe("datetime", () => {
}
});

it("standard date", done => {
it("standard date", function(done) {
//using distant server, time might be different
if (Conf.baseConfig.host !== "localhost" && Conf.baseConfig.host !== "mariadb.example.com")
this.skip();
Expand Down
Loading

0 comments on commit 37af03c

Please sign in to comment.