diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index af9a1ce935821..aa0f8e4ce0101 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.input.{WholeTextFileInputFormat,ByteInputFormat} +import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -517,11 +517,11 @@ class SparkContext(config: SparkConf) extends Logging { * * @note Small files are preferred, large file is also allowable, but may cause bad performance. */ - def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = { + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = { val job = new NewHadoopJob(hadoopConfiguration) NewFileInputFormat.addInputPath(job, new Path(path)) val updateConf = job.getConfiguration - new BinaryFileRDD( + new RawFileRDD( this, classOf[ByteInputFormat], classOf[String], @@ -530,6 +530,28 @@ class SparkContext(config: SparkConf) extends Logging { minPartitions).setName(path) } + /** + * Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file (useful for binary data) + * Care must be taken to close the files afterwards + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + @DeveloperApi + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, DataInputStream)] = { + val job = new NewHadoopJob(hadoopConfiguration) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updateConf = job.getConfiguration + new RawFileRDD( + this, + classOf[StreamInputFormat], + classOf[String], + classOf[DataInputStream], + updateConf, + minPartitions).setName(path) + } + /** * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable), diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 8a5f8088a05ca..7690d009fa2b7 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -20,6 +20,8 @@ package org.apache.spark.api.java import java.util import java.util.{Map => JMap} +import java.io.DataInputStream + import scala.collection.JavaConversions import scala.collection.JavaConversions._ import scala.language.implicitConversions @@ -180,6 +182,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def textFile(path: String, minPartitions: Int): JavaRDD[String] = sc.textFile(path, minPartitions) + + /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a @@ -210,6 +214,66 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path, minPartitions)) + /** + * Read a directory of binary files from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + *

For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do `JavaPairRDD rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, + * + *

then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,DataInputStream] = + new JavaPairRDD(sc.dataStreamFiles(path,minPartitions)) + + /** + * Read a directory of files as DataInputStreams from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + *

For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do `JavaPairRDD rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`, + * + *

then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,Array[Byte]] = + new JavaPairRDD(sc.binaryFiles(path,minPartitions)) + /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a diff --git a/core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala b/core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala deleted file mode 100644 index 683784c8873b2..0000000000000 --- a/core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala +++ /dev/null @@ -1,102 +0,0 @@ -package org.apache.spark.input - -import scala.collection.JavaConversions._ -import com.google.common.io.{ByteStreams, Closeables} -import org.apache.hadoop.mapreduce.InputSplit -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit -import org.apache.hadoop.mapreduce.RecordReader -import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat -import org.apache.hadoop.mapreduce.JobContext -import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader - - -/** - * The new (Hadoop 2.0) InputFormat for while binary files (not be to be confused with the recordreader itself) - */ -@serializable abstract class BinaryFileInputFormat[T] - extends CombineFileInputFormat[String,T] { - override protected def isSplitable(context: JobContext, file: Path): Boolean = false - /** - * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. - */ - def setMaxSplitSize(context: JobContext, minPartitions: Int) { - val files = listStatus(context) - val totalLen = files.map { file => - if (file.isDir) 0L else file.getLen - }.sum - - /** val maxSplitSize = Math.ceil(totalLen * 1.0 / - (if (minPartitions == 0) 1 else minPartitions)).toLong **/ - val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong - super.setMaxSplitSize(maxSplitSize) - } - - def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T] - -} - -/** - * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole tiff file - * out in a key-value pair, where the key is the file path and the value is the entire content of - * the file as a TSliceReader (to keep the size information - */ -@serializable abstract class BinaryRecordReader[T]( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) - extends RecordReader[String, T] { - - private val path = split.getPath(index) - private val fs = path.getFileSystem(context.getConfiguration) - - // True means the current file has been processed, then skip it. - private var processed = false - - private val key = path.toString - private var value: T = null.asInstanceOf[T] - override def initialize(split: InputSplit, context: TaskAttemptContext) = {} - override def close() = {} - - override def getProgress = if (processed) 1.0f else 0.0f - - override def getCurrentKey = key - - override def getCurrentValue = value - - override def nextKeyValue = { - if (!processed) { - val fileIn = fs.open(path) - val innerBuffer = ByteStreams.toByteArray(fileIn) - value = parseByteArray(innerBuffer) - Closeables.close(fileIn, false) - - processed = true - true - } else { - false - } - } - def parseByteArray(inArray: Array[Byte]): T -} - -/** - * A demo class for extracting just the byte array itself - */ - -@serializable class ByteInputFormat extends BinaryFileInputFormat[Array[Byte]] { - override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= - { - new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader]) - } -} - -@serializable class ByteRecordReader( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) - extends BinaryRecordReader[Array[Byte]](split,context,index) { - - def parseByteArray(inArray: Array[Byte]) = inArray -} diff --git a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala new file mode 100644 index 0000000000000..3f299508fe790 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { + val files = listStatus(context) + val totalLen = files.map { file => + if (file.isDir) 0L else file.getLen + }.sum + + val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong + super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T] + +} + +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] to reading files out as streams + */ +abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + private val path = split.getPath(index) + private val fs = path.getFileSystem(context.getConfiguration) + + // True means the current file has been processed, then skip it. + private var processed = false + + private val key = path.toString + private var value: T = null.asInstanceOf[T] + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + override def nextKeyValue = { + if (!processed) { + val fileIn: FSDataInputStream = fs.open(path) + value = parseStream(fileIn) + processed = true + true + } else { + false + } + } + + /** + * Parse the stream (and close it afterwards) and return the value as in type T + * @param inStream the stream to be read in + * @return the data formatted as + */ + def parseStream(inStream: DataInputStream): T +} + +/** + * Reads the record in directly as a stream for other objects to manipulate and handle + */ +class StreamRecordReader( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends StreamBasedRecordReader[DataInputStream](split,context,index) { + + def parseStream(inStream: DataInputStream): DataInputStream = inStream +} + +/** + * A class for extracting the information from the file using the BinaryRecordReader (as Byte array) + */ +class StreamInputFormat extends StreamFileInputFormat[DataInputStream] { + override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= + { + new CombineFileRecordReader[String,DataInputStream](split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader]) + } +} + +/** + * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole binary file + * out in a key-value pair, where the key is the file path and the value is the entire content of + * the file as a byte array + */ +abstract class BinaryRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends StreamBasedRecordReader[T](split,context,index) { + + def parseStream(inStream: DataInputStream): T = { + val innerBuffer = ByteStreams.toByteArray(inStream) + Closeables.close(inStream, false) + parseByteArray(innerBuffer) + } + def parseByteArray(inArray: Array[Byte]): T +} + + +class ByteRecordReader( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends BinaryRecordReader[Array[Byte]](split,context,index) { + + def parseByteArray(inArray: Array[Byte]) = inArray +} + +/** + * A class for extracting the information from the file using the BinaryRecordReader (as Byte array) + */ +class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] { + override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= + { + new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader]) + } +} + + diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 7fdc553659f67..e0bacb6dc8db1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.rdd /** Allows better control of the partitioning @@ -18,11 +35,11 @@ import org.apache.spark.Partition import org.apache.spark.SerializableWritable import org.apache.spark.{SparkContext, TaskContext} -import org.apache.spark.input.BinaryFileInputFormat +import org.apache.spark.input.StreamFileInputFormat -private[spark] class BinaryFileRDD[T]( +private[spark] class RawFileRDD[T]( sc : SparkContext, - inputFormatClass: Class[_ <: BinaryFileInputFormat[T]], + inputFormatClass: Class[_ <: StreamFileInputFormat[T]], keyClass: Class[String], valueClass: Class[T], @transient conf: Configuration,