Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into viz2
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 28, 2015
2 parents 7ef957c + 28b1af7 commit d19c4da
Show file tree
Hide file tree
Showing 111 changed files with 5,539 additions and 891 deletions.
1 change: 0 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>dist</id>
Expand Down
5 changes: 4 additions & 1 deletion bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java

rem The launcher library prints the command to be executed in a single line suitable for being
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
set LAUNCHER_OUTPUT=%temp%\spark-class-launcher-output-%RANDOM%.txt
"%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %* > %LAUNCHER_OUTPUT%
for /f "tokens=*" %%i in (%LAUNCHER_OUTPUT%) do (
set SPARK_CMD=%%i
)
del %LAUNCHER_OUTPUT%
%SPARK_CMD%
3 changes: 2 additions & 1 deletion conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

# Options read when launching programs locally with
# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
Expand Down Expand Up @@ -39,6 +39,7 @@
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

Expand Down
1 change: 0 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,6 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.3.2</version>
<executions>
<execution>
<id>sparkr-pkg</id>
Expand Down
26 changes: 17 additions & 9 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)

private var timeoutCheckingTask: ScheduledFuture[_] = null

private val timeoutCheckingThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
// "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
// block the thread for a long time.
private val eventLoopThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")

private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")

override def onStart(): Unit = {
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ExpireDeadHosts))
}
Expand All @@ -99,11 +101,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
if (scheduler != null) {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
context.reply(response)
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
})
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
Expand All @@ -125,7 +131,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
if (sc.supportDynamicAllocation) {
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = sc.killExecutor(executorId)
override def run(): Unit = Utils.tryLogNonFatalError {
sc.killExecutor(executorId)
}
})
}
executorLastSeen.remove(executorId)
Expand All @@ -137,7 +145,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel(true)
}
timeoutCheckingThread.shutdownNow()
eventLoopThread.shutdownNow()
killExecutorThread.shutdownNow()
}
}
Expand Down
90 changes: 89 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,74 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
Utils.timeStringAsMs(get(key, defaultValue))
}

/**
* Get a size parameter as bytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then bytes are assumed.
* @throws NoSuchElementException
*/
def getSizeAsBytes(key: String): Long = {
Utils.byteStringAsBytes(get(key))
}

/**
* Get a size parameter as bytes, falling back to a default if not set. If no
* suffix is provided then bytes are assumed.
*/
def getSizeAsBytes(key: String, defaultValue: String): Long = {
Utils.byteStringAsBytes(get(key, defaultValue))
}

/**
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Kibibytes are assumed.
* @throws NoSuchElementException
*/
def getSizeAsKb(key: String): Long = {
Utils.byteStringAsKb(get(key))
}

/**
* Get a size parameter as Kibibytes, falling back to a default if not set. If no
* suffix is provided then Kibibytes are assumed.
*/
def getSizeAsKb(key: String, defaultValue: String): Long = {
Utils.byteStringAsKb(get(key, defaultValue))
}

/**
* Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Mebibytes are assumed.
* @throws NoSuchElementException
*/
def getSizeAsMb(key: String): Long = {
Utils.byteStringAsMb(get(key))
}

/**
* Get a size parameter as Mebibytes, falling back to a default if not set. If no
* suffix is provided then Mebibytes are assumed.
*/
def getSizeAsMb(key: String, defaultValue: String): Long = {
Utils.byteStringAsMb(get(key, defaultValue))
}

/**
* Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Gibibytes are assumed.
* @throws NoSuchElementException
*/
def getSizeAsGb(key: String): Long = {
Utils.byteStringAsGb(get(key))
}

/**
* Get a size parameter as Gibibytes, falling back to a default if not set. If no
* suffix is provided then Gibibytes are assumed.
*/
def getSizeAsGb(key: String, defaultValue: String): Long = {
Utils.byteStringAsGb(get(key, defaultValue))
}

/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
Expand Down Expand Up @@ -407,7 +474,13 @@ private[spark] object SparkConf extends Logging {
"The spark.cache.class property is no longer being used! Specify storage levels using " +
"the RDD.persist() method instead."),
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
"Please use spark.{driver,executor}.userClassPathFirst instead."))
"Please use spark.{driver,executor}.userClassPathFirst instead."),
DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",
"Please use spark.kryoserializer.buffer instead. The default value for " +
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
)

Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
}

