Permalink
Browse files

DBObjectWritable provides a DBObject so you can casbah operate instead

of getting a BSONObject back.
  • Loading branch information...
1 parent 25fd310 commit 237c97a97b3d93fd583a9e28aea5fd277661ffe9 Brendan W. McAdams committed Jun 28, 2012
@@ -1,5 +1,7 @@
package com.mongodb.hadoop.io;
+import com.mongodb.DBObject;
+
/**
* Copyright (c) 2008 - 2012 10gen, Inc. <http://10gen.com>
* <p/>
@@ -16,5 +18,13 @@
* limitations under the License.
*/
-public class DBObjectWritable {
+public class DBObjectWritable extends BSONWritable implements DBObject {
+ @Override
+ public void markAsPartialObject() {
+ }
+
+ @Override
+ public boolean isPartialObject() {
+ return false;
+ }
}
@@ -19,15 +19,16 @@ package scoobi
import com.nicta.scoobi.{DList, WireFormat}
import org.apache.hadoop.io.ObjectWritable
-import com.mongodb.hadoop.io.BSONWritable
+import com.mongodb.hadoop.io.{DBObjectWritable, BSONWritable}
import com.nicta.scoobi.io.{InputConverter, DataSource}
import com.nicta.scoobi.impl.Configured
import org.apache.hadoop.mapreduce.Job
import com.mongodb.casbah.{MongoCollection, MongoCursor}
import org.bson.BSONObject
-import com.mongodb.casbah.commons.MongoDBObject
+import com.mongodb.casbah.Imports._
import java.io.{DataInput, DataOutput}
import org.apache.commons.logging.LogFactory
+import com.mongodb.hadoop.util.MongoConfigUtil
object MongoInput {
@@ -46,8 +47,20 @@ object MongoInput {
DList.fromSource(source)
}
+ class MongoWireFormat extends WireFormat[DBObject] {
+ def fromWire(in: DataInput): DBObject = {
+ log.info("[from] Input: " + in)
+ val w = new DBObjectWritable()
+ w.readFields(in)
+ w
+ }
+
+ def toWire(x: DBObject, out: DataOutput) {
+ log.info("[to] X: " + x + " out: " + out)
+ }
+ }
- class MongoWireFormat extends WireFormat[BSONObject] {
+ class BSONWireFormat extends WireFormat[BSONObject] {
def fromWire(in: DataInput): BSONObject = {
log.info("[from] Input: " + in)
val w = new BSONWritable()
@@ -69,6 +82,21 @@ object MongoInput {
def inputCheck() {}
def inputConfigure(job: Job) {
+ // Extract information to setup our Hadoop job
+ // TODO - We currently cannot support authentication!
+ val query = cursor.query
+ val coll = cursor.underlying.getCollection
+ val db = coll.getDB
+ val conn = db.getMongo
+ val addr = conn.getAddress
+
+ val inputURI = "mongodb://%s:%s/%s.%s".format(addr.getHost, addr.getPort,
+ db.getName, coll.getName)
+
+ log.info("*** Input URI: %s".format(inputURI))
+ log.info("*** Input Query: %s".format(query))
+ MongoConfigUtil.setInputURI( job.getConfiguration, inputURI )
+ MongoConfigUtil.setQuery( job.getConfiguration, query )
configure(job)
}
@@ -20,19 +20,18 @@ package test
*/
import com.mongodb.casbah.Imports._
+import com.mongodb.hadoop.scoobi.MongoInput.{MongoWireFormat}
import com.nicta.scoobi.Scoobi._
-import org.bson.BSONObject
-import com.mongodb.hadoop.scoobi.MongoInput.MongoWireFormat
object TestJob extends ScoobiApp {
def run() {
implicit val wf = new MongoWireFormat
- val data = MongoInput.fromCollection[BSONObject](MongoConnection()("playbookstore")("books"))
- val x = data.map(doc => (doc.get("title").asInstanceOf[String], 1)).groupByKey
- System.err.println(x)
+ val data = MongoInput.fromCollection[DBObject](MongoConnection()("playbookstore")("books"))
+ val authors: DList[String] = data.flatMap( doc => doc.as[Seq[String]]("author") )
+ val grouped: DList[(String, Iterable[Int])] = authors.map(_ -> 1).groupByKey
+ val x: DList[(String, Int)] = grouped.combine(_+_)
persist(toTextFile(x, "./test"))
-
}
}

0 comments on commit 237c97a

Please sign in to comment.