Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJS-138] pool.getConnection() might not timeout even with acquireT…
…imeout set #116

Pool option `acquireTimeout` ensure that pool.getConnection() throw an error if there is no available connection after this timeout is reached.

When connection stays in pool unused for some time (`minDelayValidation` option), pool will return those connection after a validation (PING to server). The issue is that when using proxy between client and server, socket to proxy might still be open, but proxy might be busy reconnecting server for an unknown amount of time. In those case, `acquireTimeout` is not respected, and pool.getConnection() might wait indefinitely.

New option `pingTimeout`, defaulting to 250ms is added so validation is limited in time.

Connection.ping() now has an optional Timeout parameter. If timeout is reached, promise is rejected and connection is force closed.
  • Loading branch information
rusher committed Jun 30, 2020
1 parent f73a54c commit 07fb091
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 17 deletions.
3 changes: 2 additions & 1 deletion lib/config/pool-options.js
Expand Up @@ -19,6 +19,7 @@ class PoolOptions {
if (opts.minimumIdle) opts.minimumIdle = parseInt(opts.minimumIdle);
if (opts.noControlAfterUse) opts.noControlAfterUse = opts.noControlAfterUse == 'true';
if (opts.resetAfterUse) opts.resetAfterUse = opts.resetAfterUse == 'true';
if (opts.pingTimeout) opts.pingTimeout = parseInt(opts.pingTimeout);
}

