Permalink
Browse files

Initial commit

  • Loading branch information...
0 parents commit 9b368680fa7ff45390499d6320fd641ebe27902d @jdarling committed Jul 13, 2012
Showing with 426 additions and 0 deletions.
  1. +22 −0 .gitattributes
  2. +15 −0 .gitignore
  3. +34 −0 bin/test.js
  4. +95 −0 lib/MongoConnection.js
  5. +127 −0 lib/MongoMQ.js
  6. +24 −0 package.json
  7. +109 −0 readme.md
22 .gitattributes
@@ -0,0 +1,22 @@
+# Auto detect text files and perform LF normalization
+* text=auto
+
+# Custom for Visual Studio
+*.cs diff=csharp
+*.sln merge=union
+*.csproj merge=union
+*.vbproj merge=union
+*.fsproj merge=union
+*.dbproj merge=union
+
+# Standard to msysgit
+*.doc diff=astextplain
+*.DOC diff=astextplain
+*.docx diff=astextplain
+*.DOCX diff=astextplain
+*.dot diff=astextplain
+*.DOT diff=astextplain
+*.pdf diff=astextplain
+*.PDF diff=astextplain
+*.rtf diff=astextplain
+*.RTF diff=astextplain
15 .gitignore
@@ -0,0 +1,15 @@
+#################
+## npm/Node.js
+#################
+
+node_modules
+
+############
+## Windows
+############
+
+# Windows image file caches
+Thumbs.db
+
+# Folder config file
+Desktop.ini
34 bin/test.js
@@ -0,0 +1,34 @@
+var MongoMQ = require('../lib/MongoMQ').MongoMQ;
+var repl = require('repl');
+
+var queue = new MongoMQ();
+
+var r = repl.start({
+ prompt: ">"
+ });
+
+r.on('exit', function(){
+ queue.close();
+});
+
+r.context.listen = function(){
+ queue.on('test', function(err, data, next){
+ console.log('got: ', data);
+ next();
+ });
+};
+
+var msgidx = 0;
+r.context.send = function(){
+ queue.emit('test', msgidx);
+ msgidx++;
+};
+
+r.context.load = function(){
+ for(var i = 0; i<100; i++){
+ queue.emit('test', msgidx);
+ msgidx++;
+ }
+};
+
+r.context.queue = queue;
95 lib/MongoConnection.js
@@ -0,0 +1,95 @@
+var mongodb = require('mongodb');
+var key;
+
+for(key in mongodb){
+ if(hasOwnProperty.call(mongodb, key)){
+ //console.log('Importing: ', key);
+ global[key] = exports[key] = mongodb[key];
+ }
+}
+
+exports.ObjectId = exports.ObjectID;
+
+var MongoConnection = exports.MongoConnection = function(options, callback){
+ options = options||{};
+ var self = this;
+ if(options.servers instanceof Array){
+ var servers = [], host, port, serverOptions, l = options.servers.length;
+ for(var i = 0; i<l; i++){
+ if(typeof(options.servers[i])=='string'){
+ host = options.servers[i];
+ port = Connection.DEFAULT_PORT;
+ serverOptions = options.serverOptions||{auto_reconnect: true};
+ }else{
+ host = options.servers[i].host||options.host||'localhost';
+ port = options.servers[i].port||options.port||Connection.DEFAULT_PORT;
+ serverOptions = options.servers[i].serverOptions||options.serverOptions||{auto_reconnect: true};
+ }
+ servers.push(new Server(host, port, options));
+ }
+ self.server = new ReplSetServers(servers);
+ }else self.server = new Server(options.host||'localhost', options.port||Connection.DEFAULT_PORT, options.serverOptions||{auto_reconnect: true});
+
+ var server = self.server;
+ var db = self.dbConnection = new Db(options.db, server, {native_parser:(options.nativeParser==null?false:options.nativeParser)});
+ db.open(function(err, _db){
+ self._db = _db;
+ if(options.username&&options.password){
+ db.admin(function(err, adminDb){
+ adminDb.authenticate(options.username, options.password, function(err, result){
+ if(typeof(callback)=='function') callback(null, self);
+ });
+ });
+ }else{
+ if(typeof(callback)=='function') callback(null, self);
+ }
+ });
+};
+
+MongoConnection.prototype.Close = function(){
+ var self = this;
+ self._db.close();
+};
+
+MongoConnection.prototype.Collection = function(collectionName, pkFactory, options){
+ var self = this;
+ return self.dbConnection.collection(collectionName, pkFactory, options);
+};
+
+MongoConnection.prototype.find = function(inCollection, selector, fields, skip, limit, timeout, callback){
+ var self = this;
+ var args = Array.prototype.slice.apply(arguments);
+ var collection = self.Collection(args.shift());
+ var cb = args.pop();
+ if(typeof(cb)!=='function'){
+ args.push(cb);
+ cb = function(){};
+ }
+ collection.find.apply(collection, args).toArray(cb);
+};
+
+MongoConnection.prototype.insert = function(inCollection, selector, fields, skip, limit, timeout, callback){
+ var self = this;
+ var args = Array.prototype.slice.apply(arguments);
+ var collection = self.Collection(args.shift());
+ var cb = args.pop();
+ if(typeof(cb)!=='function'){
+ args.push(cb);
+ cb = function(){};
+ }
+ args.push(cb);
+ collection.insert.apply(collection, args);
+};
+
+MongoConnection.prototype.update = function(inCollection, selector, fields, skip, limit, timeout, callback){
+ var self = this;
+ var args = Array.prototype.slice.apply(arguments);
+ var collection = self.Collection(args.shift());
+ var cb = args.pop();
+ if(typeof(cb)!=='function'){
+ args.push(cb);
+ cb = function(){};
+ }
+ args.push(cb);
+ collection.update.apply(collection, args);
+};
127 lib/MongoMQ.js
@@ -0,0 +1,127 @@
+var MC = require("./MongoConnection").MongoConnection;
+
+var MongoMQ = exports.MongoMQ = function(options){
+ var self = this;
+ options = options || {};
+ self.mqCollectionName = options.mqCollectionName||'queue';
+ self.mqDB = options.mqDB||options.db||'MongoMQ';
+ options.db = self.mqDB;
+ self.handlers = [];
+ var server = self.server = new MC(options, function(err, conn){
+ if(err){
+ console.log(err);
+ }else{
+ self.db = conn._db;
+ self.start();
+ }
+ });
+};
+
+MongoMQ.prototype.start = function(){
+ var self = this;
+ self.db.collection(self.mqCollectionName, {safe: true}, function(err, collection){
+ var okToStart = true;
+ if(err){
+ if(collection){
+ okToStart = false;
+ }else{
+ self.createCollection();
+ }
+ }else{
+ self.startListening(collection);
+ }
+ });
+};
+
+MongoMQ.prototype.close = function(){
+ var self = this;
+ self.db.close();
+};
+
+MongoMQ.prototype.createCollection = function(){
+ var self = this;
+ self.db.dropCollection(self.mqCollectionName, function(){
+ self.db.createCollection(self.mqCollectionName, {
+ capped: true,
+ size: 100000000
+ }, function(err, collection){
+ self.startListening(collection);
+ });
+ });
+};
+
+MongoMQ.prototype.startListening = function(collection){
+ var self = this;
+ collection = collection||self.db.collection(self.mqCollectionName, {safe: true});
+ if(!collection) self.createCollection();
+ else{
+ collection.isCapped(function(err, isCapped){
+ if(!isCapped) self.createCollection();
+ else{
+ self.collection = collection;
+ if(self.handlers.length>0){
+ var l = self.handlers.length;
+ for(var i = 0; i<l; i++){
+ self.nextMessage(self.handlers[i].msgType, self.handlers[i].passive, self.handlers[i].callback);
+ }
+ }
+ self.listening = true;
+ }
+ });
+ }
+};
+
+MongoMQ.prototype.getCursor = function(forMsg, passive, callback){
+ var self = this;
+ var doCallback = function(err, msg){
+ var next = function(){
+ self.getCursor(forMsg, passive, callback);
+ };
+ if(typeof(callback)=='function') callback(err, msg?msg.pkt:false, next);
+ };
+ if(typeof(passive)=='function'){
+ callback = passive;
+ passive = false;
+ }
+ self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true});
+ if(passive){
+ self.collection.find({$and: [{msg: forMsg}, {$or: [{handled: false}, {handled: {$exists: false}}]}]}, {tailable: true}, function(err, msg){
+ if(typeof(callback)=='function') doCallback(err, (msg&&msg.length)>0?msg[0]:msg);
+ });
+ }else{
+ self.collection.find({$and: [{msg: forMsg}, {$or: [{handled: false}, {handled: {$exists: false}}]}]}, {tailable: true}).nextObject(function(err, msg){
+ self.collection.findAndModify((msg&&msg.length)>0?msg[0]:msg, {emitted: -1}, {$set: {handled: true}}, {tailable: true},
+ function(err, data){
+ if(err||(!data)) self.getCursor(forMsg, passive, callback); // someone else picked it up
+ else doCallback(err, data);
+ });
+ });
+ }
+};
+
+MongoMQ.prototype.nextMessage = function(msgType, passive, callback){
+ var self = this;
+ self.getCursor(msgType, passive, callback);
+};
+
+MongoMQ.prototype.emit = function(msg, pkt){
+ var self = this;
+ self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true});
+ msg = {
+ msg: msg,
+ pkt: pkt,
+ handled: false,
+ emitted: new Date()
+ };
+ self.collection.insert(msg, function(){});
+};
+
+MongoMQ.prototype.on = function(msg, passive, callback){
+ var self = this;
+ if(typeof(passive)=='function'){
+ callback = passive;
+ passive = false;
+ }
+ self.handlers.push({msgType: msg, passive: !!passive, handler: callback});
+ if(self.listening) self.nextMessage(msg, passive, callback);
+};
24 package.json
@@ -0,0 +1,24 @@
+{
+ "author": {
+ "name": "Jeremy Darling",
+ "email": "jeremy.darling@gmail.com"
+ },
+ "name": "MongoMQ",
+ "version": "0.1.0",
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/jdarling/MongoMQ.git"
+ },
+ "contributors": [
+ ],
+ "main": "./lib/MongoMQ",
+ "bin": {
+ "MongoMQ": "./bin/test.js"
+ },
+ "engines": {
+ "node": ">= v0.6.0"
+ },
+ "dependencies": {
+ "mongodb": ">= 1.0.2"
+ },
+}
109 readme.md
@@ -0,0 +1,109 @@
+MongoMQ - Node.js MongoMQ
+=========================
+Installation
+============
+
+From GitHub
+-----------
+ * Download from GitHub and extract.
+ * npm install mongodb
+
+Using NPM
+---------
+ * npm install MongoMQ
+
+What is MongoMQ?
+================
+
+MongoMQ is a messaging queue built on top of Node.js and MongoDB's tailable cursors. It allows for distributed of messages across workers in both a single reciever and broadcast method.
+
+What MongoMQ is NOT
+===================
+
+MongoMQ does NOT (currently) support callback's once a message is processed. Instead it is recommended that you use a one time listener to pickup responses if this is required.
+
+Supported Methods
+=================
+
+new MongoMQ(options)
+--------------------
+options
+ * mqCollectionName - Collection to store queue messages in, defaults to 'queue'
+ * mqDB - Database to store queue in, defaults to 'MongoMQ'
+ * server - If not running against a ReplicaSet this is the server to connect to
+ * port - If not running against a ReplicaSet this is the server port to connect with
+ * servers[] - If connecting to a ReplicaSet then this is a collection of {host: 'hostname', port: 1234} objects defining the root entry for the ReplicaSet
+
+MongoMQ.on(msgType, passive, callback);
+---------------------------------------
+msgType
+ * The message type to listen for
+
+passive
+ * If true will not mark the message as handled and will
+
+callback(err, messageContents, next)
+ * Use next() to look for another message in the queue, don't call next() if you only want a one time listener
+
+MongoMQ.emit(msgType, messageContents);
+---------------------------------------
+msgType
+ * The message type to send
+
+messageContents
+ * What to send
+
+How does MongoMQ work?
+======================
+
+MongoMQ sets up a tailable collection and then starts listeners using find in conjunction with findAndModify to pickup messages out of this collection.
+
+Since MongoMQ is basically a wrapper around MongoDB's built in support for tailable cursors it is possible to place listeners built in other langauges on the "queue".
+
+Sample Usage
+============
+
+ * Ensure MongoDB is up and running locally (or modify the config options to collect to your Mongo instance)
+ * Start 3 copies of the bin/test.js script.
+ * In two copies type listen() to setup a "test" message listener
+ * In the 3rd copy type load() to send 100 test messages to the queue
+
+You should see the two listeners pickup messages one at a time with whoever has resources to process picking up the message first.
+
+bin/test.js
+===========
+
+ var MongoMQ = require('../lib/MongoMQ').MongoMQ;
+ var repl = require('repl');
+
+ var queue = new MongoMQ();
+
+ var r = repl.start({
+ prompt: ">"
+ });
+
+ r.on('exit', function(){
+ queue.close();
+ });
+
+ r.context.listen = function(){
+ queue.on('test', function(err, data, next){
+ console.log('got: ', data);
+ next();
+ });
+ };
+
+ var msgidx = 0;
+ r.context.send = function(){
+ queue.emit('test', msgidx);
+ msgidx++;
+ };
+
+ r.context.load = function(){
+ for(var i = 0; i<100; i++){
+ queue.emit('test', msgidx);
+ msgidx++;
+ }
+ };
+
+ r.context.queue = queue;

0 comments on commit 9b36868

Please sign in to comment.