Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Commit

Permalink
complete bson recordwriter + output format code
Browse files Browse the repository at this point in the history
  • Loading branch information
mpobrien committed May 7, 2013
1 parent e644710 commit cc0b4f5
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 0 deletions.
49 changes: 49 additions & 0 deletions core/src/main/java/com/mongodb/hadoop/BSONFileOutputFormat.java
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2010 10gen Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.hadoop;

// Mongo

import com.mongodb.hadoop.output.*;
import com.mongodb.hadoop.util.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.mapreduce.*;

// Commons
// Hadoop

public class BSONFileOutputFormat<K, V> extends OutputFormat<K, V> {

public void checkOutputSpecs( final JobContext context ){ }

public OutputCommitter getOutputCommitter( final TaskAttemptContext context ){
return new MongoOutputCommiter();
}

/**
* Get the record writer that points to the output collection.
*/
public RecordWriter<K, V> getRecordWriter( final TaskAttemptContext context ){
return new BSONFileRecordWriter( context );
}

public BSONFileOutputFormat(){
}

private static final Log LOG = LogFactory.getLog( BSONFileOutputFormat.class );
}

118 changes: 118 additions & 0 deletions core/src/main/java/com/mongodb/hadoop/output/BSONFileRecordWriter.java
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2011 10gen Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.hadoop.output;

import com.mongodb.*;
import com.mongodb.hadoop.*;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.io.MongoUpdateWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.bson.*;

import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.logging.*;


public class BSONFileRecordWriter<K, V> extends RecordWriter<K, V> {

private static final Log log = LogFactory.getLog( BSONFileRecordWriter.class );
private BSONEncoder bsonEnc = new BasicBSONEncoder();
private final Path outputPath;
private boolean outFileOpened = false;
private FSDataOutputStream outFile = null;
private final TaskAttemptContext _context;

public BSONFileRecordWriter( TaskAttemptContext ctx ){
this.outputPath = new Path(ctx.getConfiguration().get("mapred.output.file"));
this._context = ctx;
}

public void close( TaskAttemptContext context ) throws IOException{
if( this.outFile != null){
this.outFile.close();
}
}

private FSDataOutputStream getOutputStream() throws IOException{
if(this.outFile != null){
return this.outFile;
}else if(this.outFileOpened && this.outFile == null){
throw new IllegalStateException("Opening of output file failed.");
}else{
FileSystem fs = this.outputPath.getFileSystem(this._context.getConfiguration());
this.outFile = fs.create(this.outputPath);
return this.outFile;
}
}

public void write( K key, V value ) throws IOException{
final FSDataOutputStream destination = getOutputStream();

if( value instanceof MongoUpdateWritable ){
throw new IllegalArgumentException("MongoUpdateWriteable can only be used to output to a mongo collection, not a static BSON file.");
}

Object keyBSON = null;
BSONObject toEncode = null;
byte[] outputByteBuf;
if(key != null){
keyBSON = BSONWritable.toBSON(key);
if(keyBSON != null){
toEncode = new BasicDBObject();
}
}

if (value instanceof BSONWritable ){
if(toEncode != null){
((BasicDBObject)toEncode).putAll(((BSONWritable)value).getDoc());
}else{
toEncode = ((BSONWritable)value).getDoc();
}
}else if ( value instanceof BSONObject ){
log.info("in here 2");
if(toEncode != null){
((BasicDBObject)toEncode).putAll((BSONObject)value);
}else{
toEncode = (BSONObject)value;
}
}else{
if(toEncode != null){
((BasicDBObject)toEncode).put("value", BSONWritable.toBSON( value ));
}else{
final DBObject o = new BasicDBObject();
o.put( "value", BSONWritable.toBSON( value ) );
toEncode = o;
}
}

if( keyBSON != null ){
((BasicDBObject)toEncode).put("_id", keyBSON);
}

outputByteBuf = bsonEnc.encode(toEncode);
destination.write(outputByteBuf, 0, outputByteBuf.length);
}

}

0 comments on commit cc0b4f5

Please sign in to comment.