Permalink
Browse files

Finished initial working version of IMR

  • Loading branch information...
1 parent ceaa37e commit 1a07fad3ce0cb135d7a9bdbff7a12746e217c8b6 @jacwright jacwright committed Oct 27, 2011
Showing with 293 additions and 83 deletions.
  1. +79 −0 README.md
  2. +124 −31 index.js
  3. +1 −4 package.json
  4. +89 −48 test.js
View
@@ -0,0 +1,79 @@
+mongodb-incremental-mapreduce
+=============================
+
+[MongoDB](http://www.mongodb.org/) allows [incremental map/reduce](http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-IncrementalMapreduce), but you must build a solution to do it. This library provides that solution for you, out of the box.
+
+Usage
+-----
+
+### Installation
+
+Install node.js (http://nodejs.org) first, then npm (http://npmjs.org). mongodb-incremental-mapreduce relies on the [mongodb](https://github.com/christkv/node-mongodb-native) npm module.
+
+```bash
+npm install mongodb
+npm install mongodb-incremental-mapreduce
+```
+
+### Usage
+
+MongoDB Incremental Map/Reduce (IMR for short) extends the existing functionality of the [mongodb](https://github.com/christkv/node-mongodb-native) node.js library by adding an additional "out" option.
+
+```javascript
+var mongodb = require('mongodb');
+var incremental = require('mongodb-incremental-mapreduce'); // must be required, even if not using the package object
+
+// ... get access to a 'comments' collection in the regular way ...
+// Here we want to reduce comment counts and likes per username
+
+var map = function() {
+ emit(this.username, { count: 1, likes: this.likes });
+}
+
+var reduce = function() {
+ var totals = { count: 0, likes: 0 };
+ values.forEach(function(current) {
+ totals.count += current.count;
+ totals.likes += current.likes;
+ });
+ return totals;
+}
+
+collection.mapReduce(map, reduce, { out: { incremental: 'commentCounts' } }, function(err, results) {
+ console.log('IMR done!');
+});
+```
+
+IMR will only map/reduce new docs that have been inserted since the last time you called collection.mapReduce for a given collection-destination combo (in the example above that would be comments and commentCounts).
+
+You can specify an interval in seconds at which the map/reduce should run. If you do this you will get the intervalId back from the mapReduce call which you may then use to clear the interval with later.
+
+```javascript
+// run every minute
+var intervalId = collection.mapReduce(map, reduce, { out: { incremental: 'commentCounts', interval: 60 } }, function(err, results) {
+ if (err) clearInterval(intervalId);
+ else console.log('IMR done!', 'Again!');
+});
+```
+
+Sometimes it is desirable to map/reduce across many collections, such as rotating log collections. This can be done by using a slightly modified mapReduce method in the mongodb-incremental-mapreduce package.
+
+```javascript
+var incremental = require('mongodb-incremental-mapreduce');
+
+var collections = { collections: 'log*', db: db };
+var options = { out: { incremental: 'logs' } };
+incremental.mapReduce(collections, map, reduce, options, callback);
+```
+
+You can use a either a string with an asterisk (e.g. "logs*" will match "logs-10" and "logster") or a regular expression object. Note that even if your string or expression matches the "out" collection IMR will not include it, so you are safe taking "logs*" and reducing into "logs". You can even use this method for other types of mapReduce methods on multiple collections, even if it isn't incremental. And you may add interval to any type this way.
+
+### How it works
+
+Under the hood IMR stores the _id of the last doc it reduced in a metadata table called "incmapreduce". The next time mapReduce is called it will load the last _id from the metadata collection and go from there.
+
+### Additional work
+
+There is a possibility of docs being double counted in a map/reduce if you run it again before the former map/reduce is finished. This is likely to occur when your map/reduce is taking longer than the interval at which you want it to be updated. Currently it is on the user of the library to ensure this doesn't happen, but it would be nice to have it built in so that it will skip subsequent map/reduces if one is still running.
+
+There are probably optimizations that can be done also.
View
155 index.js
@@ -1,64 +1,157 @@
-var Collection = require('mongodb').Collection;
+var mongodb = require('mongodb');
+var Collection = mongodb.Collection;
var origMapReduce = Collection.prototype.mapReduce;
-Collection.prototype.mapReduce = function mapReduce(map, reduce, options, callback) {
+exports.mapReduce = function incMapReduce(collection, map, reduce, options, callback) {
if ('function' === typeof options) callback = options, options = {};
+ var db = collection.db;
+ var isMatch = collection.collections;
+ if (!options) options = {};
+ else options = clone(options);
+ if ( !(isMatch instanceof RegExp))
+ isMatch = new RegExp('^' + RegExp.escape(isMatch).replace(/\\\*/g, '.*') + '$');
+
+ if (options.out) {
+ var interval = options.out.interval;
+ delete options.out.interval;
+ var outCollection = options.out.interval || options.out.reduce || options.out.replace || options.out.merge;
+ }
+
+ db.collectionNames(function(err, collectionNames) {
+ if (err) return callback(err);
+
+ collectionNames = collectionNames.map(getName).filter(function(name) {
+ return (name != outCollection && isMatch.test(name));
+ });
+
+ var count = 0;
+ collectionNames.forEach(function(collectionName) {
+ count++;
+ db.collection(collectionName, function(err, collection) {
+// console.log('handling collection:', collectionName);
+ collection.mapReduce(map, reduce, options, function(err, results) {
+// console.log('done with:', collectionName);
+ if (!--count) {
+ if (callback) callback(null, results);
+ }
+ });
+ });
+ });
+ });
+
+ if (interval) {
+ return setInterval(function() {
+ exports.mapReduce(collection, map, reduce, options, callback);
+ }, interval*1000);
+ }
+};
+
+Collection.prototype.mapReduce = function incMapReduce(map, reduce, options, callback) {
+ if ('function' === typeof options) callback = options, options = {};
+ else options = clone(options);
+
+ var collection = this;
var out = options.out;
if (!out || !out.incremental) {
- return origMapReduce(map, reduce, options, callback);
+ return origMapReduce.call(collection, map, reduce, options, callback);
}
// fix out
out.reduce = out.incremental;
- var interval = out.interval || 10000;
- options.$orderBy = { _id: 1 };
-
+ var interval = out.interval;
delete out.incremental;
delete out.interval;
- runMapReduce.call(this, options);
+ runMapReduce(collection, map, reduce, options, callback);
// run the mapreduce at a regular interval
- setInterval(function() {
-
- }, interval);
+ if (interval) {
+ return setInterval(function() {
+ runMapReduce(collection, map, reduce, options, callback);
+ }, interval*1000);
+ }
};
-function runMapReduce(options) {
- // pull the metadata if it exists for this collection
- var collection = this.db.collection('__incmapreduce__');
- var metaId = this.collectionName + ':' + out.reduce;
+function runMapReduce(collection, map, reduce, options, callback) {
- collection.find({ _id: metaId }, function(err, results) {
- if (results.length) {
- options.$query = { _id: { $gt: results[0].lastId }};
- }
+ // pull the metadata if it exists for this collection
+ collection.db.collection('incmapreduce', function(err, metaCollection) {
- // get the max id at this point in time so we can reliably store the last id of the batch
- var find = { $orderBy: { _id: -1 }, $limit: 1 };
+ var metaId = collection.collectionName + ':' + options.out.reduce;
+// console.log('metaId:', metaId);
- this.find(find, { _id: 1 }, function(err, results) {
- if (!results.length) return;
+ metaCollection.findOne({ _id: metaId }, function(err, meta) {
- var lastId = results[0]._id;
- var query = options.$query || (options.$query = {});
- query.$lte = lastId;
+ var query = {};
- origMapReduce(map, reduce, options, function(err, results) {
- if (callback) callback(err, results);
-
- if (err) {
+ if (meta) {
+// console.log('Found META:', meta);
+ options.$query = query = { _id: { $gt: meta.lastId }};
+ }
+
+ // get the max id at this point in time so we can reliably store the last id of the batch
+ var cursor = collection.find(query, {_id: 1});
+ cursor.sort({ _id: -1 });
+
+ cursor.nextObject(function(err, doc) {
+ if (!doc) {
+// console.log('NO MORE TO ADD AT THIS TIME');
+ if (callback) callback(null, null);
return;
}
- collection.save({ _id: metaId, lastId: lastId });
+ var lastId = doc._id;
+ var query = options.$query || (options.$query = {});
+ query.$lte = lastId;
+
+ origMapReduce.call(collection, map, reduce, options, function(err, results) {
+
+ if (err) {
+ if (callback) callback(err);
+ return;
+ }
+
+// console.log('success!!');
+
+ if (meta) {
+ metaCollection.update({ _id: metaId }, { $set: { lastId: lastId } }, function(err) {
+ if (err) console.warn(err.message);
+// else console.log('successfully updated', lastId);
+ if (callback) callback(err, results);
+ });
+ } else {
+ metaCollection.insert({ _id: metaId, lastId: lastId }, function(err) {
+ if (err) console.warn(err.message);
+// else console.log('successfully inserted');
+ if (callback) callback(err, results);
+ });
+ }
+ });
});
});
});
-}
+}
+
+function getName(collection) {
+ return collection.name.split('.').slice(1).join('.');
+}
+
+function clone(obj) {
+ var result = {};
+ for (var i in obj) {
+ var value = obj[i];
+ if (value && typeof value === 'object') result[i] = clone(value);
+ else result[i] = value;
+ }
+ return result;
+}
+
+RegExp.escape = function(text) {
+ return text.replace(/[-[\]{}()*+?.,\\^$|#\s]/g, "\\$&");
+};
View
@@ -8,10 +8,7 @@
"type" : "git",
"url" : "https://github.com/touchads/node-mongodb-incremental-map-reduce"
},
- "engines": {
- "node": ">= 0.4.0"
- },
"dependencies": {
- "mongodb": "0.9.2"
+ "mongodb": "0.9.4"
}
}
Oops, something went wrong.

0 comments on commit 1a07fad

Please sign in to comment.