Permalink
Browse files

Changes to have Spark work with Twitter's internal version of Mesos.

  • Loading branch information...
1 parent a69c073 commit 94f0322f0b5d2d54d12b7eb9eb93346a6fd4754a root committed Apr 13, 2012
Binary file not shown.
View
Binary file not shown.
@@ -24,13 +24,9 @@ class Executor extends org.apache.mesos.Executor with Logging {
initLogging()
- override def registered(
- driver: ExecutorDriver,
- executorInfo: ExecutorInfo,
- frameworkInfo: FrameworkInfo,
- slaveInfo: SlaveInfo) {
+ override def init(driver: ExecutorDriver, args: ExecutorArgs) {
// Read spark.* system properties from executor arg
- val props = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
+ val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray)
for ((key, value) <- props) {
System.setProperty(key, value)
}
@@ -52,16 +48,12 @@ class Executor extends org.apache.mesos.Executor with Logging {
threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
}
-
- override def disconnected(d: ExecutorDriver) {}
-
- override def reregistered(d: ExecutorDriver, s: SlaveInfo) {}
- override def launchTask(d: ExecutorDriver, task: TaskInfo) {
+ override def launchTask(d: ExecutorDriver, task: TaskDescription) {
threadPool.execute(new TaskRunner(task, d))
}
- class TaskRunner(info: TaskInfo, d: ExecutorDriver)
+ class TaskRunner(info: TaskDescription, d: ExecutorDriver)
extends Runnable {
override def run() = {
val tid = info.getTaskId.getValue
@@ -162,7 +154,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
Utils.copyStream(in, out, true)
}
- override def error(d: ExecutorDriver, message: String) {
+ override def error(d: ExecutorDriver, code: Int, message: String) {
logError("Error from Mesos: " + message)
}
@@ -180,7 +172,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
*/
object Executor extends Logging {
def main(args: Array[String]) {
- MesosNativeLibrary.load()
+ System.loadLibrary("mesos")
// Create a new Executor and start it running
val exec = new Executor
new MesosExecutorDriver(exec).run()
@@ -8,7 +8,7 @@ import org.apache.mesos.Protos._
* callbacks.
*/
abstract class Job(val runId: Int, val jobId: Int) {
- def slaveOffer(s: Offer, availableCpus: Double): Option[TaskInfo]
+ def slaveOffer(s: Offer, availableCpus: Double): Option[TaskDescription]
def statusUpdate(t: TaskStatus): Unit
@@ -108,8 +108,7 @@ private class MesosScheduler(
setDaemon(true)
override def run {
val sched = MesosScheduler.this
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
- driver = new MesosSchedulerDriver(sched, fwInfo, master)
+ driver = new MesosSchedulerDriver(sched, frameworkName, executorInfo, master)
try {
val ret = driver.run()
logInfo("driver.run() returned with code " + ret)
@@ -128,28 +127,25 @@ private class MesosScheduler(
"property, the SPARK_HOME environment variable or the SparkContext constructor")
}
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
- val environment = Environment.newBuilder()
+ val params = Params.newBuilder()
for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
if (System.getenv(key) != null) {
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(System.getenv(key))
- .build())
+ params.addParam(Param.newBuilder()
+ .setKey("env." + key)
+ .setValue(System.getenv(key))
+ .build())
}
}
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build())
.build()
- val command = CommandInfo.newBuilder()
- .setValue(execScript)
- .setEnvironment(environment)
- .build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue("default").build())
- .setCommand(command)
+ .setUri(execScript)
.setData(ByteString.copyFrom(createExecArg()))
+ .setParams(params.build())
.addResources(memory)
.build()
}
@@ -178,7 +174,7 @@ private class MesosScheduler(
}
}
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID) {
logInfo("Registered as framework ID " + frameworkId.getValue)
registeredLock.synchronized {
isRegistered = true
@@ -194,18 +190,14 @@ private class MesosScheduler(
}
}
- override def disconnected(d: SchedulerDriver) {}
-
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
/**
* Method called by Mesos to offer resources on slaves. We resond by asking our active jobs for
* tasks in FIFO order. We fill each node with tasks in a round-robin manner so that tasks are
* balanced across the cluster.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
- val tasks = offers.map(o => new JArrayList[TaskInfo])
+ val tasks = offers.map(o => new JArrayList[TaskDescription])
val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus"))
val enoughMem = offers.map(o => {
val mem = getResource(o.getResourcesList(), "mem")
@@ -292,7 +284,7 @@ private class MesosScheduler(
}
}
- override def error(d: SchedulerDriver, message: String) {
+ override def error(d: SchedulerDriver, code: Int, message: String) {
logError("Mesos error: " + message)
synchronized {
if (activeJobs.size > 0) {
@@ -378,18 +370,14 @@ private class MesosScheduler(
override def frameworkMessage(
d: SchedulerDriver,
- e: ExecutorID,
s: SlaveID,
+ e: ExecutorID,
b: Array[Byte]) {}
override def slaveLost(d: SchedulerDriver, s: SlaveID) {
slavesWithExecutors.remove(s.getValue)
}
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
- slavesWithExecutors.remove(s.getValue)
- }
-
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
/**
@@ -141,7 +141,7 @@ class SimpleJob(
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskInfo] = {
+ def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskDescription] = {
if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -173,7 +173,7 @@ class SimpleJob(
val serializedTask = Utils.serialize(task)
logDebug("Serialized size: " + serializedTask.size)
val taskName = "task %d:%d".format(jobId, index)
- return Some(TaskInfo.newBuilder()
+ return Some(TaskDescription.newBuilder()
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId)
.setExecutor(sched.executorInfo)
@@ -28,8 +28,6 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
-import org.apache.mesos.MesosNativeLibrary
-
import spark.broadcast._
class SparkContext(
@@ -74,7 +72,7 @@ class SparkContext(
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
new LocalScheduler(threads.toInt, maxFailures.toInt)
case _ =>
- MesosNativeLibrary.load()
+ System.loadLibrary("mesos")
new MesosScheduler(this, master, frameworkName)
}
}

0 comments on commit 94f0322

Please sign in to comment.