From 6379be487cfa91097f3591dfd05b8e87e09c2399 Mon Sep 17 00:00:00 2001 From: Kevin Mader Date: Wed, 22 Oct 2014 00:15:16 +0200 Subject: [PATCH] reorganizing code --- .../spark/input/PortableDataStream.scala | 138 +++++++++--------- 1 file changed, 69 insertions(+), 69 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index dcb74f3556979..9c8121e2a6d14 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -51,6 +51,75 @@ private[spark] abstract class StreamFileInputFormat[T] } +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] + * to reading files out as streams + */ +private[spark] abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + // True means the current file has been processed, then skip it. + private var processed = false + + private var key = "" + private var value: T = null.asInstanceOf[T] + + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + override def nextKeyValue = { + if (!processed) { + val fileIn = new PortableDataStream(split, context, index) + value = parseStream(fileIn) + fileIn.close() // if it has not been open yet, close does nothing + key = fileIn.getPath + processed = true + true + } else { + false + } + } + + /** + * Parse the stream (and close it afterwards) and return the value as in type T + * @param inStream the stream to be read in + * @return the data formatted as + */ + def parseStream(inStream: PortableDataStream): T +} + +/** + * Reads the record in directly as a stream for other objects to manipulate and handle + */ +private[spark] class StreamRecordReader( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends StreamBasedRecordReader[PortableDataStream](split, context, index) { + + def parseStream(inStream: PortableDataStream): PortableDataStream = inStream +} + +/** + * The format for the PortableDataStream files + */ +private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] { + override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) = + { + new CombineFileRecordReader[String, PortableDataStream]( + split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader]) + } +} + /** * A class that allows DataStreams to be serialized and moved around by not creating them * until they need to be read @@ -143,72 +212,3 @@ class PortableDataStream(@transient isplit: CombineFileSplit, def getPath(): String = path } -/** - * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] - * to reading files out as streams - */ -private[spark] abstract class StreamBasedRecordReader[T]( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) - extends RecordReader[String, T] { - - // True means the current file has been processed, then skip it. - private var processed = false - - private var key = "" - private var value: T = null.asInstanceOf[T] - - override def initialize(split: InputSplit, context: TaskAttemptContext) = {} - override def close() = {} - - override def getProgress = if (processed) 1.0f else 0.0f - - override def getCurrentKey = key - - override def getCurrentValue = value - - override def nextKeyValue = { - if (!processed) { - val fileIn = new PortableDataStream(split, context, index) - value = parseStream(fileIn) - fileIn.close() // if it has not been open yet, close does nothing - key = fileIn.getPath - processed = true - true - } else { - false - } - } - - /** - * Parse the stream (and close it afterwards) and return the value as in type T - * @param inStream the stream to be read in - * @return the data formatted as - */ - def parseStream(inStream: PortableDataStream): T -} - -/** - * Reads the record in directly as a stream for other objects to manipulate and handle - */ -private[spark] class StreamRecordReader( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) - extends StreamBasedRecordReader[PortableDataStream](split, context, index) { - - def parseStream(inStream: PortableDataStream): PortableDataStream = inStream -} - -/** - * The format for the PortableDataStream files - */ -private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] { - override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) = - { - new CombineFileRecordReader[String, PortableDataStream]( - split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader]) - } -} -