From 7b30fb047148da82bb0e3a564d35e88532255b55 Mon Sep 17 00:00:00 2001 From: Xu Cao Date: Fri, 16 Dec 2016 13:43:00 -0800 Subject: [PATCH] Add TCP KeepAlive connection config option --- README.md | 3 +- lib/connection.js | 875 +++++++++++++++++++++--------------------- src/connection.coffee | 10 +- 3 files changed, 451 insertions(+), 437 deletions(-) diff --git a/README.md b/README.md index 9924127..e830fd1 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # node-vertica [![Build Status](https://travis-ci.org/wvanbergen/node-vertica.png?branch=travis)](https://travis-ci.org/wvanbergen/node-vertica) -> WARNING: this library is not maintained. +> WARNING: this library is not maintained. A pure javascript library to connect to a Vertica database. Except that it is written in CoffeeScript. @@ -33,6 +33,7 @@ options are supported. - `initializer`: a callback function that gets called after connection but before any query gets executed. - `decoders`: an object containing custom buffer decoders for query result field deserialization, see usage in custom decoders test. +- `keepAlive`: Enable/disable tcp keep-alive functionality (default: false). ```coffeescript diff --git a/lib/connection.js b/lib/connection.js index 1c29802..fb749a7 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -1,472 +1,481 @@ // Generated by CoffeeScript 1.11.1 -var Authentication, BackendMessage, Connection, EventEmitter, FrontendMessage, Query, errors, net, util, - extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }, - hasProp = {}.hasOwnProperty; +(function() { + var Authentication, BackendMessage, Connection, EventEmitter, FrontendMessage, Query, errors, net, util, + extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }, + hasProp = {}.hasOwnProperty; -util = require('util'); + util = require('util'); -net = require('net'); + net = require('net'); -EventEmitter = require('events').EventEmitter; + EventEmitter = require('events').EventEmitter; -FrontendMessage = require('./frontend_message'); + FrontendMessage = require('./frontend_message'); -BackendMessage = require('./backend_message'); + BackendMessage = require('./backend_message'); -Authentication = require('./authentication'); + Authentication = require('./authentication'); -Query = require('./query'); + Query = require('./query'); -errors = require('./errors'); + errors = require('./errors'); -Connection = (function(superClass) { - extend(Connection, superClass); + Connection = (function(superClass) { + extend(Connection, superClass); - function Connection(connectionOptions) { - var base, base1, base2; - this.connectionOptions = connectionOptions; - if ((base = this.connectionOptions).host == null) { - base.host = 'localhost'; - } - if ((base1 = this.connectionOptions).port == null) { - base1.port = 5433; - } - if ((base2 = this.connectionOptions).ssl == null) { - base2.ssl = 'optional'; + function Connection(connectionOptions) { + var base, base1, base2, base3; + this.connectionOptions = connectionOptions; + if ((base = this.connectionOptions).host == null) { + base.host = 'localhost'; + } + if ((base1 = this.connectionOptions).port == null) { + base1.port = 5433; + } + if ((base2 = this.connectionOptions).ssl == null) { + base2.ssl = 'optional'; + } + if ((base3 = this.connectionOptions).keepAlive == null) { + base3.keepAlive = false; + } + this.connected = false; + this.busy = true; + this.queue = []; + this.parameters = {}; + this.key = null; + this.pid = null; + this.transactionStatus = null; + this.incomingData = new Buffer(0); } - this.connected = false; - this.busy = true; - this.queue = []; - this.parameters = {}; - this.key = null; - this.pid = null; - this.transactionStatus = null; - this.incomingData = new Buffer(0); - } - - Connection.prototype.connect = function(callback) { - var initialErrorHandler; - this.connectedCallback = callback; - this.connection = net.createConnection(this.connectionOptions.port, this.connectionOptions.host); - initialErrorHandler = (function(_this) { - return function(err) { - if (_this.connectedCallback) { - return _this.connectedCallback(err.message); - } else { - return _this.emit('error', err); - } - }; - })(this); - this.connection.on('error', initialErrorHandler); - return this.connection.on('connect', (function(_this) { - return function() { - _this.connection.removeListener('error', initialErrorHandler); - _this.connected = true; - _this._bindEventListeners(); - if (_this.connectionOptions.ssl) { - _this._writeMessage(new FrontendMessage.SSLRequest); - return _this.connection.once('data', function(buffer) { - var conn, err, sslOptions; - if ('S' === buffer.toString('utf-8')) { - sslOptions = { - key: _this.connectionOptions.sslKey, - cert: _this.connectionOptions.sslCert, - ca: _this.connectionOptions.sslCA - }; - return conn = require('./starttls')(_this.connection, sslOptions, function() { - var err; - if (!conn.authorized && _this.connectionOptions.ssl === 'verified') { - conn.end(); - _this.disconnect(); - err = new errors.SSLError(conn.authorizationError); - if (_this.connectedCallback) { - return _this.connectedCallback(err); + + Connection.prototype.connect = function(callback) { + var initialErrorHandler; + this.connectedCallback = callback; + this.connection = net.createConnection(this.connectionOptions.port, this.connectionOptions.host); + initialErrorHandler = (function(_this) { + return function(err) { + if (_this.connectedCallback) { + return _this.connectedCallback(err.message); + } else { + return _this.emit('error', err); + } + }; + })(this); + this.connection.on('error', initialErrorHandler); + return this.connection.on('connect', (function(_this) { + return function() { + _this.connection.removeListener('error', initialErrorHandler); + _this.connected = true; + _this._bindEventListeners(); + if (_this.connectionOptions.keepAlive) { + _this.connection.setKeepAlive(true); + } + if (_this.connectionOptions.ssl) { + _this._writeMessage(new FrontendMessage.SSLRequest); + return _this.connection.once('data', function(buffer) { + var conn, err, sslOptions; + if ('S' === buffer.toString('utf-8')) { + sslOptions = { + key: _this.connectionOptions.sslKey, + cert: _this.connectionOptions.sslCert, + ca: _this.connectionOptions.sslCA + }; + return conn = require('./starttls')(_this.connection, sslOptions, function() { + var err; + if (!conn.authorized && _this.connectionOptions.ssl === 'verified') { + conn.end(); + _this.disconnect(); + err = new errors.SSLError(conn.authorizationError); + if (_this.connectedCallback) { + return _this.connectedCallback(err); + } else { + return _this.emit('error', err); + } } else { - return _this.emit('error', err); + if (!conn.authorized) { + _this.emit('warn', conn.authorizationError); + } + _this.connection = conn; + _this._bindEventListeners(); + return _this._handshake(); } + }); + } else if (_this.connectionOptions.ssl === "optional") { + return _this._handshake(); + } else { + err = new errors.SSLError("The server does not support SSL connection"); + if (_this.connectedCallback) { + return _this.connectedCallback(err); } else { - if (!conn.authorized) { - _this.emit('warn', conn.authorizationError); - } - _this.connection = conn; - _this._bindEventListeners(); - return _this._handshake(); + return _this.emit('error', err); } - }); - } else if (_this.connectionOptions.ssl === "optional") { - return _this._handshake(); - } else { - err = new errors.SSLError("The server does not support SSL connection"); - if (_this.connectedCallback) { - return _this.connectedCallback(err); - } else { - return _this.emit('error', err); } - } - }); - } else { - return _this._handshake(); - } - }; - })(this)); - }; - - Connection.prototype._bindEventListeners = function() { - this.connection.once('close', this._onClose.bind(this)); - this.connection.once('error', this._onError.bind(this)); - return this.connection.once('timeout', this._onTimeout.bind(this)); - }; - - Connection.prototype.disconnect = function(error) { - if (error) { - this._onError(error); - } - if (this.connection.connected) { - this._writeMessage(new FrontendMessage.Terminate()); - } - return this.connection.end(); - }; - - Connection.prototype.isSSL = function() { - return (this.connection.pair != null) && (this.connection.encrypted != null); - }; - - Connection.prototype._scheduleJob = function(job) { - if (this.busy) { - this.queue.push(job); - this.emit('queuejob', job); - } else { - this._runJob(job); - } - return job; - }; - - Connection.prototype._runJob = function(job) { - if (!this.connected) { - throw new errors.ClientStateError("Connection is closed"); - } - if (this.busy) { - throw new errors.ClientStateError("Connection is busy"); - } - this.busy = true; - this.currentJob = job; - job.run(); - return job; - }; - - Connection.prototype._processJobQueue = function() { - if (this.queue.length > 0) { - return this._runJob(this.queue.shift()); - } else { - return this.emit('ready', this); - } - }; - - Connection.prototype.query = function(sql, callback) { - return this._scheduleJob(new Query(this, sql, callback)); - }; - - Connection.prototype._queryDirect = function(sql, callback) { - return this._runJob(new Query(this, sql, callback)); - }; - - Connection.prototype.copy = function(sql, source, callback) { - var q; - q = new Query(this, sql, callback); - q.copyInSource = source; - return this._scheduleJob(q); - }; - - Connection.prototype._handshake = function() { - var authenticationFailureHandler, authenticationHandler; - authenticationFailureHandler = (function(_this) { - return function(err) { - err = new errors.AuthenticationError(err); - if (_this.connectedCallback) { - return _this.connectedCallback(err); - } else { - return _this.emit('error', err); - } - }; - })(this); - authenticationHandler = (function(_this) { - return function(msg) { - switch (msg.method) { - case Authentication.methods.OK: - return _this.once('ReadyForQuery', function(msg) { - _this.removeListener('ErrorResponse', authenticationFailureHandler); - return _this._initializeConnection(); }); - case Authentication.methods.CLEARTEXT_PASSWORD: - case Authentication.methods.MD5_PASSWORD: - _this._writeMessage(new FrontendMessage.Password(_this.connectionOptions.password, msg.method, { - salt: msg.salt, - user: _this.connectionOptions.user - })); - return _this.once('Authentication', authenticationHandler); - default: - throw new errors.ClientStateError("Authentication method " + msg.method + " not supported."); - } - }; - })(this); - this.connection.on('data', this._onData.bind(this)); - this._writeMessage(new FrontendMessage.Startup(this.connectionOptions.user, this.connectionOptions.database)); - this.once('ErrorResponse', authenticationFailureHandler); - this.once('Authentication', authenticationHandler); - this.on('ParameterStatus', (function(_this) { - return function(msg) { - return _this.parameters[msg.name] = msg.value; - }; - })(this)); - this.on('BackendKeyData', (function(_this) { - return function(msg) { - var ref; - return ref = [msg.pid, msg.key], _this.pid = ref[0], _this.key = ref[1], ref; - }; - })(this)); - return this.on('ReadyForQuery', (function(_this) { - return function(msg) { - _this.busy = false; - _this.currentJob = false; - return _this.transactionStatus = msg.transactionStatus; - }; - })(this)); - }; - - Connection.prototype._initializeConnection = function() { - var chain, i, initializer, initializers, len; - initializers = []; - if (!this.connectionOptions.skipInitialization) { - if (this.connectionOptions.interruptible) { - initializers.push(this._initializeInterrupt); + } else { + return _this._handshake(); + } + }; + })(this)); + }; + + Connection.prototype._bindEventListeners = function() { + this.connection.once('close', this._onClose.bind(this)); + this.connection.once('error', this._onError.bind(this)); + return this.connection.once('timeout', this._onTimeout.bind(this)); + }; + + Connection.prototype.disconnect = function(error) { + if (error) { + this._onError(error); } - if (this.connectionOptions.role != null) { - initializers.push(this._initializeRoles); + if (this.connection.connected) { + this._writeMessage(new FrontendMessage.Terminate()); } - if (this.connectionOptions.searchPath != null) { - initializers.push(this._initializeSearchPath); + return this.connection.end(); + }; + + Connection.prototype.isSSL = function() { + return (this.connection.pair != null) && (this.connection.encrypted != null); + }; + + Connection.prototype._scheduleJob = function(job) { + if (this.busy) { + this.queue.push(job); + this.emit('queuejob', job); + } else { + this._runJob(job); } - if (this.connectionOptions.timezone != null) { - initializers.push(this._initializeTimezone); + return job; + }; + + Connection.prototype._runJob = function(job) { + if (!this.connected) { + throw new errors.ClientStateError("Connection is closed"); } - if (this.connectionOptions.initializer != null) { - initializers.push(this.connectionOptions.initializer); + if (this.busy) { + throw new errors.ClientStateError("Connection is busy"); } - } - chain = this._initializationSuccess.bind(this); - for (i = 0, len = initializers.length; i < len; i++) { - initializer = initializers[i]; - chain = initializer.bind(this, chain, this._initializationFailure.bind(this)); - } - return chain(); - }; - - Connection.prototype._initializeRoles = function(next, fail) { - var roles; - roles = this.connectionOptions.role instanceof Array ? this.connectionOptions.role : [this.connectionOptions.role]; - return this._queryDirect("SET ROLE " + (roles.join(', ')), (function(_this) { - return function(err, result) { - if (err != null) { - return fail(err); - } else { - return next(); + this.busy = true; + this.currentJob = job; + job.run(); + return job; + }; + + Connection.prototype._processJobQueue = function() { + if (this.queue.length > 0) { + return this._runJob(this.queue.shift()); + } else { + return this.emit('ready', this); + } + }; + + Connection.prototype.query = function(sql, callback) { + return this._scheduleJob(new Query(this, sql, callback)); + }; + + Connection.prototype._queryDirect = function(sql, callback) { + return this._runJob(new Query(this, sql, callback)); + }; + + Connection.prototype.copy = function(sql, source, callback) { + var q; + q = new Query(this, sql, callback); + q.copyInSource = source; + return this._scheduleJob(q); + }; + + Connection.prototype._handshake = function() { + var authenticationFailureHandler, authenticationHandler; + authenticationFailureHandler = (function(_this) { + return function(err) { + err = new errors.AuthenticationError(err); + if (_this.connectedCallback) { + return _this.connectedCallback(err); + } else { + return _this.emit('error', err); + } + }; + })(this); + authenticationHandler = (function(_this) { + return function(msg) { + switch (msg.method) { + case Authentication.methods.OK: + return _this.once('ReadyForQuery', function(msg) { + _this.removeListener('ErrorResponse', authenticationFailureHandler); + return _this._initializeConnection(); + }); + case Authentication.methods.CLEARTEXT_PASSWORD: + case Authentication.methods.MD5_PASSWORD: + _this._writeMessage(new FrontendMessage.Password(_this.connectionOptions.password, msg.method, { + salt: msg.salt, + user: _this.connectionOptions.user + })); + return _this.once('Authentication', authenticationHandler); + default: + throw new errors.ClientStateError("Authentication method " + msg.method + " not supported."); + } + }; + })(this); + this.connection.on('data', this._onData.bind(this)); + this._writeMessage(new FrontendMessage.Startup(this.connectionOptions.user, this.connectionOptions.database)); + this.once('ErrorResponse', authenticationFailureHandler); + this.once('Authentication', authenticationHandler); + this.on('ParameterStatus', (function(_this) { + return function(msg) { + return _this.parameters[msg.name] = msg.value; + }; + })(this)); + this.on('BackendKeyData', (function(_this) { + return function(msg) { + var ref; + return ref = [msg.pid, msg.key], _this.pid = ref[0], _this.key = ref[1], ref; + }; + })(this)); + return this.on('ReadyForQuery', (function(_this) { + return function(msg) { + _this.busy = false; + _this.currentJob = false; + return _this.transactionStatus = msg.transactionStatus; + }; + })(this)); + }; + + Connection.prototype._initializeConnection = function() { + var chain, i, initializer, initializers, len; + initializers = []; + if (!this.connectionOptions.skipInitialization) { + if (this.connectionOptions.interruptible) { + initializers.push(this._initializeInterrupt); } - }; - })(this)); - }; - - Connection.prototype._initializeSearchPath = function(next, fail) { - var searchPath; - searchPath = this.connectionOptions.searchPath instanceof Array ? this.connectionOptions.searchPath : [this.connectionOptions.searchPath]; - return this._queryDirect("SET SEARCH_PATH TO " + (searchPath.join(', ')), (function(_this) { - return function(err, result) { - if (err != null) { - return fail(err); - } else { - return next(); + if (this.connectionOptions.role != null) { + initializers.push(this._initializeRoles); } - }; - })(this)); - }; - - Connection.prototype._initializeTimezone = function(next, fail) { - return this._queryDirect("SET TIMEZONE TO '" + this.connectionOptions.timezone + "'", (function(_this) { - return function(err, result) { - if (err != null) { - return fail(err); - } else { - return next(); + if (this.connectionOptions.searchPath != null) { + initializers.push(this._initializeSearchPath); } - }; - })(this)); - }; - - Connection.prototype._initializeInterrupt = function(next, fail) { - return this._queryDirect("SELECT session_id FROM v_monitor.current_session", (function(_this) { - return function(err, result) { - if (err != null) { - fail(err); + if (this.connectionOptions.timezone != null) { + initializers.push(this._initializeTimezone); } - _this.sessionID = result.theValue(); - return next(); - }; - })(this)); - }; - - Connection.prototype._initializationSuccess = function() { - this.on('ReadyForQuery', this._processJobQueue.bind(this)); - this._processJobQueue(); - if (this.connectedCallback) { - return this.connectedCallback(null, this); - } - }; - - Connection.prototype._initializationFailure = function(err) { - if (this.connectedCallback) { - return this.connectedCallback(err); - } else { - return this.emit('error', err); - } - }; - - Connection.prototype._onData = function(buffer) { - var bufferedData, message, size; - if (this.incomingData.length === 0) { - this.incomingData = buffer; - } else { - bufferedData = new Buffer(this.incomingData.length + buffer.length); - this.incomingData.copy(bufferedData); - buffer.copy(bufferedData, this.incomingData.length); - this.incomingData = bufferedData; - } - while (this.incomingData.length >= 5) { - size = this.incomingData.readUInt32BE(1); - if (size + 1 <= this.incomingData.length) { - message = BackendMessage.fromBuffer(this.incomingData.slice(0, size + 1)); - if (this.debug) { - console.log('<=', message.event, message); + if (this.connectionOptions.initializer != null) { + initializers.push(this.connectionOptions.initializer); } - this.emit('message', message); - this.emit(message.event, message); - this.incomingData = this.incomingData.slice(size + 1); + } + chain = this._initializationSuccess.bind(this); + for (i = 0, len = initializers.length; i < len; i++) { + initializer = initializers[i]; + chain = initializer.bind(this, chain, this._initializationFailure.bind(this)); + } + return chain(); + }; + + Connection.prototype._initializeRoles = function(next, fail) { + var roles; + roles = this.connectionOptions.role instanceof Array ? this.connectionOptions.role : [this.connectionOptions.role]; + return this._queryDirect("SET ROLE " + (roles.join(', ')), (function(_this) { + return function(err, result) { + if (err != null) { + return fail(err); + } else { + return next(); + } + }; + })(this)); + }; + + Connection.prototype._initializeSearchPath = function(next, fail) { + var searchPath; + searchPath = this.connectionOptions.searchPath instanceof Array ? this.connectionOptions.searchPath : [this.connectionOptions.searchPath]; + return this._queryDirect("SET SEARCH_PATH TO " + (searchPath.join(', ')), (function(_this) { + return function(err, result) { + if (err != null) { + return fail(err); + } else { + return next(); + } + }; + })(this)); + }; + + Connection.prototype._initializeTimezone = function(next, fail) { + return this._queryDirect("SET TIMEZONE TO '" + this.connectionOptions.timezone + "'", (function(_this) { + return function(err, result) { + if (err != null) { + return fail(err); + } else { + return next(); + } + }; + })(this)); + }; + + Connection.prototype._initializeInterrupt = function(next, fail) { + return this._queryDirect("SELECT session_id FROM v_monitor.current_session", (function(_this) { + return function(err, result) { + if (err != null) { + fail(err); + } + _this.sessionID = result.theValue(); + return next(); + }; + })(this)); + }; + + Connection.prototype._initializationSuccess = function() { + this.on('ReadyForQuery', this._processJobQueue.bind(this)); + this._processJobQueue(); + if (this.connectedCallback) { + return this.connectedCallback(null, this); + } + }; + + Connection.prototype._initializationFailure = function(err) { + if (this.connectedCallback) { + return this.connectedCallback(err); } else { - break; + return this.emit('error', err); } - } - return void 0; - }; - - Connection.prototype._onClose = function() { - var error; - this.connected = false; - error = new errors.ConnectionError("The connection was closed."); - if (this.currentJob) { - this.currentJob.onConnectionError(error); - } - this.currentJob = false; - return this.emit('close'); - }; - - Connection.prototype._onTimeout = function() { - var error; - error = new errors.ConnectionError("The connection timed out."); - if (this.currentJob) { - this.currentJob.onConnectionError(error); - } - this.currentJob = false; - return this.emit('timeout'); - }; - - Connection.prototype._onError = function(err) { - var error, ref; - error = new errors.ConnectionError((ref = err.message) != null ? ref : err.toString()); - if (this.currentJob) { - this.currentJob.onConnectionError(error); - } - this.currentJob = false; - return this.emit('error', error); - }; + }; - Connection.prototype._writeMessage = function(msg, callback) { - if (this.debug) { - console.log('=>', msg.__proto__.constructor.name, msg); - } - return this.connection.write(msg.toBuffer(), callback); - }; - - Connection.prototype.isInterruptible = function() { - return this.sessionID != null; - }; - - Connection.prototype._interruptConnection = function(cb) { - var bareClient, bareConnectionOptions; - if (this.sessionID != null) { - bareConnectionOptions = { - skipInitialization: true - }; - bareConnectionOptions.__proto__ = this.connectionOptions; - bareClient = new Connection(bareConnectionOptions); - return bareClient.connect(cb); - } else { - return cb(new errors.ClientStateError("Cannot interrupt connection! It's not initialized as interruptible."), null); - } - }; + Connection.prototype._onData = function(buffer) { + var bufferedData, message, size; + if (this.incomingData.length === 0) { + this.incomingData = buffer; + } else { + bufferedData = new Buffer(this.incomingData.length + buffer.length); + this.incomingData.copy(bufferedData); + buffer.copy(bufferedData, this.incomingData.length); + this.incomingData = bufferedData; + } + while (this.incomingData.length >= 5) { + size = this.incomingData.readUInt32BE(1); + if (size + 1 <= this.incomingData.length) { + message = BackendMessage.fromBuffer(this.incomingData.slice(0, size + 1)); + if (this.debug) { + console.log('<=', message.event, message); + } + this.emit('message', message); + this.emit(message.event, message); + this.incomingData = this.incomingData.slice(size + 1); + } else { + break; + } + } + return void 0; + }; + + Connection.prototype._onClose = function() { + var error; + this.connected = false; + error = new errors.ConnectionError("The connection was closed."); + if (this.currentJob) { + this.currentJob.onConnectionError(error); + } + this.currentJob = false; + return this.emit('close'); + }; + + Connection.prototype._onTimeout = function() { + var error; + error = new errors.ConnectionError("The connection timed out."); + if (this.currentJob) { + this.currentJob.onConnectionError(error); + } + this.currentJob = false; + return this.emit('timeout'); + }; + + Connection.prototype._onError = function(err) { + var error, ref; + error = new errors.ConnectionError((ref = err.message) != null ? ref : err.toString()); + if (this.currentJob) { + this.currentJob.onConnectionError(error); + } + this.currentJob = false; + return this.emit('error', error); + }; - Connection.prototype._success = function(err, cb) { - if (err != null) { - if (cb != null) { - cb(err); + Connection.prototype._writeMessage = function(msg, callback) { + if (this.debug) { + console.log('=>', msg.__proto__.constructor.name, msg); + } + return this.connection.write(msg.toBuffer(), callback); + }; + + Connection.prototype.isInterruptible = function() { + return this.sessionID != null; + }; + + Connection.prototype._interruptConnection = function(cb) { + var bareClient, bareConnectionOptions; + if (this.sessionID != null) { + bareConnectionOptions = { + skipInitialization: true + }; + bareConnectionOptions.__proto__ = this.connectionOptions; + bareClient = new Connection(bareConnectionOptions); + return bareClient.connect(cb); } else { - this.emit('error', err); + return cb(new errors.ClientStateError("Cannot interrupt connection! It's not initialized as interruptible."), null); } - return false; - } else { - return true; - } - }; - - Connection.prototype.interruptSession = function(cb) { - return this._interruptConnection((function(_this) { - return function(err, conn) { - if (_this._success(err, cb)) { - return conn.query("SELECT CLOSE_SESSION('" + _this.sessionID + "')", function(err, rs) { - conn.disconnect(); - if (_this._success(err, cb) && (cb != null)) { - return cb(null, rs.theValue()); - } - }); + }; + + Connection.prototype._success = function(err, cb) { + if (err != null) { + if (cb != null) { + cb(err); + } else { + this.emit('error', err); } - }; - })(this)); - }; - - Connection.prototype.interruptStatement = function(cb) { - return this._interruptConnection((function(_this) { - return function(err, conn) { - if (_this._success(err, cb)) { - return conn.query("SELECT statement_id FROM v_monitor.sessions WHERE session_id = '" + _this.sessionID + "'", function(err, rs) { - var statementID; - if (!_this._success(err, cb)) { - return conn.disconnect(); - } else if (rs.getLength() === 1 && (statementID = rs.theValue())) { - return conn.query("SELECT INTERRUPT_STATEMENT('" + _this.sessionID + "', " + statementID + ")", function(err, rs) { - conn.disconnect(); - if (_this._success(err, cb) && (cb != null)) { - return cb(null, rs.theValue()); - } - }); - } else { + return false; + } else { + return true; + } + }; + + Connection.prototype.interruptSession = function(cb) { + return this._interruptConnection((function(_this) { + return function(err, conn) { + if (_this._success(err, cb)) { + return conn.query("SELECT CLOSE_SESSION('" + _this.sessionID + "')", function(err, rs) { conn.disconnect(); - return _this._success("Session " + _this.sessionID + " is not running a statement at the moment.", cb); - } - }); - } - }; - })(this)); - }; + if (_this._success(err, cb) && (cb != null)) { + return cb(null, rs.theValue()); + } + }); + } + }; + })(this)); + }; + + Connection.prototype.interruptStatement = function(cb) { + return this._interruptConnection((function(_this) { + return function(err, conn) { + if (_this._success(err, cb)) { + return conn.query("SELECT statement_id FROM v_monitor.sessions WHERE session_id = '" + _this.sessionID + "'", function(err, rs) { + var statementID; + if (!_this._success(err, cb)) { + return conn.disconnect(); + } else if (rs.getLength() === 1 && (statementID = rs.theValue())) { + return conn.query("SELECT INTERRUPT_STATEMENT('" + _this.sessionID + "', " + statementID + ")", function(err, rs) { + conn.disconnect(); + if (_this._success(err, cb) && (cb != null)) { + return cb(null, rs.theValue()); + } + }); + } else { + conn.disconnect(); + return _this._success("Session " + _this.sessionID + " is not running a statement at the moment.", cb); + } + }); + } + }; + })(this)); + }; + + return Connection; - return Connection; + })(EventEmitter); -})(EventEmitter); + module.exports = Connection; -module.exports = Connection; +}).call(this); diff --git a/src/connection.coffee b/src/connection.coffee index 99498d3..f95832d 100644 --- a/src/connection.coffee +++ b/src/connection.coffee @@ -11,9 +11,10 @@ errors = require('./errors') class Connection extends EventEmitter constructor: (@connectionOptions) -> - @connectionOptions.host ?= 'localhost' - @connectionOptions.port ?= 5433 - @connectionOptions.ssl ?= 'optional' + @connectionOptions.host ?= 'localhost' + @connectionOptions.port ?= 5433 + @connectionOptions.ssl ?= 'optional' + @connectionOptions.keepAlive ?= false @connected = false @busy = true @@ -40,6 +41,9 @@ class Connection extends EventEmitter @connected = true @_bindEventListeners() + if @connectionOptions.keepAlive + @connection.setKeepAlive true + if @connectionOptions.ssl @_writeMessage(new FrontendMessage.SSLRequest) @connection.once 'data', (buffer) =>