Skip to content

Commit

Permalink
added apache headers, added datainputstream directly as an output opt…
Browse files Browse the repository at this point in the history
…ion for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api
  • Loading branch information
kmader committed Jul 31, 2014
1 parent 84035f1 commit 1cfa38a
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 108 deletions.
28 changes: 25 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{WholeTextFileInputFormat,ByteInputFormat}
import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -517,11 +517,11 @@ class SparkContext(config: SparkConf) extends Logging {
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*/
def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
new BinaryFileRDD(
new RawFileRDD(
this,
classOf[ByteInputFormat],
classOf[String],
Expand All @@ -530,6 +530,28 @@ class SparkContext(config: SparkConf) extends Logging {
minPartitions).setName(path)
}

/**
* Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file (useful for binary data)
* Care must be taken to close the files afterwards
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*/
@DeveloperApi
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, DataInputStream)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
new RawFileRDD(
this,
classOf[StreamInputFormat],
classOf[String],
classOf[DataInputStream],
updateConf,
minPartitions).setName(path)
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.api.java
import java.util
import java.util.{Map => JMap}

import java.io.DataInputStream

import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.language.implicitConversions
Expand Down Expand Up @@ -180,6 +182,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def textFile(path: String, minPartitions: Int): JavaRDD[String] =
sc.textFile(path, minPartitions)



/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
Expand Down Expand Up @@ -210,6 +214,66 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path, minPartitions))

/**
* Read a directory of binary files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,DataInputStream] =
new JavaPairRDD(sc.dataStreamFiles(path,minPartitions))

/**
* Read a directory of files as DataInputStreams from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `JavaPairRDD<String, DataInputStream> rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,Array[Byte]] =
new JavaPairRDD(sc.binaryFiles(path,minPartitions))

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
Expand Down
102 changes: 0 additions & 102 deletions core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala

This file was deleted.

Loading

0 comments on commit 1cfa38a

Please sign in to comment.