Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #45 from mpobrien/master

added nodejs support to hadoop streaming
  • Loading branch information...
commit 294323d9aa2f7fc3223a83720159f0217239be97 2 parents 2186ef3 + f128913
@bwmcadams bwmcadams authored
View
21 streaming/examples/enron/enron_map.js
@@ -0,0 +1,21 @@
+#!/usr/bin/env node
+
+var node_mongo_hadoop = require('node_mongo_hadoop')
+
+
+var trimString = function(str){
+ return String(str).replace(/^\s+|\s+$/g, '');
+}
+
+function mapFunc(doc, callback){
+ if(doc.headers && doc.headers.From && doc.headers.To){
+ var from_field = doc['headers']['From']
+ var to_field = doc['headers']['To']
+ var recips = []
+ to_field.split(',').forEach(function(to){
+ callback( {'_id': {'f':from_field, 't':trimString(to)}, 'count': 1} )
+ });
+ }
+}
+
+node_mongo_hadoop.MapBSONStream(mapFunc);
View
13 streaming/examples/enron/enron_reduce.js
@@ -0,0 +1,13 @@
+#!/usr/bin/env node
+
+var node_mongo_hadoop = require('node_mongo_hadoop')
+
+function reduceFunc(key, values, callback){
+ var count = 0;
+ values.forEach(function(v){
+ count += v.count
+ });
+ callback( {'_id':key, 'count':count } );
+}
+
+node_mongo_hadoop.ReduceBSONStream(reduceFunc);
View
1  streaming/examples/enron/run_enron_js.sh
@@ -0,0 +1 @@
+hadoop jar target/mongo-hadoop-streaming-assembly*.jar -mapper examples/enron/enron_map.js -reducer examples/enron/enron_reduce.js -inputURI mongodb://127.0.0.1/enron_mail.messages -outputURI mongodb://127.0.0.1/enron_mail.output -file examples/enron/enron_map.js -file examples/enron/enron_reduce.js
View
119 streaming/language_support/js/node_mongo_hadoop.js
@@ -0,0 +1,119 @@
+var EventEmitter = require('events').EventEmitter;
+var inherits = require('util').inherits;
+var stream = require('stream')
+var BSON = require('mongodb').BSON;
+var Buffers = require('buffers');
+var _ = require('underscore');
+
+var debug = function(x){
+ process.stderr.write(JSON.stringify(x));
+}
+
+var BSONInputStream = exports.BSONInputStream = function(bytestream){
+ EventEmitter.call(this);
+ if(bytestream == null){
+ this.bytestream = process.stdin;
+ }else{
+ this.bytestream = bytestream;
+ }
+ this.bufs = new Buffers()
+
+ var self = this;
+
+ this.bytestream.pause();
+ this.processChunk = function(chunk){
+ var parseDocuments = function(){
+ while(true){
+ // keep parsing + emitting docs
+ // break out of loop when we no longer have enough data
+ if(self.bufs.length >= 4){
+ var sizeBuf = self.bufs.slice(0, 4);
+ var bsonObjectSize = sizeBuf.readInt32LE(0);
+ if(self.bufs.length>=bsonObjectSize){
+ var docBuf = self.bufs.splice(0, bsonObjectSize);
+ self.emit("document", BSON.deserialize(docBuf.toBuffer()));
+ }else{
+ break;
+ }
+ }else{
+ break;
+ }
+ }
+ }
+
+ this.bufs.push(chunk)
+ parseDocuments();
+ }
+
+ this.bytestream.on("data", function(chunk){
+ self.processChunk(chunk);
+ })
+
+ this.bytestream.on("end", function(chunk){
+ self.emit("end");
+ })
+
+}
+inherits(BSONInputStream, EventEmitter);
+
+BSONInputStream.prototype.start = function(){
+ this.bytestream.resume();
+}
+
+var BSONOutput = exports.BSONOutput = function(outputstream){
+ if(outputstream == null){
+ this.outputstream = process.stdout;
+ }else{
+ this.outputstream = outputstream;
+ }
+}
+
+BSONOutput.prototype.write = function(doc){
+ var outputBuffer = BSON.serialize(doc);
+ this.outputstream.write(outputBuffer);
+}
+
+var MapBSONStream = exports.MapBSONStream = function(mapFunc, inputStream){
+ if(inputStream==null){
+ inputStream = new BSONInputStream();
+ }
+ outputStream = new BSONOutput();
+ inputStream.on("document", function(doc){
+ mapFunc(doc, function(mapResult){
+ outputStream.write(mapResult);
+ });
+ });
+ inputStream.start();
+}
+
+var ReduceBSONStream = exports.ReduceBSONStream = function(reduceFunc, inputStream){
+ outputStream = new BSONOutput();
+ if(inputStream == null){
+ inputStream = new BSONInputStream();
+ }
+ var currentGroup = []
+ var currentKey;
+ inputStream.on("document", function(doc){
+ var docKey = doc._id;
+ if(!_.isEqual(docKey, currentKey)){
+ if(currentKey != undefined){
+ reduceFunc(currentKey, currentGroup, function(reduceResult){
+ outputStream.write(reduceResult);
+ });
+ }
+ currentKey = docKey;
+ currentGroup = [doc]
+ }else{
+ currentGroup.push(doc)
+ }
+ });
+
+ inputStream.on("end", function(){
+ if(currentKey != undefined){
+ reduceFunc(currentKey, currentGroup, function(reduceResult){
+ outputStream.write(reduceResult);
+ });
+ }
+ });
+ inputStream.start();
+}
View
22 streaming/language_support/js/package.json
@@ -0,0 +1,22 @@
+{
+ "author": "Mike O'Brien <mikeo@10gen.com> (http://mpobrien.net)",
+ "name": "node_mongo_hadoop",
+ "description": "Bindings to connect to the MongoDB adapter for writing Map/Reduce jobs in Javascript with Hadoop Streaming.",
+ "version": "0.0.2",
+ "homepage": "api.mongodb.org/hadoop",
+ "repository": {
+ "type": "git",
+ "url": "git@github.com:mpobrien/node_mongo_hadoop.git"
+ },
+ "main": "./node_mongo_hadoop",
+ "dependencies": {
+ "mongodb": "*",
+ "buffers": "*",
+ "underscore": "*"
+ },
+ "devDependencies": {},
+ "optionalDependencies": {},
+ "engines": {
+ "node": "*"
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.