Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ldaonline
Browse files Browse the repository at this point in the history
s
Conflicts:
	mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
  • Loading branch information
hhbyyh committed Feb 6, 2015
2 parents d640d9c + 6d3b7cb commit 043e786
Show file tree
Hide file tree
Showing 63 changed files with 2,110 additions and 927 deletions.
10 changes: 5 additions & 5 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ install_app() {
local binary="${_DIR}/$3"

# setup `curl` and `wget` silent options if we're running on Jenkins
local curl_opts=""
local curl_opts="-L"
local wget_opts=""
if [ -n "$AMPLAB_JENKINS" ]; then
curl_opts="-s"
wget_opts="--quiet"
curl_opts="-s ${curl_opts}"
wget_opts="--quiet ${wget_opts}"
else
curl_opts="--progress-bar"
wget_opts="--progress=bar:force"
curl_opts="--progress-bar ${curl_opts}"
wget_opts="--progress=bar:force ${wget_opts}"
fi

if [ -z "$3" -o ! -f "$binary" ]; then
Expand Down
7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@
<artifactId>jetty-servlet</artifactId>
<scope>compile</scope>
</dependency>
<!-- Because we mark jetty as provided and shade it, its dependency
orbit is ignored, so we explicitly list it here (see SPARK-5557).-->
<dependency>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
<version>${orbit.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
68 changes: 56 additions & 12 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,37 @@ import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID.randomUUID

import scala.collection.{Map, Set}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}

import akka.actor.Props

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}

import org.apache.mesos.MesosNativeLibrary
import akka.actor.Props

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
Expand Down Expand Up @@ -1016,12 +1024,48 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*/
def addFile(path: String) {
def addFile(path: String): Unit = {
addFile(path, false)
}

/**
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*
* A directory can be given if the recursive option is set to true. Currently directories are only
* supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
case "local" => "file:" + uri.getPath
case _ => path
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => "file:" + uri.getPath
case _ => path
}

val hadoopPath = new Path(schemeCorrectedPath)
val scheme = new URI(schemeCorrectedPath).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
val isDir = fs.isDirectory(hadoopPath)
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
}
if (!recursive && isDir) {
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
"turned on.")
}
}

val key = if (!isLocal && scheme == "file") {
env.httpFileServer.addFile(new File(uri.getPath))
} else {
schemeCorrectedPath
}
val timestamp = System.currentTimeMillis
addedFiles(key) = timestamp
Expand Down Expand Up @@ -1633,8 +1677,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails =
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
addedFilePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
Expand Down Expand Up @@ -191,6 +191,21 @@ class SparkHadoopUtil extends Logging {
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}

/**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
def recurse(path: Path) = {
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
}

val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}
}

object SparkHadoopUtil {
Expand Down
88 changes: 73 additions & 15 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,10 @@ private[spark] object Utils extends Logging {
}

/**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
* filesystems.
*
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
* across executors running the same application. `useCache` is used mainly for
Expand Down Expand Up @@ -456,17 +458,18 @@ private[spark] object Utils extends Logging {
*
* @param url URL that `sourceFile` originated from, for logging purposes.
* @param in InputStream to download.
* @param tempFile File path to download `in` to.
* @param destFile File path to move `tempFile` to.
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
* `sourceFile`
*/
private def downloadFile(
url: String,
in: InputStream,
tempFile: File,
destFile: File,
fileOverwrite: Boolean): Unit = {
val tempFile = File.createTempFile("fetchFileTemp", null,
new File(destFile.getParentFile.getAbsolutePath))
logInfo(s"Fetching $url to $tempFile")

try {
val out = new FileOutputStream(tempFile)
Expand Down Expand Up @@ -505,7 +508,7 @@ private[spark] object Utils extends Logging {
removeSourceFile: Boolean = false): Unit = {

if (destFile.exists) {
if (!Files.equal(sourceFile, destFile)) {
if (!filesEqualRecursive(sourceFile, destFile)) {
if (fileOverwrite) {
logInfo(
s"File $destFile exists and does not match contents of $url, replacing it with $url"
Expand Down Expand Up @@ -540,13 +543,44 @@ private[spark] object Utils extends Logging {
Files.move(sourceFile, destFile)
} else {
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
Files.copy(sourceFile, destFile)
copyRecursive(sourceFile, destFile)
}
}

private def filesEqualRecursive(file1: File, file2: File): Boolean = {
if (file1.isDirectory && file2.isDirectory) {
val subfiles1 = file1.listFiles()
val subfiles2 = file2.listFiles()
if (subfiles1.size != subfiles2.size) {
return false
}
subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).forall {
case (f1, f2) => filesEqualRecursive(f1, f2)
}
} else if (file1.isFile && file2.isFile) {
Files.equal(file1, file2)
} else {
false
}
}

private def copyRecursive(source: File, dest: File): Unit = {
if (source.isDirectory) {
if (!dest.mkdir()) {
throw new IOException(s"Failed to create directory ${dest.getPath}")
}
val subfiles = source.listFiles()
subfiles.foreach(f => copyRecursive(f, new File(dest, f.getName)))
} else {
Files.copy(source, dest)
}
}

/**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
* filesystems.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
Expand All @@ -558,14 +592,11 @@ private[spark] object Utils extends Logging {
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration) {
val tempFile = File.createTempFile("fetchFileTemp", null, new File(targetDir.getAbsolutePath))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
Option(uri.getScheme).getOrElse("file") match {
case "http" | "https" | "ftp" =>
logInfo("Fetching " + url + " to " + tempFile)

var uc: URLConnection = null
if (securityMgr.isAuthenticationEnabled()) {
logDebug("fetchFile with security enabled")
Expand All @@ -583,17 +614,44 @@ private[spark] object Utils extends Logging {
uc.setReadTimeout(timeout)
uc.connect()
val in = uc.getInputStream()
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
downloadFile(url, in, targetFile, fileOverwrite)
case "file" =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
copyFile(url, sourceFile, targetFile, fileOverwrite)
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
val path = new Path(uri)
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
}
}

/**
* Fetch a file or directory from a Hadoop-compatible filesystem.
*
* Visible for testing
*/
private[spark] def fetchHcfsFile(
path: Path,
targetDir: File,
fs: FileSystem,
conf: SparkConf,
hadoopConf: Configuration,
fileOverwrite: Boolean): Unit = {
if (!targetDir.mkdir()) {
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
}
fs.listStatus(path).foreach { fileStatus =>
val innerPath = fileStatus.getPath
if (fileStatus.isDir) {
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
fileOverwrite)
} else {
val in = fs.open(innerPath)
val targetFile = new File(targetDir, innerPath.getName)
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
}
}
}

Expand Down
Loading

0 comments on commit 043e786

Please sign in to comment.