Skip to content

Commit

Permalink
added new portabledatastream to code so that it can be serialized cor…
Browse files Browse the repository at this point in the history
…rectly
  • Loading branch information
kmader committed Sep 7, 2014
1 parent f032bc0 commit 5deb79e
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 47 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, ByteInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -533,7 +533,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
*
Expand All @@ -544,15 +544,15 @@ class SparkContext(config: SparkConf) extends Logging {
*/
@DeveloperApi
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, DataInputStream)] = {
RDD[(String, PortableDataStream)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
classOf[String],
classOf[DataInputStream],
classOf[PortableDataStream],
updateConf,
minPartitions).setName(path)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{Map => JMap}
import java.io.DataInputStream

import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}

import scala.collection.JavaConversions
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -257,7 +257,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions):
JavaPairRDD[String,DataInputStream] = new JavaPairRDD(sc.dataStreamFiles(path,minPartitions))
JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.dataStreamFiles(path,minPartitions))

/**
* Read a directory of files as DataInputStream from HDFS,
Expand Down
99 changes: 65 additions & 34 deletions core/src/main/scala/org/apache/spark/input/RawFileInput.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,40 +51,66 @@ abstract class StreamFileInputFormat[T]
}

def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
RecordReader[String,T]
RecordReader[String,T]

}

/**
* A class that allows DataStreams to be serialized and moved around by not creating them
* until they need to be read
* @param split
* @param context
* @param index
*/
class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)
extends Serializable {
private var path = ""
private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream]
private var isOpen = false

def open(): FSDataInputStream= {
val pathp = split.getPath(index)
path = pathp.toString
val fs = pathp.getFileSystem(context.getConfiguration)
fileIn = fs.open(pathp)
isOpen=true
fileIn
}

def close() = {
if (isOpen) {
try {
fileIn.close()
isOpen=false
} catch {
case ioe: java.io.IOException => // do nothing
}
}
}
def getPath(): String = path
}

/**
* An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]]
* to reading files out as streams
*/
abstract class StreamBasedRecordReader[T](
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, T] {

private val path = split.getPath(index)
private val fs = path.getFileSystem(context.getConfiguration)


// True means the current file has been processed, then skip it.
private var processed = false

private val key = path.toString
private var key = ""
private var value: T = null.asInstanceOf[T]
// the file to be read when nextkeyvalue is called
private lazy val fileIn: FSDataInputStream = fs.open(path)


override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
override def close() = {
// make sure the file is closed
try {
fileIn.close()
} catch {
case ioe: java.io.IOException => // do nothing
}
}
override def close() = {}

override def getProgress = if (processed) 1.0f else 0.0f

Expand All @@ -93,10 +119,13 @@ abstract class StreamBasedRecordReader[T](
override def getCurrentValue = value



override def nextKeyValue = {
if (!processed) {

val fileIn = new PortableDataStream(split,context,index)
key = fileIn.getPath
value = parseStream(fileIn)
fileIn.close() // if it has not been open yet, close does nothing
processed = true
true
} else {
Expand All @@ -109,29 +138,29 @@ abstract class StreamBasedRecordReader[T](
* @param inStream the stream to be read in
* @return the data formatted as
*/
def parseStream(inStream: DataInputStream): T
def parseStream(inStream: PortableDataStream): T
}

/**
* Reads the record in directly as a stream for other objects to manipulate and handle
*/
private[spark] class StreamRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends StreamBasedRecordReader[DataInputStream](split,context,index) {
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends StreamBasedRecordReader[PortableDataStream](split,context,index) {

def parseStream(inStream: DataInputStream): DataInputStream = inStream
def parseStream(inStream: PortableDataStream): PortableDataStream = inStream
}

/**
* A class for extracting the information from the file using the
* BinaryRecordReader (as Byte array)
*/
private[spark] class StreamInputFormat extends StreamFileInputFormat[DataInputStream] {
private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
{
new CombineFileRecordReader[String,DataInputStream](
new CombineFileRecordReader[String,PortableDataStream](
split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader]
)
}
Expand All @@ -143,12 +172,13 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[DataInputSt
* the file as a byte array
*/
abstract class BinaryRecordReader[T](
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends StreamBasedRecordReader[T](split,context,index) {

def parseStream(inStream: DataInputStream): T = {
def parseStream(inpStream: PortableDataStream): T = {
val inStream = inpStream.open()
val innerBuffer = ByteStreams.toByteArray(inStream)
Closeables.close(inStream, false)
parseByteArray(innerBuffer)
Expand All @@ -157,13 +187,14 @@ abstract class BinaryRecordReader[T](
}



private[spark] class ByteRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends BinaryRecordReader[Array[Byte]](split,context,index) {

def parseByteArray(inArray: Array[Byte]) = inArray
override def parseByteArray(inArray: Array[Byte]) = inArray
}

/**
Expand Down
11 changes: 4 additions & 7 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,8 @@ class FileSuite extends FunSuite with LocalSparkContext {

test("binary file input as byte array") {
sc = new SparkContext("local", "test")
val outputDir = new File(tempDir).getAbsolutePath
val outFile = new File(outputDir, "record-bytestream-00000.bin")
val outFileName = outFile.toPath().toString()
val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1,2,3,4,5,6)
Expand All @@ -252,10 +251,8 @@ class FileSuite extends FunSuite with LocalSparkContext {
// a fixed length of 6 bytes

sc = new SparkContext("local", "test")

val outputDir = new File(tempDir).getAbsolutePath
val outFile = new File(outputDir, "record-bytestream-00000.bin")
val outFileName = outFile.toPath().toString()
val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1,2,3,4,5,6)
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
<module>bagel</module>
<module>graphx</module>
<module>mllib</module>
<module>imglib</module>
<module>tools</module>
<module>streaming</module>
<module>sql/catalyst</module>
Expand Down Expand Up @@ -743,6 +744,10 @@
<artifactId>jackson-mapper-asl</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>io.scif</groupId>
<artifactId>pom-scifio</artifactId>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down

0 comments on commit 5deb79e

Please sign in to comment.