Skip to content

Commit

Permalink
Added FixedLengthBinaryInputFormat and RecordReader from freeman-lab …
Browse files Browse the repository at this point in the history
…and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile
  • Loading branch information
kmader committed Aug 13, 2014
1 parent 1622935 commit eacfaa6
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 27 deletions.
38 changes: 26 additions & 12 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,40 @@

package org.apache.spark

import scala.language.implicitConversions

import java.io._
import java.net.URI
import java.util.UUID.randomUUID
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat}
import org.apache.spark.input.{ByteInputFormat, FixedLengthBinaryInputFormat, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SimrSchedulerBackend, SparkDeploySchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.{Map, Set}
import scala.language.implicitConversions
import scala.reflect.{ClassTag, classTag}

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Expand Down Expand Up @@ -557,6 +557,20 @@ class SparkContext(config: SparkConf) extends Logging {
minPartitions).setName(path)
}

/**
* Load data from a flat binary file, assuming each record is a set of numbers
* with the specified numerical format (see ByteBuffer), and the number of
* bytes per record is constant (see FixedLengthBinaryInputFormat)
*
* @param path Directory to the input data files
* @return An RDD of data with values, RDD[(Array[Byte])]
*/
def fixedLengthBinaryFiles(path: String): RDD[Array[Byte]] = {
val lines = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path)
val data = lines.map{ case (k, v) => v.getBytes}
data
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,25 @@

package org.apache.spark.api.java

import java.io.DataInputStream
import java.util
import java.util.{Map => JMap}

import java.io.DataInputStream

import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag

import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark._
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
import org.apache.spark._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, RDD}

import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag

/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
Expand Down Expand Up @@ -214,6 +212,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path, minPartitions))

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* @see `wholeTextFiles(path: String, minPartitions: Int)`.
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))

/**
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
Expand Down Expand Up @@ -279,14 +287,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
JavaPairRDD[String,Array[Byte]] = new JavaPairRDD(sc.binaryFiles(path,minPartitions))

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
* Load data from a flat binary file, assuming each record is a set of numbers
* with the specified numerical format (see ByteBuffer), and the number of
* bytes per record is constant (see FixedLengthBinaryInputFormat)
*
* @see `wholeTextFiles(path: String, minPartitions: Int)`.
* @param path Directory to the input data files
* @return An RDD of data with values, JavaRDD[(Array[Byte])]
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))
def fixedLengthBinaryFiles(path: String): JavaRDD[Array[Byte]] = {
new JavaRDD(sc.fixedLengthBinaryFiles(path))
}

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.input

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}

/**
* Custom Input Format for reading and splitting flat binary files that contain records, each of which
* are a fixed size in bytes. The fixed record size is specified through a parameter recordLength
* in the Hadoop configuration.
*/

object FixedLengthBinaryInputFormat {

/**
* This function retrieves the recordLength by checking the configuration parameter
*
*/
def getRecordLength(context: JobContext): Int = {

// retrieve record length from configuration
context.getConfiguration.get("recordLength").toInt
}

}

class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] {


/**
* Override of isSplitable to ensure initial computation of the record length
*/
override def isSplitable(context: JobContext, filename: Path): Boolean = {

if (recordLength == -1) {
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
}
if (recordLength <= 0) {
println("record length is less than 0, file cannot be split")
false
} else {
true
}

}

/**
* This input format overrides computeSplitSize() to make sure that each split
* only contains full records. Each InputSplit passed to FixedLengthBinaryRecordReader
* will start at the first byte of a record, and the last byte will the last byte of a record.
*/
override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = {

val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize)

// If the default size is less than the length of a record, make it equal to it
// Otherwise, make sure the split size is as close to possible as the default size,
// but still contains a complete set of records, with the first record
// starting at the first byte in the split and the last record ending with the last byte

defaultSize match {
case x if x < recordLength => recordLength.toLong
case _ => (Math.floor(defaultSize / recordLength) * recordLength).toLong
}
}

/**
* Create a FixedLengthBinaryRecordReader
*/
override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
RecordReader[LongWritable, BytesWritable] = {
new FixedLengthBinaryRecordReader
}

var recordLength = -1

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.input

import java.io.IOException

import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}

/**
*
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
* It uses the record length set in FixedLengthBinaryInputFormat to
* read one record at a time from the given InputSplit.
*
* Each call to nextKeyValue() updates the LongWritable KEY and BytesWritable VALUE.
*
* KEY = record index (Long)
* VALUE = the record itself (BytesWritable)
*
*/
class FixedLengthBinaryRecordReader extends RecordReader[LongWritable, BytesWritable] {

override def close() {
if (fileInputStream != null) {
fileInputStream.close()
}
}

override def getCurrentKey: LongWritable = {
recordKey
}

override def getCurrentValue: BytesWritable = {
recordValue
}

override def getProgress: Float = {
splitStart match {
case x if x == splitEnd => 0.0.toFloat
case _ => Math.min(((currentPosition - splitStart) / (splitEnd - splitStart)).toFloat, 1.0).toFloat
}
}

override def initialize(inputSplit: InputSplit, context: TaskAttemptContext) {

// the file input
val fileSplit = inputSplit.asInstanceOf[FileSplit]

// the byte position this fileSplit starts at
splitStart = fileSplit.getStart

// splitEnd byte marker that the fileSplit ends at
splitEnd = splitStart + fileSplit.getLength

// the actual file we will be reading from
val file = fileSplit.getPath

// job configuration
val job = context.getConfiguration

// check compression
val codec = new CompressionCodecFactory(job).getCodec(file)
if (codec != null) {
throw new IOException("FixedLengthRecordReader does not support reading compressed files")
}

// get the record length
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)

// get the filesystem
val fs = file.getFileSystem(job)

// open the File
fileInputStream = fs.open(file)

// seek to the splitStart position
fileInputStream.seek(splitStart)

// set our current position
currentPosition = splitStart

}

override def nextKeyValue(): Boolean = {

if (recordKey == null) {
recordKey = new LongWritable()
}

// the key is a linear index of the record, given by the
// position the record starts divided by the record length
recordKey.set(currentPosition / recordLength)

// the recordValue to place the bytes into
if (recordValue == null) {
recordValue = new BytesWritable(new Array[Byte](recordLength))
}

// read a record if the currentPosition is less than the split end
if (currentPosition < splitEnd) {

// setup a buffer to store the record
val buffer = recordValue.getBytes

fileInputStream.read(buffer, 0, recordLength)

// update our current position
currentPosition = currentPosition + recordLength

// return true
return true
}

false
}

var splitStart: Long = 0L
var splitEnd: Long = 0L
var currentPosition: Long = 0L
var recordLength: Int = 0
var fileInputStream: FSDataInputStream = null
var recordKey: LongWritable = null
var recordValue: BytesWritable = null

}

0 comments on commit eacfaa6

Please sign in to comment.