this.acquireTimeout = opts.acquireTimeout === undefined ? 10000 : opts.acquireTimeout;
Expand All @@ -34,7 +35,7 @@ class PoolOptions {
: Math.min(opts.minimumIdle, this.connectionLimit);
this.noControlAfterUse = opts.noControlAfterUse || false;
this.resetAfterUse = opts.resetAfterUse === undefined ? true : opts.resetAfterUse;

this.pingTimeout = opts.pingTimeout || 250;
this.connOptions = new ConnOptions(opts);

if (this.acquireTimeout > 0 && this.connOptions.connectTimeout > this.acquireTimeout) {
Expand Down
16 changes: 12 additions & 4 deletions lib/connection-callback.js
Expand Up @@ -41,10 +41,18 @@ function ConnectionCallback(options) {
.catch(callback || emptyError);
};

const _pingCallback = (callback) => {
pingPromise()
.then(callback || emptySuccess)
.catch(callback || emptyError);
const _pingCallback = (timeout, callback) => {
let _timeout, _cb;
if (typeof timeout === 'function') {
_cb = timeout;
_timeout = undefined;
} else {
_timeout = timeout;
_cb = callback;
}
pingPromise(_timeout)
.then(_cb || emptySuccess)
.catch(_cb || emptyError);
};

const _resetCallback = (callback) => {
Expand Down
42 changes: 40 additions & 2 deletions lib/connection.js
Expand Up @@ -249,11 +249,49 @@ function Connection(options) {

/**
* Send an empty MySQL packet to ensure connection is active, and reset @@wait_timeout
*
* @param timeout (optional) timeout value in ms. If reached, throw error and close connection
* @returns {Promise} promise
*/
this.ping = () => {
this.ping = (timeout) => {
return new Promise(function (resolve, reject) {
if (timeout) {
if (timeout < 0) {
reject(
Errors.createError(
'Ping cannot have negative timeout value',
false,
info,
'0A000',
Errors.ER_BAD_PARAMETER_VALUE
)
);
return;
}
const tOut = setTimeout(() => {
reject(Errors.createError('Ping timeout', true, info, '0A000', Errors.ER_PING_TIMEOUT));
// close connection
_addCommand = _addCommandDisabled;
clearTimeout(_timeout);
if (_status !== Status.CLOSING && _status !== Status.CLOSED) {
_sendQueue.clear();
_status = Status.CLOSED;
_socket.destroy();
}
_clear();
}, timeout);
return _addCommand(
new Ping(
() => {
clearTimeout(tOut);
resolve();
},
(err) => {
clearTimeout(tOut);
reject(err);
}
)
);
}
return _addCommand(new Ping(resolve, reject));
});
};
Expand Down
2 changes: 2 additions & 0 deletions lib/misc/errors.js
Expand Up @@ -99,6 +99,8 @@ module.exports.ER_TIMEOUT_NOT_SUPPORTED = 45038;
module.exports.ER_INITIAL_TIMEOUT_ERROR = 45039;
module.exports.ER_DUPLICATE_FIELD = 45040;
module.exports.ER_CLIENT_OPTION_INCOMPATIBILITY = 45041;
module.exports.ER_PING_TIMEOUT = 45042;
module.exports.ER_BAD_PARAMETER_VALUE = 45043;

const keys = Object.keys(module.exports);
const errByNo = {};
Expand Down
2 changes: 1 addition & 1 deletion lib/pool-callback.js
Expand Up @@ -26,7 +26,7 @@ function PoolCallback(options) {

const pingPromise = function (conn) {
return new Promise((resolve, reject) => {
conn.ping((err) => {
conn.ping(options.pingTimeout, (err) => {
if (err) {
reject(err);
} else resolve();
Expand Down
2 changes: 1 addition & 1 deletion lib/pool-promise.js
Expand Up @@ -98,7 +98,7 @@ function PoolPromise(options) {
};

PoolBase.call(this, options, processTaskPromise, createConnectionPoolPromise, (conn) =>
conn.ping()
conn.ping(options.pingTimeout)
);
}

Expand Down
77 changes: 72 additions & 5 deletions test/integration/test-connection.js
Expand Up @@ -289,18 +289,85 @@ describe('connection', () => {
});

it('connection.ping()', function (done) {
shareConn.ping();
shareConn.ping().then(done).catch(done);
const conn = new Connection(new ConnOptions(Conf.baseConfig));
conn.connect().then(() => {
conn.ping();
conn
.ping()
.then(() => {
conn
.ping(-2)
.then(() => {
done(new Error('must have thrown error'));
})
.catch((err) => {
assert.isTrue(err.message.includes('Ping cannot have negative timeout value'));
conn
.ping(200)
.then(() => {
conn.query('SELECT SLEEP(1)');
const initTime = Date.now();
conn
.ping(200)
.then(() => {
done(new Error('must have thrown error after ' + (Date.now() - initTime)));
})
.catch((err) => {
assert.isTrue(
Date.now() - initTime > 195,
'expected > 195, without waiting for SLEEP to finish, but was ' +
(Date.now() - initTime)
);
assert.isTrue(err.message.includes('Ping timeout'));
assert.isFalse(conn.isValid());
done();
});
})
.catch(done);
});
})
.catch(done);
});
});

it('connection.ping() with callback', function (done) {
const conn = base.createCallbackConnection();
conn.connect((err) => {
conn.ping();
conn.ping((err) => {
conn.end();
if (err) done(err);
done();
if (err) {
done(err);
} else {
conn.ping(-2, (err) => {
if (!err) {
done(new Error('must have thrown error'));
} else {
assert.isTrue(err.message.includes('Ping cannot have negative timeout value'));
conn.ping(200, (err) => {
if (err) {
done(err);
} else {
conn.query('SELECT SLEEP(1)');
const initTime = Date.now();
conn.ping(200, (err) => {
if (!err) {
done(new Error('must have thrown error'));
} else {
assert.isTrue(
Date.now() - initTime > 195,
'expected > 195, without waiting for SLEEP to finish, but was ' +
(Date.now() - initTime)
);
assert.isTrue(err.message.includes('Ping timeout'));
assert.isFalse(conn.isValid());
done();
}
});
}
});
}
});
}
});
});
});
Expand Down
55 changes: 54 additions & 1 deletion test/integration/test-pool.js
Expand Up @@ -7,6 +7,7 @@ const stream = require('stream');
const fs = require('fs');
const path = require('path');
const os = require('os');
const Proxy = require('../tools/proxy');

describe('Pool', () => {
const fileName = path.join(os.tmpdir(), Math.random() + 'tempStream.txt');
Expand Down Expand Up @@ -551,7 +552,6 @@ describe('Pool', () => {
conn.release();
})
.catch((err) => {
console.log(err);
done(err);
});
}, 1100);
Expand Down Expand Up @@ -1084,4 +1084,57 @@ describe('Pool', () => {
});
pool.end();
});

it('pool server defect timeout', function (done) {
this.timeout(5000);
const proxy = new Proxy({
port: Conf.baseConfig.port,
proxyPort: 4000,
host: Conf.baseConfig.host
});

const initTime = Date.now();
const pool = base.createPool({
port: 4000,
acquireTimeout: 1000,
minDelayValidation: 0,
connectionLimit: 1,
noControlAfterUse: true
});

// test use proxy that stop answer for 1.5s,
// with pool.getConnection with 1s timeout.
// (minDelayValidation is set to 0, to ensure ping is done each time for existing connection)
pool
.getConnection()
.then((conn) => {
proxy.suspendRemote();
setTimeout(() => {
proxy.resumeRemote();
}, 1500);
conn.release();

pool
.getConnection()
.then(() => {
done(new Error('must have thrown error !' + (Date.now() - initTime)));
})
.catch((err) => {
assert.isTrue(
Date.now() - initTime > 995,
'expected > 1000, but was ' + (Date.now() - initTime)
);
pool
.getConnection()
.then((conn2) => {
conn2.release();
pool.end();
proxy.close();
done();
})
.catch(done);
});
})
.catch(done);
});
});
35 changes: 33 additions & 2 deletions test/tools/proxy.js
Expand Up @@ -4,8 +4,9 @@ function Proxy(args) {
const LOCAL_PORT = args.proxyPort || 6512;
const REMOTE_PORT = args.port;
const REMOTE_ADDR = args.host;
const log = false;
let log = args.log || false;
let server;
let remoteSocket;
let stop = false;

this.close = () => {
Expand All @@ -17,22 +18,35 @@ function Proxy(args) {
stop = true;
};

this.suspendRemote = () => {
server.emit('suspendRemote');
};

this.resumeRemote = () => {
server.emit('resumeRemote');
};

this.resume = () => {
stop = false;
server.listen(LOCAL_PORT);
};

this.start = () => {
const sockets = [];
const remoteSockets = [];
let stopRemote = false;

server = net.createServer((socket) => {
let ended = false;
sockets.push(socket);
if (stop) {
process.nextTick(socket.destroy.bind(socket));
} else {
if (log) console.log(' ** START **');
const remoteSocket = new net.Socket();
remoteSocket = new net.Socket();
remoteSocket.connect(REMOTE_PORT, REMOTE_ADDR, function () {});
remoteSockets.push(remoteSocket);
if (stopRemote) remoteSocket.pause();

remoteSocket.on('data', function (data) {
if (log) console.log('<< ', data.toString());
Expand Down Expand Up @@ -81,6 +95,23 @@ function Proxy(args) {
if (socket) socket.end();
});
});

server.on('suspendRemote', () => {
if (log) console.log('suspend proxy server');
remoteSockets.forEach((socket) => {
if (socket) socket.pause();
});
stopRemote = true;
});

server.on('resumeRemote', () => {
if (log) console.log('resume proxy server');
remoteSockets.forEach((socket) => {
if (socket) socket.resume();
});
stopRemote = false;
});

server.listen(LOCAL_PORT);
if (log) console.log('TCP server accepting connection on port: ' + LOCAL_PORT);
};
Expand Down

0 comments on commit 07fb091

Please sign in to comment.