Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJS-60] handling pipe error for stream
  • Loading branch information
rusher committed Jan 30, 2019
1 parent a601224 commit ddb2080
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 19 deletions.
12 changes: 6 additions & 6 deletions lib/cmd/stream.js
Expand Up @@ -13,25 +13,25 @@ class Stream extends Query {
this.socket = socket;
this.inStream = new Readable({
objectMode: true,
read: () => {
socket.resume();
}
read: () => {}
});

this.on("fields", function(meta) {
this.inStream.emit("fields", meta);
});

this.on("error", function(err) {
this.inStream.emit("error", err);
});

this.on("end", function(err) {
if (err) this.inStream.emit("error", err);
this.inStream.push(null);
});
}

handleNewRows(row) {
if (!this.inStream.push(row)) {
this.socket.pause();
}
this.inStream.push(row);
}
}

Expand Down
10 changes: 2 additions & 8 deletions lib/connection.js
Expand Up @@ -726,10 +726,7 @@ function Connection(options) {
if (opts.socketPath) {
_socket = Net.connect(opts.socketPath);
} else {
_socket = Net.connect(
opts.port,
opts.host
);
_socket = Net.connect(opts.port, opts.host);
}

if (opts.connectTimeout) {
Expand Down Expand Up @@ -837,10 +834,7 @@ function Connection(options) {
});

try {
const secureSocket = tls.connect(
sslOption,
callback
);
const secureSocket = tls.connect(sslOption, callback);

secureSocket.on("data", _in.onData.bind(_in));
secureSocket.on("error", _socketError);
Expand Down
54 changes: 54 additions & 0 deletions test/integration/test-pool.js
Expand Up @@ -3,8 +3,20 @@
const base = require("../base.js");
const { assert } = require("chai");
const Conf = require("../conf");
const stream = require("stream");
const fs = require("fs");
const path = require("path");
const os = require("os");

describe("Pool", () => {
const fileName = path.join(os.tmpdir(), Math.random() + "tempStream.txt");

after(function() {
fs.unlink(fileName, err => {
//eat
});
});

it("pool with wrong authentication", function(done) {
this.timeout(5000);
const pool = base.createPool({ connectionLimit: 3, user: "wrongAuthentication" });
Expand Down Expand Up @@ -679,4 +691,46 @@ describe("Pool", () => {
})
.catch(done);
});

it("ensure pipe ending doesn't stall connection", function(done) {
//sequence engine only exist in MariaDB
if (!shareConn.info.isMariaDB()) this.skip();
const ver = process.version.substring(1).split(".");
//stream.pipeline doesn't exist before node.js 8
if (parseInt(ver[0]) < 10) this.skip();

this.timeout(10000);
const pool = base.createPool({ connectionLimit: 1 });

pool
.getConnection()
.then(conn => {
const someWriterStream = fs.createWriteStream(fileName);

let received = 0;
const transformStream = new stream.Transform({
objectMode: true,
transform: function transformer(chunk, encoding, callback) {
callback(null, JSON.stringify(chunk));
received++;
}
});

const queryStream = conn.queryStream(
"SELECT seq ,REPEAT('a', 100) as val FROM seq_1_to_10000"
);

stream.pipeline(queryStream, transformStream, someWriterStream, () => {
assert.isTrue(received > 0 && received < 10000, "received " + received + " results");
conn.query("SELECT 1").then(res => {
conn.end();
pool.end();
done();
});
});

setTimeout(someWriterStream.destroy.bind(someWriterStream), 2);
})
.catch(done);
});
});
6 changes: 1 addition & 5 deletions test/tools/proxy.js
Expand Up @@ -32,11 +32,7 @@ function Proxy(args) {
} else {
if (log) console.log(" ** START **");
const remoteSocket = new net.Socket();
remoteSocket.connect(
REMOTE_PORT,
REMOTE_ADDR,
function() {}
);
remoteSocket.connect(REMOTE_PORT, REMOTE_ADDR, function() {});

remoteSocket.on("data", function(data) {
if (log) console.log("<< ", data.toString());
Expand Down

0 comments on commit ddb2080

Please sign in to comment.