Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Oct 14, 2014
2 parents c9d7301 + 7b4f39f commit 473a7c5
Show file tree
Hide file tree
Showing 188 changed files with 1,633 additions and 261 deletions.
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ else
exit 1
fi
fi
JAVA_VERSION=$("$RUNNER" -version 2>&1 | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
JAVA_VERSION=$("$RUNNER" -version 2>&1 | grep 'version' | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

# Set JAVA_OPTS to be able to load native libraries and to set heap size
if [ "$JAVA_VERSION" -ge 18 ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ private[spark] class ExecutorRunner(
case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => host
case "{{CORES}}" => cores.toString
case "{{APP_ID}}" => appId
case other => other
}

def getCommandSeq = {
val command = Command(
appDesc.command.mainClass,
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
appDesc.command.arguments.map(substituteVariables),
appDesc.command.environment,
appDesc.command.classPathEntries,
appDesc.command.libraryPathEntries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_WORKER_CORES") != null) {
cores = System.getenv("SPARK_WORKER_CORES").toInt
}
if (System.getenv("SPARK_WORKER_MEMORY") != null) {
memory = Utils.memoryStringToMb(System.getenv("SPARK_WORKER_MEMORY"))
if (conf.getenv("SPARK_WORKER_MEMORY") != null) {
memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY"))
}
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
Expand All @@ -56,6 +56,8 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {

parse(args.toList)

checkWorkerMemory()

def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Expand Down Expand Up @@ -153,4 +155,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024, 512)
}

def checkWorkerMemory(): Unit = {
if (memory <= 0) {
val message = "Memory can't be 0, missing a M or G on the end of the memory specification?"
throw new IllegalStateException(message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
"<cores> <appid> [<workerUrl>] ")
System.exit(1)

// NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
// and CoarseMesosSchedulerBackend (for mesos mode).
case 5 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
case x if x > 5 =>
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the first iteration, just try all partitions next.
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%.
// by 50%. We also cap the estimation in the end.
if (results.size == 0) {
numPartsToTry = totalParts - 1
numPartsToTry = partsScanned * 4
} else {
numPartsToTry = (1.5 * num * partsScanned / results.size).toInt
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max(1,
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions

val left = num - results.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1079,15 +1079,17 @@ abstract class RDD[T: ClassTag](
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// interpolate the number of partitions we need to try, but overestimate it by 50%.
// We also cap the estimation in the end.
if (buf.size == 0) {
numPartsToTry = partsScanned * 4
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions

val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ private[spark] class SparkDeploySchedulerBackend(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
"{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,17 @@ private[spark] class CoarseMesosSchedulerBackend(
if (uri == null) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
("cd %s*; " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
.format(basename, driverUrl, offer.getSlaveId.getValue,
offer.getHostname, numCores))
offer.getHostname, numCores, appId))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
Expand Down
26 changes: 14 additions & 12 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ private[spark] object Utils extends Logging {
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
uri.getScheme match {
case "http" | "https" | "ftp" =>
Option(uri.getScheme) match {
case Some("http") | Some("https") | Some("ftp") =>
logInfo("Fetching " + url + " to " + tempFile)

var uc: URLConnection = null
Expand Down Expand Up @@ -374,7 +374,7 @@ private[spark] object Utils extends Logging {
}
}
Files.move(tempFile, targetFile)
case "file" | null =>
case Some("file") | None =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
Expand Down Expand Up @@ -403,7 +403,7 @@ private[spark] object Utils extends Logging {
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
Files.copy(sourceFile, targetFile)
}
case _ =>
case Some(other) =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
Expand Down Expand Up @@ -1368,16 +1368,17 @@ private[spark] object Utils extends Logging {
if (uri.getPath == null) {
throw new IllegalArgumentException(s"Given path is malformed: $uri")
}
uri.getScheme match {
case windowsDrive(d) if windows =>

Option(uri.getScheme) match {
case Some(windowsDrive(d)) if windows =>
new URI("file:/" + uri.toString.stripPrefix("/"))
case null =>
case None =>
// Preserve fragments for HDFS file name substitution (denoted by "#")
// For instance, in "abc.py#xyz.py", "xyz.py" is the name observed by the application
val fragment = uri.getFragment
val part = new File(uri.getPath).toURI
new URI(part.getScheme, part.getPath, fragment)
case _ =>
case Some(other) =>
uri
}
}
Expand All @@ -1399,10 +1400,11 @@ private[spark] object Utils extends Logging {
} else {
paths.split(",").filter { p =>
val formattedPath = if (windows) formatWindowsPath(p) else p
new URI(formattedPath).getScheme match {
case windowsDrive(d) if windows => false
case "local" | "file" | null => false
case _ => true
val uri = new URI(formattedPath)
Option(uri.getScheme) match {
case Some(windowsDrive(d)) if windows => false
case Some("local") | Some("file") | None => false
case Some(other) => true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ import org.apache.spark.SparkConf

class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
val appId = "12345-worker321-9876"
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)

Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
assert(er.getCommandSeq.last === appId)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.spark.deploy.worker

import org.apache.spark.SparkConf
import org.scalatest.FunSuite


class WorkerArgumentsTest extends FunSuite {

test("Memory can't be set to 0 when cmd line args leave off M or G") {
val conf = new SparkConf
val args = Array("-m", "10000", "spark://localhost:0000 ")
intercept[IllegalStateException] {
new WorkerArguments(args, conf)
}
}


test("Memory can't be set to 0 when SPARK_WORKER_MEMORY env property leaves off M or G") {
val args = Array("spark://localhost:0000 ")

class MySparkConf extends SparkConf(false) {
override def getenv(name: String) = {
if (name == "SPARK_WORKER_MEMORY") "50000"
else super.getenv(name)
}

override def clone: SparkConf = {
new MySparkConf().setAll(settings)
}
}
val conf = new MySparkConf()
intercept[IllegalStateException] {
new WorkerArguments(args, conf)
}
}

test("Memory correctly set when SPARK_WORKER_MEMORY env property appends G") {
val args = Array("spark://localhost:0000 ")

class MySparkConf extends SparkConf(false) {
override def getenv(name: String) = {
if (name == "SPARK_WORKER_MEMORY") "5G"
else super.getenv(name)
}

override def clone: SparkConf = {
new MySparkConf().setAll(settings)
}
}
val conf = new MySparkConf()
val workerArgs = new WorkerArguments(args, conf)
assert(workerArgs.memory === 5120)
}

test("Memory correctly set from args with M appended to memory value") {
val conf = new SparkConf
val args = Array("-m", "10000M", "spark://localhost:0000 ")

val workerArgs = new WorkerArguments(args, conf)
assert(workerArgs.memory === 10000)

}

}
Loading

0 comments on commit 473a7c5

Please sign in to comment.