Skip to content

Commit

Permalink
fixing line length and output from FSDataInputStream to DataInputStre…
Browse files Browse the repository at this point in the history
…am to minimize sensitivity to Hadoop API changes
  • Loading branch information
kmader committed Oct 1, 2014
1 parent 19812a8 commit 4163e38
Showing 1 changed file with 26 additions and 33 deletions.
59 changes: 26 additions & 33 deletions core/src/main/scala/org/apache/spark/input/RawFileInput.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,24 @@
package org.apache.spark.input

import scala.collection.JavaConversions._
import com.google.common.io.{ByteStreams, Closeables}
import com.google.common.io.{ ByteStreams, Closeables }
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.fs.{FSDataInputStream, Path}
import org.apache.hadoop.fs.{ FSDataInputStream, Path }
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream}

import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream }

/**
* A general format for reading whole files in as streams, byte arrays,
* or other functions to be added
*/
abstract class StreamFileInputFormat[T]
extends CombineFileInputFormat[String,T] {
extends CombineFileInputFormat[String, T] {
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
/**
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
Expand All @@ -51,8 +50,7 @@ abstract class StreamFileInputFormat[T]
super.setMaxSplitSize(maxSplitSize)
}

def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
RecordReader[String,T]
def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String, T]

}

Expand All @@ -62,13 +60,14 @@ abstract class StreamFileInputFormat[T]
* @note TaskAttemptContext is not serializable resulting in the confBytes construct
* @note CombineFileSplit is not serializable resulting in the splitBytes construct
*/
class PortableDataStream(@transient isplit: CombineFileSplit, @transient context: TaskAttemptContext, index: Integer)
class PortableDataStream(@transient isplit: CombineFileSplit,
@transient context: TaskAttemptContext, index: Integer)
extends Serializable {
// transient forces file to be reopened after being serialization
// it is also used for non-serializable classes

@transient
private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream]
private var fileIn: DataInputStream = null.asInstanceOf[DataInputStream]
@transient
private var isOpen = false

Expand Down Expand Up @@ -111,12 +110,12 @@ class PortableDataStream(@transient isplit: CombineFileSplit, @transient context
/**
* create a new DataInputStream from the split and context
*/
def open(): FSDataInputStream = {
def open(): DataInputStream = {
if (!isOpen) {
val pathp = split.getPath(index)
val fs = pathp.getFileSystem(conf)
fileIn = fs.open(pathp)
isOpen=true
isOpen = true
}
fileIn
}
Expand All @@ -138,7 +137,7 @@ class PortableDataStream(@transient isplit: CombineFileSplit, @transient context
if (isOpen) {
try {
fileIn.close()
isOpen=false
isOpen = false
} catch {
case ioe: java.io.IOException => // do nothing
}
Expand All @@ -152,20 +151,17 @@ class PortableDataStream(@transient isplit: CombineFileSplit, @transient context
* to reading files out as streams
*/
abstract class StreamBasedRecordReader[T](
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, T] {



// True means the current file has been processed, then skip it.
private var processed = false

private var key = ""
private var value: T = null.asInstanceOf[T]


override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
override def close() = {}

Expand All @@ -175,8 +171,6 @@ abstract class StreamBasedRecordReader[T](

override def getCurrentValue = value



override def nextKeyValue = {
if (!processed) {
val fileIn = new PortableDataStream(split, context, index)
Expand All @@ -202,9 +196,9 @@ abstract class StreamBasedRecordReader[T](
* Reads the record in directly as a stream for other objects to manipulate and handle
*/
private[spark] class StreamRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends StreamBasedRecordReader[PortableDataStream](split, context, index) {

def parseStream(inStream: PortableDataStream): PortableDataStream = inStream
Expand All @@ -215,12 +209,11 @@ private[spark] class StreamRecordReader(
* BinaryRecordReader (as Byte array)
*/
private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
{
new CombineFileRecordReader[String,PortableDataStream](
split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader]
)
}
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) =
{
new CombineFileRecordReader[String, PortableDataStream](
split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader])
}
}

/**
Expand All @@ -229,10 +222,10 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
* the file as a byte array
*/
abstract class BinaryRecordReader[T](
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends StreamBasedRecordReader[T](split,context,index) {
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends StreamBasedRecordReader[T](split, context, index) {

def parseStream(inpStream: PortableDataStream): T = {
val inStream = inpStream.open()
Expand Down

0 comments on commit 4163e38

Please sign in to comment.