Skip to content

Commit

Permalink
Let's use future instead of actors
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Ma committed Sep 13, 2010
1 parent 0896fd6 commit 366c09c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 38 deletions.
40 changes: 24 additions & 16 deletions src/scala/spark/MesosScheduler.scala
Expand Up @@ -3,6 +3,7 @@ package spark
import java.io.File import java.io.File


import scala.collection.mutable.Map import scala.collection.mutable.Map
import scala.collection.mutable.Queue
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._


Expand Down Expand Up @@ -32,6 +33,7 @@ extends NScheduler with spark.Scheduler
val registeredLock = new Object() val registeredLock = new Object()


// Current callback object (may be null) // Current callback object (may be null)
var activeOpsQueue = new Queue[Int]
var activeOps = new HashMap[Int, ParallelOperation] var activeOps = new HashMap[Int, ParallelOperation]
private var nextOpId = 0 private var nextOpId = 0
private[spark] var taskIdToOpId = new HashMap[Int, Int] private[spark] var taskIdToOpId = new HashMap[Int, Int]
Expand Down Expand Up @@ -72,21 +74,23 @@ extends NScheduler with spark.Scheduler


override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = { override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = {
var opId = 0 var opId = 0
runTasksMutex.synchronized { waitForRegister()
waitForRegister() this.synchronized {
opId = newOpId() opId = newOpId()
} }
val myOp = new SimpleParallelOperation(this, tasks, opId) val myOp = new SimpleParallelOperation(this, tasks, opId)


try { try {
this.synchronized { this.synchronized {
this.activeOps(myOp.opId) = myOp this.activeOps(myOp.opId) = myOp
this.activeOpsQueue += myOp.opId
} }
driver.reviveOffers(); driver.reviveOffers();
myOp.join(); myOp.join();
} finally { } finally {
this.synchronized { this.synchronized {
this.activeOps.remove(myOp.opId) this.activeOps.remove(myOp.opId)
this.activeOpsQueue.dequeueAll(x => (x == myOp.opId))
} }
} }


Expand Down Expand Up @@ -117,21 +121,24 @@ extends NScheduler with spark.Scheduler
val tasks = new java.util.ArrayList[TaskDescription] val tasks = new java.util.ArrayList[TaskDescription]
val availableCpus = offers.map(_.getParams.get("cpus").toInt) val availableCpus = offers.map(_.getParams.get("cpus").toInt)
val availableMem = offers.map(_.getParams.get("mem").toInt) val availableMem = offers.map(_.getParams.get("mem").toInt)
var resourcesAvailable = true var launchedTask = true
while (resourcesAvailable) { for (opId <- activeOpsQueue) {
resourcesAvailable = false launchedTask = true
for (i <- 0 until offers.size.toInt; (opId, activeOp) <- activeOps) { while (launchedTask) {
try { launchedTask = false
activeOp.slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match { for (i <- 0 until offers.size.toInt) {
case Some(task) => try {
tasks.add(task) activeOps(opId).slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match {
availableCpus(i) -= task.getParams.get("cpus").toInt case Some(task) =>
availableMem(i) -= task.getParams.get("mem").toInt tasks.add(task)
resourcesAvailable = resourcesAvailable || true availableCpus(i) -= task.getParams.get("cpus").toInt
case None => {} availableMem(i) -= task.getParams.get("mem").toInt
launchedTask = launchedTask || true
case None => {}
}
} catch {
case e: Exception => e.printStackTrace
} }
} catch {
case e: Exception => e.printStackTrace
} }
} }
} }
Expand Down Expand Up @@ -317,6 +324,7 @@ extends ParallelOperation
println("Lost opId " + opId + " TID " + tid) println("Lost opId " + opId + " TID " + tid)
if (!finished(tidToIndex(tid))) { if (!finished(tidToIndex(tid))) {
launched(tidToIndex(tid)) = false launched(tidToIndex(tid)) = false
sched.taskIdToOpId.remove(tid)
tasksLaunched -= 1 tasksLaunched -= 1
} else { } else {
printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid)) printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid))
Expand Down
22 changes: 0 additions & 22 deletions src/scala/spark/SparkContext.scala
Expand Up @@ -6,16 +6,6 @@ import java.util.UUID
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.actors.Actor._ import scala.actors.Actor._


case class SparkAsyncLock(var finished: Boolean = false) {
def join() {
this.synchronized {
while (!finished) {
this.wait
}
}
}
}

class SparkContext(master: String, frameworkName: String) { class SparkContext(master: String, frameworkName: String) {
Broadcast.initialize(true) Broadcast.initialize(true)


Expand All @@ -32,18 +22,6 @@ class SparkContext(master: String, frameworkName: String) {
def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local) def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local)
//def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local) //def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local)


def fork(f: => Unit): SparkAsyncLock = {
val thisLock = new SparkAsyncLock
actor {
f
thisLock.synchronized {
thisLock.finished = true
thisLock.notifyAll()
}
}
thisLock
}

def textFile(path: String) = new HdfsTextFile(this, path) def textFile(path: String) = new HdfsTextFile(this, path)


val LOCAL_REGEX = """local\[([0-9]+)\]""".r val LOCAL_REGEX = """local\[([0-9]+)\]""".r
Expand Down

0 comments on commit 366c09c

Please sign in to comment.