Permalink
Browse files

connection pooling

  • Loading branch information...
1 parent fcad293 commit 1f91317d30caaf5bdd6eab94c87db4bed77ecdbc @joeferner committed Nov 27, 2012
View
@@ -48,6 +48,7 @@ You can install using Node Package Manager (npm):
* [define](#persistDefine)
* [defineAuto](#persistDefineAuto)
* [setDefaultConnectOptions](#persistSetDefaultConnectOptions)
+ * [shutdown](#persistShutdown)
## Connection
@@ -100,6 +101,9 @@ You can install using Node Package Manager (npm):
## Results Set
* [getById](#resultSetGetById)
+## Connection Pooling
+ * [using](#connectionPoolingUsing)
+
<a name="databaseJson"/>
# database.json
@@ -122,7 +126,13 @@ The file should follow a format like this:
"prod": {
"driver": "sqlite3",
"filename": "prod.db"
- "sqlDir": "./prodSql"
+ "sqlDir": "./prodSql",
+ "pooling": {
+ "name": "testPool",
+ "max": 2,
+ "min": 1,
+ "idleTimeoutMillis": 30000
+ }
}
}
@@ -240,6 +250,20 @@ __Example__
filename: 'test.db',
trace: true});
+<a name="persistShutdown"/>
+### persist.shutdown([callback])
+
+Shutdown persist. This is currently only required if you are using connection pooling. see [generic-pool](https://github.com/coopernurse/node-pool).
+
+__Arguments__
+ * [callback] - Optional callback on successful shutdown.
+
+__Example__
+
+ persist.shutdown(function() {
+ console.log('persist shutdown');
+ });
+
<a name="connection"/>
## Connection
@@ -953,3 +977,10 @@ __Example__
Person.all(connection, function(err, people) {
var person2 = people.getById(2);
});
+
+<a name="connectionPoolingUsing"/>
+### Connection Pooling
+
+Persist uses [generic-pool](https://github.com/coopernurse/node-pool) to manage the connection pool. If you specify
+"pooling" in your configuration you must specify a pool name. See [generic-pool](https://github.com/coopernurse/node-pool)
+for other options. To cleanly shutdown the connection pool you must also call persist.[shutdown](#persistShutdown).
View
@@ -6,6 +6,7 @@ var path = require('path');
var Model = require('./model');
var dbInfo = require('db-info');
var async = require('async');
+var genericPool = require('generic-pool');
var inflection = require('./inflection');
exports.type = require('./type');
@@ -18,53 +19,55 @@ exports.env = null;
var databaseJsonLoaded = false;
var defaultConnectOptions = null;
-exports.define = function (name, columnDefs) {
+exports.define = function(name, columnDefs) {
return Model.define(name, columnDefs);
};
-exports.asyncQueue = function(){
+exports.asyncQueue = function() {
return q;
-}
+};
exports.defineAuto = function(name, options, callback) {
var pluralName = inflection.pluralize(name);
- this.asyncQueue().push(options, function(err, result){
- if(result.tables.hasOwnProperty(pluralName)){
+ this.asyncQueue().push(options, function(err, result) {
+ if (result.tables.hasOwnProperty(pluralName)) {
var columnDefs = result.tables[pluralName].columns;
var model = Model.define(name, columnDefs);
callback(null, model);
}
});
-}
+};
-exports.waitForDefinitionsToFinish = function(callback){
- if(this.asyncQueue().length() > 0){
+exports.waitForDefinitionsToFinish = function(callback) {
+ if (this.asyncQueue().length() > 0) {
this.asyncQueue().drain = callback;
- }else{
+ } else {
callback();
}
-}
+};
-var q = async.queue(function (task, callback) {
- dbInfo.getInfo(task, function(err, result){
- if(err){
+var q = async.queue(function(task, callback) {
+ dbInfo.getInfo(task, function(err, result) {
+ if (err) {
console.log(err);
}
- callback(err,result);
+ callback(err, result);
});
}, 2);
-var setDefaultConnectOptions = exports.setDefaultConnectOptions = function (options) {
+var setDefaultConnectOptions = exports.setDefaultConnectOptions = function(options) {
defaultConnectOptions = options;
};
-var getDefaultConnectOptions = exports.getDefaultConnectOptions = function () {
+var getDefaultConnectOptions = exports.getDefaultConnectOptions = function() {
return defaultConnectOptions;
};
+var pools = {};
+
// opts, callback
// callback - use database.json for connection information
-exports.connect = function () {
+exports.connect = function() {
if (!databaseJsonLoaded) {
tryLoadDatabaseJson(path.join(process.cwd(), "database.json"));
}
@@ -83,21 +86,69 @@ exports.connect = function () {
var driverName = opts.driver;
var Driver = require('./drivers/' + driverName + '.js');
var driver = new Driver();
- driver.connect(opts, connectAfterAutoDefinesComplete.bind(this, callback));
+
+ if (opts.pooling && opts.pooling.name) {
+ var pool;
+ opts.pooling.create = function(callback) {
+ if (opts.trace) {
+ console.log('pooling create');
+ }
+ return driver.connect(opts, function(err, conn) {
+ if (err) {
+ return callback(err);
+ }
+ conn.getPool = function() {
+ return pools[opts.pooling.name];
+ };
+ conn.oldClose = conn.close;
+ conn.close = function() {
+ if (opts.trace) {
+ console.log('pooling release');
+ }
+ return pool.release(conn);
+ };
+ return callback(null, conn);
+ });
+ };
+ opts.pooling.destroy = function(conn) {
+ if (opts.trace) {
+ console.log('pooling destroy');
+ }
+ conn.oldClose();
+ };
+ if (pools[opts.pooling.name]) {
+ pool = pools[opts.pooling.name];
+ } else {
+ pool = pools[opts.pooling.name] = genericPool.Pool(opts.pooling);
+ }
+ return pool.acquire(connectAfterAutoDefinesComplete.bind(this, callback));
+ } else {
+ return driver.connect(opts, connectAfterAutoDefinesComplete.bind(this, callback));
+ }
+};
+
+exports.shutdown = function(callback) {
+ callback = callback || function() {};
+ async.forEach(Object.keys(pools), function(poolName, callback) {
+ var pool = pools[poolName];
+ pool.drain(function() {
+ pool.destroyAllNow(callback);
+ });
+ }, callback);
};
-function connectAfterAutoDefinesComplete(callback, err , connection){
+function connectAfterAutoDefinesComplete(callback, err, connection) {
this.waitForDefinitionsToFinish(callback.bind(this, err, connection));
}
function createConnectionDelegate(fnName) {
- return function () {
+ return function() {
var conn = arguments[0];
if (conn && conn.driver && conn.db) {
return conn[fnName].apply(conn, Array.prototype.slice.call(arguments, 1));
} else {
var args = Array.prototype.slice.call(arguments);
- var fn = function (connection, callback) {
+ var fn = function(connection, callback) {
var newArgs = args.concat(callback);
return connection[fnName].apply(connection, newArgs);
};
@@ -114,7 +165,7 @@ exports.runSqlFromFile = createConnectionDelegate('runSqlFromFile');
exports.runSqlAllFromFile = createConnectionDelegate('runSqlAllFromFile');
exports.runSqlEachFromFile = createConnectionDelegate('runSqlEachFromFile');
-var loadDatabaseJson = exports.loadDatabaseJson = function (pathStr) {
+var loadDatabaseJson = exports.loadDatabaseJson = function(pathStr) {
var databaseJson = JSON.parse(fs.readFileSync(pathStr).toString());
var env = exports.env || databaseJson['default'];
var opts = databaseJson[env];
@@ -123,7 +174,7 @@ var loadDatabaseJson = exports.loadDatabaseJson = function (pathStr) {
databaseJsonLoaded = true;
};
-var tryLoadDatabaseJson = exports.tryLoadDatabaseJson = function (path) {
+var tryLoadDatabaseJson = exports.tryLoadDatabaseJson = function(path) {
try {
var stats = fs.statSync(path);
if (stats) {
View
@@ -37,7 +37,8 @@
"dependencies": {
"async": "0.1.15",
"over": ">=0.0.5",
- "db-info": "0.0.3"
+ "db-info": "0.0.3",
+ "generic-pool": "~2.0.2"
},
"devDependencies": {
"nodeunit": "~>0.6.4",
View
@@ -2,7 +2,7 @@ var persist = require("../lib/persist");
var type = persist.type;
var nodeunit = require("nodeunit");
var assert = require("../test_helpers/assert");
-var testUtils = require("../test_helpers/auto_define_test_utils");
+var testUtils = require("../test_helpers/autoDefineTestUtils");
var Phone;
var PrimaryKeyTest;
var Person;
@@ -12,7 +12,7 @@ exports['Insert'] = nodeunit.testCase({
setUp: function (callback) {
var self = this;
- testUtils.connect(persist, function (err, connection) {
+ testUtils.connect(persist, {}, function (err, connection) {
self.connection = connection;
var dbDriver = connection.opts.driver;
persist.defineAuto("Phone", {driver:dbDriver, db:self.connection.db},function(err,model){
View
@@ -18,7 +18,7 @@ exports['Chain'] = nodeunit.testCase({
"age": type.INTEGER
}).hasMany(this.Phone);
- testUtils.connect(persist, function(err, connection) {
+ testUtils.connect(persist, {}, function(err, connection) {
if(err) { console.log(err); return; }
self.connection = connection;
self.person1 = new self.Person({ name: "Bob O'Neill", age: 21 });
View
@@ -7,7 +7,7 @@ exports['Delete'] = nodeunit.testCase({
setUp: function (callback) {
var self = this;
- testUtils.connect(persist, function (err, connection) {
+ testUtils.connect(persist, {}, function (err, connection) {
self.connection = connection;
self.Person = persist.define("Person", {
"name": type.STRING
View
@@ -44,7 +44,7 @@ exports['Insert'] = nodeunit.testCase({
return callback(true);
};
- testUtils.connect(persist, function (err, connection) {
+ testUtils.connect(persist, {}, function (err, connection) {
self.connection = connection;
callback();
});
View
@@ -17,7 +17,7 @@ exports['ManyToMany'] = nodeunit.testCase({
"name": type.STRING
}).hasMany(this.Company, { through: 'CompanyPerson' });
- testUtils.connect(persist, function(err, connection) {
+ testUtils.connect(persist, {}, function(err, connection) {
if(err) { console.log(err); return; }
self.connection = connection;
self.person1 = new self.Person({ name: "bob" });
View
@@ -0,0 +1,85 @@
+var persist = require("../lib/persist");
+var type = persist.type;
+var nodeunit = require("nodeunit");
+var assert = require("../test_helpers/assert");
+var testUtils = require("../test_helpers/test_utils");
+
+exports['Pool'] = nodeunit.testCase({
+ setUp: function(callback) {
+ var self = this;
+ self.poolingLogs = [];
+
+ this.Person = persist.define("Person", {
+ "name": type.STRING,
+ "age": type.INTEGER
+ });
+
+ self.connectOpts = {
+ trace: false,
+ pooling: {
+ name: 'testPool',
+ max: 2,
+ min: 1,
+ idleTimeoutMillis: 1000,
+ log: function(msg, level) {
+ self.poolingLogs.push(msg);
+ }
+ }
+ };
+ testUtils.connect(persist, self.connectOpts, function(err, connection) {
+ self.pool = connection.getPool();
+ connection.close();
+ callback();
+ });
+ },
+
+ "max connections": function(test) {
+ var self = this;
+
+ // 1st connection
+ persist.connect(self.connectOpts, function(err, conn1) {
+ if (err) {
+ console.error(err);
+ return test.done(err);
+ }
+
+ // 2nd connection
+ return persist.connect(self.connectOpts, function(err, conn2) {
+ if (err) {
+ console.error(err);
+ return test.done(err);
+ }
+
+ // 3rd connection
+ setTimeout(function() {
+ conn1.close();
+ }, 100);
+ return persist.connect(self.connectOpts, function(err, conn3) {
+ if (err) {
+ console.error(err);
+ return test.done(err);
+ }
+
+ conn2.close();
+ conn3.close();
+ persist.shutdown();
+ return test.done();
+ });
+ });
+ });
+
+// this.Person.using(this.connection).all(function(err, people) {
+// if (err) {
+// console.error(err);
+// return;
+// }
+// test.equals(people.length, 2);
+// test.equals(people[0].name, "Bob O'Neill");
+// console.log(people[0].nameAndAge, "Bob O\'Neill: 21");
+// test.equals(JSON.stringify(people[0]), '{"phones":{},"companies":{},"modifiedBy":{},"name":"Bob O\'Neill","age":21,"id":' + people[0].id + ',"nameAndAge":"Bob O\'Neill: 21"}');
+// test.equals(people[1].name, 'john');
+//
+// test.done();
+// });
+ }
+});
View
@@ -14,7 +14,7 @@ exports['Run SQL'] = nodeunit.testCase({
"age": type.INTEGER
});
- testUtils.connect(persist, function(err, connection) {
+ testUtils.connect(persist, {}, function(err, connection) {
if(err) { console.log(err); return; }
self.connection = connection;
Oops, something went wrong.

0 comments on commit 1f91317

Please sign in to comment.