Skip to content

Commit

Permalink
removed unneeded classes added DeveloperApi note to portabledatastrea…
Browse files Browse the repository at this point in the history
…ms since the implementation might change
  • Loading branch information
kmader committed Oct 2, 2014
1 parent c27a8f1 commit 49174d9
Showing 1 changed file with 3 additions and 21 deletions.
24 changes: 3 additions & 21 deletions core/src/main/scala/org/apache/spark/input/RawFileInput.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.fs.{ FSDataInputStream, Path }
import org.apache.spark.annotation.DeveloperApi
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
Expand Down Expand Up @@ -60,6 +61,7 @@ abstract class StreamFileInputFormat[T]
* @note TaskAttemptContext is not serializable resulting in the confBytes construct
* @note CombineFileSplit is not serializable resulting in the splitBytes construct
*/
@DeveloperApi
class PortableDataStream(@transient isplit: CombineFileSplit,
@transient context: TaskAttemptContext, index: Integer)
extends Serializable {
Expand Down Expand Up @@ -205,8 +207,7 @@ private[spark] class StreamRecordReader(
}

/**
* A class for extracting the information from the file using the
* BinaryRecordReader (as Byte array)
* The format for the PortableDataStream files
*/
private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) =
Expand All @@ -216,22 +217,3 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
}
}

/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single binary file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file as a byte array
*/
abstract class BinaryRecordReader[T](
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends StreamBasedRecordReader[T](split, context, index) {

def parseStream(inpStream: PortableDataStream): T = {
val inStream = inpStream.open()
val innerBuffer = ByteStreams.toByteArray(inStream)
Closeables.close(inStream, false)
parseByteArray(innerBuffer)
}
def parseByteArray(inArray: Array[Byte]): T
}

0 comments on commit 49174d9

Please sign in to comment.