Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit a3da7185175225729ce16e8c627899a0cd2610cc 0 parents
@jacwright jacwright authored
Showing with 207 additions and 0 deletions.
  1. +62 −0 index.js
  2. +17 −0 package.json
  3. +128 −0 test.js
62 index.js
@@ -0,0 +1,62 @@
+var Collection = require('mongodb').Collection;
+
+
+var origMapReduce = Collection.prototype.mapReduce;
+
+
+Collection.prototype.mapReduce = function mapReduce(map, reduce, options, callback) {
+ if ('function' === typeof options) callback = options, options = {};
+
+ var out = options.out;
+ if (!out || !out.incremental) {
+ return origMapReduce(map, reduce, options, callback);
+ }
+
+ // fix out
+ out.reduce = out.incremental;
+
+ var interval = out.interval || 10000;
+ options.$orderBy = out.sort || {_id: 1};
+
+ delete out.incremental;
+ delete out.interval;
+ delete out.sort;
+
+ // pull the metadata if it exists for this collection
+ var collection = this.db.collection('__incmapreduce__');
+ var metaId = this.collectionName + ':' + out.reduce;
+
+ collection.find({ _id: metaId }, function(err, result) {
+ if (result) {
+ options.$query = {};
+ for (var i in options.$orderBy) {
+ // TODO determine this result structure and fix the line accordingly
+ options.$query[i] = { $gt: result[i] || 0 };
+ }
+ }
+
+
+ // run the mapreduce at a regular interval
+ setInterval(function() {
+ // get the max id at this point in time so we can reliably mapreduce
+ // TODO fix this orderby and select fields
+ var find = { $query:options.$query, $orderBy: reverse.$orderBy, $limit: 1 }, { /* only orderBy fields */};
+ this.find(find, function(err, results) {
+ // TODO results needs to be what we want
+ // TODO add the $lte to the $query
+ origMapReduce(map, reduce, options, function(err, results) {
+ var lastProcessed = results;
+
+ if (callback) callback(err, results);
+
+ if (err) {
+ return;
+ }
+
+ // TODO find the last reduced items and store their sort field's value here
+ collection.save({ _id: metaId, lastProcessed: lastProcessed });
+ });
+ });
+ }, interval);
+ });
+};
17 package.json
@@ -0,0 +1,17 @@
+{
+ "name": "mongodb-incremental-mapreduce",
+ "description": "Run incremental map/reduce queries for mongodb more easily",
+ "version": "0.0.1",
+ "keywords" : ["mongodb", "mongo", "mapreduce", "incremental"],
+ "author": "Jacob Wright <jacwright@gmail.com>",
+ "repository" : {
+ "type" : "git",
+ "url" : "https://github.com/touchads/node-mongodb-incremental-map-reduce"
+ },
+ "engines": {
+ "node": ">= 0.4.0"
+ },
+ "dependencies": {
+ "mongodb": "0.9.2"
+ }
+}
128 test.js
@@ -0,0 +1,128 @@
+var mongodb = require('mongodb');
+require('./index');
+
+var Db = mongodb.Db,
+ Connection = mongodb.Connection,
+ Server = mongodb.Server;
+
+
+var host = process.env['MONGO_NODE_DRIVER_HOST'] != null ? process.env['MONGO_NODE_DRIVER_HOST'] : 'localhost';
+var port = process.env['MONGO_NODE_DRIVER_PORT'] != null ? process.env['MONGO_NODE_DRIVER_PORT'] : Connection.DEFAULT_PORT;
+
+
+var db = new Db('node-mongo-examples', new Server(host, port, {}), {native_parser:false});
+
+console.log('opening', host, port);
+db.open(function(err, db) {
+ if (err) return console.error('open:', err.trace.stack);
+
+ db.collection('test', function(err, collection) {
+ if (err) return console.error('collection test:', err.stack);
+
+ collection.insert([
+ {
+ username: "jones",
+ likes: 20,
+ text: "Hello world!"
+ },
+ {
+ username: "jimmy",
+ likes: 13,
+ text: "The James"
+ },
+ {
+ username: "juju",
+ likes: 0,
+ text: "beans"
+ }
+ ]);
+
+
+ var map = function map() {
+ emit(this.username, { count: 1, likes: this.likes });
+ };
+
+ var reduce = function reduce() {
+
+ };
+
+ var options = {
+ out: {
+ incremental: 'testResults'
+ }
+ };
+
+
+
+ collection.mapReduce(map, reduce, options, function(err, results) {
+ if (err) return console.error('mapreduce:', err.stack);
+
+ console.log('done!');
+ console.log(results);
+ });
+ });
+
+ db.collection('log101011', function(err, collection) {
+ if (err) return console.error('collection log101011:', err.stack);
+
+ collection.insert([
+ { tags: ['one', 'two', 'three'] },
+ { tags: ['one', 'four', 'five'] },
+ { tags: ['two', 'six', 'seven'] }
+ ]);
+
+ db.collection('log101111', function(err, collection) {
+ if (err) return console.error('collection log101111:', err.stack);
+
+ collection.insert([
+ { tags: ['one', 'four', 'eight'] },
+ { tags: ['three', 'four', 'five'] },
+ { tags: ['one', 'two', 'four'] }
+ ]);
+
+ db.collection('log101211', function(err, collection) {
+ if (err) return console.error('collection log 101211:', err.stack);
+
+
+ collection.insert([
+ { tags: ['one', 'two', 'three'] },
+ { tags: ['one', 'four', 'five'] },
+ { tags: ['two', 'six', 'seven'] }
+ ]);
+
+
+
+
+ var map = function map() {
+ this.tags.forEach(function(tag) {
+ emit(tag, 1);
+ });
+ };
+
+ var reduce = function reduce(values) {
+ return values.reduce(function(a, b) {
+ return a + b;
+ });
+ };
+
+ var options = {
+ out: {
+ incremental: 'logs',
+ interval: 10000
+ }
+ };
+
+
+
+ collection.mapReduce(map, reduce, options, function(err, results) {
+ if (err) return console.error('mapreduce-last:', err.stack);
+
+ console.log('done!');
+ console.log(results);
+ });
+
+
+ });
+ });
+ });
+});
Please sign in to comment.
Something went wrong with that request. Please try again.