Skip to content

Commit

Permalink
Fixed the serialization issue with PortableDataStream since neither C…
Browse files Browse the repository at this point in the history
…ombineFileSplit nor TaskAttemptContext implement the Serializable interface, by using ByteArrays for storing both and then recreating the objects from these bytearrays as needed.
  • Loading branch information
kmader committed Oct 1, 2014
1 parent 238c83c commit 19812a8
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def binaryArrays(path: String, minPartitions: Int = defaultMinPartitions):
JavaPairRDD[String, Array[Byte]] = new JavaPairRDD(sc.binaryFiles(path,minPartitions).mapValues(_.toArray()))
JavaPairRDD[String, Array[Byte]] =
new JavaPairRDD(sc.binaryFiles(path,minPartitions).mapValues(_.toArray()))

/**
* Load data from a flat binary file, assuming each record is a set of numbers
Expand Down
44 changes: 39 additions & 5 deletions core/src/main/scala/org/apache/spark/input/RawFileInput.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.input
import scala.collection.JavaConversions._
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.fs.{FSDataInputStream, Path}
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
import java.io.DataInputStream
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream}


/**
Expand Down Expand Up @@ -58,17 +59,50 @@ abstract class StreamFileInputFormat[T]
/**
* A class that allows DataStreams to be serialized and moved around by not creating them
* until they need to be read
* @note TaskAttemptContext is not serializable resulting in the confBytes construct
* @note CombineFileSplit is not serializable resulting in the splitBytes construct
*/
class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)
class PortableDataStream(@transient isplit: CombineFileSplit, @transient context: TaskAttemptContext, index: Integer)
extends Serializable {
// transient forces file to be reopened after being moved (serialization)
// transient forces file to be reopened after being serialization
// it is also used for non-serializable classes

@transient
private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream]
@transient
private var isOpen = false

private val confBytes = {
val baos = new ByteArrayOutputStream()
context.getConfiguration.write(new DataOutputStream(baos))
baos.toByteArray
}

private val splitBytes = {
val baos = new ByteArrayOutputStream()
isplit.write(new DataOutputStream(baos))
baos.toByteArray
}

@transient
private lazy val split = {
val bais = new ByteArrayInputStream(splitBytes)
val nsplit = new CombineFileSplit()
nsplit.readFields(new DataInputStream(bais))
nsplit
}

@transient
private lazy val conf = {
val bais = new ByteArrayInputStream(confBytes)
val nconf = new Configuration()
nconf.readFields(new DataInputStream(bais))
nconf
}
/**
* Calculate the path name independently of opening the file
*/
@transient
private lazy val path = {
val pathp = split.getPath(index)
pathp.toString
Expand All @@ -80,7 +114,7 @@ class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, i
def open(): FSDataInputStream = {
if (!isOpen) {
val pathp = split.getPath(index)
val fs = pathp.getFileSystem(context.getConfiguration)
val fs = pathp.getFileSystem(conf)
fileIn = fs.open(pathp)
isOpen=true
}
Expand Down Expand Up @@ -207,4 +241,4 @@ abstract class BinaryRecordReader[T](
parseByteArray(innerBuffer)
}
def parseByteArray(inArray: Array[Byte]): T
}
}
1 change: 0 additions & 1 deletion core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,6 @@ public void binaryFilesCaching() throws Exception {
readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() {
@Override
public void call(Tuple2<String, PortableDataStream> stringPortableDataStreamTuple2) throws Exception {
stringPortableDataStreamTuple2._2().getPath();
stringPortableDataStreamTuple2._2().toArray(); // force the file to read
}
});
Expand Down
37 changes: 37 additions & 0 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,43 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(indata.toArray === testOutput)
}

test("portabledatastream flatmap tests") {
sc = new SparkContext("local", "test")
val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1,2,3,4,5,6)
val numOfCopies = 3
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
// write data to file
val file = new java.io.FileOutputStream(outFile)
val channel = file.getChannel
channel.write(bbuf)
channel.close()
file.close()

val inRdd = sc.binaryFiles(outFileName)
val mappedRdd = inRdd.map{
curData: (String, PortableDataStream) =>
(curData._2.getPath(),curData._2)
}
val copyRdd = mappedRdd.flatMap{
curData: (String, PortableDataStream) =>
for(i <- 1 to numOfCopies) yield (i,curData._2)
}

val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect()

// Try reading the output back as an object file
assert(copyArr.length == numOfCopies)
copyArr.foreach{
cEntry: (Int, PortableDataStream) =>
assert(cEntry._2.toArray === testOutput)
}

}

test("fixed record length binary file as byte array") {
// a fixed length of 6 bytes

Expand Down

0 comments on commit 19812a8

Please sign in to comment.