Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Refactor/Fix pubsub

Complete pubsub tests
  • Loading branch information...
commit 67e3250ff988d23caa12ae767f212374212d0c33 1 parent 90955b4
@maritz authored
View
93 lib/pubsub.js
@@ -92,9 +92,10 @@ var initializePubSub = function initializePubSub (callback) {
var initialize = function () {
/**
- * Set the pubSub client.
+ * Set the pubSub client and initialize the subscriptions and event emitters.
*
* @param {Object} client Redis client to use. This client will be set to pubSub and cannot be used for normal commands after that.
+ * @param {Function} callback Called after the provided redis client is subscribed to the necessary channels.
*/
Nohm.setPubSubClient = function (client, callback) {
pub_sub_client = client;
@@ -110,6 +111,7 @@ var initialize = function () {
/**
* Unsubscribes from the nohm redis pubsub channel.
+ *
* @param {Function} callback Called after the unsubscibe. Parameters: redisClient
*/
Nohm.closePubSub = function closePubSub (callback) {
@@ -139,8 +141,8 @@ var initialize = function () {
};
- // This populates the diff property for `change` events.
- messageComposers.change = function changeComposer (action, diff) {
+ // This populates the diff property for `save` and `update` events.
+ messageComposers.save = messageComposers.update = function changeComposer (action, diff) {
var result = messageComposers.defaultComposer.apply(this, arguments);
result.target.diff = diff;
return result;
@@ -164,10 +166,12 @@ var initialize = function () {
var supportedActions = [ 'create', 'update', 'save', 'remove', 'unlink', 'link' ];
- // Actually only the `action` argument must be declared here, it's the composers
- // duty to define its signature. I put model here to clarify that `action`
- // iself tells nothing about the event.
- Nohm.prototype.fireEvent = function (action) {
+ /**
+ * Fires an event to be published to the redis db by the internal publisher.
+ *
+ * @param {String} event Name of the event to be published. Allowed are: [ 'create', 'update', 'save', 'remove', 'unlink', 'link' ]
+ */
+ Nohm.prototype.fireEvent = function (event) {
var channel;
var composer;
var payload;
@@ -178,57 +182,84 @@ var initialize = function () {
return false;
}
- if (supportedActions.indexOf(action) < 0) {
+ if (supportedActions.indexOf(event) < 0) {
supported = supportedActions.join(', ');
Nohm.logError(
- 'Cannot fire an unsupported action. Was "' + action + '" ' +
+ 'Cannot fire an unsupported action. Was "' + event + '" ' +
'and must be one of ' + supported
);
return false;
}
- channel = this.modelName + ':' + action;
- composer = messageComposers[action] || messageComposers.defaultComposer;
+ channel = this.modelName + ':' + event;
+ composer = messageComposers[event] || messageComposers.defaultComposer;
payload = composer.apply(this, arguments);
publish(channel, payload);
};
- Nohm.setPublish = function (bool) {
- do_publish = bool;
+ /**
+ * Set global boolean to publish events or not.
+ * By default publishing is disabled globally.
+ * The model-specific setting overwrites the global setting.
+ *
+ * @param {Boolean} publish Whether nohm should publish its events.
+ */
+ Nohm.setPublish = function (publish) {
+ do_publish = !!publish;
}
+ /**
+ * Get the model-specific status of whether event should be published or not.
+ * If no model-specific setting is found, the global setting is returned.
+ *
+ * @returns {Boolean} True if this model will publish its events, False if not.
+ */
Nohm.prototype.getPublish = function () {
if (this.hasOwnProperty('publish')) {
- return this.publish;
+ return !!this.publish;
}
return do_publish;
};
- Nohm.prototype.subscribe = function (action, callback) {
- var self = this;
- initializePubSub(function () {
- pub_sub_event_emitter.on(self.modelName+':'+action, callback);
- });
+ /**
+ * Subscribe to events of nohm models.
+ *
+ * @param {String} event_name Name of the event to be listened to. Allowed are: [ 'create', 'update', 'save', 'remove', 'unlink', 'link' ]
+ * @param {Function} callback Called every time an event of the provided name is published on this model.
+ */
+ Nohm.prototype.subscribe = function (event_name, callback) {
+ initializePubSub();
+ pub_sub_event_emitter.on(this.modelName+':'+event_name, callback);
};
- Nohm.prototype.subscribeOnce = function (action, callback) {
- var self = this;
- initializePubSub(function () {
- pub_sub_event_emitter.once(self.modelName+':'+action, callback);
- });
+
+ /**
+ * Subscribe to an event of nohm models only once.
+ *
+ * @param {String} event_name Name of the event to be listened to. Allowed are: [ 'create', 'update', 'save', 'remove', 'unlink', 'link' ]
+ * @param {Function} callback Called once when an event of the provided name is published on this model and then never again.
+ */
+ Nohm.prototype.subscribeOnce = function (event_name, callback) {
+ initializePubSub();
+ pub_sub_event_emitter.once(this.modelName+':'+event_name, callback);
};
- Nohm.prototype.unsubscribe = function (action, callback) {
- var self = this;
- initializePubSub(function () {
- if (! callback) {
- pub_sub_event_emitter.removeAllListeners(self.modelName+':'+action);
+ /**
+ * Unsubscribe from a nohm model event.
+ *
+ * @param {String} event_name Name of the event to be unsubscribed from. Allowed are: [ 'create', 'update', 'save', 'remove', 'unlink', 'link' ]
+ * @param {Function} fn Function to unsubscribe. If none is provided all subscriptions of the given event are unsubscribed!
+ */
+ Nohm.prototype.unsubscribe = function (event_name, fn) {
+ if (pub_sub_event_emitter !== false) {
+ if (! fn) {
+ pub_sub_event_emitter.removeAllListeners(self.modelName+':'+event_name);
} else {
- pub_sub_event_emitter.removeListener(self.modelName+':'+action, callback);
+ pub_sub_event_emitter.removeListener(self.modelName+':'+event_name, fn);
}
- });
+ }
};
};
View
25 lib/relations.js
@@ -64,29 +64,38 @@ exports.getAll = function getAll(objName, name) {
});
};
-exports.numLinks = function numLinks(objName, name) {
- var callback = h.getCallback(arguments),
+/**
+ * Returns the number of links of a specified relation (or the default) an instance has to models of a given modelName.
+ *
+ * @param {String} obj_name Name of the model on the other end of the relation.
+ * @param {String} [relation_name="child"] Name of the relation
+ * @param {Function} Callback Callback called with (err, num_relations
+ */
+exports.numLinks = function numLinks(obj_name, relation_name, callback) {
+ callback = h.getCallback(arguments),
self = this;
- name = name && typeof name !== 'function' ? name : 'child';
+ relation_name = relation_name && typeof relation_name !== 'function' ? relation_name : 'child';
if (!this.id) {
Nohm.logError('Calling numLinks() even though either the object itself or the relation does not have an id.');
}
- this.getClient().scard(this.relationKey(objName, name),
- function (err, value) {
+ this.getClient().scard(this.relationKey(obj_name, relation_name),
+ function (err, num_relations) {
if (err) {
self.logError(err);
}
- callback.call(self, err, value);
+ callback.call(self, err, num_relations);
});
};
var allowedLinkTypes = ['sadd', 'srem'];
exports.__linkProxied = function __linkProxied(type, obj, name, options, callback) {
+ options = typeof(options) === 'object' && Object.keys(options).length > 0 ? options : {};
+ callback = h.getCallback(arguments);
var self = this,
- silent = !!options.silent,
parentName = name === 'child' ? 'parent' : name + 'Parent',
+ silent = !!options.silent,
client = self.getClient(),
redisExec = function (err, childFail, childName) {
if (!err || typeof err === 'function') {
@@ -107,7 +116,7 @@ exports.__linkProxied = function __linkProxied(type, obj, name, options, callbac
},
function (err) {
- if (!silent) {
+ if (!silent && !err) {
self.fireEvent( type === 'sadd' ? 'link' : 'unlink', obj, name );
}
View
12 lib/store.js
@@ -228,10 +228,10 @@ var __update = function __update(all, silent, callback) {
callback.call(self, err.err, true, err.modelName);
} else {
-
+ var diff;
if (!silent && self.getPublish()) {
// we only need the diff if we'll fire the change to pubsub
- var diff = self.propertyDiff();
+ diff = self.propertyDiff();
}
self.__inDB = true;
@@ -242,7 +242,11 @@ var __update = function __update(all, silent, callback) {
}
if (!silent) {
- self.fireEvent(isCreation ? 'create' : 'update');
+ if (isCreation) {
+ self.fireEvent('create');
+ } else {
+ self.fireEvent('update', diff);
+ }
self.fireEvent('save', diff);
}
@@ -313,7 +317,7 @@ var __realDelete = function __realDelete(silent, callback) {
multi.exec(function (err, values) {
self.id = 0;
- if (!silent) {
+ if (!silent && !err) {
self.fireEvent('remove', id);
}
View
28 test/pubsub/child.js
@@ -7,6 +7,8 @@ nohm.setClient(args.setClient);
require(__dirname+'/Model.js');
process.on('message', function (msg) {
+ var event, modelName, fn;
+
switch (msg.question) {
case 'does nohm have pubsub?':
process.send({
@@ -26,8 +28,8 @@ process.on('message', function (msg) {
break;
case 'subscribe':
- var event = msg.args.event;
- var modelName = msg.args.modelName;
+ event = msg.args.event;
+ modelName = msg.args.modelName;
nohm.factory(modelName).subscribe(event, function (change) {
process.send({
question: 'subscribe',
@@ -35,5 +37,27 @@ process.on('message', function (msg) {
})
});
break;
+
+ case 'subscribeOnce':
+ event = msg.args.event;
+ modelName = msg.args.modelName;
+ nohm.factory(modelName).subscribeOnce(event, function (change) {
+ process.send({
+ question: 'subscribeOnce',
+ answer: change
+ })
+ });
+ break;
+
+ case 'unsubscribe':
+ event = msg.args.event;
+ modelName = msg.args.modelName;
+ fn = msg.args.fn;
+ nohm.factory(modelName).unsubscribe(event, fn);
+ process.send({
+ question: 'unsubscribe',
+ answer: true
+ })
+ break;
}
});
View
239 test/pubsubTests.js
@@ -4,26 +4,22 @@ var child_process = require('child_process');
require(__dirname+'/pubsub/Model.js');
-var child_path = __dirname+'/pubsub/child.js'
-
-nohm.logError = function (err) {
- if (err) {
- throw new rror(err);
- }
-};
-
-// TODO base pub/sub tests.
+var child_path = __dirname+'/pubsub/child.js';
var after = function (times, fn) {
return function () {
- if ((times--)==1) {
+ if ((--times) <= 0) {
fn.apply(this, arguments);
}
};
};
-var tearDown = function (next) {
- nohm.closePubSub(next);
+var error_callback = function (t) {
+ return function (err) {
+ if (err)
+ console.log(arguments);
+ t.ok(!err, 'Callback received an error');
+ };
};
var secondaryClient = redis.createClient();
@@ -38,7 +34,7 @@ module.exports = {
counter += 1;
});
- _test();_test();_test();_test();
+ _test();_test();_test();
t.equal(counter, 1, 'Function has been called a wrong number of times');
t.done();
@@ -88,7 +84,7 @@ module.exports = {
var child = child_process.fork(child_path);
var checkNohmPubSubNotInitialized = function (msg) {
if (msg.question === question) {
- t.same(msg.answer, false, 'PubSub in the child process was already initialized.')
+ t.same(msg.answer, false, 'PubSub in the child process was already initialized.');
child.kill();
t.done();
}
@@ -99,8 +95,8 @@ module.exports = {
'initialized': {
setUp: function (next) {
- this.child = child_process.fork(child_path, process.argv);
- this.child.on('message', function (msg) {
+ var child = this.child = child_process.fork(child_path, process.argv);
+ child.on('message', function (msg) {
if (msg.question === 'initialize' && msg.answer === true) {
next();
}
@@ -108,7 +104,16 @@ module.exports = {
throw new Error(msg.error);
}
});
- this.child.send({question: 'initialize'});
+
+ child.ask = function (request, callback) {
+ child.send(request);
+ child.on('message', function (msg) {
+ if (msg.question === request.question) {
+ callback(msg.answer);
+ }
+ });
+ };
+ child.send({question: 'initialize'});
},
tearDown: function (next) {
@@ -122,29 +127,211 @@ module.exports = {
'create': function (t) {
t.expect(5);
var instance = nohm.factory('Tester');
- instance.p('dummy', 'asdasd');
+ instance.p('dummy', 'create');
- this.child.send({
+ this.child.ask({
question: 'subscribe',
args: {
event: 'create',
modelName: 'Tester'
}
+ }, function (answer) {
+ t.ok(instance.id.length > 0, 'ID was not set properly before the child returned the event.');
+ t.same(instance.id, answer.target.id, 'Id from create event wrong');
+ t.same(instance.modelName, answer.target.modelName, 'Modelname from create event wrong');
+ t.same(instance.allProperties(), answer.target.properties, 'Properties from create event wrong');
+ t.done();
+ });
+
+ instance.save(error_callback(t));
+ },
+
+ 'update': function (t) {
+ t.expect(7);
+ var instance = nohm.factory('Tester');
+ instance.p('dummy', 'update');
+ var diff;
+
+ this.child.ask({
+ question: 'subscribe',
+ args: {
+ event: 'update',
+ modelName: 'Tester'
+ }
+ }, function (answer) {
+ t.ok(instance.id.length > 0, 'ID was not set properly before the child returned the event.');
+ t.same(instance.id, answer.target.id, 'Id from update event wrong');
+ t.same(instance.modelName, answer.target.modelName, 'Modelname from update event wrong');
+ t.same(instance.allProperties(), answer.target.properties, 'Properties from update event wrong');
+ t.same(diff, answer.target.diff, 'Properties from update event wrong');
+ t.done();
+ });
+
+ instance.save(function (err) {
+ error_callback(t)(err);
+ instance.p('dummy', 'updatededed');
+ diff = instance.propertyDiff();
+ instance.save(error_callback(t));
});
+ },
+
+ 'save': function (t) {
+ t.expect(10);
+ var instance = nohm.factory('Tester');
+ instance.p('dummy', 'save');
- this.child.on('message', function (msg) {
- if (msg.question === 'subscribe') {
- t.ok(instance.id.length > 0, 'ID was not set properly before the child returned the event?! oO');
- t.same(instance.id, msg.answer.target.id, 'Id from save event wrong');
- t.same(instance.modelName, msg.answer.target.modelName, 'Modelname from save event wrong');
- t.same(instance.allProperties(), msg.answer.target.properties, 'Properties from save event wrong');
+
+ var counter = 0;
+ var props = [];
+
+ this.child.ask({
+ question: 'subscribe',
+ args: {
+ event: 'save',
+ modelName: 'Tester'
+ }
+ }, function (answer) {
+ t.ok(instance.id.length > 0, 'ID was not set properly before the child returned the event.');
+ t.same(instance.id, answer.target.id, 'Id from save event wrong');
+ t.same(instance.modelName, answer.target.modelName, 'Modelname from save event wrong');
+ t.same(props[counter], answer.target.properties, 'Properties from save event wrong');
+ counter++;
+ if (counter >= 2) {
t.done();
}
});
instance.save(function (err) {
- t.ok(!err, 'Saving produced an error');
+ error_callback(t)(err);
+ props.push(instance.allProperties());
+ instance.p('dummy', 'save_the_second');
+ props.push(instance.allProperties());
+ instance.save(error_callback(t));
});
+ },
+
+ 'remove': function (t) {
+ t.expect(6);
+ var instance = nohm.factory('Tester');
+ instance.p('dummy', 'remove');
+ var old_id;
+
+ this.child.ask({
+ question: 'subscribe',
+ args: {
+ event: 'remove',
+ modelName: 'Tester'
+ }
+ }, function (answer) {
+ t.same(instance.id, 0, 'ID was not reset properly before the child returned the event.');
+ t.same(old_id, answer.target.id, 'Id from remove event wrong');
+ t.same(instance.modelName, answer.target.modelName, 'Modelname from remove event wrong');
+ t.same(instance.allProperties(), answer.target.properties, 'Properties from remove event wrong');
+ t.done();
+ });
+
+ instance.save(function (err) {
+ error_callback(t)(err);
+ old_id = instance.id;
+ instance.remove(error_callback(t));
+ });
+ },
+
+ 'link': function (t) {
+ t.expect(9);
+ var instance_child = nohm.factory('Tester');
+ var instance_parent = nohm.factory('Tester');
+ instance_child.p('dummy', 'link_child');
+ instance_parent.p('dummy', 'link_parent');
+ instance_child.link(instance_parent);
+
+ this.child.ask({
+ question: 'subscribe',
+ args: {
+ event: 'link',
+ modelName: 'Tester'
+ }
+ }, function (answer) {
+ t.ok(instance_child.id.length > 0, 'ID was not set properly before the child returned the event.');
+ t.same(instance_child.id, answer.child.id, 'Id from link event wrong');
+ t.same(instance_child.modelName, answer.child.modelName, 'Modelname from link event wrong');
+ t.same(instance_child.allProperties(), answer.child.properties, 'Properties from link event wrong');
+
+ t.ok(instance_parent.id.length > 0, 'ID was not set properly before the child returned the event.');
+ t.same(instance_parent.id, answer.parent.id, 'Id from link event wrong');
+ t.same(instance_parent.modelName, answer.parent.modelName, 'Modelname from link event wrong');
+ t.same(instance_parent.allProperties(), answer.parent.properties, 'Properties from link event wrong');
+ t.done();
+ });
+
+ instance_child.save(error_callback(t));
+ },
+
+ 'unlink': function (t) {
+ t.expect(10);
+ var instance_child = nohm.factory('Tester');
+ var instance_parent = nohm.factory('Tester');
+ instance_child.p('dummy', 'unlink_child');
+ instance_parent.p('dummy', 'unlink_parent');
+ instance_child.link(instance_parent);
+
+ this.child.ask({
+ question: 'subscribe',
+ args: {
+ event: 'unlink',
+ modelName: 'Tester'
+ }
+ }, function (answer) {
+ t.ok(instance_child.id.length > 0, 'ID was not set properly before the child returned the event.');
+ t.same(instance_child.id, answer.child.id, 'Id from unlink event wrong');
+ t.same(instance_child.modelName, answer.child.modelName, 'Modelname from unlink event wrong');
+ t.same(instance_child.allProperties(), answer.child.properties, 'Properties from unlink event wrong');
+
+ t.ok(instance_parent.id.length > 0, 'ID was not set properly before the child returned the event.');
+ t.same(instance_parent.id, answer.parent.id, 'Id from unlink event wrong');
+ t.same(instance_parent.modelName, answer.parent.modelName, 'Modelname from unlink event wrong');
+ t.same(instance_parent.allProperties(), answer.parent.properties, 'Properties from unlink event wrong');
+ t.done();
+ });
+
+ instance_child.save(function (err) {
+ error_callback(t)(err);
+ instance_child.unlink(instance_parent);
+ instance_child.save(error_callback(t))
+ });
+ },
+
+ 'createOnce': function (t) {
+ // because testing a once event is a pain in the ass and really doesn't have many ways it can fail if the on method on the same event works, we only do on once test.
+ t.expect(7);
+ var instance = nohm.factory('Tester');
+ instance.p('dummy', 'create_once');
+ var once_done = 0;
+
+ this.child.ask({
+ question: 'subscribeOnce',
+ args: {
+ event: 'create',
+ modelName: 'Tester'
+ }
+ }, function (answer) {
+ once_done++;
+ t.ok(instance.id.length > 0, 'ID was not set properly before the child returned the event.');
+ t.same(instance.id, answer.target.id, 'Id from createOnce event wrong');
+ t.same(instance.modelName, answer.target.modelName, 'Modelname from createOnce event wrong');
+ t.same(instance.allProperties(), answer.target.properties, 'Properties from createOnce event wrong');
+
+ var instance_inner = nohm.factory('Tester');
+ instance_inner.p('dummy', 'create_once_again');
+ instance_inner.save(error_callback(t));
+
+ setTimeout(function () {
+ t.same(once_done, 1, 'subscribeOnce called the callback more than once.');
+ t.done();
+ }, 150); // this is fucked up :(
+ });
+
+ instance.save(error_callback(t));
}
}
};
Please sign in to comment.
Something went wrong with that request. Please try again.