Skip to content

Commit

Permalink
Fixes #529. Also provides backward compatibility for the new mongo dr…
Browse files Browse the repository at this point in the history
…iver. The documents are again the second parameter of the callback after a save operation
  • Loading branch information
brianhyder committed Apr 19, 2015
1 parent 0fcd77b commit f4886c7
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 43 deletions.
13 changes: 13 additions & 0 deletions include/config.js
Expand Up @@ -640,6 +640,19 @@ var BASE_CONFIG = {
defaultLocale: 'en_US'
},

//The locking service provides a common mechanism for processes to reserve
//access to resources during critical operations. When the locks exist,
//other PB instances will not hinder the other process from completing its
//task.
locks: {

//By default, the cache will be used as the store for the locks
provider: 'redis',

//The default amount of time that a lock will be persisted in seconds.
timeout: 30
},

//Pulls in the package.json file for PB and extracts the version so it is
//available in the configuration.
version: require(path.join(Configuration.DOCUMENT_ROOT, 'package.json')).version
Expand Down
54 changes: 51 additions & 3 deletions include/dao/dao.js
Expand Up @@ -360,12 +360,14 @@ module.exports = function DAOModule(pb) {
};

/**
* Replaces an existing document with the specified DB Object
* Inserts or replaces an existing document with the specified DB Object.
* An insert is distinguished from an update based the presence of the _id
* field.
* @method save
* @param {Object} dbObj The system object to persist
* @param {Object} [options] See http://mongodb.github.io/node-mongodb-native/api-generated/collection.html#save
* @param {Function} cb A callback that takes two parameters. The first, an
* error, if occurred. The second is the result of the persistence operation.
* error, if occurred. The second is the object that was persisted
*/
DAO.prototype.save = function(dbObj, options, cb) {
if (util.isFunction(options)) {
Expand Down Expand Up @@ -402,7 +404,9 @@ module.exports = function DAOModule(pb) {
}

//execute persistence operation
db.collection(dbObj.object_type).save(dbObj, options, cb);
db.collection(dbObj.object_type).save(dbObj, options, function(err/*, writeOpResult*/) {
cb(err, dbObj);
});
});
};

