diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d12dd8a894bb2..1b6ad249bc5c9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat, FixedLengthBinaryInputFormat} +import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, ByteInputFormat, FixedLengthBinaryInputFormat} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -533,7 +533,7 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file + * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file * (useful for binary data) * * @@ -544,7 +544,7 @@ class SparkContext(config: SparkConf) extends Logging { */ @DeveloperApi def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): - RDD[(String, DataInputStream)] = { + RDD[(String, PortableDataStream)] = { val job = new NewHadoopJob(hadoopConfiguration) NewFileInputFormat.addInputPath(job, new Path(path)) val updateConf = job.getConfiguration @@ -552,7 +552,7 @@ class SparkContext(config: SparkConf) extends Logging { this, classOf[StreamInputFormat], classOf[String], - classOf[DataInputStream], + classOf[PortableDataStream], updateConf, minPartitions).setName(path) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index a407263aa8dd0..dcaaccb437168 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -23,7 +23,7 @@ import java.util.{Map => JMap} import java.io.DataInputStream import org.apache.hadoop.io.{BytesWritable, LongWritable} -import org.apache.spark.input.FixedLengthBinaryInputFormat +import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat} import scala.collection.JavaConversions import scala.collection.JavaConversions._ @@ -257,7 +257,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * @param minPartitions A suggestion value of the minimal splitting number for input data. */ def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): - JavaPairRDD[String,DataInputStream] = new JavaPairRDD(sc.dataStreamFiles(path,minPartitions)) + JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.dataStreamFiles(path,minPartitions)) /** * Read a directory of files as DataInputStream from HDFS, diff --git a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala index fa79ed7af3513..4a8e543adecfa 100644 --- a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala +++ b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala @@ -51,40 +51,66 @@ abstract class StreamFileInputFormat[T] } def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): - RecordReader[String,T] + RecordReader[String,T] } +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + * @param split + * @param context + * @param index + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + private var path = "" + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var isOpen = false + + def open(): FSDataInputStream= { + val pathp = split.getPath(index) + path = pathp.toString + val fs = pathp.getFileSystem(context.getConfiguration) + fileIn = fs.open(pathp) + isOpen=true + fileIn + } + + def close() = { + if (isOpen) { + try { + fileIn.close() + isOpen=false + } catch { + case ioe: java.io.IOException => // do nothing + } + } + } + def getPath(): String = path +} + /** * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] * to reading files out as streams */ abstract class StreamBasedRecordReader[T]( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) extends RecordReader[String, T] { - private val path = split.getPath(index) - private val fs = path.getFileSystem(context.getConfiguration) + // True means the current file has been processed, then skip it. private var processed = false - private val key = path.toString + private var key = "" private var value: T = null.asInstanceOf[T] - // the file to be read when nextkeyvalue is called - private lazy val fileIn: FSDataInputStream = fs.open(path) + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} - override def close() = { - // make sure the file is closed - try { - fileIn.close() - } catch { - case ioe: java.io.IOException => // do nothing - } - } + override def close() = {} override def getProgress = if (processed) 1.0f else 0.0f @@ -93,10 +119,13 @@ abstract class StreamBasedRecordReader[T]( override def getCurrentValue = value + override def nextKeyValue = { if (!processed) { - + val fileIn = new PortableDataStream(split,context,index) + key = fileIn.getPath value = parseStream(fileIn) + fileIn.close() // if it has not been open yet, close does nothing processed = true true } else { @@ -109,29 +138,29 @@ abstract class StreamBasedRecordReader[T]( * @param inStream the stream to be read in * @return the data formatted as */ - def parseStream(inStream: DataInputStream): T + def parseStream(inStream: PortableDataStream): T } /** * Reads the record in directly as a stream for other objects to manipulate and handle */ private[spark] class StreamRecordReader( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) - extends StreamBasedRecordReader[DataInputStream](split,context,index) { + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends StreamBasedRecordReader[PortableDataStream](split,context,index) { - def parseStream(inStream: DataInputStream): DataInputStream = inStream + def parseStream(inStream: PortableDataStream): PortableDataStream = inStream } /** * A class for extracting the information from the file using the * BinaryRecordReader (as Byte array) */ -private[spark] class StreamInputFormat extends StreamFileInputFormat[DataInputStream] { +private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] { override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= { - new CombineFileRecordReader[String,DataInputStream]( + new CombineFileRecordReader[String,PortableDataStream]( split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader] ) } @@ -143,12 +172,13 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[DataInputSt * the file as a byte array */ abstract class BinaryRecordReader[T]( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) extends StreamBasedRecordReader[T](split,context,index) { - def parseStream(inStream: DataInputStream): T = { + def parseStream(inpStream: PortableDataStream): T = { + val inStream = inpStream.open() val innerBuffer = ByteStreams.toByteArray(inStream) Closeables.close(inStream, false) parseByteArray(innerBuffer) @@ -157,13 +187,14 @@ abstract class BinaryRecordReader[T]( } + private[spark] class ByteRecordReader( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) extends BinaryRecordReader[Array[Byte]](split,context,index) { - def parseByteArray(inArray: Array[Byte]) = inArray + override def parseByteArray(inArray: Array[Byte]) = inArray } /** diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index b723f79c7fb3f..2bb6d809cc174 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -226,9 +226,8 @@ class FileSuite extends FunSuite with LocalSparkContext { test("binary file input as byte array") { sc = new SparkContext("local", "test") - val outputDir = new File(tempDir).getAbsolutePath - val outFile = new File(outputDir, "record-bytestream-00000.bin") - val outFileName = outFile.toPath().toString() + val outFile = new File(tempDir, "record-bytestream-00000.bin") + val outFileName = outFile.getAbsolutePath() // create file val testOutput = Array[Byte](1,2,3,4,5,6) @@ -252,10 +251,8 @@ class FileSuite extends FunSuite with LocalSparkContext { // a fixed length of 6 bytes sc = new SparkContext("local", "test") - - val outputDir = new File(tempDir).getAbsolutePath - val outFile = new File(outputDir, "record-bytestream-00000.bin") - val outFileName = outFile.toPath().toString() + val outFile = new File(tempDir, "record-bytestream-00000.bin") + val outFileName = outFile.getAbsolutePath() // create file val testOutput = Array[Byte](1,2,3,4,5,6) diff --git a/pom.xml b/pom.xml index ae97bf03c53a2..cfd59cf463131 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,7 @@ bagel graphx mllib + imglib tools streaming sql/catalyst @@ -743,6 +744,10 @@ jackson-mapper-asl 1.8.8 + + io.scif + pom-scifio +