diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index aa0f8e4ce0101..4fb226dee9f09 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -289,7 +289,7 @@ class SparkContext(config: SparkConf) extends Logging { value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } - Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => + Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => executorEnvs("SPARK_PREPEND_CLASSES") = v } // The Mesos scheduler backend relies on this environment variable to set executor memory. @@ -511,13 +511,15 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Get an RDD for a Hadoop-readable dataset as byte-streams for each file (useful for binary data) + * Get an RDD for a Hadoop-readable dataset as byte-streams for each file + * (useful for binary data) * * @param minPartitions A suggestion value of the minimal splitting number for input data. * * @note Small files are preferred, large file is also allowable, but may cause bad performance. */ - def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = { + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, Array[Byte])] = { val job = new NewHadoopJob(hadoopConfiguration) NewFileInputFormat.addInputPath(job, new Path(path)) val updateConf = job.getConfiguration @@ -531,15 +533,18 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file (useful for binary data) - * Care must be taken to close the files afterwards + * Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file + * (useful for binary data) + * * * @param minPartitions A suggestion value of the minimal splitting number for input data. * + * @note Care must be taken to close the files afterwards * @note Small files are preferred, large file is also allowable, but may cause bad performance. */ @DeveloperApi - def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, DataInputStream)] = { + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, DataInputStream)] = { val job = new NewHadoopJob(hadoopConfiguration) NewFileInputFormat.addInputPath(job, new Path(path)) val updateConf = job.getConfiguration @@ -1250,7 +1255,7 @@ class SparkContext(config: SparkConf) extends Logging { * If checkSerializable is set, clean will also proactively * check to see if f is serializable and throw a SparkException * if not. - * + * * @param f the closure to clean * @param checkSerializable whether or not to immediately check f for serializability * @throws SparkException if checkSerializable is set but f is not diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 7690d009fa2b7..98c84a779b9da 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -215,9 +215,10 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.wholeTextFiles(path, minPartitions)) /** - * Read a directory of binary files from HDFS, a local file system (available on all nodes), or any - * Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a - * key-value pair, where the key is the path of each file, the value is the content of each file. + * Read a directory of binary files from HDFS, a local file system (available on all nodes), + * or any Hadoop-supported file system URI as a byte array. Each file is read as a single + * record and returned in a key-value pair, where the key is the path of each file, + * the value is the content of each file. * *

For example, if you have the following files: * {{{ @@ -227,7 +228,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * hdfs://a-hdfs-path/part-nnnnn * }}} * - * Do `JavaPairRDD rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, + * Do + * `JavaPairRDD rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, * *

then `rdd` contains * {{{ @@ -241,13 +243,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,DataInputStream] = - new JavaPairRDD(sc.dataStreamFiles(path,minPartitions)) + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): + JavaPairRDD[String,DataInputStream] = new JavaPairRDD(sc.dataStreamFiles(path,minPartitions)) /** - * Read a directory of files as DataInputStreams from HDFS, a local file system (available on all nodes), or any - * Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a - * key-value pair, where the key is the path of each file, the value is the content of each file. + * Read a directory of files as DataInputStream from HDFS, + * a local file system (available on all nodes), or any Hadoop-supported file system URI + * as a byte array. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each. * *

For example, if you have the following files: * {{{ @@ -257,7 +260,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * hdfs://a-hdfs-path/part-nnnnn * }}} * - * Do `JavaPairRDD rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`, + * Do + * `JavaPairRDD rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`, * *

then `rdd` contains * {{{ @@ -271,8 +275,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,Array[Byte]] = - new JavaPairRDD(sc.binaryFiles(path,minPartitions)) + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + JavaPairRDD[String,Array[Byte]] = new JavaPairRDD(sc.binaryFiles(path,minPartitions)) /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any diff --git a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala index 3f299508fe790..eca82184f587f 100644 --- a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala +++ b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala @@ -31,7 +31,8 @@ import java.io.DataInputStream /** - * A general format for reading whole files in as streams, byte arrays, or other functions to be added + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added */ abstract class StreamFileInputFormat[T] extends CombineFileInputFormat[String,T] { @@ -49,12 +50,14 @@ abstract class StreamFileInputFormat[T] super.setMaxSplitSize(maxSplitSize) } - def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T] + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] } /** - * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] to reading files out as streams + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] + * to reading files out as streams */ abstract class StreamBasedRecordReader[T]( split: CombineFileSplit, @@ -111,17 +114,20 @@ class StreamRecordReader( } /** - * A class for extracting the information from the file using the BinaryRecordReader (as Byte array) + * A class for extracting the information from the file using the + * BinaryRecordReader (as Byte array) */ class StreamInputFormat extends StreamFileInputFormat[DataInputStream] { override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= { - new CombineFileRecordReader[String,DataInputStream](split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader]) + new CombineFileRecordReader[String,DataInputStream]( + split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader] + ) } } /** - * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole binary file + * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single binary file * out in a key-value pair, where the key is the file path and the value is the entire content of * the file as a byte array */ @@ -150,12 +156,14 @@ class ByteRecordReader( } /** - * A class for extracting the information from the file using the BinaryRecordReader (as Byte array) + * A class for reading the file using the BinaryRecordReader (as Byte array) */ class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] { override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= { - new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader]) + new CombineFileRecordReader[String,Array[Byte]]( + split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader] + ) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index e0bacb6dc8db1..7c31e2b50ab75 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -20,21 +20,10 @@ package org.apache.spark.rdd /** Allows better control of the partitioning * */ -import java.text.SimpleDateFormat -import java.util.Date - import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.input.WholeTextFileInputFormat -import org.apache.spark.InterruptibleIterator -import org.apache.spark.Logging -import org.apache.spark.Partition -import org.apache.spark.SerializableWritable -import org.apache.spark.{SparkContext, TaskContext} - +import org.apache.spark.{Partition, SparkContext} import org.apache.spark.input.StreamFileInputFormat private[spark] class RawFileRDD[T]( @@ -58,7 +47,9 @@ private[spark] class RawFileRDD[T]( val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { - result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + result(i) = new NewHadoopPartition( + id, i, rawSplits(i).asInstanceOf[InputSplit with Writable] + ) } result }