Expand Down Expand Up @@ -471,6 +475,7 @@ module.exports = function DAOModule(pb) {
* @param {Boolean} [options.upsert=false] Inserts the object is not found
* @param {Boolean} [options.multi=false] Updates multiple records if the query
* finds more than 1
* @param {Function} cb
*/
DAO.prototype.updateFields = function(collection, query, updates, options, cb) {
if (util.isFunction(options)) {
Expand Down Expand Up @@ -608,6 +613,49 @@ module.exports = function DAOModule(pb) {
db.collection(collection).ensureIndex(spec, options, cb);
});
};

/**
* Retrieves indexes for the specified collection
* @method indexInfo
* @param {String} collection
* @param {Object} [options={}]
* @param {Function} cb
*/
DAO.prototype.indexInfo = function(collection, options, cb) {
if (util.isFunction(options)) {
cb = options;
options = {};
}
pb.dbm.getDb(this.dbName, function(err, db) {
if (util.isError(err)) {
return cb(err);
}

db.indexInformation(collection, options, cb);
});
};

/**
* Drops the specified index from the given collection
* @method dropIndex
* @param {String} collection
* @param {String} indexName
* @param {Object} [options={}]
* @param {Function} cb
*/
DAO.prototype.dropIndex = function(collection, indexName, options, cb) {
if (util.isFunction(options)) {
cb = options;
options = {};
}

pb.dbm.getDb(this.dbName, function(err, db) {
if (util.isError(err)) {
return cb(err);
}
db.collection(collection).dropIndex(indexName, options, cb);
});
};

/**
* Determines if a collection exists in the DB
Expand Down
167 changes: 167 additions & 0 deletions include/dao/mongo/ttl_index_helper.js
@@ -0,0 +1,167 @@
/*
Copyright (C) 2015 PencilBlue, LLC
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

//dependencies
var util = require('../../util.js');
var async = require('async');

module.exports = function(pb) {

/**
*
* Used to change the TTL index on a collection without elevated permissions
* @class TTLIndexHelper
* @constructor
*/
function TTLIndexHelper() {

/**
*
* @property lockService
* @type {LockService}
*/
this.lockService = new pb.LockService();
}

/**
*
* @private
* @static
* @readonly
* @property INDEX_MOD_KEY_PREFIX
* @type {String}
*/
var INDEX_MOD_KEY_PREFIX = 'TTL_INDEX_MOD_LOCK_';

/**
*
* @method ensureIndex
* @param {Object} procedure
* @param {Function} cb
*/
TTLIndexHelper.prototype.ensureIndex = function(procedure, cb) {
var self = this;

var collection = procedure.collection;
var expiry = procedure.options.expireAfterSeconds;

//ensure an index exists. According to the MongoDB documentation ensure
//index cannot modify a TTL value once it is created. Therefore, we have
//to ensure that the index exists and then send the collection modification
//command to change the TTL value.
var indexName = null;
var acquiredLock = false;
var key = INDEX_MOD_KEY_PREFIX + collection;
var dao = new pb.DAO();
var tasks = [

//ensure the index is there
function(callback) {
dao.ensureIndex(procedure, callback);
},

//check to see if the index has the correct expiry
function(index, callback) {
indexName = index;
self.hasCorrectTTLIndex(collection, index, expiry, callback);
},

//acquire lock when index does not match otherwise pass null to indicate we are done
function(hasCorrectExpiry, callback) {
if (hasCorrectExpiry) {
return callback(null, null);
}

pb.log.silly('TTLIndexHelper:[%s:%s] An incorrect TTL index was detected. Attempting to acquire lock to modify it.', collection, indexName);
self.lockService.acquire(key, callback);
},

//drop index when lock is acquired. If NULL, the expiry is fine.
//If false assume that another process is handling it.
function(acquiredLock, callback) {
if (acquiredLock === null) {
return callback(null, null);
}
else if (acquiredLock === false) {
pb.log.silly('TTLIndexHelper:[%s:%s] Failed to acquire index mod lock. Assuming another PB instance is handling it', collection, indexName);
return callback(null, null);
}

pb.log.silly('TTLIndexHelper:[%s:%s] Lock acquired, dropping the index before recreating it', collection, indexName);
acquiredLock = true;
dao.dropIndex(collection, indexName, callback);
},

//re-create the index when the result is not null
function(dropResult, callback) {
if (dropResult === null) {
return callback(null, true);
}

pb.log.silly('TTLIndexHelper:[%s:%s] Rebuilding TTL index', collection, indexName);
dao.ensureIndex(procedure, callback);
},

//drop the lock
function(indexName, callback) {
if (!acquiredLock) {
return callback(null, indexName);
}

pb.log.silly('TTLIndexHelper:[%s:%s] Dropping index modification lock', collection, indexName);
self.lockService.release(key, function(err, result) {
callback(err, indexName);
});
},
];
async.waterfall(tasks, function(err, result) {
pb.log.silly('TTLIndexHelper: Attempted to ensure the TTL index for collection %s. RESULT=[%s] ERROR=[%s]', collection, result, err ? err.message : 'NONE');
cb(err, !util.isNullOrUndefined(result));
});
};

/**
* Retrieves and compares the expiry of a TTL index on the specified
* collection to the expiry provided. Calls back with TRUE if and only if
* the index is found the expiries match.
* @method hasCorrectTTLIndex
* @param {String} collection
* @param {String} indexName
* @param {Integer} expectedExpiry
* @param {Function} cb
*/
TTLIndexHelper.prototype.hasCorrectTTLIndex = function(collection, indexName, expectedExpiry, cb) {

var dao = new pb.DAO();
dao.indexInfo(collection, {full: true}, function(err, indexes) {
if (util.isError(err)) {
return cb(err);
}

for (var i = 0; i < indexes.length; i++) {

var index = indexes[i];
if (index.name === indexName) {
return cb(null, expectedExpiry === index.expireAfterSeconds);
}
}
cb(new Error('The index '+indexName+' on collection '+collection+' could not be found'), false);
});
};

return TTLIndexHelper;
};
8 changes: 8 additions & 0 deletions include/requirements.js
Expand Up @@ -63,6 +63,14 @@ module.exports = function PB(config) {
var ValidationModule = require(path.join(config.docRoot, '/include/validation/validation_service.js'));
pb.ValidationService = ValidationModule(pb);
pb.validation = pb.ValidationService;

//lock services
pb.locks = {
providers: {
CacheLockProvider: require(path.join(config.docRoot, '/include/service/locks/providers/cache_lock_provider.js'))(pb)
}
};
pb.LockService = require(path.join(config.docRoot, '/include/service/locks/lock_service.js'))(pb);

//setup the session handler
var SessionModule = require(path.join(config.docRoot, '/include/session/session.js'));
Expand Down

0 comments on commit f4886c7

Please sign in to comment.