From eacfaa6ca95dd059b943524df9ae037cbce724ed Mon Sep 17 00:00:00 2001 From: Kevin Mader Date: Wed, 13 Aug 2014 16:06:04 +0200 Subject: [PATCH] Added FixedLengthBinaryInputFormat and RecordReader from freeman-lab and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile --- .../scala/org/apache/spark/SparkContext.scala | 38 +++-- .../spark/api/java/JavaSparkContext.scala | 40 +++-- .../input/FixedLengthBinaryInputFormat.scala | 95 ++++++++++++ .../input/FixedLengthBinaryRecordReader.scala | 144 ++++++++++++++++++ 4 files changed, 290 insertions(+), 27 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala create mode 100644 core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4fb226dee9f09..41c0e98ec1d10 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -17,40 +17,40 @@ package org.apache.spark -import scala.language.implicitConversions - import java.io._ import java.net.URI +import java.util.UUID.randomUUID import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, UUID} -import java.util.UUID.randomUUID -import scala.collection.{Map, Set} -import scala.collection.JavaConversions._ -import scala.collection.generic.Growable -import scala.collection.mutable.HashMap -import scala.reflect.{ClassTag, classTag} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat} -import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.mesos.MesosNativeLibrary - import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat} +import org.apache.spark.input.{ByteInputFormat, FixedLengthBinaryInputFormat, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SimrSchedulerBackend, SparkDeploySchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} +import scala.collection.JavaConversions._ +import scala.collection.generic.Growable +import scala.collection.mutable.HashMap +import scala.collection.{Map, Set} +import scala.language.implicitConversions +import scala.reflect.{ClassTag, classTag} + /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. @@ -557,6 +557,20 @@ class SparkContext(config: SparkConf) extends Logging { minPartitions).setName(path) } + /** + * Load data from a flat binary file, assuming each record is a set of numbers + * with the specified numerical format (see ByteBuffer), and the number of + * bytes per record is constant (see FixedLengthBinaryInputFormat) + * + * @param path Directory to the input data files + * @return An RDD of data with values, RDD[(Array[Byte])] + */ + def fixedLengthBinaryFiles(path: String): RDD[Array[Byte]] = { + val lines = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path) + val data = lines.map{ case (k, v) => v.getBytes} + data + } + /** * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable), diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 98c84a779b9da..9eab7afed6505 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -17,27 +17,25 @@ package org.apache.spark.api.java +import java.io.DataInputStream import java.util import java.util.{Map => JMap} -import java.io.DataInputStream - -import scala.collection.JavaConversions -import scala.collection.JavaConversions._ -import scala.language.implicitConversions -import scala.reflect.ClassTag - import com.google.common.base.Optional import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.{InputFormat, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} - -import org.apache.spark._ import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam} +import org.apache.spark._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, RDD} +import scala.collection.JavaConversions +import scala.collection.JavaConversions._ +import scala.language.implicitConversions +import scala.reflect.ClassTag + /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. @@ -214,6 +212,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path, minPartitions)) + /** + * Read a directory of text files from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + * @see `wholeTextFiles(path: String, minPartitions: Int)`. + */ + def wholeTextFiles(path: String): JavaPairRDD[String, String] = + new JavaPairRDD(sc.wholeTextFiles(path)) + /** * Read a directory of binary files from HDFS, a local file system (available on all nodes), * or any Hadoop-supported file system URI as a byte array. Each file is read as a single @@ -279,14 +287,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork JavaPairRDD[String,Array[Byte]] = new JavaPairRDD(sc.binaryFiles(path,minPartitions)) /** - * Read a directory of text files from HDFS, a local file system (available on all nodes), or any - * Hadoop-supported file system URI. Each file is read as a single record and returned in a - * key-value pair, where the key is the path of each file, the value is the content of each file. + * Load data from a flat binary file, assuming each record is a set of numbers + * with the specified numerical format (see ByteBuffer), and the number of + * bytes per record is constant (see FixedLengthBinaryInputFormat) * - * @see `wholeTextFiles(path: String, minPartitions: Int)`. + * @param path Directory to the input data files + * @return An RDD of data with values, JavaRDD[(Array[Byte])] */ - def wholeTextFiles(path: String): JavaPairRDD[String, String] = - new JavaPairRDD(sc.wholeTextFiles(path)) + def fixedLengthBinaryFiles(path: String): JavaRDD[Array[Byte]] = { + new JavaRDD(sc.fixedLengthBinaryFiles(path)) + } /** Get an RDD for a Hadoop SequenceFile with given key and value types. * diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala new file mode 100644 index 0000000000000..4292a63d7f301 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{BytesWritable, LongWritable} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} + +/** + * Custom Input Format for reading and splitting flat binary files that contain records, each of which + * are a fixed size in bytes. The fixed record size is specified through a parameter recordLength + * in the Hadoop configuration. + */ + +object FixedLengthBinaryInputFormat { + + /** + * This function retrieves the recordLength by checking the configuration parameter + * + */ + def getRecordLength(context: JobContext): Int = { + + // retrieve record length from configuration + context.getConfiguration.get("recordLength").toInt + } + +} + +class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] { + + + /** + * Override of isSplitable to ensure initial computation of the record length + */ + override def isSplitable(context: JobContext, filename: Path): Boolean = { + + if (recordLength == -1) { + recordLength = FixedLengthBinaryInputFormat.getRecordLength(context) + } + if (recordLength <= 0) { + println("record length is less than 0, file cannot be split") + false + } else { + true + } + + } + + /** + * This input format overrides computeSplitSize() to make sure that each split + * only contains full records. Each InputSplit passed to FixedLengthBinaryRecordReader + * will start at the first byte of a record, and the last byte will the last byte of a record. + */ + override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = { + + val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize) + + // If the default size is less than the length of a record, make it equal to it + // Otherwise, make sure the split size is as close to possible as the default size, + // but still contains a complete set of records, with the first record + // starting at the first byte in the split and the last record ending with the last byte + + defaultSize match { + case x if x < recordLength => recordLength.toLong + case _ => (Math.floor(defaultSize / recordLength) * recordLength).toLong + } + } + + /** + * Create a FixedLengthBinaryRecordReader + */ + override def createRecordReader(split: InputSplit, context: TaskAttemptContext): + RecordReader[LongWritable, BytesWritable] = { + new FixedLengthBinaryRecordReader + } + + var recordLength = -1 + +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala new file mode 100644 index 0000000000000..be61617898a7d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import java.io.IOException + +import org.apache.hadoop.fs.FSDataInputStream +import org.apache.hadoop.io.compress.CompressionCodecFactory +import org.apache.hadoop.io.{BytesWritable, LongWritable} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} + +/** + * + * FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat. + * It uses the record length set in FixedLengthBinaryInputFormat to + * read one record at a time from the given InputSplit. + * + * Each call to nextKeyValue() updates the LongWritable KEY and BytesWritable VALUE. + * + * KEY = record index (Long) + * VALUE = the record itself (BytesWritable) + * + */ +class FixedLengthBinaryRecordReader extends RecordReader[LongWritable, BytesWritable] { + + override def close() { + if (fileInputStream != null) { + fileInputStream.close() + } + } + + override def getCurrentKey: LongWritable = { + recordKey + } + + override def getCurrentValue: BytesWritable = { + recordValue + } + + override def getProgress: Float = { + splitStart match { + case x if x == splitEnd => 0.0.toFloat + case _ => Math.min(((currentPosition - splitStart) / (splitEnd - splitStart)).toFloat, 1.0).toFloat + } + } + + override def initialize(inputSplit: InputSplit, context: TaskAttemptContext) { + + // the file input + val fileSplit = inputSplit.asInstanceOf[FileSplit] + + // the byte position this fileSplit starts at + splitStart = fileSplit.getStart + + // splitEnd byte marker that the fileSplit ends at + splitEnd = splitStart + fileSplit.getLength + + // the actual file we will be reading from + val file = fileSplit.getPath + + // job configuration + val job = context.getConfiguration + + // check compression + val codec = new CompressionCodecFactory(job).getCodec(file) + if (codec != null) { + throw new IOException("FixedLengthRecordReader does not support reading compressed files") + } + + // get the record length + recordLength = FixedLengthBinaryInputFormat.getRecordLength(context) + + // get the filesystem + val fs = file.getFileSystem(job) + + // open the File + fileInputStream = fs.open(file) + + // seek to the splitStart position + fileInputStream.seek(splitStart) + + // set our current position + currentPosition = splitStart + + } + + override def nextKeyValue(): Boolean = { + + if (recordKey == null) { + recordKey = new LongWritable() + } + + // the key is a linear index of the record, given by the + // position the record starts divided by the record length + recordKey.set(currentPosition / recordLength) + + // the recordValue to place the bytes into + if (recordValue == null) { + recordValue = new BytesWritable(new Array[Byte](recordLength)) + } + + // read a record if the currentPosition is less than the split end + if (currentPosition < splitEnd) { + + // setup a buffer to store the record + val buffer = recordValue.getBytes + + fileInputStream.read(buffer, 0, recordLength) + + // update our current position + currentPosition = currentPosition + recordLength + + // return true + return true + } + + false + } + + var splitStart: Long = 0L + var splitEnd: Long = 0L + var currentPosition: Long = 0L + var recordLength: Int = 0 + var fileInputStream: FSDataInputStream = null + var recordKey: LongWritable = null + var recordValue: BytesWritable = null + +} \ No newline at end of file