Skip to content

Commit

Permalink
changing the line lengths to make jenkins happy
Browse files Browse the repository at this point in the history
  • Loading branch information
kmader committed Jul 31, 2014
1 parent 1cfa38a commit 1622935
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 40 deletions.
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class SparkContext(config: SparkConf) extends Logging {
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
executorEnvs("SPARK_PREPEND_CLASSES") = v
}
// The Mesos scheduler backend relies on this environment variable to set executor memory.
Expand Down Expand Up @@ -511,13 +511,15 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* Get an RDD for a Hadoop-readable dataset as byte-streams for each file (useful for binary data)
* Get an RDD for a Hadoop-readable dataset as byte-streams for each file
* (useful for binary data)
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*/
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, Array[Byte])] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
Expand All @@ -531,15 +533,18 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file (useful for binary data)
* Care must be taken to close the files afterwards
* Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file
* (useful for binary data)
*
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Care must be taken to close the files afterwards
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*/
@DeveloperApi
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, DataInputStream)] = {
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, DataInputStream)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
Expand Down Expand Up @@ -1250,7 +1255,7 @@ class SparkContext(config: SparkConf) extends Logging {
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
* if not.
*
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,10 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.wholeTextFiles(path, minPartitions))

/**
* Read a directory of binary files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
* the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
Expand All @@ -227,7 +228,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
* Do
* `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
Expand All @@ -241,13 +243,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,DataInputStream] =
new JavaPairRDD(sc.dataStreamFiles(path,minPartitions))
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions):
JavaPairRDD[String,DataInputStream] = new JavaPairRDD(sc.dataStreamFiles(path,minPartitions))

/**
* Read a directory of files as DataInputStreams from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
* Read a directory of files as DataInputStream from HDFS,
* a local file system (available on all nodes), or any Hadoop-supported file system URI
* as a byte array. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each.
*
* <p> For example, if you have the following files:
* {{{
Expand All @@ -257,7 +260,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `JavaPairRDD<String, DataInputStream> rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`,
* Do
* `JavaPairRDD<String,DataInputStream> rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
Expand All @@ -271,8 +275,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,Array[Byte]] =
new JavaPairRDD(sc.binaryFiles(path,minPartitions))
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
JavaPairRDD[String,Array[Byte]] = new JavaPairRDD(sc.binaryFiles(path,minPartitions))

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/scala/org/apache/spark/input/RawFileInput.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import java.io.DataInputStream


/**
* A general format for reading whole files in as streams, byte arrays, or other functions to be added
* A general format for reading whole files in as streams, byte arrays,
* or other functions to be added
*/
abstract class StreamFileInputFormat[T]
extends CombineFileInputFormat[String,T] {
Expand All @@ -49,12 +50,14 @@ abstract class StreamFileInputFormat[T]
super.setMaxSplitSize(maxSplitSize)
}

def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T]
def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
RecordReader[String,T]

}

/**
* An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] to reading files out as streams
* An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]]
* to reading files out as streams
*/
abstract class StreamBasedRecordReader[T](
split: CombineFileSplit,
Expand Down Expand Up @@ -111,17 +114,20 @@ class StreamRecordReader(
}

/**
* A class for extracting the information from the file using the BinaryRecordReader (as Byte array)
* A class for extracting the information from the file using the
* BinaryRecordReader (as Byte array)
*/
class StreamInputFormat extends StreamFileInputFormat[DataInputStream] {
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
{
new CombineFileRecordReader[String,DataInputStream](split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader])
new CombineFileRecordReader[String,DataInputStream](
split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader]
)
}
}

/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole binary file
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single binary file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file as a byte array
*/
Expand Down Expand Up @@ -150,12 +156,14 @@ class ByteRecordReader(
}

/**
* A class for extracting the information from the file using the BinaryRecordReader (as Byte array)
* A class for reading the file using the BinaryRecordReader (as Byte array)
*/
class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] {
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
{
new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader])
new CombineFileRecordReader[String,Array[Byte]](
split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader]
)
}
}

Expand Down
17 changes: 4 additions & 13 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,10 @@ package org.apache.spark.rdd
/** Allows better control of the partitioning
*
*/
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.InterruptibleIterator
import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}

import org.apache.spark.{Partition, SparkContext}
import org.apache.spark.input.StreamFileInputFormat

private[spark] class RawFileRDD[T](
Expand All @@ -58,7 +47,9 @@ private[spark] class RawFileRDD[T](
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
result(i) = new NewHadoopPartition(
id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]
)
}
result
}
Expand Down

0 comments on commit 1622935

Please sign in to comment.