Permalink
Browse files

Merge pull request #505 from stephenh/volatile

Make Executor fields volatile since they're read from the thread pool.
  • Loading branch information...
2 parents fd53f2f + 4f42153 commit b8949cab889da4ba0f613a17b2eef52a32476410 @mateiz mateiz committed Mar 23, 2013
@@ -16,66 +16,61 @@ import java.nio.ByteBuffer
/**
* The Mesos executor for Spark.
*/
-private[spark] class Executor extends Logging {
- var urlClassLoader : ExecutorURLClassLoader = null
- var threadPool: ExecutorService = null
- var env: SparkEnv = null
-
+private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
+
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
- val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
- val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
+ private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
+ private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
- val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
+ private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
initLogging()
- def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
- // Make sure the local hostname we report matches the cluster scheduler's name for this host
- Utils.setCustomHostname(slaveHostname)
+ // Make sure the local hostname we report matches the cluster scheduler's name for this host
+ Utils.setCustomHostname(slaveHostname)
- // Set spark.* system properties from executor arg
- for ((key, value) <- properties) {
- System.setProperty(key, value)
- }
+ // Set spark.* system properties from executor arg
+ for ((key, value) <- properties) {
+ System.setProperty(key, value)
+ }
+
+ // Create our ClassLoader and set it on this thread
+ private val urlClassLoader = createClassLoader()
+ Thread.currentThread.setContextClassLoader(urlClassLoader)
- // Create our ClassLoader and set it on this thread
- urlClassLoader = createClassLoader()
- Thread.currentThread.setContextClassLoader(urlClassLoader)
-
- // Make any thread terminations due to uncaught exceptions kill the entire
- // executor process to avoid surprising stalls.
- Thread.setDefaultUncaughtExceptionHandler(
- new Thread.UncaughtExceptionHandler {
- override def uncaughtException(thread: Thread, exception: Throwable) {
- try {
- logError("Uncaught exception in thread " + thread, exception)
-
- // We may have been called from a shutdown hook. If so, we must not call System.exit().
- // (If we do, we will deadlock.)
- if (!Utils.inShutdown()) {
- if (exception.isInstanceOf[OutOfMemoryError]) {
- System.exit(ExecutorExitCode.OOM)
- } else {
- System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
- }
+ // Make any thread terminations due to uncaught exceptions kill the entire
+ // executor process to avoid surprising stalls.
+ Thread.setDefaultUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler {
+ override def uncaughtException(thread: Thread, exception: Throwable) {
+ try {
+ logError("Uncaught exception in thread " + thread, exception)
+
+ // We may have been called from a shutdown hook. If so, we must not call System.exit().
+ // (If we do, we will deadlock.)
+ if (!Utils.inShutdown()) {
+ if (exception.isInstanceOf[OutOfMemoryError]) {
+ System.exit(ExecutorExitCode.OOM)
+ } else {
+ System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
- } catch {
- case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
- case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
+ } catch {
+ case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+ case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
- )
+ }
+ )
- // Initialize Spark environment (using system properties read above)
- env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
- SparkEnv.set(env)
+ // Initialize Spark environment (using system properties read above)
+ val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
+ SparkEnv.set(env)
- // Start worker thread pool
- threadPool = new ThreadPoolExecutor(
- 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
- }
+ // Start worker thread pool
+ val threadPool = new ThreadPoolExecutor(
+ 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
@@ -8,11 +8,12 @@ import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
-private[spark] class MesosExecutorBackend(executor: Executor)
+private[spark] class MesosExecutorBackend
extends MesosExecutor
with ExecutorBackend
with Logging {
+ var executor: Executor = null
var driver: ExecutorDriver = null
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
@@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor.initialize(
+ executor = new Executor(
executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
- properties
- )
+ properties)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
val taskId = taskInfo.getTaskId.getValue.toLong
- executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+ if (executor == null) {
+ logError("Received launchTask but executor was null")
+ } else {
+ executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+ }
}
override def error(d: ExecutorDriver, message: String) {
@@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
- val runner = new MesosExecutorBackend(new Executor)
+ val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
}
}
@@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
- executor: Executor,
driverUrl: String,
executorId: String,
hostname: String,
@@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend(
with ExecutorBackend
with Logging {
+ var executor: Executor = null
var driver: ActorRef = null
override def preStart() {
@@ -36,15 +36,20 @@ private[spark] class StandaloneExecutorBackend(
override def receive = {
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
- executor.initialize(executorId, hostname, sparkProperties)
+ executor = new Executor(executorId, hostname, sparkProperties)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
- executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+ if (executor == null) {
+ logError("Received launchTask but executor was null")
+ System.exit(1)
+ } else {
+ executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+ }
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Driver terminated or disconnected! Shutting down.")
@@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend {
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
+ Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}

0 comments on commit b8949ca

Please sign in to comment.