Skip to content
Browse files

Initial basics for Hive support, as well as InputFormat for BSON files.

Need to migrate to "old" APIs for the BSON Input it turns out.
  • Loading branch information...
1 parent c7627b9 commit af2dabf9403e58492be9986bc159441e547bd404 @bwmcadams bwmcadams committed
View
3 build.sbt
@@ -3,7 +3,6 @@ name := "mongo-hadoop"
organization := "org.mongodb"
-seq(net.virtualvoid.sbt.graph.Plugin.graphSettings: _*)
-hadoopRelease in ThisBuild := "0.22"
+hadoopRelease in ThisBuild := "default"
View
155 core/src/main/java/com/mongodb/hadoop/input/BSONFileRecordReader.java
@@ -0,0 +1,155 @@
+package com.mongodb.hadoop.input;
+
+import com.mongodb.hadoop.io.BSONWritable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.bson.*;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Copyright (c) 2008 - 2012 10gen, Inc. <http://10gen.com>
+ * <p/>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class BSONFileRecordReader extends RecordReader<NullWritable, BSONWritable> {
+ private FileSplit fileSplit;
+ private Configuration conf;
+ private BSONReader rdr;
+ private static final Log log = LogFactory.getLog(BSONFileRecordReader.class);
+ private Object key;
+ private BSONWritable value;
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+ this.fileSplit = (FileSplit) inputSplit;
+ this.conf = context.getConfiguration();
+ Path file = fileSplit.getPath();
+ FileSystem fs = file.getFileSystem(conf);
+ FSDataInputStream in = null;
+ in = fs.open(file);
+ rdr = new BSONReader(in);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (rdr.hasNext()) {
+ value = new BSONWritable( rdr.next() );
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public BSONWritable getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return rdr.hasNext() ? 1.0f : 0.0f;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+
+ private class BSONReader implements Iterable<BSONObject>, Iterator<BSONObject> {
+
+ public BSONReader(final InputStream input) {
+ _input = new DataInputStream( input );
+ }
+
+ public Iterator<BSONObject> iterator(){
+ return this;
+ }
+
+ public boolean hasNext(){
+ checkHeader();
+ return hasMore.get();
+ }
+
+ private synchronized void checkHeader(){
+ // Read the BSON length from the start of the record
+ byte[] l = new byte[4];
+ try {
+ _input.readFully( l );
+ nextLen = org.bson.io.Bits.readInt( l );
+ nextHdr = l;
+ hasMore.set( true );
+ } catch (Exception e) {
+ log.debug( "Failed to get next header: " + e, e );
+ hasMore.set( false );
+ try {
+ _input.close();
+ }
+ catch ( IOException e1 ) { }
+ }
+ }
+
+ public BSONObject next(){
+ try {
+ byte[] data = new byte[nextLen + 4];
+ System.arraycopy( nextHdr, 0, data, 0, 4 );
+ _input.readFully( data, 4, nextLen - 4 );
+ decoder.decode( data, callback );
+ return (BSONObject) callback.get();
+ }
+ catch ( IOException e ) {
+ /* If we can't read another length it's not an error, just return quietly. */
+ log.info( "No Length Header available." + e );
+ hasMore.set( false );
+ try {
+ _input.close();
+ }
+ catch ( IOException e1 ) { }
+ throw new NoSuchElementException("Iteration completed.");
+ }
+ }
+
+ public void remove(){
+ throw new UnsupportedOperationException();
+ }
+
+ private final BSONDecoder decoder = new BasicBSONDecoder();
+ private final BSONCallback callback = new BasicBSONCallback();
+
+ private volatile byte[] nextHdr;
+ private volatile int nextLen;
+ private AtomicBoolean hasMore = new AtomicBoolean( true );
+ private final DataInputStream _input;
+ }
+}
View
28 project/MongoHadoopBuild.scala
@@ -8,7 +8,7 @@ import AssemblyKeys._
object MongoHadoopBuild extends Build {
lazy val buildSettings = Seq(
- version := "1.0.0",
+ version := "1.1.0-SNAPSHOT",
crossScalaVersions := Nil,
crossPaths := false,
organization := "org.mongodb"
@@ -38,9 +38,9 @@ object MongoHadoopBuild extends Build {
"0.23.x" -> hadoopDependencies("0.23.1", true, stockPig, nextGen=true),
"cdh4" -> hadoopDependencies(cdh4CoreHadoop, true, cdh4Pig, Some(cdh4YarnHadoop), nextGen=true),
"cdh3" -> hadoopDependencies(cdh3Hadoop, true, cdh3Pig),
- "1.0" -> hadoopDependencies("1.0.2", false, stockPig),
- "1.0.x" -> hadoopDependencies("1.0.2", false, stockPig),
- "default" -> hadoopDependencies("1.0.2", false, stockPig)
+ "1.0" -> hadoopDependencies("1.0.3", false, stockPig),
+ "1.0.x" -> hadoopDependencies("1.0.3", false, stockPig),
+ "default" -> hadoopDependencies("1.0.3", false, stockPig)
)
lazy val root = Project( id = "mongo-hadoop",
@@ -51,6 +51,9 @@ object MongoHadoopBuild extends Build {
base = file("core"),
settings = coreSettings )
+ lazy val hive = Project( id = "mongo-hadoop-hive",
+ base = file("hive"),
+ settings = hiveSettings ) dependsOn( core )
lazy val pig = Project( id = "mongo-hadoop-pig",
base = file("pig"),
@@ -70,6 +73,7 @@ object MongoHadoopBuild extends Build {
settings = exampleSettings ) dependsOn( core )
+
lazy val baseSettings = Defaults.defaultSettings ++ buildSettings ++ Seq(
resolvers ++= Seq(Resolvers.mitSimileRepo, Resolvers.clouderaRepo, Resolvers.mavenOrgRepo, Resolvers.sonatypeRels),
@@ -160,6 +164,7 @@ object MongoHadoopBuild extends Build {
)
val exampleSettings = dependentSettings
+
val pigSettings = dependentSettings ++ Seq(
resolvers ++= Seq(Resolvers.rawsonApache), /** Seems to have thrift deps I need*/
libraryDependencies <++= (scalaVersion, libraryDependencies, hadoopRelease) { (sv, deps, hr: String) =>
@@ -169,6 +174,11 @@ object MongoHadoopBuild extends Build {
}
)
+ val hiveSettings = dependentSettings ++ Seq(
+ resolvers ++= Seq(Resolvers.rawsonApache), /** Seems to have thrift deps I need*/
+ libraryDependencies ++= Seq(Dependencies.hiveSerDe)
+ )
+
val coreSettings = dependentSettings ++ Seq(
libraryDependencies ++= Seq(Dependencies.mongoJavaDriver, Dependencies.junit),
libraryDependencies <++= (scalaVersion, libraryDependencies, hadoopRelease) { (sv, deps, hr: String) =>
@@ -177,12 +187,12 @@ object MongoHadoopBuild extends Build {
hadoopDeps._2()
},
libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
- val versionMap = Map("2.8.0" -> ("specs2_2.8.0", "1.5"),
- "2.8.1" -> ("specs2_2.8.1", "1.5"),
+ val versionMap = Map("2.8.1" -> ("specs2_2.8.1", "1.5"),
"2.9.0" -> ("specs2_2.9.0", "1.7.1"),
"2.9.0-1" -> ("specs2_2.9.0", "1.7.1"),
- "2.9.1" -> ("specs2_2.9.1", "1.7.1"))
- val tuple = versionMap.getOrElse(sv, sys.error("Unsupported Scala version for Specs2"))
+ "2.9.1" -> ("specs2_2.9.1", "1.7.1"),
+ "2.9.2" -> ("specs2_2.9.2", "1.10"))
+ val tuple = versionMap.getOrElse(sv, sys.error("Unsupported Scala version '%s' for Specs2".format(sv)))
deps :+ ("org.specs2" % tuple._1 % tuple._2 % "test")
},
autoCompilerPlugins := true,
@@ -269,7 +279,7 @@ object Dependencies {
val mongoJavaDriver = "org.mongodb" % "mongo-java-driver" % "2.7.3"
val junit = "junit" % "junit" % "4.10" % "test"
val flume = "com.cloudera" % "flume-core" % "0.9.4-cdh3u3"
-
+ val hiveSerDe = "org.apache.hive" % "hive-serde" % "0.9.0"
}
// vim: set ts=2 sw=2 sts=2 et:
View
7 project/plugins.sbt
@@ -1,7 +1,4 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.7.3")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3")
-resolvers += Resolver.url("sbt-plugin-releases",
- new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
-
-addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.5.2")
+resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
View
1 streaming/src/main/java/com/mongodb/hadoop/mapred/input/MongoRecordReader.java
@@ -31,7 +31,6 @@ public MongoRecordReader( MongoInputSplit split ){
_cursor = split.getCursor();
}
- @Override
public void close(){
if ( _cursor != null )
_cursor.close();

0 comments on commit af2dabf

Please sign in to comment.
Something went wrong with that request. Please try again.