Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-6939
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed May 2, 2015
2 parents 74307cf + 38d4e9e commit 203605d
Show file tree
Hide file tree
Showing 125 changed files with 3,705 additions and 1,028 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ TAGS
RELEASE
control
docs
docker.properties.template
fairscheduler.xml.template
spark-defaults.conf.template
log4j.properties
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ rem

set SPARK_HOME=%~dp0..

echo "%*" | findstr " --help -h" >nul
echo "%*" | findstr " \<--help\> \<-h\>" >nul
if %ERRORLEVEL% equ 0 (
call :usage
exit /b 0
Expand Down
3 changes: 3 additions & 0 deletions conf/docker.properties.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
spark.mesos.executor.docker.image: <image built from `../docker/spark-mesos/Dockerfile`>
spark.mesos.executor.docker.volumes: /usr/local/lib:/host/usr/local/lib:ro
spark.mesos.executor.home: /opt/spark
101 changes: 49 additions & 52 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,20 @@ import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
*
* The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
* the scheduler queue is not drained in N seconds, then new executors are added. If the queue
* persists for another M seconds, then more executors are added and so on. The number added
* in each round increases exponentially from the previous round until an upper bound on the
* number of executors has been reached. The upper bound is based both on a configured property
* and on the number of tasks pending: the policy will never increase the number of executor
* requests past the number needed to handle all pending tasks.
* The ExecutorAllocationManager maintains a moving target number of executors which is periodically
* synced to the cluster manager. The target starts at a configured initial value and changes with
* the number of pending and running tasks.
*
* Decreasing the target number of executors happens when the current target is more than needed to
* handle the current load. The target number of executors is always truncated to the number of
* executors that could run all current running and pending tasks at once.
*
* Increasing the target number of executors happens in response to backlogged tasks waiting to be
* scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
* the queue persists for another M seconds, then more executors are added and so on. The number
* added in each round increases exponentially from the previous round until an upper bound has been
* reached. The upper bound is based both on a configured property and on the current number of
* running and pending tasks, as described above.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
Expand Down Expand Up @@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager(
// Number of executors to add in the next round
private var numExecutorsToAdd = 1

// Number of executors that have been requested but have not registered yet
private var numExecutorsPending = 0
// The desired number of executors at this moment in time. If all our executors were to die, this
// is the number of executors we would immediately want from the cluster manager.
private var numExecutorsTarget =
conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)

// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
Expand Down Expand Up @@ -199,13 +208,6 @@ private[spark] class ExecutorAllocationManager(
executor.awaitTermination(10, TimeUnit.SECONDS)
}

/**
* The number of executors we would have if the cluster manager were to fulfill all our existing
* requests.
*/
private def targetNumExecutors(): Int =
numExecutorsPending + executorIds.size - executorsPendingToRemove.size

/**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
Expand All @@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis

addOrCancelExecutorRequests(now)
updateAndSyncNumExecutorsTarget(now)

removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
Expand All @@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager(
}

/**
* Updates our target number of executors and syncs the result with the cluster manager.
*
* Check to see whether our existing allocation and the requests we've made previously exceed our
* current needs. If so, let the cluster manager know so that it can cancel pending requests that
* are unneeded.
* current needs. If so, truncate our target and let the cluster manager know so that it can
* cancel pending requests that are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
* @return the delta in the target number of executors.
*/
private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
val currentTarget = targetNumExecutors
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded

if (maxNeeded < currentTarget) {
if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests.
val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(newTotalExecutors)
// executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(numExecutorsTarget)
numExecutorsToAdd = 1
updateNumExecutorsPending(newTotalExecutors)
numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
Expand All @@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager(
*/
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
val currentTarget = targetNumExecutors
if (currentTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because there are already ${executorIds.size} " +
s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
if (numExecutorsTarget >= maxNumExecutors) {
val numExecutorsPending = numExecutorsTarget - executorIds.size
logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}

val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
val oldNumExecutorsTarget = numExecutorsTarget
// There's no point in wasting time ramping up to the number of executors we already have, so
// make sure our target is at least as much as our current allocation:
numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
// Boost our target with the number to add for this round:
numExecutorsTarget += numExecutorsToAdd
// Ensure that our target doesn't exceed what we need at the present moment:
numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
// Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)

val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
if (addRequestAcknowledged) {
val delta = updateNumExecutorsPending(newTotalExecutors)
val delta = numExecutorsTarget - oldNumExecutorsTarget
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
s" (new desired total will be $newTotalExecutors)")
s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
Expand All @@ -304,23 +317,11 @@ private[spark] class ExecutorAllocationManager(
delta
} else {
logWarning(
s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
0
}
}

/**
* Given the new target number of executors, update the number of pending executor requests,
* and return the delta from the old number of pending requests.
*/
private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
val newNumExecutorsPending =
newTotalExecutors - executorIds.size + executorsPendingToRemove.size
val delta = newNumExecutorsPending - numExecutorsPending
numExecutorsPending = newNumExecutorsPending
delta
}

/**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
Expand Down Expand Up @@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager(
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
if (numExecutorsPending > 0) {
numExecutorsPending -= 1
logDebug(s"Decremented number of pending executors ($numExecutorsPending left)")
}
} else {
logWarning(s"Duplicate executor $executorId has registered")
}
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,13 @@ import org.apache.spark.util.Utils
* authorization. If not filter is in place the user is generally null and no authorization
* can take place.
*
* Connection encryption (SSL) configuration is organized hierarchically. The user can configure
* the default SSL settings which will be used for all the supported communication protocols unless
* When authentication is being used, encryption can also be enabled by setting the option
* spark.authenticate.enableSaslEncryption to true. This is only supported by communication
* channels that use the network-common library, and can be used as an alternative to SSL in those
* cases.
*
* SSL can be used for encryption for certain communication channels. The user can configure the
* default SSL settings which will be used for all the supported communication protocols unless
* they are overwritten by protocol specific settings. This way the user can easily provide the
* common settings for all the protocols without disabling the ability to configure each one
* individually.
Expand Down Expand Up @@ -412,6 +417,14 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
*/
def isAuthenticationEnabled(): Boolean = authOn

/**
* Checks whether SASL encryption should be enabled.
* @return Whether to enable SASL encryption when connecting to services that support it.
*/
def isSaslEncryptionEnabled(): Boolean = {
sparkConf.getBoolean("spark.authenticate.enableSaslEncryption", false)
}

/**
* Gets the user used for authenticating HTTP connections.
* For now use a single hardcoded user.
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
value
}

/** Control our logLevel. This overrides any user-defined log settings.
* @param logLevel The desired log level as a string.
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
*/
def setLogLevel(logLevel: String) {
val validLevels = Seq("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN")
if (!validLevels.contains(logLevel)) {
throw new IllegalArgumentException(
s"Supplied level $logLevel did not match one of: ${validLevels.mkString(",")}")
}
Utils.setLogLevel(org.apache.log4j.Level.toLevel(logLevel))
}

try {
_conf = config.clone()
_conf.validateSettings()
Expand Down
27 changes: 9 additions & 18 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,23 @@ private[spark] object TestUtils {
URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
}

private[spark] class JavaSourceFromString(val name: String, val code: String)
private class JavaSourceFromString(val name: String, val code: String)
extends SimpleJavaFileObject(createURI(name), SOURCE) {
override def getCharContent(ignoreEncodingErrors: Boolean): String = code
}

/** Creates a compiled class with the source file. Class file will be placed in destDir. */
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(
className: String,
destDir: File,
sourceFile: JavaSourceFromString,
classpathUrls: Seq[URL]): File = {
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
Expand All @@ -139,18 +144,4 @@ private[spark] object TestUtils {
assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath())
out
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(
className: String,
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,14 @@ class JavaSparkContext(val sc: SparkContext)
*/
def getLocalProperty(key: String): String = sc.getLocalProperty(key)

/** Control our logLevel. This overrides any user-defined log settings.
* @param logLevel The desired log level as a string.
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
*/
def setLogLevel(logLevel: String) {
sc.setLogLevel(logLevel)
}

/**
* Assigns a group ID to all the jobs started by this thread until the group ID is set to a
* different value or cleared.
Expand Down
Loading

0 comments on commit 203605d

Please sign in to comment.