Expand All @@ -432,6 +505,21 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
translation = s => s"${s.toLong * 10}s")),
"spark.reducer.maxSizeInFlight" -> Seq(
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
"spark.kryoserializer.buffer" ->
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${s.toDouble * 1000}k")),
"spark.kryoserializer.buffer.max" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
"spark.shuffle.file.buffer" -> Seq(
AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
"spark.executor.logs.rolling.maxSize" -> Seq(
AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
"spark.io.compression.snappy.blockSize" -> Seq(
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
"spark.io.compression.lz4.blockSize" -> Seq(
AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
"spark.rpc.numRetries" -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
"spark.rpc.retry.wait" -> Seq(
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Register an RDD to be persisted in memory and/or disk storage
*/
private[spark] def persistRDD(rdd: RDD[_]) {
_executorAllocationManager.foreach { _ =>
logWarning(
s"Dynamic allocation currently does not support cached RDDs. Cached data for RDD " +
s"${rdd.id} will be lost when executors are removed.")
}
persistentRdds(rdd.id) = rdd
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
} else {
None
}
blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
// Note: use getSizeAsKb (not bytes) to maintain compatiblity if no units are provided
blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
}
setConf(SparkEnv.get.conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
* limitations under the License.
*/

package org.apache.spark.deploy.worker
package org.apache.spark.deploy

import java.util.concurrent.CountDownLatch

import org.apache.spark.{Logging, SparkConf, SecurityManager}
import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.sasl.SaslRpcHandler
import org.apache.spark.network.server.TransportServer
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.util.Utils

/**
* Provides a server from which Executors can read shuffle files (rather than reading directly from
Expand All @@ -31,8 +34,8 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
*
* Optionally requires SASL authentication in order to read. See [[SecurityManager]].
*/
private[worker]
class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
private[deploy]
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
extends Logging {

private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
Expand All @@ -51,16 +54,58 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
/** Starts the external shuffle service if the user has configured us to. */
def startIfEnabled() {
if (enabled) {
require(server == null, "Shuffle server already started")
logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
server = transportContext.createServer(port)
start()
}
}

/** Start the external shuffle service */
def start() {
require(server == null, "Shuffle server already started")
logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
server = transportContext.createServer(port)
}

def stop() {
if (enabled && server != null) {
if (server != null) {
server.close()
server = null
}
}
}

/**
* A main class for running the external shuffle service.
*/
object ExternalShuffleService extends Logging {
@volatile
private var server: ExternalShuffleService = _

private val barrier = new CountDownLatch(1)

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
Utils.loadDefaultSparkProperties(sparkConf)
val securityManager = new SecurityManager(sparkConf)

// we override this value since this service is started from the command line
// and we assume the user really wants it to be running
sparkConf.set("spark.shuffle.service.enabled", "true")
server = new ExternalShuffleService(sparkConf, securityManager)
server.start()

installShutdownHook()

// keep running until the process is terminated
barrier.await()
}

private def installShutdownHook(): Unit = {
Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") {
override def run() {
logInfo("Shutting down shuffle service.")
server.stop()
barrier.countDown()
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.util.Utils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.master
package org.apache.spark.deploy

import scala.collection.JavaConversions._

Expand All @@ -25,15 +25,17 @@ import org.apache.zookeeper.KeeperException

import org.apache.spark.{Logging, SparkConf}

private[deploy] object SparkCuratorUtil extends Logging {
private[spark] object SparkCuratorUtil extends Logging {

private val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
private val ZK_SESSION_TIMEOUT_MILLIS = 60000
private val RETRY_WAIT_MILLIS = 5000
private val MAX_RECONNECT_ATTEMPTS = 3

def newClient(conf: SparkConf): CuratorFramework = {
val ZK_URL = conf.get("spark.deploy.zookeeper.url")
def newClient(
conf: SparkConf,
zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
val ZK_URL = conf.get(zkUrlConf)
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))
Expand Down
Loading

0 comments on commit d19c4da

Please sign in to comment.