Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into fix-drop-events
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jul 31, 2014
2 parents b12fcd7 + f68105d commit a9ec384
Show file tree
Hide file tree
Showing 33 changed files with 460 additions and 129 deletions.
43 changes: 43 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import org.apache.hadoop.mapred.InputSplit

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaSparkContext._
import org.apache.spark.api.java.function.{Function2 => JFunction2}
import org.apache.spark.rdd.HadoopRDD

@DeveloperApi
class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
extends JavaPairRDD[K, V](rdd) {

/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
@DeveloperApi
def mapPartitionsWithInputSplit[R](
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] = {
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
preservesPartitioning)(fakeClassTag))(fakeClassTag)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import org.apache.hadoop.mapreduce.InputSplit

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaSparkContext._
import org.apache.spark.api.java.function.{Function2 => JFunction2}
import org.apache.spark.rdd.NewHadoopRDD

@DeveloperApi
class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
extends JavaPairRDD[K, V](rdd) {

/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
@DeveloperApi
def mapPartitionsWithInputSplit[R](
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] = {
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
preservesPartitioning)(fakeClassTag))(fakeClassTag)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, RDD}
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}

/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
Expand Down Expand Up @@ -294,7 +294,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}

/**
Expand All @@ -314,7 +315,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat.
Expand All @@ -333,7 +335,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
Expand All @@ -351,8 +354,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path,
inputFormatClass, keyClass, valueClass))
val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass)
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}

/**
Expand All @@ -372,7 +375,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
conf: Configuration): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
val rdd = sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)
new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
}

/**
Expand All @@ -391,7 +395,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
vClass: Class[V]): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
val rdd = sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)
new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
}

/** Build the union of two or more RDDs. */
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),

// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
Expand Down Expand Up @@ -268,14 +268,17 @@ object SparkSubmit {
}
}

// Properties given with --conf are superceded by other options, but take precedence over
// properties in the defaults file.
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
sysProps.getOrElseUpdate(k, v)
}

// Spark properties included on command line take precedence
sysProps ++= args.sparkProperties

(childArgs, childClasspath, sysProps, childMainClass)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

parseOpts(args.toList)
loadDefaults()
mergeSparkProperties()
checkRequiredArguments()

/** Return default present in the currently defined defaults file. */
Expand All @@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
defaultProperties
}

/** Fill in any undefined values based on the current properties file or built-in defaults. */
private def loadDefaults(): Unit = {

/**
* Fill in any undefined values based on the default properties file or options passed in through
* the '--conf' flag.
*/
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
sys.env.get("SPARK_HOME").foreach { sparkHome =>
Expand All @@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
}

val defaultProperties = getDefaultSparkProperties
val properties = getDefaultSparkProperties
properties.putAll(sparkProperties)

// Use properties file as fallback for values which have a direct analog to
// arguments in this script.
master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
master = Option(master).getOrElse(properties.get("spark.master").orNull)
executorMemory = Option(executorMemory)
.getOrElse(defaultProperties.get("spark.executor.memory").orNull)
.getOrElse(properties.get("spark.executor.memory").orNull)
executorCores = Option(executorCores)
.getOrElse(defaultProperties.get("spark.executor.cores").orNull)
.getOrElse(properties.get("spark.executor.cores").orNull)
totalExecutorCores = Option(totalExecutorCores)
.getOrElse(defaultProperties.get("spark.cores.max").orNull)
name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
.getOrElse(properties.get("spark.cores.max").orNull)
name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)

// This supports env vars in older versions of Spark
master = Option(master).getOrElse(System.getenv("MASTER"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
conf.getInt("spark.history.updateInterval", 10)) * 1000

private val logDir = conf.get("spark.history.fs.logDirectory", null)
if (logDir == null) {
throw new IllegalArgumentException("Logging directory must be specified.")
}
private val resolvedLogDir = Option(logDir)
.map { d => Utils.resolveURI(d) }
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }

private val fs = Utils.getHadoopFileSystem(logDir)
private val fs = Utils.getHadoopFileSystem(resolvedLogDir)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
Expand Down Expand Up @@ -76,14 +76,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

private def initialize() {
// Validate the log directory.
val path = new Path(logDir)
val path = new Path(resolvedLogDir)
if (!fs.exists(path)) {
throw new IllegalArgumentException(
"Logging directory specified does not exist: %s".format(logDir))
"Logging directory specified does not exist: %s".format(resolvedLogDir))
}
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
"Logging directory specified is not a directory: %s".format(resolvedLogDir))
}

checkForLogs()
Expand All @@ -95,15 +95,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

override def getAppUI(appId: String): SparkUI = {
try {
val appLogDir = fs.getFileStatus(new Path(logDir, appId))
loadAppInfo(appLogDir, true)._2
val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
ui
} catch {
case e: FileNotFoundException => null
}
}

override def getConfig(): Map[String, String] =
Map(("Event Log Location" -> logDir))
Map("Event Log Location" -> resolvedLogDir.toString)

/**
* Builds the application list based on the current contents of the log directory.
Expand All @@ -114,14 +115,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
val logStatus = fs.listStatus(new Path(logDir))
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs.filter {
dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
val logInfos = logDirs.filter { dir =>
fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
}

val currentApps = Map[String, ApplicationHistoryInfo](
appList.map(app => (app.id -> app)):_*)
appList.map(app => app.id -> app):_*)

// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
Expand All @@ -131,7 +132,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
try {
newApps += loadAppInfo(dir, false)._1
val (app, _) = loadAppInfo(dir, renderUI = false)
newApps += app
} catch {
case e: Exception => logError(s"Failed to load app info from directory $dir.")
}
Expand Down Expand Up @@ -159,9 +161,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
*/
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
val path = logDir.getPath
val appId = path.getName
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
{ providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
if (allApps.size > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}
import org.apache.spark.util.SignalLogger

/**
* A web server that renders SparkUIs of completed applications.
Expand Down Expand Up @@ -177,7 +177,7 @@ object HistoryServer extends Logging {
def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
val args = new HistoryServerArguments(conf, argStrings)
new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)

val providerName = conf.getOption("spark.history.provider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy.history

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

/**
* Command-line parser for the master.
Expand All @@ -32,6 +31,7 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
args match {
case ("--dir" | "-d") :: value :: tail =>
logDir = value
conf.set("spark.history.fs.logDirectory", value)
parse(tail)

case ("--help" | "-h") :: tail =>
Expand All @@ -42,9 +42,6 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case _ =>
printUsageAndExit(1)
}
if (logDir != null) {
conf.set("spark.history.fs.logDirectory", logDir)
}
}

private def printUsageAndExit(exitCode: Int) {
Expand Down
Loading

0 comments on commit a9ec384

Please sign in to comment.