Skip to content

Commit

Permalink
reorganizing code
Browse files Browse the repository at this point in the history
  • Loading branch information
kmader committed Oct 21, 2014
1 parent 7b9d181 commit 6379be4
Showing 1 changed file with 69 additions and 69 deletions.
138 changes: 69 additions & 69 deletions core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,75 @@ private[spark] abstract class StreamFileInputFormat[T]

}

/**
* An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]]
* to reading files out as streams
*/
private[spark] abstract class StreamBasedRecordReader[T](
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, T] {

// True means the current file has been processed, then skip it.
private var processed = false

private var key = ""
private var value: T = null.asInstanceOf[T]

override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
override def close() = {}

override def getProgress = if (processed) 1.0f else 0.0f

override def getCurrentKey = key

override def getCurrentValue = value

override def nextKeyValue = {
if (!processed) {
val fileIn = new PortableDataStream(split, context, index)
value = parseStream(fileIn)
fileIn.close() // if it has not been open yet, close does nothing
key = fileIn.getPath
processed = true
true
} else {
false
}
}

/**
* Parse the stream (and close it afterwards) and return the value as in type T
* @param inStream the stream to be read in
* @return the data formatted as
*/
def parseStream(inStream: PortableDataStream): T
}

/**
* Reads the record in directly as a stream for other objects to manipulate and handle
*/
private[spark] class StreamRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends StreamBasedRecordReader[PortableDataStream](split, context, index) {

def parseStream(inStream: PortableDataStream): PortableDataStream = inStream
}

/**
* The format for the PortableDataStream files
*/
private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) =
{
new CombineFileRecordReader[String, PortableDataStream](
split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader])
}
}

/**
* A class that allows DataStreams to be serialized and moved around by not creating them
* until they need to be read
Expand Down Expand Up @@ -143,72 +212,3 @@ class PortableDataStream(@transient isplit: CombineFileSplit,
def getPath(): String = path
}

/**
* An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]]
* to reading files out as streams
*/
private[spark] abstract class StreamBasedRecordReader[T](
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, T] {

// True means the current file has been processed, then skip it.
private var processed = false

private var key = ""
private var value: T = null.asInstanceOf[T]

override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
override def close() = {}

override def getProgress = if (processed) 1.0f else 0.0f

override def getCurrentKey = key

override def getCurrentValue = value

override def nextKeyValue = {
if (!processed) {
val fileIn = new PortableDataStream(split, context, index)
value = parseStream(fileIn)
fileIn.close() // if it has not been open yet, close does nothing
key = fileIn.getPath
processed = true
true
} else {
false
}
}

/**
* Parse the stream (and close it afterwards) and return the value as in type T
* @param inStream the stream to be read in
* @return the data formatted as
*/
def parseStream(inStream: PortableDataStream): T
}

/**
* Reads the record in directly as a stream for other objects to manipulate and handle
*/
private[spark] class StreamRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends StreamBasedRecordReader[PortableDataStream](split, context, index) {

def parseStream(inStream: PortableDataStream): PortableDataStream = inStream
}

/**
* The format for the PortableDataStream files
*/
private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) =
{
new CombineFileRecordReader[String, PortableDataStream](
split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader])
}
}

0 comments on commit 6379be4

Please sign in to comment.