Skip to content

Commit

Permalink
avoiding allocating output buffer if not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Apr 20, 2018
1 parent 7f24c4a commit cca6c89
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 26 deletions.
2 changes: 1 addition & 1 deletion benchmarks/common_benchmarks.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ function Bench(callback) {
// called between running benchmarks
onCycle: function(event) {
//to avoid mysql2 taking all the server memory
mysql2.clearParserCache();
if (mysql2) mysql2.clearParserCache();
console.log(event.target.toString());
const drvType = event.target.options.drvType;
const benchTitle =
Expand Down
10 changes: 7 additions & 3 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ class Connection {
);

this._socket.writeBuf = (buf, cmd) => {
this._socket.write(buf);
return this._socket.write(buf);
};
this._socket.flush = (cmdEnd, cmd) => {};
this._out.setStreamer(this._socket);
Expand Down Expand Up @@ -414,7 +414,9 @@ class Connection {
secureSocket.on("data", chunk => this._in.onData(chunk));
secureSocket.on("error", this._socketError.bind(this));
secureSocket.on("end", this._socketError.bind(this));
secureSocket.writeBuf = (buf, cmd) => secureSocket.write(buf);
secureSocket.writeBuf = (buf, cmd) => {
return secureSocket.write(buf);
};
secureSocket.flush = (cmdEnd, cmd) => {};

this._socket.removeAllListeners("data");
Expand All @@ -431,7 +433,9 @@ class Connection {
if (this._closing) return;

//avoid sending new data in closed socket
this._socket.writeBuf = () => {};
this._socket.writeBuf = () => {
return true;
};
this._socket.flush = () => {};

//socket has been ended without error
Expand Down
47 changes: 30 additions & 17 deletions src/io/compression-output-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ class CompressionOutputStream {
this.opts = opts;
this.pos = 7;
this.header = Buffer.allocUnsafe(7);
this.smallBuffer = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
this.buf = this.smallBuffer;
this.writer = buffer => socket.write(buffer);
this.buf = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
this.writer = buffer => {
return socket.write(buffer);
};
}

growBuffer(len) {
Expand All @@ -48,7 +49,8 @@ class CompressionOutputStream {

writeBuf(arr, cmd) {
let off = 0,
len = arr.length;
len = arr.length,
flushed = true;
if (len > this.buf.length - this.pos) {
if (this.buf.length !== MAX_BUFFER_SIZE) {
this.growBuffer(len);
Expand All @@ -69,18 +71,20 @@ class CompressionOutputStream {
this.pos += lenToFillBuffer;

if (remainingLen === 0) return;
this.flush(false, cmd);
flushed = this.flush(false, cmd, remainingLen);
}
}
}
arr.copy(this.buf, this.pos, off, off + len);
this.pos += len;
return flushed;
}

/**
* Flush the internal buffer.
*/
flush(cmdEnd, cmd) {
flush(cmdEnd, cmd, remainingLen) {
let flushed;
if (this.pos < 1536) {
//*******************************************************************************
// small packet, no compression
Expand Down Expand Up @@ -111,12 +115,12 @@ class CompressionOutputStream {
);
}

this.writer(this.buf.slice(0, this.pos));
flushed = this.writer(this.buf.slice(0, this.pos));

if (this.pos === MAX_BUFFER_SIZE) this.writeEmptyPacket();
if (this.pos === MAX_BUFFER_SIZE) flushed = this.writeEmptyPacket();

//reset buffer
this.buf = this.smallBuffer;
if (!flushed) this.buf = this.allocateBuffer(remainingLen);
this.pos = 7;
} else {
//*******************************************************************************
Expand Down Expand Up @@ -152,15 +156,24 @@ class CompressionOutputStream {
}

this.writer(this.header);
this.writer(compressChunk);
if (cmdEnd) {
if (this.pos === MAX_BUFFER_SIZE) this.writeEmptyPacket(cmd);

//reset buffer
this.buf = this.smallBuffer;
}
flushed = this.writer(compressChunk);
if (cmdEnd && this.pos === MAX_BUFFER_SIZE) flushed = this.writeEmptyPacket(cmd);
//if not flushed, ensure not reusing a buffer than is not send
if (!flushed) this.header = Buffer.allocUnsafe(7);
this.pos = 7;
}
return flushed;
}

allocateBuffer(len) {
if (len + 4 < SMALL_BUFFER_SIZE) {
return Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
} else if (len + 4 < MEDIUM_BUFFER_SIZE) {
return Buffer.allocUnsafe(MEDIUM_BUFFER_SIZE);
} else if (len + 4 < LARGE_BUFFER_SIZE) {
return Buffer.allocUnsafe(LARGE_BUFFER_SIZE);
}
return Buffer.allocUnsafe(MAX_BUFFER_SIZE);
}

writeEmptyPacket(cmd) {
Expand All @@ -183,7 +196,7 @@ class CompressionOutputStream {
);
}

this.writer(emptyBuf);
return this.writer(emptyBuf);
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/io/packet-output-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const ZERO_BYTE = 0x00;
const SLASH = 0x5c;

//increase by level to avoid buffer copy.
const SMALL_BUFFER_SIZE = 2042;
const SMALL_BUFFER_SIZE = 2048;
const MEDIUM_BUFFER_SIZE = 131072; //128k
const LARGE_BUFFER_SIZE = 1048576; //1M
const MAX_BUFFER_SIZE = 16777219; //16M + 4
Expand All @@ -27,7 +27,7 @@ class PacketOutputStream {
this.opts = opts;
this.info = info;
this.pos = 4;
this.buf = null;
this.buf = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
this.writeDate = opts.timezone === "local" ? this.writeLocalDate : this.writeTimezoneDate;
this.encoding = this.opts.collation.encoding;
if (this.encoding === "utf8") {
Expand Down Expand Up @@ -62,7 +62,6 @@ class PacketOutputStream {
startPacket(cmd) {
this.cmd = cmd;
this.pos = 4;
this.buf = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
}

writeInt8(value) {
Expand Down Expand Up @@ -454,7 +453,7 @@ class PacketOutputStream {
this.buf[3] = this.cmd.sequenceNo;
this.cmd.incrementSequenceNo(1);

this.stream.writeBuf(this.buf.slice(0, this.pos), this.cmd);
const flushed = this.stream.writeBuf(this.buf.slice(0, this.pos), this.cmd);

if (this.opts.debug && !this.opts.debugCompress) {
console.log(
Expand All @@ -476,6 +475,8 @@ class PacketOutputStream {
this.writeEmptyPacket();
} else {
this.stream.flush(true, this.cmd);
//if not flushed, ensure not reusing a buffer than is not send
if (!flushed) this.buf = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
}
} else {
this.buf = this.allocateBuffer(remainingLen);
Expand Down
2 changes: 1 addition & 1 deletion test/integration/test-local-infile.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ describe("local-infile", () => {
if (err) {
done(err);
} else {
conn = base.createConnection({ permitLocalInfile: true});
conn = base.createConnection({ permitLocalInfile: true });
conn.connect(() => {
conn.query("CREATE TEMPORARY TABLE smallLocalInfile(id int, test varchar(100))");
conn.query(
Expand Down

0 comments on commit cca6c89

Please sign in to comment.