Skip to content
Browse files

Initial commit

  • Loading branch information...
0 parents commit cc5d999382bd09f2300ec3200b598d5b7eec4f8e @scttnlsn committed
Showing with 347 additions and 0 deletions.
  1. +1 −0 .gitignore
  2. +4 −0 Makefile
  3. +70 −0 README.md
  4. +69 −0 lib/channel.js
  5. +73 −0 lib/connection.js
  6. +15 −0 lib/index.js
  7. +26 −0 lib/promise.js
  8. +21 −0 package.json
  9. +68 −0 test/test_channel.js
1 .gitignore
@@ -0,0 +1 @@
+node_modules
4 Makefile
@@ -0,0 +1,4 @@
+test:
+ ./node_modules/.bin/mocha --reporter list
+
+.PHONY: test
70 README.md
@@ -0,0 +1,70 @@
+mubsub
+======
+
+Mubsub is a pub/sub implementation for Node.js and MongoDB. It utilizes Mongo's capped collections and tailable cursors to notify subscribers of inserted documents that match a given query.
+
+Example
+-------
+
+```javascript
+var mubsub = require('mubsub');
+var channel = mubsub.channel('test');
+
+mubsub.connect('mongodb://localhost:27017/mubsub');
+
+channel.subscribe({ foo: 'bar' }, function(err, doc) {
+ console.log('received bar');
+});
+
+channel.subscribe({ foo: 'baz' }, function(err, doc) {
+ console.log('received baz');
+});
+
+channel.publish({ foo: 'bar' });
+channel.publish({ foo: 'baz' });
+```
+
+Usage
+-----
+
+### Channels ###
+
+A channel maps one-to-one with a capped collection (Mubsub will create these if they do not already exist in the database). Optionally specify the byte size of the collection when creating a channel:
+
+```javascript
+var channel = mubsub.channel('foo', { size: 100000 });
+```
+
+### Subscribe ###
+
+```javascript
+var subscription = channel.subscribe(query, callback);
+```
+
+Subscriptions register a callback to be called whenever a document matching the specified query is inserted (published) into the collection (channel). You can omit the query to match all inserted documents. To later unsubscribe a particular callback, call `unsubscribe` on the returned subscription object:
+
+```javascript
+subscription.unsubscribe();
+```
+
+### Publish ###
+
+```javascript
+channel.publish(doc, callback);
+```
+
+Publishing a document simply inserts the document into the channel's capped collection. Note that Mubsub will remove any specified document `_id` as the natural ordering of `ObjectId`s is used to ensure subscribers do not receive notifications of documents inserted in the past. Callback is optional.
+
+Install
+-------
+
+ npm install mubsub
+
+Tests
+-----
+
+ make test
+
+You can optionally specify the MongoDB URI to be used for tests:
+
+ MONGODB_URI=mongodb://localhost:27017/mubsub_tests make test
69 lib/channel.js
@@ -0,0 +1,69 @@
+var Promise = require('./promise');
+
+module.exports = Channel;
+
+function Channel(name, db, options) {
+ this.collection = new Promise();
+
+ options || (options = {});
+ options.size || (options.size = 100000);
+
+ var self = this;
+
+ db.then(function(err, db) {
+ if (err) return self.collection.resolve(err);
+
+ db.createCollection(name, { capped: true, size: options.size }, function(err, collection) {
+ self.collection.resolve(err, collection);
+ });
+ });
+}
+
+Channel.prototype.publish = function(obj, callback) {
+ callback || (callback = function() {});
+
+ delete obj._id;

This isn't going to do what you want, if you want the server to generate the _id field.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ this.collection.then(function(err, collection) {
+ if (err) return callback(err);
+ collection.insert(obj, callback);
+ });
+};
+
+Channel.prototype.subscribe = function(query, callback) {
+ var self = this;
+ var subscribed = true;
+
+ if (typeof query === 'function' && callback === undefined) {
+ callback = query;
+ query = {};
+ }
+
+ this.collection.then(function(err, collection) {
+ if (err) return callback(err);
+
+ var latest = collection.find({}).sort({ '$natural': -1 }).limit(1);
+ latest.nextObject(function(err, doc) {
+ if (err) return callback(err);
+ if (doc) query._id = { '$gt': doc._id };
+
+ var cursor = collection.find(query, { tailable: true }).sort({ '$natural': 1 });

This is not going to do well with collections which aren't already in memory. This will cause the system to do a full collection scan from the start to doc used in the $gt criteria. Probably better to follow the oplog replication pattern using a "ts" field, a Timestamp(0,0), and the oplogreplay flag, or atleast mention this will only perform well if the whole collection is always in memory.

@scttnlsn Owner

@scotthernandez Can you explain more (or point me to somewhere explaining this in depth)? How do you avoid a $gt query when using a timestamp?

@scttnlsn Owner

@scotthernandez What if there's an index on _id?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ (function next() {
+ process.nextTick(function() {
+ cursor.nextObject(function(err, doc) {
+ if (err) return callback(err);
+ if (doc) callback(null, doc);
+ if (subscribed) next();
+ });
+ });
+ })();
+ });
+ });
+
+ return {
+ unsubscribe: function() {
+ subscribed = false;
+ }
+ };
+};
73 lib/connection.js
@@ -0,0 +1,73 @@
+var mongo = require('mongodb');
+var url = require('url');
+var Channel = require('./channel');
+var Promise = require('./promise');
+
+module.exports = Connection;
+
+function Connection(options) {
+ this.db = new Promise();
+ this.channels = {};
+}
+
+Connection.prototype.channel = function(name, options) {
+ this.channels[name] || (this.channels[name] = new Channel(name, this.db, options));
+ return this.channels[name];
+};
+
+Connection.prototype.open = function(options, callback) {
+ var self = this;
+ var options = parseOptions(options);
+ var db = options.db;
+
+ if (typeof db === 'string') {
+ var server = new mongo.Server(options.host, options.port, options.options);
+ db = new mongo.Db(db, server);
+ }
+
+ db.open(function(err, db) {
+ var opened = function() {
+ self.db.resolve(err, db);
+ callback && callback(err, db);
+ };
+
+ if (options.user && options.pass) {
+ db.authenticate(options.user, options.pass, opened)
+ } else {
+ opened();
+ }
+ });
+};
+
+// Helpers
+// ---------------
+
+function parseOptions(options) {
+ if (typeof options === 'string') {
+ options = { url: options };
+ } else {
+ options || (options = {});
+ }
+
+ if (options.url) {
+ var uri = url.parse(options.url);
+ options.host = uri.hostname;
+ options.port = parseInt(uri.port, 10);
+ options.db = uri.pathname && uri.pathname.replace(/\//g, '');
+
+ if (uri.auth) {
+ var auth = uri.auth.split(':');
+ options.user = auth[0];
+ options.pass = auth[1];
+ }
+ } else {
+ options.host || (options.host = 'localhost');
+ options.port || (options.port = 27017);
+ }
+
+ if (options.db === undefined) {
+ throw new Error('No `db` specified');
+ }
+
+ return options;
+};
15 lib/index.js
@@ -0,0 +1,15 @@
+var Connection = require('./connection');
+
+module.exports = new Mubsub();
+
+function Mubsub() {
+ this.connection = new Connection();
+}
+
+Mubsub.prototype.connect = function() {
+ this.connection.open.apply(this.connection, arguments);
+};
+
+Mubsub.prototype.channel = function(name, options) {
+ return this.connection.channel(name, options);
+};
26 lib/promise.js
@@ -0,0 +1,26 @@
+module.exports = Promise;
+
+function Promise(context) {
+ this.context = context || this;
+ this.callbacks = [];
+ this.resolved = undefined;
+};
+
+Promise.prototype.then = function(callback) {
+ if (this.resolved) {
+ callback.apply(this.context, this.resolved);
+ } else {
+ this.callbacks.push(callback);
+ }
+};
+
+Promise.prototype.resolve = function() {
+ if (this.resolved) throw new Error('Promise already resolved');
+
+ var callback;
+ this.resolved = arguments;
+
+ while (callback = this.callbacks.shift()) {
+ callback.apply(this.context, this.resolved);
+ }
+};
21 package.json
@@ -0,0 +1,21 @@
+{
+ "name": "mubsub",
+ "version": "0.0.1",
+ "description": "Pub/sub for Node.js and MongoDB",
+ "homepage": "http://github.com/scttnlsn/mubsub",
+ "author": "Scott Nelson <scottbnel@gmail.com>",
+ "main": "./lib/index",
+
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/scttnlsn/mubsub.git"
+ },
+
+ "dependencies": {
+ "mongodb": ">= 0.9.9"
+ },
+
+ "devDependencies": {
+ "mocha": ">= 1.0.0"
+ }
+}
68 test/test_channel.js
@@ -0,0 +1,68 @@
+var assert = require('assert');
+var mubsub = require('../lib/index');
+
+mubsub.connect(process.env.MONGODB_URI || 'mongodb://localhost:27017/mubsub_tests')
+
+var random = function() {
+ return Math.floor(Math.random() * 100 + 1);
+};
+
+describe('Channel', function() {
+ var channel;
+
+ beforeEach(function() {
+ channel = mubsub.channel('tests');
+ });
+
+ it('calls subscribed callbacks that match given query', function(next) {
+ var counts = { all: 0, bar: 0, baz: 0, both: 0 };
+
+ var bar = random();
+ var baz = random();
+ var qux = random();
+
+ var end = function() {
+ counts.all === bar + baz + qux &&
+ counts.bar === bar &&
+ counts.baz === baz &&
+ counts.both === bar + baz &&
+ next();
+ };
+
+ // Match all
+ channel.subscribe({}, function(err, doc) {
+ if (err) throw err;
+ assert.ok(doc.foo);
+ counts.all++;
+ end();
+ });
+
+ // Match `bar`
+ channel.subscribe({ foo: 'bar' }, function(err, doc) {
+ if (err) throw err;
+ assert.equal(doc.foo, 'bar');
+ counts.bar++;
+ end();
+ });
+
+ // Match `baz`
+ channel.subscribe({ foo: 'baz' }, function(err, doc) {
+ if (err) throw err;
+ assert.equal(doc.foo, 'baz');
+ counts.baz++;
+ end();
+ });
+
+ // Match `bar` or `baz`
+ channel.subscribe({ '$or': [{ foo: 'bar'}, { foo: 'baz' }]}, function(err, doc) {
+ if (err) throw err;
+ assert.ok(doc.foo === 'bar' || doc.foo === 'baz');
+ counts.both++;
+ end();
+ });
+
+ for (var i = 0; i < bar; i++) channel.publish({ foo: 'bar' });
+ for (var i = 0; i < baz; i++) channel.publish({ foo: 'baz' });
+ for (var i = 0; i < qux; i++) channel.publish({ foo: 'qux' });
+ });
+});

0 comments on commit cc5d999

Please sign in to comment.
Something went wrong with that request. Please try again.