From 1cfa38ae8bb1dcec0416dc9df1ba7a61aef82f9e Mon Sep 17 00:00:00 2001 From: Kevin Mader Date: Thu, 31 Jul 2014 02:14:57 +0200 Subject: [PATCH] added apache headers, added datainputstream directly as an output option for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api --- .../scala/org/apache/spark/SparkContext.scala | 28 ++- .../spark/api/java/JavaSparkContext.scala | 64 +++++++ .../apache/spark/input/BinaryFileInput.scala | 102 ----------- .../org/apache/spark/input/RawFileInput.scala | 162 ++++++++++++++++++ .../org/apache/spark/rdd/BinaryFileRDD.scala | 23 ++- 5 files changed, 271 insertions(+), 108 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala create mode 100644 core/src/main/scala/org/apache/spark/input/RawFileInput.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index af9a1ce935821..aa0f8e4ce0101 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.input.{WholeTextFileInputFormat,ByteInputFormat} +import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -517,11 +517,11 @@ class SparkContext(config: SparkConf) extends Logging { * * @note Small files are preferred, large file is also allowable, but may cause bad performance. */ - def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = { + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = { val job = new NewHadoopJob(hadoopConfiguration) NewFileInputFormat.addInputPath(job, new Path(path)) val updateConf = job.getConfiguration - new BinaryFileRDD( + new RawFileRDD( this, classOf[ByteInputFormat], classOf[String], @@ -530,6 +530,28 @@ class SparkContext(config: SparkConf) extends Logging { minPartitions).setName(path) } + /** + * Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file (useful for binary data) + * Care must be taken to close the files afterwards + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + @DeveloperApi + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, DataInputStream)] = { + val job = new NewHadoopJob(hadoopConfiguration) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updateConf = job.getConfiguration + new RawFileRDD( + this, + classOf[StreamInputFormat], + classOf[String], + classOf[DataInputStream], + updateConf, + minPartitions).setName(path) + } + /** * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable), diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 8a5f8088a05ca..7690d009fa2b7 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -20,6 +20,8 @@ package org.apache.spark.api.java import java.util import java.util.{Map => JMap} +import java.io.DataInputStream + import scala.collection.JavaConversions import scala.collection.JavaConversions._ import scala.language.implicitConversions @@ -180,6 +182,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def textFile(path: String, minPartitions: Int): JavaRDD[String] = sc.textFile(path, minPartitions) + + /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a @@ -210,6 +214,66 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path, minPartitions)) + /** + * Read a directory of binary files from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + *

For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do `JavaPairRDD rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, + * + *

then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,DataInputStream] = + new JavaPairRDD(sc.dataStreamFiles(path,minPartitions)) + + /** + * Read a directory of files as DataInputStreams from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + *

For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do `JavaPairRDD rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`, + * + *

then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,Array[Byte]] = + new JavaPairRDD(sc.binaryFiles(path,minPartitions)) + /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a diff --git a/core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala b/core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala deleted file mode 100644 index 683784c8873b2..0000000000000 --- a/core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala +++ /dev/null @@ -1,102 +0,0 @@ -package org.apache.spark.input - -import scala.collection.JavaConversions._ -import com.google.common.io.{ByteStreams, Closeables} -import org.apache.hadoop.mapreduce.InputSplit -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit -import org.apache.hadoop.mapreduce.RecordReader -import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat -import org.apache.hadoop.mapreduce.JobContext -import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader - - -/** - * The new (Hadoop 2.0) InputFormat for while binary files (not be to be confused with the recordreader itself) - */ -@serializable abstract class BinaryFileInputFormat[T] - extends CombineFileInputFormat[String,T] { - override protected def isSplitable(context: JobContext, file: Path): Boolean = false - /** - * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. - */ - def setMaxSplitSize(context: JobContext, minPartitions: Int) { - val files = listStatus(context) - val totalLen = files.map { file => - if (file.isDir) 0L else file.getLen - }.sum - - /** val maxSplitSize = Math.ceil(totalLen * 1.0 / - (if (minPartitions == 0) 1 else minPartitions)).toLong **/ - val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong - super.setMaxSplitSize(maxSplitSize) - } - - def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T] - -} - -/** - * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole tiff file - * out in a key-value pair, where the key is the file path and the value is the entire content of - * the file as a TSliceReader (to keep the size information - */ -@serializable abstract class BinaryRecordReader[T]( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) - extends RecordReader[String, T] { - - private val path = split.getPath(index) - private val fs = path.getFileSystem(context.getConfiguration) - - // True means the current file has been processed, then skip it. - private var processed = false - - private val key = path.toString - private var value: T = null.asInstanceOf[T] - override def initialize(split: InputSplit, context: TaskAttemptContext) = {} - override def close() = {} - - override def getProgress = if (processed) 1.0f else 0.0f - - override def getCurrentKey = key - - override def getCurrentValue = value - - override def nextKeyValue = { - if (!processed) { - val fileIn = fs.open(path) - val innerBuffer = ByteStreams.toByteArray(fileIn) - value = parseByteArray(innerBuffer) - Closeables.close(fileIn, false) - - processed = true - true - } else { - false - } - } - def parseByteArray(inArray: Array[Byte]): T -} - -/** - * A demo class for extracting just the byte array itself - */ - -@serializable class ByteInputFormat extends BinaryFileInputFormat[Array[Byte]] { - override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= - { - new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader]) - } -} - -@serializable class ByteRecordReader( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) - extends BinaryRecordReader[Array[Byte]](split,context,index) { - - def parseByteArray(inArray: Array[Byte]) = inArray -} diff --git a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala new file mode 100644 index 0000000000000..3f299508fe790 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { + val files = listStatus(context) + val totalLen = files.map { file => + if (file.isDir) 0L else file.getLen + }.sum + + val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong + super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T] + +} + +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] to reading files out as streams + */ +abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + private val path = split.getPath(index) + private val fs = path.getFileSystem(context.getConfiguration) + + // True means the current file has been processed, then skip it. + private var processed = false + + private val key = path.toString + private var value: T = null.asInstanceOf[T] + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + override def nextKeyValue = { + if (!processed) { + val fileIn: FSDataInputStream = fs.open(path) + value = parseStream(fileIn) + processed = true + true + } else { + false + } + } + + /** + * Parse the stream (and close it afterwards) and return the value as in type T + * @param inStream the stream to be read in + * @return the data formatted as + */ + def parseStream(inStream: DataInputStream): T +} + +/** + * Reads the record in directly as a stream for other objects to manipulate and handle + */ +class StreamRecordReader( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends StreamBasedRecordReader[DataInputStream](split,context,index) { + + def parseStream(inStream: DataInputStream): DataInputStream = inStream +} + +/** + * A class for extracting the information from the file using the BinaryRecordReader (as Byte array) + */ +class StreamInputFormat extends StreamFileInputFormat[DataInputStream] { + override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= + { + new CombineFileRecordReader[String,DataInputStream](split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader]) + } +} + +/** + * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole binary file + * out in a key-value pair, where the key is the file path and the value is the entire content of + * the file as a byte array + */ +abstract class BinaryRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends StreamBasedRecordReader[T](split,context,index) { + + def parseStream(inStream: DataInputStream): T = { + val innerBuffer = ByteStreams.toByteArray(inStream) + Closeables.close(inStream, false) + parseByteArray(innerBuffer) + } + def parseByteArray(inArray: Array[Byte]): T +} + + +class ByteRecordReader( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends BinaryRecordReader[Array[Byte]](split,context,index) { + + def parseByteArray(inArray: Array[Byte]) = inArray +} + +/** + * A class for extracting the information from the file using the BinaryRecordReader (as Byte array) + */ +class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] { + override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= + { + new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader]) + } +} + + diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 7fdc553659f67..e0bacb6dc8db1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.rdd /** Allows better control of the partitioning @@ -18,11 +35,11 @@ import org.apache.spark.Partition import org.apache.spark.SerializableWritable import org.apache.spark.{SparkContext, TaskContext} -import org.apache.spark.input.BinaryFileInputFormat +import org.apache.spark.input.StreamFileInputFormat -private[spark] class BinaryFileRDD[T]( +private[spark] class RawFileRDD[T]( sc : SparkContext, - inputFormatClass: Class[_ <: BinaryFileInputFormat[T]], + inputFormatClass: Class[_ <: StreamFileInputFormat[T]], keyClass: Class[String], valueClass: Class[T], @transient conf: Configuration,