Skip to content

Commit

Permalink
Doc comments + clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
manuelstofer committed Mar 27, 2013
1 parent 78bc318 commit 8d0472d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 56 deletions.
33 changes: 22 additions & 11 deletions src/backends/memory.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
'use strict';

var qry = require('qry');
var qry = require('qry'),
_ = require('underscore');

/**
* In-memory implementation of the storage backend interface
* - for development and mocks
*
* @param options
* @returns {{put: Function, get: Function, del: Function, query: Function}}
*/
module.exports = function (options) {
options = options || {};

var objs = options.data || {},
autoIncId = 0;

/**
* Returns an incrementing number
*
* @returns {number}
*/
function autoInc () {
while(typeof objs[++autoIncId] !== 'undefined');
return autoIncId;
}

/**
* Creates a deep cloned object, ensures only JSON compatible
* data is stored
*
* @param obj
* @returns {*}
*/
function getObjData (obj) {
return JSON.parse(JSON.stringify(obj));
}
Expand Down Expand Up @@ -44,17 +64,8 @@ module.exports = function (options) {

query: function (query, fn) {
var match = qry(query);
fn(null, values(objs).filter(match));
fn(null, _.filter(_.values(objs), match));
}
};
};

function values (obj) {
var vals = [];
for (var key in obj) {
if (obj.hasOwnProperty(key)) {
vals.push(obj[key]);
}
}
return vals;
}
62 changes: 44 additions & 18 deletions src/backends/mongo.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,68 @@
var mongodb = require("mongodb"),
ObjectID = require('mongodb').ObjectID,
map = require('mapr').map;
'use strict';

var mongodb = require("mongodb"),
ObjectID = mongodb.ObjectID,
map = require('mapr').map;

/**
* Backend implementation for MongoDB
*
* @param options
* @param callback
*/
module.exports = function (options, callback) {

var mongoserver = new mongodb.Server(
var server = new mongodb.Server(
options.server.host || 'localhost',
options.server.port || mongodb.Connection.DEFAULT_PORT,
options.server.options
),
db_connector = new mongodb.Db(

connector = new mongodb.Db(
options.db.name,
mongoserver,
server,
options.db.options
),

collection,

objectId = function (obj) {
/**
* Converts to object id to MongoDB ObjectID
*
* @param obj {}
*/
mongoId = function (obj) {
if (obj && obj._id) {
obj._id = new ObjectID(obj._id);
}
return obj;
},

strObjectId = function (obj) {
/**
* Converts the object id back to string
*
* @param obj {}
*/
strId = function (obj) {
if (obj && obj._id) {
obj._id = obj._id.toString();
}
return obj;
};


/**
* Implementation of the storage backend interface
*
* @type {{put: Function, get: Function, del: Function, query: Function}}
*/
var api = {

put: function (obj, fn) {
var oldObj = {};
mongoId(obj);

if (obj._id) {
objectId(obj);
collection.find({_id: obj._id}).limit(1).toArray(function (err, docs) {
oldObj = docs[0];
var findErr = err || (docs.length != 1 ? 'error': null);
Expand All @@ -45,50 +71,50 @@ module.exports = function (options, callback) {
fn(findErr, {}, {});
} else {
collection.save(obj, {safe: true}, function (err) {
fn(err, strObjectId(obj), strObjectId(oldObj));
fn(err, strId(obj), strId(oldObj));
})
}
});
} else {
collection.insert(obj, {safe: true}, function (err, docs) {
fn(err, strObjectId(docs[0]), strObjectId(oldObj));
fn(err, strId(docs[0]), strId(oldObj));
});
}
},

get: function (_id, fn) {
collection.find(objectId({_id: _id})).limit(1).toArray(function (err, docs) {
collection.find(mongoId({_id: _id})).limit(1).toArray(function (err, docs) {
fn(
err || (docs.length != 1? 'error' : null),
strObjectId(docs[0])
strId(docs[0])
);
});
},

del: function (_id, fn) {
collection.find(objectId({_id: _id})).limit(1).toArray(function (err, docs) {
collection.find(mongoId({_id: _id})).limit(1).toArray(function (err, docs) {

var findErr = err || (docs.length != 1 ? 'error': null),
oldObj = docs[0];

if (findErr) {
fn(findErr, {});
} else {
collection.remove(objectId({_id: _id}), true, function (err) {
fn(err || findErr, strObjectId(oldObj));
collection.remove(mongoId({_id: _id}), true, function (err) {
fn(err || findErr, strId(oldObj));
});
}
});
},

query: function (query, fn) {
collection.find(query).toArray(function (err, docs) {
fn(err, map(docs, strObjectId));
fn(err, map(docs, strId));
});
}
};

db_connector.open(function (error, client) {
connector.open(function (error, client) {
collection = new mongodb.Collection(client, options.collection);
collection.remove({}, function () {
callback(api);
Expand Down
1 change: 1 addition & 0 deletions src/query-id.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
'use strict';
var md5 = require('md5').digest_s;

module.exports = function (query) {
Expand Down
46 changes: 24 additions & 22 deletions src/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ module.exports = function storage (options) {
*/
queries = {},

qemitter = emitter({});
/**
* Event emitter for internal communication between clients
*/
bus = emitter({});


return {
Expand Down Expand Up @@ -108,11 +111,9 @@ module.exports = function storage (options) {
unsubscribe = function (_id) {
subscriptionCount[_id] = Math.max(--subscriptionCount[_id], 0);
if (subscriptionCount[_id] === 0) {

subscriptions[_id] = _.without(subscriptions[_id], client);

delete subscriptionCount[_id];

subscriptions[_id] = _.without(subscriptions[_id], client);
if (subscriptions[_id].length === 0) {
delete subscriptions[_id];
delete queries[_id];
Expand Down Expand Up @@ -153,24 +154,18 @@ module.exports = function storage (options) {
oldObj = oldObj || {};
newObj = newObj || {};

var _id = newObj._id || oldObj._id;

each(queries, function (query, queryId) {
var oldMatch = query(oldObj),
newMatch = query(newObj);

if (oldMatch != newMatch) {
qemitter.emit('notify-query', queryId, {
event: newMatch ? 'match' : 'unmatch',
data: newObj,
_id: _id
});

} else if (newMatch) {
qemitter.emit('notify-query', queryId, {
event: 'change',
var didMatch = query(oldObj),
matches = query(newObj),
event = didMatch ?
(matches ? 'change' : 'unmatch') :
(matches ? 'match': null);

if (event) {
bus.emit('notify-query', queryId, {
event: event,
data: newObj,
_id: _id
_id: newObj._id || oldObj._id
});
}
});
Expand Down Expand Up @@ -277,15 +272,22 @@ module.exports = function storage (options) {
}
subscriptionCount = null;

qemitter.off('notify-query', notifyQuery);
bus.off('notify-query', notifyQuery);
});

/**
* Sends a notification to the client if the query is subscribed
*
* @param queryId
* @param notification
*/
function notifyQuery (queryId, notification) {
if (subscriptionCount[queryId]) {
client.emit('notify', queryId, notification);
}
}
qemitter.on('notify-query', notifyQuery)

bus.on('notify-query', notifyQuery)
}
};
};
10 changes: 5 additions & 5 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/*global describe, it, io*/

'use strict';
'use strict';

var storage = require('repo'),
mock = storage.mock(),
client = storage.client({socket: io.connect('http://localhost:2014')});
mock = storage.mock(),
client = storage.client({
socket: io.connect('http://localhost:2014')
});

describeInterface('mock', mock);
describeInterface('client', client);
Expand Down

0 comments on commit 8d0472d

Please sign in to comment.