Browse files

simpler implementation for hadoop-5 using same bsonwritable class.

  • Loading branch information...
1 parent dea0669 commit 20e14eb0a3b7599290b0d79fffa673f0c6944c6a @mpobrien mpobrien committed Aug 28, 2012
View
2 build.sbt
@@ -4,4 +4,4 @@ name := "mongo-hadoop"
organization := "org.mongodb"
-hadoopRelease in ThisBuild := "default"
+hadoopRelease in ThisBuild := "cdh3"
View
5 core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java
@@ -18,6 +18,7 @@
package com.mongodb.hadoop.input;
import com.mongodb.*;
+import com.mongodb.hadoop.io.*;
import com.mongodb.hadoop.util.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
@@ -138,7 +139,9 @@ DBCursor getCursor(){
// them.
// todo - support limit/skip
if ( _cursor == null ){
- _cursor = MongoConfigUtil.getCollection( _mongoURI ).find( _querySpec, _fieldSpec ).sort( _sortSpec );
+ DBCollection collection = MongoConfigUtil.getCollection( _mongoURI );
+ collection.setDBDecoderFactory(LazyDBReadableBytesDecoder.FACTORY);
+ _cursor = collection.find( _querySpec, _fieldSpec ).sort( _sortSpec );
if (_notimeout) _cursor.setOptions( Bytes.QUERYOPTION_NOTIMEOUT );
_cursor.slaveOk();
}
View
51 core/src/main/java/com/mongodb/hadoop/io/BSONWritable.java
@@ -34,11 +34,15 @@
@SuppressWarnings( "deprecation" )
public class BSONWritable implements BSONObject, WritableComparable {
+ private static final byte[] emptyBSONbytes = new BasicBSONEncoder().encode(new BasicBSONObject());
+
/**
* Constructs a new instance.
*/
public BSONWritable(){
- _doc = new BasicBSONObject();
+ //log.info(emptyBSONbytes);
+ _doc = new LazyDBReadableBytes(new BasicBSONEncoder().encode(new BasicBSONObject()) );
+ //_doc = new BasicBSONObject();
}
/**
@@ -51,6 +55,11 @@ public BSONWritable( BSONWritable other ){
copy( other );
}
+ public BSONWritable( BSONByteBuffer rawBytes){
+ this();
+ initialize(rawBytes);
+ }
+
/**
* Constructs a new instance around an existing BSONObject
*
@@ -61,6 +70,10 @@ public BSONWritable( BSONObject doc ){
putAll( doc );
}
+ public void initialize(BSONByteBuffer rawBytes){
+ _doc = (LazyDBReadableBytes)(dec.readObject(rawBytes.array()));
+ }
+
/**
* {@inheritDoc}
*
@@ -97,12 +110,17 @@ public Object get( String key ){
return _doc.get( key );
}
+ public BSONByteBuffer getRaw(){
+ return this._doc.getRawData();
+ }
+
/**
* {@inheritDoc}
*
* @see BSONObject#toMap()
*/
public Map toMap(){
+ log.warn("iterating entire object");
return _doc.toMap();
}
@@ -148,13 +166,13 @@ public boolean containsField( String fieldName ){
* @see Writable#write(DataOutput)
*/
public void write( DataOutput out ) throws IOException{
-
- BSONEncoder enc = new BasicBSONEncoder();
- BasicOutputBuffer buf = new BasicOutputBuffer();
- enc.set( buf );
- enc.putObject( _doc );
- enc.done();
- buf.pipe( out );
+ out.write(this._doc.getRawData().array());
+// BSONEncoder enc = new BasicBSONEncoder();
+// BasicOutputBuffer buf = new BasicOutputBuffer();
+// enc.set( buf );
+// enc.putObject( _doc );
+// enc.done();
+// buf.pipe( out );
}
@@ -164,8 +182,7 @@ public void write( DataOutput out ) throws IOException{
* @see Writable#readFields(DataInput)
*/
public void readFields( DataInput in ) throws IOException{
- BSONDecoder dec = new BasicBSONDecoder();
- BSONCallback cb = new BasicBSONCallback();
+ BSONCallback cb = new LazyDBReadableBytesCallback(null);
// Read the BSON length from the start of the record
byte[] l = new byte[4];
try {
@@ -175,24 +192,23 @@ public void readFields( DataInput in ) throws IOException{
byte[] data = new byte[dataLen + 4];
System.arraycopy( l, 0, data, 0, 4 );
in.readFully( data, 4, dataLen - 4 );
- dec.decode( data, cb );
- _doc = (BSONObject) cb.get();
+ _doc = (LazyDBReadableBytes)(dec.readObject(data));
if ( log.isTraceEnabled() ) log.trace( "Decoded a BSON Object: " + _doc );
}
catch ( Exception e ) {
/* If we can't read another length it's not an error, just return quietly. */
// TODO - Figure out how to gracefully mark this as an empty
log.info( "No Length Header available." + e );
- _doc = new BasicDBObject();
+ _doc = new LazyDBReadableBytes(emptyBSONbytes);
}
-
- }
+ }
/**
* {@inheritDoc}
*/
@Override
public String toString(){
+ log.warn("iterating entire object - tostring");
BSONEncoder enc = new BasicBSONEncoder();
BasicOutputBuffer buf = new BasicOutputBuffer();
enc.set( buf );
@@ -210,6 +226,7 @@ public String toString(){
*/
protected synchronized void copy( Writable other ){
if ( other != null ){
+ log.warn("copying raw obj");
try {
DataOutputBuffer out = new DataOutputBuffer();
other.write( out );
@@ -271,7 +288,9 @@ public int hashCode(){
return ( this._doc != null ? this._doc.hashCode() : 0 );
}
- protected BSONObject _doc;
+ //protected BSONObject _doc;
+ protected LazyDBReadableBytes _doc;
+ protected LazyDBReadableBytesDecoder dec = new LazyDBReadableBytesDecoder();
private static final Log log = LogFactory.getLog( BSONWritable.class );
View
29 core/src/main/java/com/mongodb/hadoop/io/LazyDBReadableBytes.java
@@ -0,0 +1,29 @@
+package com.mongodb.hadoop.io;
+import org.bson.io.BSONByteBuffer;
+import org.bson.LazyBSONCallback;
+import com.mongodb.LazyWriteableDBObject;
+
+public class LazyDBReadableBytes extends LazyWriteableDBObject{
+
+ public LazyDBReadableBytes(BSONByteBuffer buff, int offset, LazyDBReadableBytesCallback cbk){
+ super(buff, offset, cbk);
+ }
+ public LazyDBReadableBytes(BSONByteBuffer buff, LazyDBReadableBytesCallback cbk){
+ super(buff, cbk);
+ }
+ public LazyDBReadableBytes(byte[] data, int offset, LazyDBReadableBytesCallback cbk){
+ super(data, offset, cbk);
+ }
+ public LazyDBReadableBytes(byte[] data, LazyDBReadableBytesCallback cbk){
+ super(data, cbk);
+ }
+
+ public LazyDBReadableBytes(byte[] data){
+ super(data, new LazyDBReadableBytesCallback(null));
+ }
+
+ public BSONByteBuffer getRawData(){
+ return this._input;
+ }
+
+}
View
29 core/src/main/java/com/mongodb/hadoop/io/LazyDBReadableBytesCallback.java
@@ -0,0 +1,29 @@
+package com.mongodb.hadoop.io;
+
+import com.mongodb.*;
+import java.util.Iterator;
+import java.util.logging.Logger;
+
+/**
+ *
+ */
+public class LazyDBReadableBytesCallback extends LazyDBCallback {
+
+ public LazyDBReadableBytesCallback( DBCollection coll ){
+ super(coll);
+ }
+
+ @Override
+ public Object createObject( byte[] data, int offset ){
+ LazyDBReadableBytes o = new LazyDBReadableBytes( data, offset, this );
+ Iterator it = o.keySet().iterator();
+ if ( it.hasNext() && it.next().equals( "$ref" ) &&
+ o.containsField( "$id" ) ){
+ return new DBRef( null, o );
+ }
+ return o;
+ }
+
+ private static final Logger log = Logger.getLogger( LazyDBReadableBytesCallback.class.getName() );
+}
+
View
31 core/src/main/java/com/mongodb/hadoop/io/LazyDBReadableBytesDecoder.java
@@ -0,0 +1,31 @@
+package com.mongodb.hadoop.io;
+import com.mongodb.*;
+import org.bson.*;
+import java.io.InputStream;
+import java.io.IOException;
+
+public class LazyDBReadableBytesDecoder extends LazyWriteableDBDecoder implements DBDecoder{
+
+ static class LazyDBReadableBytesDecoderFactory implements DBDecoderFactory {
+ @Override
+ public DBDecoder create( ){
+ return new LazyDBReadableBytesDecoder();
+ }
+ }
+
+ public static DBDecoderFactory FACTORY = new LazyDBReadableBytesDecoderFactory();
+
+ public DBCallback getDBCallback(DBCollection collection) {
+ return new LazyDBReadableBytesCallback(collection);
+ }
+
+ @Override
+ public BSONObject readObject(InputStream in) throws IOException {
+ BSONCallback c = new LazyDBReadableBytesCallback(null);
+ decode( in , c );
+ return (BSONObject)c.get();
+ }
+
+
+}
+
View
10 core/src/main/java/com/mongodb/hadoop/mapred/input/MongoRecordReader.java
@@ -92,13 +92,17 @@ public boolean nextKeyValue(){
public boolean next( BSONWritable key, BSONWritable value ){
if ( nextKeyValue() ){
- log.debug( "Had another k/v" );
key.put( "_id", getCurrentKey().get( "_id" ) );
- value.putAll( getCurrentValue() );
+ if(getCurrentValue() instanceof LazyDBReadableBytes){
+ LazyDBReadableBytes lazyCurrent = (LazyDBReadableBytes)getCurrentValue();
+ value.initialize( lazyCurrent.getRawData() );
+ }else{
+ value.putAll(getCurrentValue());
+ }
return true;
}
else{
- log.info( "Cursor exhausted." );
+ log.info( "Cursor exhausted!" );
return false;
}
}
View
2 streaming/examples/twitter/run_twit_py.sh
@@ -1 +1 @@
-hadoop jar target/mongo-hadoop-streaming-assembly*.jar -mapper examples/twitter/twit_map.py -reducer examples/twitter/twit_reduce.py -inputURI mongodb://127.0.0.1/test.live -outputURI mongodb://127.0.0.1/test.twit_reduction -file examples/twitter/twit_map.py -file examples/twitter/twit_reduce.py
+hadoop jar target/mongo-hadoop-streaming-assembly*.jar -mapper examples/twitter/twit_map.py -reducer examples/twitter/twit_reduce.py -inputURI mongodb://127.0.0.1/demo.live -outputURI mongodb://127.0.0.1/demo.twit_reduction -file examples/twitter/twit_map.py -file examples/twitter/twit_reduce.py
View
3 streaming/examples/twitter/twit_map.py
@@ -7,7 +7,8 @@
def mapper(documents):
for doc in documents:
- yield {'_id': doc['user']['time_zone'], 'count': 1}
+ if 'user' in doc:
+ yield {'_id': doc['user']['time_zone'], 'count': 1}
BSONMapper(mapper)
print >> sys.stderr, "Done Mapping."

0 comments on commit 20e14eb

Please sign in to comment.