diff --git a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala index 9c97f41097e52..e1caf9c44ec16 100644 --- a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala +++ b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.fs.{ FSDataInputStream, Path } +import org.apache.spark.annotation.DeveloperApi import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader @@ -60,6 +61,7 @@ abstract class StreamFileInputFormat[T] * @note TaskAttemptContext is not serializable resulting in the confBytes construct * @note CombineFileSplit is not serializable resulting in the splitBytes construct */ +@DeveloperApi class PortableDataStream(@transient isplit: CombineFileSplit, @transient context: TaskAttemptContext, index: Integer) extends Serializable { @@ -205,8 +207,7 @@ private[spark] class StreamRecordReader( } /** - * A class for extracting the information from the file using the - * BinaryRecordReader (as Byte array) + * The format for the PortableDataStream files */ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] { override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) = @@ -216,22 +217,3 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat } } -/** - * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single binary file - * out in a key-value pair, where the key is the file path and the value is the entire content of - * the file as a byte array - */ -abstract class BinaryRecordReader[T]( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) - extends StreamBasedRecordReader[T](split, context, index) { - - def parseStream(inpStream: PortableDataStream): T = { - val inStream = inpStream.open() - val innerBuffer = ByteStreams.toByteArray(inStream) - Closeables.close(inStream, false) - parseByteArray(innerBuffer) - } - def parseByteArray(inArray: Array[Byte]): T -}