/
LocalScheduler.scala
55 lines (47 loc) · 1.75 KB
/
LocalScheduler.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package spark
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent._
/**
* A simple Scheduler implementation that runs tasks locally in a thread pool.
*/
private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
var threadPool: ExecutorService =
Executors.newFixedThreadPool(threads, DaemonThreadFactory)
val env = SparkEnv.get
override def start() {}
override def waitForRegister() {}
override def submitTasks(tasks: Seq[Task[_]]) {
tasks.zipWithIndex.foreach { case (task, i) =>
threadPool.submit(new Runnable {
def run() {
logInfo("Running task " + i)
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
try {
// Serialize and deserialize the task so that accumulators are
// changed to thread-local ones; this adds a bit of unnecessary
// overhead but matches how the Mesos Executor works
Accumulators.clear
val bytes = Utils.serialize(tasks(i))
logInfo("Size of task " + i + " is " + bytes.size + " bytes")
val deserializedTask = Utils.deserialize[Task[_]](
bytes, currentThread.getContextClassLoader)
val result: Any = deserializedTask.run
val accumUpdates = Accumulators.values
logInfo("Finished task " + i)
taskEnded(tasks(i), Success, result, accumUpdates)
} catch {
case t: Throwable => {
// TODO: Do something nicer here
logError("Exception in task " + i, t)
System.exit(1)
null
}
}
}
})
}
}
override def stop() {}
override def numCores() = threads
}