Skip to content

Commit

Permalink
added new tests, renamed files, fixed several of the javaapi function…
Browse files Browse the repository at this point in the history
…s, formatted code more nicely
  • Loading branch information
kmader committed Oct 20, 2014
1 parent a32fef7 commit 92bda0d
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
def binaryFiles(path: String, minPartitions: Int):
JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,minPartitions))

def binaryFiles(path: String):
JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,defaultMinPartitions))

/**
* Read a directory of files as DataInputStream from HDFS,
* a local file system (available on all nodes), or any Hadoop-supported file system URI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt
*/

private[spark] object FixedLengthBinaryInputFormat {

/**
* This function retrieves the recordLength by checking the configuration parameter
*
Expand All @@ -39,13 +38,10 @@ private[spark] object FixedLengthBinaryInputFormat {
// retrieve record length from configuration
context.getConfiguration.get("recordLength").toInt
}

}

private[spark] class FixedLengthBinaryInputFormat
extends FileInputFormat[LongWritable, BytesWritable] {


/**
* Override of isSplitable to ensure initial computation of the record length
*/
Expand All @@ -60,7 +56,6 @@ private[spark] class FixedLengthBinaryInputFormat
} else {
true
}

}

/**
Expand All @@ -69,14 +64,11 @@ private[spark] class FixedLengthBinaryInputFormat
* will start at the first byte of a record, and the last byte will the last byte of a record.
*/
override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = {

val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize)

// If the default size is less than the length of a record, make it equal to it
// Otherwise, make sure the split size is as close to possible as the default size,
// but still contains a complete set of records, with the first record
// starting at the first byte in the split and the last record ending with the last byte

if (defaultSize < recordLength) {
recordLength.toLong
} else {
Expand All @@ -91,7 +83,5 @@ private[spark] class FixedLengthBinaryInputFormat
RecordReader[LongWritable, BytesWritable] = {
new FixedLengthBinaryRecordReader
}

var recordLength = -1

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,66 +76,48 @@ private[spark] class FixedLengthBinaryRecordReader

// the actual file we will be reading from
val file = fileSplit.getPath

// job configuration
val job = context.getConfiguration

// check compression
val codec = new CompressionCodecFactory(job).getCodec(file)
if (codec != null) {
throw new IOException("FixedLengthRecordReader does not support reading compressed files")
}

// get the record length
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)

// get the filesystem
val fs = file.getFileSystem(job)

// open the File
fileInputStream = fs.open(file)

// seek to the splitStart position
fileInputStream.seek(splitStart)

// set our current position
currentPosition = splitStart

}

override def nextKeyValue(): Boolean = {

if (recordKey == null) {
recordKey = new LongWritable()
}

// the key is a linear index of the record, given by the
// position the record starts divided by the record length
recordKey.set(currentPosition / recordLength)

// the recordValue to place the bytes into
if (recordValue == null) {
recordValue = new BytesWritable(new Array[Byte](recordLength))
}

// read a record if the currentPosition is less than the split end
if (currentPosition < splitEnd) {

// setup a buffer to store the record
val buffer = recordValue.getBytes

fileInputStream.read(buffer, 0, recordLength)

// update our current position
currentPosition = currentPosition + recordLength

// return true
return true
}

false
}

var splitStart: Long = 0L
var splitEnd: Long = 0L
var currentPosition: Long = 0L
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.spark.rdd

/** Allows better control of the partitioning
*
*/
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, SparkContext}
import org.apache.spark.input.StreamFileInputFormat
import org.apache.spark.{Partition, SparkContext}

private[spark] class BinaryFileRDD[T](
sc : SparkContext,
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,6 @@ public void binaryFiles() throws Exception {
// Reusing the wholeText files example
byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");


String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");

Expand All @@ -866,7 +865,6 @@ public void binaryFilesCaching() throws Exception {
// Reusing the wholeText files example
byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");


String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");

Expand All @@ -877,7 +875,7 @@ public void binaryFilesCaching() throws Exception {
channel1.write(bbuf);
channel1.close();

JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName,3).cache();
JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() {
@Override
public void call(Tuple2<String, PortableDataStream> stringPortableDataStreamTuple2) throws Exception {
Expand Down
60 changes: 60 additions & 0 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.io.{File, FileWriter}

import org.apache.spark.input.PortableDataStream
import org.apache.spark.storage.StorageLevel

import scala.io.Source

Expand Down Expand Up @@ -280,6 +281,37 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(indata.toArray === testOutput)
}

test("portabledatastream persist disk storage") {
sc = new SparkContext("local", "test")
val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1,2,3,4,5,6)
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
// write data to file
val file = new java.io.FileOutputStream(outFile)
val channel = file.getChannel
channel.write(bbuf)
channel.close()
file.close()

val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY)
inRdd.foreach{
curData: (String, PortableDataStream) =>
curData._2.toArray() // force the file to read
}
val mappedRdd = inRdd.map{
curData: (String, PortableDataStream) =>
(curData._2.getPath(),curData._2)
}
val (infile: String, indata: PortableDataStream) = mappedRdd.first

// Try reading the output back as an object file

assert(indata.toArray === testOutput)
}

test("portabledatastream flatmap tests") {
sc = new SparkContext("local", "test")
val outFile = new File(tempDir, "record-bytestream-00000.bin")
Expand Down Expand Up @@ -348,6 +380,34 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(indata === testOutput)
}

test ("negative binary record length should raise an exception") {
// a fixed length of 6 bytes
sc = new SparkContext("local", "test")

val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1,2,3,4,5,6)
val testOutputCopies = 10

// write data to file
val file = new java.io.FileOutputStream(outFile)
val channel = file.getChannel
for(i <- 1 to testOutputCopies) {
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
channel.write(bbuf)
}
channel.close()
file.close()

val inRdd = sc.binaryRecords(outFileName, -1)

intercept[SparkException] {
inRdd.count
}
}

test("file caching") {
sc = new SparkContext("local", "test")
val out = new FileWriter(tempDir + "/input")
Expand Down

0 comments on commit 92bda0d

Please sign in to comment.