Skip to content

Commit

Permalink
Passive bug fix and added hostname to event
Browse files Browse the repository at this point in the history
v0.2.3
* Minor bug fix related to passive listeners where a fromDT was not
passed in the options
* Added hostName to messages for better tracking/logging
* Modified passive callback to pass the actual message as the "this"
argument, you can now use this.event to get the actual event that was
responded to
* Updated the on() method to accept strings or regular expressions to
filter events on
  • Loading branch information
jdarling committed Aug 1, 2012
1 parent 086931e commit 25bf760
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 20 deletions.
21 changes: 16 additions & 5 deletions lib/MongoMQ.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
var MC = require('./MongoConnection').MongoConnection;
var UUID = require('node-uuid');
var QueueListener = require('./QueueListener').QueueListener;
var hostName = require('os').hostname();

exports.MongoConnection = MC;

var defaults = {
collectionName: 'queue',
db: 'mongomq',
collectionSize: 100000000,
collectionSize: 104857600,
onOptions: {
passive: false
}
Expand All @@ -47,7 +50,7 @@ var errors = {
};

var validateListenerArguments = function(msgType, options, handler){
if(typeof(msgType)!=='string'){
if(typeof(msgType)!=='string'&&(!(msgType instanceof RegExp))){
throw errors.E_INVALIDEVENTTYPE;
}
if(typeof(options)==='function'){
Expand Down Expand Up @@ -80,7 +83,7 @@ var validateListenerArguments = function(msgType, options, handler){
selector.partial = false;
}
if(options.passive){
selector.emitted = {$gte: options.after};
selector.emitted = {$gte: options.after||new Date()};
}else if(msgType!=='*'){
selector.handled = false;
}
Expand Down Expand Up @@ -127,6 +130,9 @@ MongoMQ.prototype.start = function(callback){
for(var i = 0; i<l; i++){
self.listeners[i].start();
}
if(typeof(callback)==='function'){
callback(err, self);
}
};
var ensureIndexes = function(err, collection){
collection.ensureIndex({event: 1, handled: -1, emitted: 1, partial: -1}, {safe: true}, function(err, indexName){
Expand Down Expand Up @@ -210,7 +216,8 @@ MongoMQ.prototype.emit = function(msgType, data, partialCallback, completeCallba
data: data,
handled: false,
emitted: new Date(),
partial: false
partial: false,
host: hostName
};
if(typeof(partialCallback)==='function'){//||typeof(completeCallback)==='function'){
(function(partialCallback, completeCallback){
Expand Down Expand Up @@ -242,7 +249,11 @@ MongoMQ.prototype.emit = function(msgType, data, partialCallback, completeCallba
};
doComplete = function(err, data, next){
if(err) errors.push(err);
results.push(data);
if(results.length>0){
results.push(data);
}else{
results = data;
}
completeTest(errors.length>0?errors:null, results, next);
};
}
Expand Down
19 changes: 13 additions & 6 deletions lib/QueueListener.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

var UUID = require('node-uuid');
var hostName = require('os').hostname();

var errors = {
E_CONNCLOSED: 'No active server connection!',
Expand All @@ -35,8 +36,11 @@ var passiveNext = function(err, msg){
self.next.apply(self, arguments);
};
var data = (msg&&msg.length>0)?msg[0]:msg;
if(data) data = data.data;
self.handler(err, data, next);
if(data){
data = data.data;
}
msg.listener = self;
self.handler.call(msg, err, data, next);
};

var activeNext = function(err, msg){
Expand All @@ -46,7 +50,6 @@ var activeNext = function(err, msg){
var next = function(){
if(conversationId){
var args = Array.prototype.slice.apply(arguments), l = args.length, done = false, data;
//console.log('next: ', conversationId, args);
/*
next(data, done);
next(data);
Expand Down Expand Up @@ -77,18 +80,22 @@ var activeNext = function(err, msg){
data: data,
handled: false,
emitted: new Date(),
host: hostName
};

self.mongoConnection.insert(self.options.collectionName, msgPkt, function(){});
self.next.call(self, done);
self.next.call(self, false);
}else{
self.next.apply(self, arguments);
}
};
self.mongoConnection.findAndModify(self.options.collectionName, record, {emitted: -1}, {$set: {handled: true}}, {},
function(err, data){
if(!data) next();
else self.handler(err, data.data, next);
if(!data){
next();
}else{
self.handler(err, data.data, next);
}
});
};

Expand Down
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
"email": "jeremy.darling@gmail.com"
},
"name": "mongomq",
"version": "0.2.2",
"version": "0.2.3",
"repository": {
"type": "git",
"url": "git://github.com/jdarling/MongoMQ.git"
},
"contributors": [
],
"contributors": [],
"main": "./lib/MongoMQ",
"bin": {
"MongoMQ": "./bin/MongoMQ.js"
Expand All @@ -21,5 +20,7 @@
"dependencies": {
"mongodb": ">= 1.0.2",
"node-uuid": ">= 1.3.3"
}
},
"_id": "mongomq@0.2.2",
"_from": "mongomq"
}
29 changes: 24 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
MongoMQ - Node.js MongoMQ
=========================

Version 0.2.2 presents what should be the final API and should be feature complete, need to add a lot more test cases, documentation, samples, and etc to the project now.
Version 0.2.3 fixed a minor bug in passive listeners.
Version 0.2.2 presented what should be the final API and should be feature complete, need to add a lot more test cases, documentation, samples, and etc to the project now.

Installation
============
Expand Down Expand Up @@ -81,7 +82,7 @@ MongoMQ.on(msgType, [passive||options], handler)
Sets up a listener for a specific message type.

Params:
* msgType - The message type to listen for
* msgType - The message type to listen for can be a string or a regular expression
* passive - If true will not mark the message as handled when a message is consumed from the queue
* options - additional options that can be passed
* handler(err, messageContents, next) - Use next() to look for another message in the queue, don't call next() if you only want a one time listener
Expand Down Expand Up @@ -238,14 +239,32 @@ r.context.queue = queue;

r.context.help();
```
Planned Improvements
====================

* Update the emit method and handler callbacks to allow placing an event on to the queue collection and receiving a response or responses from it.
How Events are stored
=====================

```javascript
{
_id: ObjectId(), // for internal use only
event: 'event name', // string that represents what type of event this is
data: JSON Data, // Contains the actual message contents
handled: boolean, // states if the message has been handled or not
emitted: Date(), // Contains the date/time when the event was emitted
partial: boolean, // for responses states if this is a partial response or the complete/end response
host: string, // Contains the host name of the machine that initiated the event
conversationId: string // if the event expects response(s) this will be the conversation identifier used to track those responses
}
```

Update History
==============

v0.2.3
* Minor bug fix related to passive listeners where a fromDT was not passed in the options
* Added hostName to messages for better tracking/logging
* Modified passive callback to pass the actual message as the "this" argument, you can now use this.event to get the actual event that was responded to
* Updated the on() method to accept strings or regular expressions to filter events on

v0.2.2
* Completed code to allow for callbacks and partial callbacks to be issued back to emit statements
* Complteed refactoring of code to properly seperate functionality into objects
Expand Down

0 comments on commit 25bf760

Please sign in to comment.