Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Browse files Browse the repository at this point in the history
…712_new
  • Loading branch information
witgo committed May 14, 2014
2 parents 3ea1ca1 + 5c0dafc commit 0e29eac
Show file tree
Hide file tree
Showing 92 changed files with 1,048 additions and 453 deletions.
2 changes: 1 addition & 1 deletion bagel/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=bagel/target/unit-tests.log
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n

Expand Down
5 changes: 2 additions & 3 deletions bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

rem Find the path of sbin
set BIN=%~dp0..\bin\
set SPARK_HOME=%~dp0..

cmd /V /E /C %BIN%spark-class2.cmd org.apache.spark.repl.Main %*
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-internal %* --class org.apache.spark.repl.Main
56 changes: 56 additions & 0 deletions bin/spark-submit.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
@echo off

rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

set SPARK_HOME=%~dp0..
set ORIG_ARGS=%*

rem Clear the values of all variables used
set DEPLOY_MODE=
set DRIVER_MEMORY=
set SPARK_SUBMIT_LIBRARY_PATH=
set SPARK_SUBMIT_CLASSPATH=
set SPARK_SUBMIT_OPTS=
set SPARK_DRIVER_MEMORY=

:loop
if [%1] == [] goto continue
if [%1] == [--deploy-mode] (
set DEPLOY_MODE=%2
) else if [%1] == [--driver-memory] (
set DRIVER_MEMORY=%2
) else if [%1] == [--driver-library-path] (
set SPARK_SUBMIT_LIBRARY_PATH=%2
) else if [%1] == [--driver-class-path] (
set SPARK_SUBMIT_CLASSPATH=%2
) else if [%1] == [--driver-java-options] (
set SPARK_SUBMIT_OPTS=%2
)
shift
goto loop
:continue

if [%DEPLOY_MODE%] == [] (
set DEPLOY_MODE=client
)

if not [%DRIVER_MEMORY%] == [] if [%DEPLOY_MODE%] == [client] (
set SPARK_DRIVER_MEMORY=%DRIVER_MEMORY%
)

cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%
3 changes: 2 additions & 1 deletion conf/spark-defaults.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# This is useful for setting default environmental settings.

# Example:
# spark.master spark://master:7077
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
6 changes: 3 additions & 3 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@

# Options for the daemons used in the standalone deploy mode:
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_DAEMON_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
3 changes: 2 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
<classifier>${mesos.classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down Expand Up @@ -322,7 +323,7 @@
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

/**
* Classes that represent cleaning tasks.
Expand Down Expand Up @@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning() {
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand All @@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}
} catch {
case t: Throwable => logError("Error in cleaning thread", t)
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
Expand All @@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.rddCleaned(rddId))
logInfo("Cleaned RDD " + rddId)
} catch {
case t: Throwable => logError("Error cleaning RDD " + rddId, t)
case e: Exception => logError("Error cleaning RDD " + rddId, e)
}
}

Expand All @@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
} catch {
case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
}
}

Expand All @@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.broadcastCleaned(broadcastId))
logInfo("Cleaned broadcast " + broadcastId)
} catch {
case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
| - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
| - ./spark-submit with --driver-java-options to set -X options for a driver
| - spark.executor.extraJavaOptions to set -X options for executors
| - SPARK_DAEMON_OPTS to set java options for standalone daemons (i.e. master, worker)
| - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker)
""".stripMargin
logError(error)

Expand Down
25 changes: 11 additions & 14 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,13 @@ import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

/**
* :: DeveloperApi ::
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/

@DeveloperApi
class SparkContext(config: SparkConf) extends Logging {

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
Expand Down Expand Up @@ -276,27 +274,26 @@ class SparkContext(config: SparkConf) extends Logging {
.getOrElse(512)

// Environment variables to pass to our executors.
// NOTE: This should only be used for test related settings.
private[spark] val testExecutorEnvs = HashMap[String, String]()
private[spark] val executorEnvs = HashMap[String, String]()

// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
testExecutorEnvs(envKey) = value
executorEnvs(envKey) = value
}
// The Mesos scheduler backend relies on this environment variable to set executor memory.
// TODO: Set this only in the Mesos scheduler.
testExecutorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
testExecutorEnvs ++= conf.getExecutorEnv
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
executorEnvs ++= conf.getExecutorEnv

// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
}.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
}
testExecutorEnvs("SPARK_USER") = sparkUser
executorEnvs("SPARK_USER") = sparkUser

// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
Expand Down Expand Up @@ -1494,8 +1491,8 @@ object SparkContext extends Logging {
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
Expand All @@ -1510,8 +1507,8 @@ object SparkContext extends Logging {
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand All @@ -1521,8 +1518,8 @@ object SparkContext extends Logging {
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[spark] object TestUtils {
*/
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
val tempDir = Files.createTempDir()
tempDir.deleteOnExit()
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
createJar(files, jarFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private[spark] class PythonRDD[T: ClassTag](
this.interrupt()
}

override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
SparkEnv.set(env)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
Expand Down Expand Up @@ -282,7 +282,6 @@ private[spark] object PythonRDD {
}
} catch {
case eof: EOFException => {}
case e: Throwable => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
case e: Throwable => throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[spark] object HttpBroadcast extends Logging {
private var securityManager: SecurityManager = null

// TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
private val files = new TimeStampedHashSet[String]
private val files = new TimeStampedHashSet[File]
private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
private var compressionCodec: CompressionCodec = null
private var cleaner: MetadataCleaner = null
Expand Down Expand Up @@ -173,7 +173,7 @@ private[spark] object HttpBroadcast extends Logging {
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file.getAbsolutePath
files += file
}

def read[T: ClassTag](id: Long): T = {
Expand Down Expand Up @@ -216,7 +216,7 @@ private[spark] object HttpBroadcast extends Logging {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
if (removeFromDriver) {
val file = getFile(id)
files.remove(file.toString)
files.remove(file)
deleteBroadcastFile(file)
}
}
Expand All @@ -232,7 +232,7 @@ private[spark] object HttpBroadcast extends Logging {
val (file, time) = (entry.getKey, entry.getValue)
if (time < cleanupTime) {
iterator.remove()
deleteBroadcastFile(new File(file.toString))
deleteBroadcastFile(file)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object Client {
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object SparkHadoopUtil {
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class HistoryServer(
* TODO: Add a mechanism to update manually.
*/
private val logCheckingThread = new Thread {
override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
val now = System.currentTimeMillis
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
Expand Down Expand Up @@ -154,7 +154,7 @@ class HistoryServer(
numCompletedApplications = logInfos.size

} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
case e: Exception => logError("Exception in checking for event log updates", e)
}
} else {
logWarning("Attempted to check for event log updates before binding the server.")
Expand Down Expand Up @@ -231,8 +231,8 @@ class HistoryServer(
dir.getModificationTime
}
} catch {
case t: Throwable =>
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
case e: Exception =>
logError("Exception in accessing modification time of %s".format(dir.getPath), e)
-1L
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,8 @@ private[spark] class Master(
webUi.attachSparkUI(ui)
return true
} catch {
case t: Throwable =>
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
case e: Exception =>
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e)
}
} else {
logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object DriverWrapper {
case workerUrl :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")

// Delegate to supplied main class
Expand Down
Loading

0 comments on commit 0e29eac

Please sign in to comment.