Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into window4
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed May 23, 2015
2 parents 8936ade + 63a5ce7 commit ef55132
Show file tree
Hide file tree
Showing 28 changed files with 1,047 additions and 156 deletions.
Expand Up @@ -91,15 +91,15 @@ private[spark] class ExecutorAllocationManager(

// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
"spark.dynamicAllocation.schedulerBacklogTimeout", "1s")

// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")

// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.executorIdleTimeout", "600s")
"spark.dynamicAllocation.executorIdleTimeout", "60s")

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
Expand Down Expand Up @@ -268,6 +268,8 @@ private[spark] class ExecutorAllocationManager(
numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(numExecutorsTarget)
numExecutorsToAdd = 1
logInfo(s"Lowering target number of executors to $numExecutorsTarget because " +
s"not all requests are actually needed (previously $oldNumExecutorsTarget)")
numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
Expand All @@ -292,9 +294,8 @@ private[spark] class ExecutorAllocationManager(
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
if (numExecutorsTarget >= maxNumExecutors) {
val numExecutorsPending = numExecutorsTarget - executorIds.size
logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
logDebug(s"Not adding executors because our current target total " +
s"is already $numExecutorsTarget (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
Expand All @@ -310,10 +311,19 @@ private[spark] class ExecutorAllocationManager(
// Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)

val delta = numExecutorsTarget - oldNumExecutorsTarget

// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
numExecutorsToAdd = 1
return 0
}

val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
if (addRequestAcknowledged) {
val delta = numExecutorsTarget - oldNumExecutorsTarget
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
Expand Down Expand Up @@ -420,7 +430,7 @@ private[spark] class ExecutorAllocationManager(
* This resets all variables used for adding executors.
*/
private def onSchedulerQueueEmpty(): Unit = synchronized {
logDebug(s"Clearing timer to add executors because there are no more pending tasks")
logDebug("Clearing timer to add executors because there are no more pending tasks")
addTime = NOT_SET
numExecutorsToAdd = 1
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -1884,7 +1884,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* serializable
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
Expand Down
27 changes: 18 additions & 9 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Expand Up @@ -105,23 +105,18 @@ private[spark] object TestUtils {
URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
}

private class JavaSourceFromString(val name: String, val code: String)
private[spark] class JavaSourceFromString(val name: String, val code: String)
extends SimpleJavaFileObject(createURI(name), SOURCE) {
override def getCharContent(ignoreEncodingErrors: Boolean): String = code
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
/** Creates a compiled class with the source file. Class file will be placed in destDir. */
def createCompiledClass(
className: String,
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
sourceFile: JavaSourceFromString,
classpathUrls: Seq[URL]): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
Expand All @@ -144,4 +139,18 @@ private[spark] object TestUtils {
assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath())
out
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(
className: String,
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}
}
Expand Up @@ -43,6 +43,8 @@ class LocalSparkCluster(
private val localHostname = Utils.localHostName()
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
// exposed for testing
var masterWebUIPort = -1

def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
Expand All @@ -53,7 +55,9 @@ class LocalSparkCluster(
.set("spark.shuffle.service.enabled", "false")

/* Start the Master */
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
val (masterSystem, masterPort, webUiPort, _) =
Master.startSystemAndActor(localHostname, 0, 0, _conf)
masterWebUIPort = webUiPort
masterActorSystems += masterSystem
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
val masters = Array(masterUrl)
Expand Down
128 changes: 68 additions & 60 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Expand Up @@ -753,7 +753,9 @@ private[spark] object SparkSubmitUtils {
* @param artifactId the artifactId of the coordinate
* @param version the version of the coordinate
*/
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String)
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
override def toString: String = s"$groupId:$artifactId:$version"
}

/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
Expand All @@ -776,6 +778,10 @@ private[spark] object SparkSubmitUtils {
}
}

/** Path of the local Maven cache. */
private[spark] def m2Path: File = new File(System.getProperty("user.home"),
".m2" + File.separator + "repository" + File.separator)

/**
* Extracts maven coordinates from a comma-delimited string
* @param remoteRepos Comma-delimited string of remote repositories
Expand All @@ -789,8 +795,7 @@ private[spark] object SparkSubmitUtils {

val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
val m2Path = ".m2" + File.separator + "repository" + File.separator
localM2.setRoot(new File(System.getProperty("user.home"), m2Path).toURI.toString)
localM2.setRoot(m2Path.toURI.toString)
localM2.setUsepoms(true)
localM2.setName("local-m2-cache")
cr.add(localM2)
Expand Down Expand Up @@ -915,69 +920,72 @@ private[spark] object SparkSubmitUtils {
""
} else {
val sysOut = System.out
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
// set ivy settings for location of cache
val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File =
if (alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars")
try {
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
// set ivy settings for location of cache
val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File =
if (alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars")
} else {
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
}
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
val resolveOptions = new ResolveOptions
resolveOptions.setTransitive(true)
val retrieveOptions = new RetrieveOptions
// Turn downloading and logging off for testing
if (isTest) {
resolveOptions.setDownload(false)
resolveOptions.setLog(LogOptions.LOG_QUIET)
retrieveOptions.setLog(LogOptions.LOG_QUIET)
} else {
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
resolveOptions.setDownload(true)
}
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
val resolveOptions = new ResolveOptions
resolveOptions.setTransitive(true)
val retrieveOptions = new RetrieveOptions
// Turn downloading and logging off for testing
if (isTest) {
resolveOptions.setDownload(false)
resolveOptions.setLog(LogOptions.LOG_QUIET)
retrieveOptions.setLog(LogOptions.LOG_QUIET)
} else {
resolveOptions.setDownload(true)
}

// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)
// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)

// Add exclusion rules for Spark and Scala Library
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)
// Add exclusion rules for Spark and Scala Library
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)

// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
throw new RuntimeException(rr.getAllProblemMessages.toString)
// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
throw new RuntimeException(rr.getAllProblemMessages.toString)
}
// retrieve all resolved dependencies
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator +
"[organization]_[artifact]-[revision].[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
} finally {
System.setOut(sysOut)
}
// retrieve all resolved dependencies
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator +
"[organization]_[artifact]-[revision].[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))
System.setOut(sysOut)
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
}
}
}
Expand Down
Expand Up @@ -177,6 +177,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ

override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.clear()
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
try {
kryo.writeClassAndObject(output, t)
} catch {
Expand All @@ -202,6 +203,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
}

override def serializeStream(s: OutputStream): SerializationStream = {
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
new KryoSerializationStream(kryo, s)
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Expand Up @@ -77,7 +77,10 @@ private[spark] abstract class WebUI(
val pagePath = "/" + page.prefix
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, basePath)
val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
(request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)
attachHandler(renderHandler)
attachHandler(renderJsonHandler)
pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
.append(renderHandler)
}
Expand Down

0 comments on commit ef55132

Please sign in to comment.