Skip to content
Browse files

Add job server, util source files + missing dependencies

  • Loading branch information...
1 parent dc01522 commit a52a895af4d786e6ff84a92840b6b163f5957be2 Evan Chan committed Nov 28, 2013
Showing with 2,202 additions and 1 deletion.
  1. +29 −0 jobserver/src/main/scala/ooyala.common.akka/ActorMetrics.scala
  2. +35 −0 jobserver/src/main/scala/ooyala.common.akka/ActorStack.scala
  3. +20 −0 jobserver/src/main/scala/ooyala.common.akka/InstrumentedActor.scala
  4. +40 −0 jobserver/src/main/scala/ooyala.common.akka/Slf4jLogging.scala
  5. +20 −0 jobserver/src/main/scala/ooyala.common.akka/metrics/YammerMetrics.scala
  6. +65 −0 jobserver/src/main/scala/ooyala.common.akka/web/JsonUtils.scala
  7. +47 −0 jobserver/src/main/scala/ooyala.common.akka/web/WebService.scala
  8. +36 −0 jobserver/src/main/scala/spark.jobserver/CommonMessages.scala
  9. +38 −0 jobserver/src/main/scala/spark.jobserver/HtmlUtils.scala
  10. +38 −0 jobserver/src/main/scala/spark.jobserver/JarManager.scala
  11. +77 −0 jobserver/src/main/scala/spark.jobserver/JarUtils.scala
  12. +34 −0 jobserver/src/main/scala/spark.jobserver/JobCache.scala
  13. +59 −0 jobserver/src/main/scala/spark.jobserver/JobInfoActor.scala
  14. +274 −0 jobserver/src/main/scala/spark.jobserver/JobManagerActor.scala
  15. +50 −0 jobserver/src/main/scala/spark.jobserver/JobResultActor.scala
  16. +56 −0 jobserver/src/main/scala/spark.jobserver/JobServer.scala
  17. +95 −0 jobserver/src/main/scala/spark.jobserver/JobServerNamedRdds.scala
  18. +125 −0 jobserver/src/main/scala/spark.jobserver/JobStatusActor.scala
  19. +186 −0 jobserver/src/main/scala/spark.jobserver/LocalContextSupervisorActor.scala
  20. +99 −0 jobserver/src/main/scala/spark.jobserver/NamedRddSupport.scala
  21. +92 −0 jobserver/src/main/scala/spark.jobserver/RddManagerActor.scala
  22. +41 −0 jobserver/src/main/scala/spark.jobserver/SparkJob.scala
  23. +329 −0 jobserver/src/main/scala/spark.jobserver/WebApi.scala
  24. +80 −0 jobserver/src/main/scala/spark.jobserver/io/JobDAO.scala
  25. +150 −0 jobserver/src/main/scala/spark.jobserver/io/JobFileDAO.scala
  26. +31 −0 jobserver/src/main/scala/spark.jobserver/util/DateUtils.scala
  27. +50 −0 jobserver/src/main/scala/spark.jobserver/util/LRUCache.scala
  28. +6 −1 project/SparkBuild.scala
View
29 jobserver/src/main/scala/ooyala.common.akka/ActorMetrics.scala
@@ -0,0 +1,29 @@
+package ooyala.common.akka
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.Actor
+import com.yammer.metrics.Metrics
+
+/**
+ * ActorMetrics is a trait that provides the following metrics:
+ * * message-handler.meter.{mean,m1,m5,m15} = moving avg of rate at which receive handler is called
+ * * message-handler.duration.{mean,p75,p99,p999} = histogram of wrappedReeive() running time
+ *
+ * NOTE: the number of incoming messages can be tracked using meter.count.
+ */
+trait ActorMetrics extends ActorStack {
+ // Timer includes a histogram of wrappedReceive() duration as well as moving avg of rate of invocation
+ val metricReceiveTimer = Metrics.newTimer(getClass, "message-handler",
+ TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+
+ override def receive: Receive = {
+ case x =>
+ val context = metricReceiveTimer.time()
+ try {
+ super.receive(x)
+ } finally {
+ context.stop()
+ }
+ }
+}
View
35 jobserver/src/main/scala/ooyala.common.akka/ActorStack.scala
@@ -0,0 +1,35 @@
+package ooyala.common.akka
+
+import akka.actor.Actor
+
+/**
+ * A base trait for enabling stackable traits that enhance Actors.
+ * Examples of stackable traits are included, and add logging, metrics, etc.
+ *
+ * == Actor classes ==
+ * Actor classes that mix in this trait should define a wrappedReceive partial function
+ * instead of the standard receive.
+ *
+ * Messages not handled by wrappedReceive will go, as usual, to unhandled().
+ *
+ * == Stacking traits ==
+ * {{{
+ * trait MyActorTrait extends ActorStack {
+ * override def receive: Receive = {
+ * case x =>
+ * println("Before calling wrappedReceive... do something")
+ * super.receive(x)
+ * println("After calling wrappedReceive... do something else")
+ * }
+ * }
+ * }}}
+ */
+trait ActorStack extends Actor {
+ /** Actor classes should implement this partialFunction for standard actor message handling */
+ def wrappedReceive: Receive
+
+ /** Stackable traits should override and call super.receive(x) for stacking functionality */
+ def receive: Receive = {
+ case x => if (wrappedReceive.isDefinedAt(x)) wrappedReceive(x) else unhandled(x)
+ }
+}
View
20 jobserver/src/main/scala/ooyala.common.akka/InstrumentedActor.scala
@@ -0,0 +1,20 @@
+package ooyala.common.akka
+
+import akka.actor.Actor
+
+/**
+ * Base class that includes Slf4jLogging and ActorMetrics for convenience
+ */
+abstract class InstrumentedActor extends Actor with Slf4jLogging with ActorMetrics {
+ /** preRestart() is called when actor is killed due to exception, and will be restarted. It is
+ * run on the current actor instance that is about to be killed. We just log errors.
+ * The super (original) method should call postStop() and shut down children as well.
+ */
+ override def preRestart(reason: Throwable, message: Option[Any]) {
+ logger.error("About to restart actor due to exception:", reason)
+ super.preRestart(reason, message)
+ }
+
+ /** postStop() is called when actor is stopped or restarted due to Exceptions **/
+ override def postStop() { logger.warn("Shutting down {}", getClass.getName) }
+}
View
40 jobserver/src/main/scala/ooyala.common.akka/Slf4jLogging.scala
@@ -0,0 +1,40 @@
+package ooyala.common.akka
+
+import akka.actor.Actor
+import org.slf4j.LoggerFactory
+
+/**
+ * Trait that adds Logback/SLF4J logging to actors. It adds the following members:
+ *
+ * * logger
+ *
+ * It also prints a message upon actor initialization.
+ * Also, it fills the akkaSource MDC variable with the current actor's path, making for easier
+ * log tracing of a single actor's messages.
+ */
+trait Slf4jLogging extends ActorStack {
+ val logger = LoggerFactory.getLogger(getClass)
+ private[this] val myPath = self.path.toString
+
+ withAkkaSourceLogging {
+ logger.info("Starting actor " + getClass.getName)
+ }
+
+ override def receive: Receive = {
+ case x =>
+ withAkkaSourceLogging {
+ super.receive(x)
+ }
+ }
+
+ private def withAkkaSourceLogging(fn: => Unit) {
+ // Because each actor receive invocation could happen in a different thread, and MDC is thread-based,
+ // we kind of have to set the MDC anew for each receive invocation. :(
+ try {
+ org.slf4j.MDC.put("akkaSource", myPath)
+ fn
+ } finally {
+ org.slf4j.MDC.remove("akkaSource")
+ }
+ }
+}
View
20 jobserver/src/main/scala/ooyala.common.akka/metrics/YammerMetrics.scala
@@ -0,0 +1,20 @@
+package ooyala.common.akka.metrics
+
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.{Histogram, Meter, Gauge}
+import java.util.concurrent.TimeUnit
+
+/**
+ * Utility trait to make metrics creation slightly less verbose
+ */
+trait YammerMetrics {
+ def meter(name: String, eventType: String): Meter =
+ Metrics.newMeter(getClass, name, eventType, TimeUnit.SECONDS)
+
+ def gauge[T](name: String, metric: => T, scope: String = null): Gauge[T] =
+ Metrics.newGauge(getClass, name, scope, new Gauge[T] {
+ override def value(): T = metric
+ })
+
+ def histogram(name: String): Histogram = Metrics.newHistogram(getClass, name, true)
+}
View
65 jobserver/src/main/scala/ooyala.common.akka/web/JsonUtils.scala
@@ -0,0 +1,65 @@
+package ooyala.common.akka.web
+
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+
+
+/**
+ * NOTE: Most of this code is needed because spray-json does not convert a generic Map
+ * with any kind of value.
+ */
+object JsonUtils {
+ // Allows the conversion of flexible Maps that hold ints, strings, lists, maps
+ // Note that this implicit conversion will only apply in this scope....
+ // we have to be careful to make implicits that convert Any no wider in scope than needed
+ implicit object AnyJsonFormat extends JsonFormat[Any] {
+ def write(x: Any) = x match {
+ case n: Int => JsNumber(n)
+ case l: Long => JsNumber(l)
+ case d: Double => JsNumber(d)
+ case f: Float => JsNumber(f.toDouble)
+ case s: String => JsString(s)
+ case x: Seq[_] => seqFormat[Any].write(x)
+ // Get the type of map keys from the first key, translate the rest the same way
+ case m: Map[_, _] => m.keys.head match {
+ case sym: Symbol =>
+ val map = m.asInstanceOf[Map[Symbol, _]]
+ val pairs = map.map { case (sym, v) => (sym.name -> write(v)) }
+ JsObject(pairs)
+ case s: String => mapFormat[String, Any].write(m.asInstanceOf[Map[String, Any]])
+ case a: Any =>
+ val map = m.asInstanceOf[Map[Any, _]]
+ val pairs = map.map { case (sym, v) => (sym.toString -> write(v)) }
+ JsObject(pairs)
+ }
+ case a: Array[_] => seqFormat[Any].write(a.toSeq)
+ case b: Boolean if b == true => JsTrue
+ case b: Boolean if b == false => JsFalse
+ case p: Product => seqFormat[Any].write(p.productIterator.toSeq)
+ case x => JsString(x.toString)
+ }
+ def read(value: JsValue) = value match {
+ case JsNumber(n) => n.intValue()
+ case JsString(s) => s
+ case a: JsArray => listFormat[Any].read(value)
+ case o: JsObject => mapFormat[String, Any].read(value)
+ case JsTrue => true
+ case JsFalse => false
+ case x => deserializationError("Do not understand how to deserialize " + x)
+ }
+ }
+
+ def mapToJson(map: Map[String, Any], compact: Boolean = true): String = {
+ val jsonAst = map.toJson
+ if (compact) jsonAst.compactPrint else jsonAst.prettyPrint
+ }
+
+ def listToJson(list: Seq[Any], compact: Boolean = true): String = {
+ val jsonAst = list.toJson
+ if (compact) jsonAst.compactPrint else jsonAst.prettyPrint
+ }
+
+ def mapFromJson(json: String) = json.asJson.convertTo[Map[String, Any]]
+
+ def listFromJson(json: String) = json.asJson.convertTo[Seq[Any]]
+}
View
47 jobserver/src/main/scala/ooyala.common.akka/web/WebService.scala
@@ -0,0 +1,47 @@
+package ooyala.common.akka.web
+
+import akka.actor.{ActorSystem, Actor, ActorRef, Props}
+import spray.routing.{Route, HttpService}
+import spray.io.{SingletonHandler, IOExtension}
+import spray.can.server.HttpServer
+
+
+/**
+ * Contains methods for starting and stopping an embedded Spray web server.
+ */
+object WebService {
+ case object Stop
+
+ var service: ActorRef = _
+
+ class ServiceActor(route: Route) extends Actor with HttpService {
+ def handler: Receive = {
+ case Stop => context.system.shutdown()
+ }
+
+ // the HttpService trait defines only one abstract member, which
+ // connects the services environment to the enclosing actor or test
+ def actorRefFactory = context
+
+ def receive = runRoute(route) orElse handler
+ }
+
+ /**
+ * Starts a web server given a Route. Note that this call is meant to be made from an App or other top
+ * level scope, and not within an actor, as system.actorOf may block.
+ *
+ * @param route The spray Route for the service. Multiple routes can be combined like (route1 ~ route2).
+ * @param system the ActorSystem to use
+ * @param host The host string to bind to, defaults to "0.0.0.0"
+ * @param port The port number to bind to
+ */
+ def startServer(route: Route, system: ActorSystem,
+ host: String = "0.0.0.0", port: Int = 8080) {
+ service = system.actorOf(Props(new ServiceActor(route)), "sprayService")
+ val ioBridge = IOExtension(system).ioBridge
+ val server = system.actorOf(Props(new HttpServer(ioBridge, SingletonHandler(service))), "httpServer")
+ server ! HttpServer.Bind(host, port)
+ }
+
+ def stopServer() { service ! Stop }
+}
View
36 jobserver/src/main/scala/spark.jobserver/CommonMessages.scala
@@ -0,0 +1,36 @@
+package spark.jobserver
+
+import akka.actor.ActorRef
+import org.joda.time.DateTime
+
+trait StatusMessage {
+ val jobId: String
+}
+
+// Messages that are sent and received by multiple actors.
+object CommonMessages {
+ // job status messages
+ case class JobStarted(jobId: String, context: String, startTime: DateTime) extends StatusMessage
+ case class JobFinished(jobId: String, endTime: DateTime) extends StatusMessage
+ case class JobValidationFailed(jobId: String, endTime: DateTime, err: Throwable) extends StatusMessage
+ case class JobErroredOut(jobId: String, endTime: DateTime, err: Throwable) extends StatusMessage
+
+ /**
+ * NOTE: For Subscribe, make sure to use `classOf[]` to get the Class for the case classes above.
+ * Otherwise, `.getClass` will get the `java.lang.Class` of the companion object.
+ */
+ case class GetJobResult(jobId: String)
+ case class JobResult(jobId: String, result: Any)
+
+ case class Subscribe(jobId: String, receiver: ActorRef, events: Set[Class[_]]) {
+ require(events.nonEmpty, "Must subscribe to at least one type of event!")
+ }
+ case class Unsubscribe(jobId: String, receiver: ActorRef) // all events for this jobId and receiving actor
+
+ // errors
+ case object NoSuchJobId
+ case object NoSuchApplication
+ case object NoSuchClass
+ case object JobInitAlready
+ case class NoJobSlotsAvailable(maxJobSlots: Int) // TODO(ilyam): maybe rename this
+}
View
38 jobserver/src/main/scala/spark.jobserver/HtmlUtils.scala
@@ -0,0 +1,38 @@
+package spark.jobserver
+
+import spark.jobserver.io.JobInfo
+import org.joda.time.{ Duration, DateTime }
+import scala.xml.Elem
+
+object HtmlUtils {
+ def pageTemplate(body: Seq[Elem]): Elem =
+ <html>
+ <head><title>Spark Job Server</title></head>
+ <body>
+ { body }
+ </body>
+ </html>
+
+ def jobsList(title: String, jobs: Seq[JobInfo]): Elem =
+ pageTemplate(
+ Seq(
+ <h1>{ title }</h1>,
+ <table border="1">
+ <tr>
+ <th>ClassName</th>
+ <th>Elapsed (secs)</th>
+ </tr>
+ {
+ jobs map { job =>
+ <tr>
+ <td>
+ { job.classPath }
+ </td>
+ <td align="right">
+ { (new Duration(job.startTime, DateTime.now).getMillis / 1000).toString }
+ </td>
+ </tr>
+ }
+ }
+ </table>))
+}
View
38 jobserver/src/main/scala/spark.jobserver/JarManager.scala
@@ -0,0 +1,38 @@
+package spark.jobserver
+
+import ooyala.common.akka.InstrumentedActor
+import spark.jobserver.io.JobDAO
+import org.joda.time.DateTime
+
+// Messages to JarManager actor
+case class StoreJar(appName: String, jarBytes: Array[Byte])
+case object ListJars
+
+// Responses
+case object InvalidJar
+case object JarStored
+
+/**
+ * An Actor that manages the jars stored by the job server. It's important that threads do not try to
+ * load a class from a jar as a new one is replacing it, so using an actor to serialize requests is perfect.
+ */
+class JarManager(jobDao: JobDAO) extends InstrumentedActor {
+ override def wrappedReceive: Receive = {
+ case ListJars => sender ! createJarsList()
+
+ case StoreJar(appName, jarBytes) =>
+ logger.info("Storing jar for app {}, {} bytes", appName, jarBytes.size)
+ if (!JarUtils.validateJarBytes(jarBytes)) {
+ sender ! InvalidJar
+ } else {
+ val uploadTime = DateTime.now()
+ jobDao.saveJar(appName, uploadTime, jarBytes)
+ sender ! JarStored
+ }
+ }
+
+ private def createJarsList() = jobDao.getApps.map {
+ case (appName, uploadTimes) =>
+ appName -> uploadTimes.head
+ }.toMap
+}
View
77 jobserver/src/main/scala/spark.jobserver/JarUtils.scala
@@ -0,0 +1,77 @@
+package spark.jobserver
+
+import java.io.File
+import java.lang.ClassLoader
+import java.lang.reflect.Constructor
+import java.net.{ URL, URLClassLoader }
+import org.slf4j.LoggerFactory
+
+/**
+ * A set of utilities for dynamically loading classes from a jar file, and saving the jar file.
+ */
+object JarUtils {
+ val logger = LoggerFactory.getLogger(getClass)
+
+ /**
+ * Loads a Scala object or class from the jarPath.
+ * See http://stackoverflow.com/questions/3216780/problem-reloading-a-jar-using-urlclassloader?lq=1
+ * See http://stackoverflow.com/questions/8867766/scala-dynamic-object-class-loading
+ *
+ *
+ * @param classOrObjectName must be the fully qualified name of the Scala object or class that
+ * implements the SparkJob trait. If an object is used, do not include the
+ * trailing '$'.
+ * @param jarPath path to the jar file.
+ * @return a 2-tuple of (Function0[C], ClassLoader). Calling the function will return a reference to
+ * the object (for objects), or a new instance of a class (for classes) that implement the
+ * SparkJob trait.
+ */
+ def loadClassOrObjectFromJar[C](classOrObjectName: String, jarPath: String): (() => C, ClassLoader) = {
+ val jarUrl = new File(jarPath).getAbsoluteFile.toURI.toURL
+ val loader = new URLClassLoader(Array(jarUrl), getClass.getClassLoader())
+
+ def fallBackToClass(): (() => C, ClassLoader) = {
+ val constructor = loadConstructor[C](classOrObjectName, jarUrl, loader)
+ (() => constructor.newInstance(), loader)
+ }
+
+ // Try loading it as an object first, if that fails, then try it as a class
+ try {
+ val objectRef = loadObject[C](classOrObjectName + "$", jarUrl, loader)
+ (() => objectRef, loader)
+ } catch {
+ case e: java.lang.ClassNotFoundException => fallBackToClass()
+ case e: java.lang.ClassCastException => fallBackToClass()
+ case e: java.lang.NoSuchMethodException => fallBackToClass()
+ case e: java.lang.NoSuchFieldException => fallBackToClass()
+ }
+ }
+
+ private def loadConstructor[C](className: String, jarUrl: URL, loader: ClassLoader): Constructor[C] = {
+ logger.info("Loading class {} from URL {}, using loader {}", className, jarUrl.toString, loader)
+ val loadedClass = loader.loadClass(className).asInstanceOf[Class[C]]
+ val result = loadedClass.getConstructor()
+ if (loadedClass.getClassLoader != loader) {
+ logger.error("Wrong ClassLoader for class {}: Expected {} but got {}", loadedClass.getName,
+ loader.toString, loadedClass.getClassLoader.toString)
+ }
+ result
+ }
+
+ private def loadObject[C](objectName: String, jarUrl: URL, loader: ClassLoader): C = {
+ logger.info("Loading object {} from URL {}, using loader {}", objectName, jarUrl.toString, loader)
+ val loadedClass = loader.loadClass(objectName)
+ val objectRef = loadedClass.getField("MODULE$").get(null).asInstanceOf[C]
+ if (objectRef.getClass.getClassLoader != loader) {
+ logger.error("Wrong ClassLoader for object {}: Expected {} but got {}", objectRef.getClass.getName,
+ loader.toString, objectRef.getClass.getClassLoader.toString)
+ }
+ objectRef
+ }
+
+ def validateJarBytes(jarBytes: Array[Byte]): Boolean = {
+ jarBytes.size > 4 &&
+ // For now just check the first few bytes are the ZIP signature: 0x04034b50 little endian
+ jarBytes(0) == 0x50 && jarBytes(1) == 0x4b && jarBytes(2) == 0x03 && jarBytes(3) == 0x04
+ }
+}
View
34 jobserver/src/main/scala/spark.jobserver/JobCache.scala
@@ -0,0 +1,34 @@
+package spark.jobserver
+
+import org.joda.time.DateTime
+import spark.jobserver.io.JobDAO
+import spark.jobserver.util.LRUCache
+
+case class JobJarInfo(constructor: () => SparkJob,
+ className: String,
+ jarFilePath: String,
+ loader: ClassLoader)
+
+/**
+ * A cache for SparkJob classes. A lot of times jobs are run repeatedly, and especially for low-latency
+ * jobs, why retrieve the jar and load it every single time?
+ */
+class JobCache(maxEntries: Int, dao: JobDAO, retrievalFunc: JobJarInfo => Unit) {
+ private val cache = new LRUCache[(String, DateTime, String), JobJarInfo](maxEntries)
+
+ /**
+ * Retrieves the given SparkJob class from the cache if it's there, otherwise use the DAO to retrieve it.
+ * @param appName the appName under which the jar was uploaded
+ * @param uploadTime the upload time for the version of the jar wanted
+ * @param classPath the fully qualified name of the class/object to load
+ */
+ def getSparkJob(appName: String, uploadTime: DateTime, classPath: String): JobJarInfo = {
+ cache.get((appName, uploadTime, classPath), {
+ val jarFilePath = dao.retrieveJarFile(appName, uploadTime)
+ val (constructor, loader) = JarUtils.loadClassOrObjectFromJar[SparkJob](classPath, jarFilePath)
+ val info = JobJarInfo(constructor, classPath, new java.io.File(jarFilePath).getAbsolutePath(), loader)
+ retrievalFunc(info)
+ info
+ })
+ }
+}
View
59 jobserver/src/main/scala/spark.jobserver/JobInfoActor.scala
@@ -0,0 +1,59 @@
+package spark.jobserver
+
+import akka.actor.ActorRef
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.Timeout
+import ooyala.common.akka.InstrumentedActor
+import spark.jobserver.ContextSupervisor.{GetContext, GetAdHocContext}
+import spark.jobserver.io.JobDAO
+
+object JobInfoActor {
+ case class GetJobStatuses(limit: Option[Int])
+}
+
+class JobInfoActor(jobDao: JobDAO, contextSupervisor: ActorRef) extends InstrumentedActor {
+ import akka.util.duration._
+ import CommonMessages._
+ import JobInfoActor._
+ import scala.util.control.Breaks._
+
+ // Used in the asks (?) below to request info from contextSupervisor and resultActor
+ implicit val ShortTimeout = Timeout(3 seconds)
+
+ override def wrappedReceive: Receive = {
+ case GetJobStatuses(limit) =>
+ val infos = jobDao.getJobInfos.values.toSeq.sortBy(_.startTime.toString())
+ if (limit.isDefined) {
+ sender ! infos.takeRight(limit.get)
+ } else {
+ sender ! infos
+ }
+
+ case GetJobResult(jobId) =>
+ breakable {
+ val jobInfoOpt = jobDao.getJobInfos.get(jobId)
+ if (!jobInfoOpt.isDefined) {
+ sender ! NoSuchJobId
+ break
+ }
+
+ jobInfoOpt.filter { job => job.isRunning || job.isErroredOut }
+ .foreach { jobInfo =>
+ sender ! jobInfo
+ break
+ }
+
+ // get the context from jobInfo
+ val context = jobInfoOpt.get.contextName
+
+ val future = (contextSupervisor ? ContextSupervisor.GetResultActor(context)).mapTo[ActorRef]
+ val resultActor = Await.result(future, 3 seconds)
+
+ val receiver = sender // must capture the sender since callbacks are run in a different thread
+ for (result <- (resultActor ? GetJobResult(jobId))) {
+ receiver ! result // a JobResult(jobId, result) object is sent
+ }
+ }
+ }
+}
View
274 jobserver/src/main/scala/spark.jobserver/JobManagerActor.scala
@@ -0,0 +1,274 @@
+package spark.jobserver
+
+import akka.actor.{ActorRef, Props, PoisonPill}
+import akka.dispatch.Future
+import com.typesafe.config.Config
+import java.net.URI
+import java.util.concurrent.atomic.AtomicInteger
+import ooyala.common.akka.InstrumentedActor
+import org.apache.spark.{ SparkEnv, SparkContext }
+import org.joda.time.DateTime
+import scala.util.Try
+import spark.jobserver.ContextSupervisor.StopContext
+import spark.jobserver.io.{ JobDAO, JobInfo, JarInfo }
+
+object JobManagerActor {
+ // Messages
+ case object Initialize
+ case class StartJob(appName: String, classPath: String, config: Config,
+ subscribedEvents: Set[Class[_]])
+
+ // Results/Data
+ case class Initialized(resultActor: ActorRef)
+ case class InitError(t: Throwable)
+ case class JobLoadingError(err: Throwable)
+}
+
+/**
+ * The JobManager actor supervises jobs running in a single SparkContext, as well as shared metadata.
+ * It creates a SparkContext.
+ * It also creates and supervises a JobResultActor and JobStatusActor, although an existing JobResultActor
+ * can be passed in as well.
+ *
+ * == Configuration ==
+ * {{{
+ * num-cpu-cores = 4 # Total # of CPU cores to allocate across the cluster
+ * memory-per-node = 512m # -Xmx style memory string for total memory to use for executor on one node
+ * dependent-jar-paths = ["local://opt/foo/my-foo-lib.jar"] # URIs for dependent jars to load for entire context
+ * max-jobs-per-context = 4 # Max # of jobs to run at the same time
+ * coarse-mesos-mode = true # per-context, rather than per-job, resource allocation
+ * rdd-ttl = 24 h # time-to-live for RDDs in a SparkContext. Don't specify = forever
+ * }}}
+ */
+class JobManagerActor(dao: JobDAO,
+ contextName: String,
+ sparkMaster: String,
+ contextConfig: Config,
+ isAdHoc: Boolean,
+ resultActorRef: Option[ActorRef] = None) extends InstrumentedActor {
+
+ import CommonMessages._
+ import JobManagerActor._
+ import scala.util.control.Breaks._
+ import collection.JavaConverters._
+
+ val config = context.system.settings.config
+
+ var sparkContext: SparkContext = _
+ var sparkEnv: SparkEnv = _
+ protected var rddManagerActor: ActorRef = _
+
+ private val maxRunningJobs = {
+ val cpuCores = Runtime.getRuntime.availableProcessors
+ Try(contextConfig.getInt("spark.jobserver.max-jobs-per-context")).getOrElse(cpuCores)
+ }
+ private val currentRunningJobs = new AtomicInteger(0)
+
+ // When the job cache retrieves a jar from the DAO, it also adds it to the SparkContext for distribution
+ // to executors. We do not want to add the same jar every time we start a new job, as that will cause
+ // the executors to re-download the jar every time, and causes race conditions.
+ private val jobCacheSize = Try(config.getInt("spark.job-cache.max-entries")).getOrElse(10000)
+ protected val jobCache = new JobCache(jobCacheSize, dao,
+ { jarInfo => sparkContext.addJar(jarInfo.jarFilePath) })
+
+ private val statusActor = context.actorOf(Props(new JobStatusActor(dao)), "status-actor")
+ protected val resultActor = resultActorRef.getOrElse(context.actorOf(Props[JobResultActor], "result-actor"))
+
+ override def postStop() {
+ logger.info("Shutting down SparkContext {}", contextName)
+ Option(sparkContext).foreach(_.stop())
+ }
+
+ def wrappedReceive: Receive = {
+ case Initialize =>
+ try {
+ sparkContext = createContextFromConfig()
+ sparkEnv = SparkEnv.get
+ rddManagerActor = context.actorOf(Props(new RddManagerActor(sparkContext)), "rdd-manager-actor")
+ getSideJars(contextConfig).foreach { jarPath => sparkContext.addJar(jarPath) }
+ sender ! Initialized(resultActor)
+ } catch {
+ case t: Throwable =>
+ logger.error("Failed to create context " + contextName + ", shutting down actor", t)
+ sender ! InitError(t)
+ self ! PoisonPill
+ }
+
+ case StartJob(appName, classPath, jobConfig, events) =>
+ startJobInternal(appName, classPath, jobConfig, events, sparkContext, sparkEnv, rddManagerActor)
+ }
+
+ def startJobInternal(appName: String,
+ classPath: String,
+ jobConfig: Config,
+ events: Set[Class[_]],
+ sparkContext: SparkContext,
+ sparkEnv: SparkEnv,
+ rddManagerActor: ActorRef): Option[Future[Any]] = {
+ var future: Option[Future[Any]] = None
+ breakable {
+ val lastUploadTime = dao.getLastUploadTime(appName)
+ if (!lastUploadTime.isDefined) {
+ sender ! NoSuchApplication
+ break
+ }
+
+ // Check appName, classPath from jar
+ val jarInfo = JarInfo(appName, lastUploadTime.get)
+ val jobId = java.util.UUID.randomUUID().toString()
+ logger.info("Loading class {} for app {}", classPath, appName: Any)
+ val jobJarInfo = try {
+ jobCache.getSparkJob(jarInfo.appName, jarInfo.uploadTime, classPath)
+ } catch {
+ case _: ClassNotFoundException =>
+ sender ! NoSuchClass
+ postEachJob()
+ break
+ null // needed for inferring type of return value
+ case err: Throwable =>
+ sender ! JobLoadingError(err)
+ postEachJob()
+ break
+ null
+ }
+
+ // Automatically subscribe the sender to events so it starts getting them right away
+ resultActor ! Subscribe(jobId, sender, events)
+ statusActor ! Subscribe(jobId, sender, events)
+
+ val jobInfo = JobInfo(jobId, contextName, jarInfo, classPath, DateTime.now(), None, None)
+ future =
+ Option(getJobFuture(jobJarInfo, jobInfo, jobConfig, sender, sparkContext, sparkEnv,
+ rddManagerActor))
+ }
+
+ future
+ }
+
+ private def getJobFuture(jobJarInfo: JobJarInfo,
+ jobInfo: JobInfo,
+ jobConfig: Config,
+ subscriber: ActorRef,
+ sparkContext: SparkContext,
+ sparkEnv: SparkEnv,
+ rddManagerActor: ActorRef): Future[Any] = {
+ // Use the SparkContext's ActorSystem threadpool for the futures, so we don't corrupt our own
+ implicit val executionContext = sparkEnv.actorSystem
+
+ val jobId = jobInfo.jobId
+ val constructor = jobJarInfo.constructor
+ logger.info("Starting Spark job {} [{}]...", jobId: Any, jobJarInfo.className)
+
+ // Atomically increment the number of currently running jobs. If the old value already exceeded the
+ // limit, decrement it back, send an error message to the sender, and return a dummy future with
+ // nothing in it.
+ if (currentRunningJobs.getAndIncrement() >= maxRunningJobs) {
+ currentRunningJobs.decrementAndGet()
+ sender ! NoJobSlotsAvailable(maxRunningJobs)
+ return Future[Any](None)
+ }
+
+ Future {
+ org.slf4j.MDC.put("jobId", jobId)
+ logger.info("Starting job future thread")
+
+ // Need to re-set the SparkEnv because it's thread-local and the Future runs on a diff thread
+ SparkEnv.set(sparkEnv)
+
+ // Set the thread classloader to our special one, otherwise nested function classes don't load
+ // Save the original classloader so we can restore it at the end, we don't want classloaders
+ // hanging around
+ val origLoader = Thread.currentThread.getContextClassLoader()
+ Thread.currentThread.setContextClassLoader(jobJarInfo.loader)
+ val job = constructor()
+ if (job.isInstanceOf[NamedRddSupport]) {
+ val namedRdds = job.asInstanceOf[NamedRddSupport].namedRddsPrivate
+ if (namedRdds.get() == null) {
+ namedRdds.compareAndSet(null, new JobServerNamedRdds(rddManagerActor))
+ }
+ }
+
+ try {
+ statusActor ! JobStatusActor.JobInit(jobInfo)
+
+ job.validate(sparkContext, jobConfig) match {
+ case SparkJobInvalid(reason) => {
+ val err = new Throwable(reason)
+ statusActor ! JobValidationFailed(jobId, DateTime.now(), err)
+ throw err
+ }
+ case SparkJobValid => {
+ statusActor ! JobStarted(jobId: String, contextName, jobInfo.startTime)
+ job.runJob(sparkContext, jobConfig)
+ }
+ }
+ } finally {
+ Thread.currentThread.setContextClassLoader(origLoader)
+ org.slf4j.MDC.remove("jobId")
+ }
+ }.andThen {
+ case Right(result: Any) =>
+ statusActor ! JobFinished(jobId, DateTime.now())
+ resultActor ! JobResult(jobId, result)
+ case Left(error: Throwable) =>
+ // If and only if job validation fails, JobErroredOut message is dropped silently in JobStatusActor.
+ statusActor ! JobErroredOut(jobId, DateTime.now(), error)
+ logger.warn("Exception from job " + jobId + ": ", error)
+ }.andThen {
+ case _ =>
+ // Make sure to decrement the count of running jobs when a job finishes, in both success and failure
+ // cases.
+ resultActor ! Unsubscribe(jobId, subscriber)
+ statusActor ! Unsubscribe(jobId, subscriber)
+ currentRunningJobs.getAndDecrement()
+ postEachJob()
+ }
+ }
+
+ def createContextFromConfig(contextName: String = contextName): SparkContext = {
+ for (cores <- Try(contextConfig.getInt("num-cpu-cores"))) {
+ System.setProperty("spark.cores.max", cores.toString)
+ }
+ // Should be a -Xmx style string eg "512m", "1G"
+ for (nodeMemStr <- Try(contextConfig.getString("memory-per-node"))) {
+ System.setProperty("spark.executor.memory", nodeMemStr)
+ }
+
+ val sparkHome: String = Try(config.getString("spark.home")).getOrElse(null)
+
+ // Set number of akka threads
+ // TODO(ilyam): need to figure out how many extra threads spark needs, besides the job threads
+ System.setProperty("spark.akka.threads", (maxRunningJobs + 4).toString)
+
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+
+ // Set the Jetty port to 0 to find a random port
+ System.setProperty("spark.ui.port", "0")
+
+ val coarseMesos = Try(contextConfig.getBoolean("coarse-mesos-mode")).getOrElse(false)
+ System.setProperty("spark.mesos.coarse", coarseMesos.toString)
+
+ // TTL for cleaning cached RDDs
+ Try(contextConfig.getMilliseconds("rdd-ttl")).foreach { ttl =>
+ System.setProperty("spark.cleaner.ttl", (ttl / 1000L).toString)
+ }
+
+ new SparkContext(sparkMaster, contextName, sparkHome)
+ }
+
+ // This method should be called after each job is succeeded or failed
+ private def postEachJob() {
+ // Delete the JobManagerActor after each adhoc job
+ if (isAdHoc)
+ context.parent ! StopContext(contextName) // its parent is LocalContextSupervisorActor
+ }
+
+ // "Side jars" are jars besides the main job jar that are needed for running the job.
+ // They are loaded from the context/job config.
+ // Each one should be an URL (http, ftp, hdfs, local, or file). local URLs are local files
+ // present on every node, whereas file:// will be assumed only present on driver node
+ private def getSideJars(config: Config): Seq[String] =
+ Try(config.getStringList("dependent-jar-paths").asScala.toSeq).getOrElse(Nil)
+}
View
50 jobserver/src/main/scala/spark.jobserver/JobResultActor.scala
@@ -0,0 +1,50 @@
+package spark.jobserver
+
+import akka.actor.ActorRef
+import ooyala.common.akka.InstrumentedActor
+import ooyala.common.akka.metrics.YammerMetrics
+import scala.collection.mutable
+import spark.jobserver.util.LRUCache
+
+/**
+ * It is an actor to manage results that are returned from jobs.
+ *
+ * TODO: support multiple subscribers for same JobID
+ */
+class JobResultActor extends InstrumentedActor with YammerMetrics {
+ import CommonMessages._
+
+ private val config = context.system.settings.config
+ private val cache = new LRUCache[String, Any](config.getInt("spark.jobserver.job-result-cache-size"))
+ private val subscribers = mutable.HashMap.empty[String, ActorRef] // subscribers
+
+ // metrics
+ val metricSubscribers = gauge("subscribers-size", subscribers.size)
+ val metricResultCache = gauge("result-cache-size", cache.size)
+
+ def wrappedReceive: Receive = {
+ case Subscribe(jobId, receiver, events) =>
+ if (events.contains(classOf[JobResult])) {
+ subscribers(jobId) = receiver
+ logger.info("Added receiver {} to subscriber list for JobID {}", receiver, jobId: Any)
+ }
+
+ case Unsubscribe(jobId, receiver) =>
+ if (!subscribers.contains(jobId)) {
+ sender ! NoSuchJobId
+ } else {
+ subscribers.remove(jobId)
+ logger.info("Removed subscriber list for JobID {}", jobId)
+ }
+
+ case GetJobResult(jobId) =>
+ sender ! cache.get(jobId).map(JobResult(jobId, _)).getOrElse(NoSuchJobId)
+
+ case JobResult(jobId, result) =>
+ cache.put(jobId, result)
+ logger.debug("Received job results for JobID {}", jobId)
+ subscribers.get(jobId).foreach(_ ! JobResult(jobId, result))
+ subscribers.remove(jobId)
+ }
+
+}
View
56 jobserver/src/main/scala/spark.jobserver/JobServer.scala
@@ -0,0 +1,56 @@
+package spark.jobserver
+
+import akka.actor.ActorSystem
+import akka.actor.Props
+import com.typesafe.config.ConfigFactory
+import java.io.File
+import spark.jobserver.io.JobDAO
+import org.slf4j.LoggerFactory
+
+/**
+ * The Spark Job Server is a web service that allows users to submit and run Spark jobs, check status,
+ * and view results.
+ * It may offer other goodies in the future.
+ * It only takes in one optional command line arg, a config file to override the default (and you can still
+ * use -Dsetting=value to override)
+ * -- Configuration --
+ * {{{
+ * spark {
+ * master = "local"
+ * jobserver {
+ * port = 8090
+ * }
+ * }
+ * }}}
+ *
+ * TODO(ev): Sumac. Cleanest config utility I've seen.
+ */
+object JobServer extends App {
+ val logger = LoggerFactory.getLogger(getClass)
+ val defaultConfig = ConfigFactory.load()
+ val config = if (args.length > 0) {
+ val configFile = new File(args(0))
+ if (!configFile.exists()) {
+ println("Could not find configuration file " + configFile)
+ sys.exit(1)
+ }
+ ConfigFactory.parseFile(configFile).withFallback(defaultConfig)
+ } else {
+ defaultConfig
+ }
+ logger.info("Starting JobServer with config {}", config.getConfig("spark").root.render())
+ val port = config.getInt("spark.jobserver.port")
+
+ // TODO(kelvin): Hardcode for now to get going. Make it configurable later.
+ val clazz = Class.forName(config.getString("spark.jobserver.jobdao"))
+ val ctor = clazz.getDeclaredConstructor(Class.forName("com.typesafe.config.Config"))
+ val jobDAO = ctor.newInstance(config).asInstanceOf[JobDAO]
+
+ val system = ActorSystem("JobServer", config)
+ val jarManager = system.actorOf(Props(new JarManager(jobDAO)), "jar-manager")
+ val supervisor = system.actorOf(Props(new LocalContextSupervisorActor(jobDAO)), "context-supervisor")
+ val jobInfo = system.actorOf(Props(new JobInfoActor(jobDAO, supervisor)), "job-info")
+ // Create initial contexts
+ supervisor ! ContextSupervisor.AddContextsFromConfig
+ new WebApi(system, config, port, jarManager, supervisor, jobInfo).start()
+}
View
95 jobserver/src/main/scala/spark.jobserver/JobServerNamedRdds.scala
@@ -0,0 +1,95 @@
+package spark.jobserver
+
+import akka.actor.ActorRef
+import akka.dispatch.Await
+import akka.util.Timeout
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+class JobServerNamedRdds(val rddManager: ActorRef) extends NamedRdds {
+ import RddManagerActorMessages._
+
+ require(rddManager != null, "rddManager ActorRef must not be null!")
+
+ def getOrElseCreate[T](name: String, rddGen: => RDD[T])
+ (implicit timeout: Timeout = defaultTimeout): RDD[T] = {
+ import akka.pattern.ask
+
+ val future = rddManager.ask(GetOrElseCreateRddRequest(name))(timeout)
+ val result: RDD[T] = Await.result(future, timeout.duration) match {
+ case Left(error: Throwable) =>
+ throw new RuntimeException("Failed to get named RDD '" + name + "'", error)
+ case Right(rdd: RDD[T]) => refreshRdd(rdd)
+ case None =>
+ // Try to generate the RDD and send the result of the operation to the rddManager.
+ try {
+ val rdd = createRdd(rddGen, name)
+ rddManager ! CreateRddResult(name, Right(rdd))
+ rdd
+ } catch {
+ case error: Throwable =>
+ rddManager ! CreateRddResult(name, Left(error))
+ throw new RuntimeException("Failed to create named RDD '" + name + "'", error)
+ }
+ }
+ result
+ }
+
+ def get[T](name: String)(implicit timeout: Timeout = defaultTimeout): Option[RDD[T]] = {
+ import akka.pattern.ask
+
+ val future = rddManager ? GetRddRequest(name)
+ Await.result(future, timeout.duration) match {
+ case rddOpt: Option[RDD[T]] => rddOpt.map { rdd => refreshRdd(rdd) }
+ }
+ }
+
+ def update[T](name: String, rddGen: => RDD[T]): RDD[T] = {
+ val rdd = createRdd(rddGen, name)
+ rddManager ! CreateRddResult(name, Right(rdd))
+ rdd
+ }
+
+ def destroy(name: String) {
+ rddManager ! DestroyRdd(name)
+ }
+
+ def getNames()(implicit timeout: Timeout = defaultTimeout): Iterable[String] = {
+ import akka.pattern.ask
+
+ val future = rddManager ? GetRddNames
+ Await.result(future, timeout.duration) match {
+ case answer: Iterable[String] => answer
+ }
+ }
+
+ /**
+ * Creates an RDD by calling the given generator, sets its name, persists it with the given storage level,
+ * and optionally forces its contents to be computed.
+ * @param rddGen a 0-ary function which will be called to generate the RDD in the caller's thread.
+ * @param name the name to assign to the RDD.
+ * @param storageLevel the storage level to persist the RDD with. Default: StorageLevel.MEMORY_ONLY.
+ * @param forceComputation if true, forces the RDD to be computed by calling count().
+ * @throws java.lang.IllegalArgumentException if forceComputation == true &&
+ * storageLevel == StorageLevel.NONE
+ */
+ private def createRdd[T](rddGen: => RDD[T],
+ name: String,
+ forceComputation: Boolean = true,
+ storageLevel: StorageLevel = defaultStorageLevel): RDD[T] = {
+ require(!forceComputation || storageLevel != StorageLevel.NONE,
+ "forceComputation implies storageLevel != NONE")
+ val rdd = rddGen
+ rdd.setName(name)
+ if (storageLevel != StorageLevel.NONE) rdd.persist(storageLevel)
+ // TODO(ilyam): figure out if there is a better way to force the RDD to be computed
+ if (forceComputation) rdd.count()
+ rdd
+ }
+
+ /** Calls rdd.persist(), which updates the RDD's cached timestamp, meaning it won't get
+ * garbage collected by Spark for some time.
+ * @param rdd the RDD
+ */
+ private def refreshRdd[T](rdd: RDD[T]): RDD[T] = rdd.persist(rdd.getStorageLevel)
+}
View
125 jobserver/src/main/scala/spark.jobserver/JobStatusActor.scala
@@ -0,0 +1,125 @@
+package spark.jobserver
+
+import akka.actor.ActorRef
+import com.yammer.metrics.core.Meter
+import ooyala.common.akka.InstrumentedActor
+import ooyala.common.akka.metrics.YammerMetrics
+import spark.jobserver.io.{ JobInfo, JobDAO }
+import scala.collection.mutable
+import scala.util.Try
+
+object JobStatusActor {
+ case class JobInit(jobInfo: JobInfo)
+ case class GetRunningJobStatus()
+}
+
+/**
+ * It is an actor to manage job status updates
+ *
+ */
+class JobStatusActor(jobDao: JobDAO) extends InstrumentedActor with YammerMetrics {
+ import CommonMessages._
+ import JobStatusActor._
+ import spark.jobserver.util.DateUtils.dateTimeToScalaWrapper
+
+ // jobId to its JobInfo
+ private val infos = new mutable.HashMap[String, JobInfo]
+ // subscribers
+ private val subscribers = new mutable.HashMap[String, mutable.MultiMap[Class[_], ActorRef]]
+
+ // metrics
+ val metricNumSubscriptions = gauge("num-subscriptions", subscribers.size)
+ val metricNumJobInfos = gauge("num-running-jobs", infos.size)
+ val metricStatusRates = mutable.HashMap.empty[String, Meter]
+
+ override def wrappedReceive: Receive = {
+ case GetRunningJobStatus =>
+ sender ! infos.values.toSeq.sortBy(_.startTime) // TODO(kelvin): Use toVector instead in Scala 2.10
+
+ case Unsubscribe(jobId, receiver) =>
+ subscribers.get(jobId) match {
+ case Some(jobSubscribers) =>
+ jobSubscribers.transform { case (event, receivers) => receivers -= receiver }
+ .retain { case (event, receivers) => receivers.nonEmpty }
+ if (jobSubscribers.isEmpty) subscribers.remove(jobId)
+ case None =>
+ // TODO(ilyam): The message below is named poorly. There may be such a job id, there are just no
+ // registered subscribers for this job id.
+ logger.error("No such job id " + jobId)
+ sender ! NoSuchJobId
+ }
+
+ case Subscribe(jobId, receiver, events) =>
+ // Subscription is independent of job life cycles. So, don't need to check infos.
+ val jobSubscribers = subscribers.getOrElseUpdate(jobId, newMultiMap())
+ events.foreach { event => jobSubscribers.addBinding(event, receiver) }
+
+ case JobInit(jobInfo) =>
+ // TODO (kelvin): Check if the jobId exists in the persistence store already
+ if (!infos.contains(jobInfo.jobId)) {
+ infos(jobInfo.jobId) = jobInfo
+ } else {
+ sender ! JobInitAlready
+ }
+
+ case msg: JobStarted =>
+ processStatus(msg, "started") {
+ case (info, msg) =>
+ info.copy(startTime = msg.startTime)
+ }
+
+ case msg: JobFinished =>
+ processStatus(msg, "finished OK", remove = true) {
+ case (info, msg) =>
+ info.copy(endTime = Some(msg.endTime))
+ }
+
+ case msg: JobValidationFailed =>
+ processStatus(msg, "validation failed", remove = true) {
+ case (info, msg) =>
+ info.copy(endTime = Some(msg.endTime), error = Some(msg.err))
+ }
+
+ case msg: JobErroredOut =>
+ processStatus(msg, "finished with an error", remove = true) {
+ case (info, msg) =>
+ info.copy(endTime = Some(msg.endTime), error = Some(msg.err))
+ }
+ }
+
+ private def processStatus[M <: StatusMessage](msg: M, logMessage: String, remove: Boolean = false)
+ (infoModifier: (JobInfo, M) => JobInfo) {
+ if (infos.contains(msg.jobId)) {
+ infos(msg.jobId) = infoModifier(infos(msg.jobId), msg)
+ logger.info("Job {} {}", msg.jobId: Any, logMessage)
+ jobDao.saveJobInfo(infos(msg.jobId))
+ publishMessage(msg.jobId, msg)
+ updateMessageRate(msg)
+ if (remove) infos.remove(msg.jobId)
+ } else {
+ logger.error("No such job id " + msg.jobId)
+ sender ! NoSuchJobId
+ }
+ }
+
+ private def updateMessageRate(msg: StatusMessage) {
+ val msgClass = msg.getClass.getCanonicalName
+
+ lazy val getShortName = Try(msgClass.split('.').last).toOption.getOrElse(msgClass)
+
+ metricStatusRates.getOrElseUpdate(msgClass, meter(getShortName, "messages")).mark()
+ }
+
+ private def publishMessage(jobId: String, message: StatusMessage) {
+ for (
+ jobSubscribers <- subscribers.get(jobId);
+ receivers <- jobSubscribers.get(message.getClass);
+ receiver <- receivers
+ ) {
+ receiver ! message
+ }
+ }
+
+ private def newMultiMap(): mutable.MultiMap[Class[_], ActorRef] =
+ new mutable.HashMap[Class[_], mutable.Set[ActorRef]] with mutable.MultiMap[Class[_], ActorRef]
+}
View
186 jobserver/src/main/scala/spark.jobserver/LocalContextSupervisorActor.scala
@@ -0,0 +1,186 @@
+package spark.jobserver
+
+import akka.actor.{ Props, ActorRef, PoisonPill }
+import akka.pattern.ask
+import akka.util.duration._
+import akka.util.Timeout
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import ooyala.common.akka.InstrumentedActor
+import spark.jobserver.io.JobDAO
+import scala.collection.mutable
+import scala.util.Try
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+
+/** Messages common to all ContextSupervisors */
+object ContextSupervisor {
+ // Messages/actions
+ case object AddContextsFromConfig // Start up initial contexts
+ case object ListContexts
+ case class AddContext(name: String, contextConfig: Config)
+ case class GetAdHocContext(classPath: String, contextConfig: Config)
+ case class GetContext(name: String) // returns JobManager, JobResultActor
+ case class GetResultActor(name: String) // returns JobResultActor
+ case class StopContext(name: String)
+
+ // Errors/Responses
+ case object ContextInitialized
+ case class ContextInitError(t: Throwable)
+ case object ContextAlreadyExists
+ case object NoSuchContext
+ case object ContextStopped
+}
+
+/**
+ * This class starts and stops JobManagers / Contexts in-process.
+ * It is responsible for watching out for the death of contexts/JobManagers.
+ *
+ * == Auto context start configuration ==
+ * Contexts can be configured to be created automatically at job server initialization.
+ * Configuration example:
+ * {{{
+ * spark {
+ * contexts {
+ * olap-demo {
+ * num-cpu-cores = 4 # Number of cores to allocate. Required.
+ * memory-per-node = 1024m # Executor memory per node, -Xmx style eg 512m, 1G, etc.
+ * }
+ * }
+ * }
+ * }}}
+ *
+ * == Other configuration ==
+ * {{{
+ * spark {
+ * jobserver {
+ * context-creation-timeout = 15 s
+ * }
+ *
+ * # Default settings for all context creation
+ * context-settings {
+ * coarse-mesos-mode = true
+ * }
+ * }
+ * }}}
+ */
+class LocalContextSupervisorActor(dao: JobDAO) extends InstrumentedActor {
+ import ContextSupervisor._
+ import scala.collection.JavaConverters._
+
+ val config = context.system.settings.config
+ val master = config.getString("spark.master")
+ val defaultContextConfig = config.getConfig("spark.context-settings")
+ val contextTimeout = config.getMilliseconds("spark.jobserver.context-creation-timeout").toInt / 1000
+ implicit val ec = context.system
+
+ private val contexts = mutable.HashMap.empty[String, ActorRef]
+ private val resultActors = mutable.HashMap.empty[String, ActorRef]
+
+ // This is for capturing results for ad-hoc jobs. Otherwise when ad-hoc job dies, resultActor also dies,
+ // and there is no way to retrieve results.
+ val globalResultActor = context.actorOf(Props[JobResultActor], "global-result-actor")
+
+ def wrappedReceive: Receive = {
+ case AddContextsFromConfig =>
+ addContextsFromConfig(config)
+
+ case ListContexts =>
+ sender ! contexts.keys.toSeq
+
+ case AddContext(name, contextConfig) =>
+ val originator = sender // Sender is a mutable reference, must capture in immutable val
+ val mergedConfig = contextConfig.withFallback(defaultContextConfig)
+ if (contexts contains name) {
+ originator ! ContextAlreadyExists
+ } else {
+ startContext(name, mergedConfig, false, contextTimeout) { contextMgr =>
+ originator ! ContextInitialized
+ } { err =>
+ originator ! ContextInitError(err)
+ }
+ }
+
+ case GetAdHocContext(classPath, contextConfig) =>
+ val originator = sender // Sender is a mutable reference, must capture in immutable val
+ logger.info("Creating SparkContext for adhoc jobs.")
+
+ val mergedConfig = contextConfig.withFallback(defaultContextConfig)
+
+ // Keep generating context name till there is no collision
+ var contextName = ""
+ do {
+ contextName = java.util.UUID.randomUUID().toString().substring(0, 8) + "-" + classPath
+ } while (contexts contains contextName)
+
+ // Create JobManagerActor and JobResultActor
+ startContext(contextName, mergedConfig, true, contextTimeout) { contextMgr =>
+ originator ! (contexts(contextName), resultActors(contextName))
+ } { err =>
+ originator ! ContextInitError(err)
+ }
+
+ case GetResultActor(name) =>
+ sender ! resultActors.get(name).getOrElse(globalResultActor)
+
+ case GetContext(name) =>
+ if (contexts contains name) {
+ sender ! (contexts(name), resultActors(name))
+ } else {
+ sender ! NoSuchContext
+ }
+
+ case StopContext(name) =>
+ if (contexts contains name) {
+ logger.info("Shutting down context {}", name)
+ contexts(name) ! PoisonPill
+ contexts.remove(name)
+ resultActors.remove(name)
+ sender ! ContextStopped
+ } else {
+ sender ! NoSuchContext
+ }
+ }
+
+ private def startContext(name: String, contextConfig: Config, isAdHoc: Boolean, timeoutSecs: Int = 1)
+ (successFunc: ActorRef => Unit)
+ (failureFunc: Throwable => Unit) {
+ require(!(contexts contains name), "There is already a context named " + name)
+ logger.info("Creating a SparkContext named {}", name)
+
+ val resultActorRef = if (isAdHoc) Some(globalResultActor) else None
+ val ref = context.actorOf(Props(
+ new JobManagerActor(dao, name, master, contextConfig, isAdHoc, resultActorRef)), name)
+ (ref ? JobManagerActor.Initialize)(Timeout(timeoutSecs.second)).onComplete {
+ case Left(e: Exception) =>
+ logger.error("Exception after sending Initialize to JobManagerActor", e)
+ // Make sure we try to shut down the context in case it gets created anyways
+ ref ! PoisonPill
+ failureFunc(e)
+ case Right(JobManagerActor.Initialized(resultActor)) =>
+ logger.info("SparkContext {} initialized", name)
+ contexts(name) = ref
+ resultActors(name) = resultActor
+ successFunc(ref)
+ case Right(JobManagerActor.InitError(t)) =>
+ ref ! PoisonPill
+ failureFunc(t)
+ case x =>
+ logger.warn("Unexpected message received by startContext: {}", x)
+ }
+ }
+
+ // Adds the contexts from the config file
+ private def addContextsFromConfig(config: Config) {
+ for (contexts <- Try(config.getObject("spark.contexts"))) {
+ contexts.keySet().asScala.foreach { contextName =>
+ val contextConfig = config.getConfig("spark.contexts." + contextName)
+ .withFallback(defaultContextConfig)
+ startContext(contextName, contextConfig, false, contextTimeout) { ref => } {
+ e => logger.error("Unable to start context " + contextName, e)
+ }
+ Thread sleep 500 // Give some spacing so multiple contexts can be created
+ }
+ }
+ }
+}
View
99 jobserver/src/main/scala/spark.jobserver/NamedRddSupport.scala
@@ -0,0 +1,99 @@
+package spark.jobserver
+
+import akka.util.Timeout
+import java.lang.ThreadLocal
+import java.util.concurrent.atomic.AtomicReference
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+trait NamedRdds {
+ // Default timeout is 60 seconds. Hopefully that is enough to let most RDD generator functions finish.
+ val defaultTimeout = akka.util.Timeout(akka.util.Duration(60, java.util.concurrent.TimeUnit.SECONDS))
+
+ // Default level to cache RDDs at.
+ val defaultStorageLevel = StorageLevel.MEMORY_ONLY
+
+ /**
+ * Gets an RDD with the given name, or creates it if one doesn't already exist.
+ *
+ * If the given RDD has already been computed by another job and cached in memory, this method will return
+ * a reference to the cached RDD. If the RDD has never been computed, then the generator will be called
+ * to compute it, in the caller's thread, and the result will be cached and returned to the caller.
+ *
+ * If an RDD is requested by thread B while thread A is generating the RDD, thread B will block up to
+ * the duration specified by @timeout. If thread A finishes generating the RDD within that time, then
+ * thread B will get a reference to the newly-created RDD. If thread A does not finish generating the
+ * RDD within that time, then thread B will throw a timeout exception.
+ *
+ * @param name the unique name of the RDD. The uniqueness is scoped to the current SparkContext.
+ * @param rddGen a 0-ary function which will generate the RDD if it doesn't already exist.
+ * @param timeout if the RddManager doesn't respond within this timeout, an error will be thrown.
+ * @tparam T the generic type of the RDD.
+ * @return the RDD with the given name.
+ * @throws java.util.concurrent.TimeoutException if the request to the RddManager times out.
+ * @throws java.lang.RuntimeException wrapping any error that occurs within the generator function.
+ */
+ def getOrElseCreate[T](name: String, rddGen: => RDD[T])
+ (implicit timeout: Timeout = defaultTimeout): RDD[T]
+
+ /**
+ * Gets an RDD with the given name if it already exists and is cached by the RddManager.
+ * If the RDD does not exist, None is returned.
+ *
+ * Note that a previously-known RDD could 'disappear' if it hasn't been used for a while, because the
+ * SparkContext garbage-collects old cached RDDs.
+ *
+ * @param name the unique name of the RDD. The uniqueness is scoped to the current SparkContext.
+ * @param timeout if the RddManager doesn't respond within this timeout, an error will be thrown.
+ * @tparam T the generic type of the RDD.
+ * @return the RDD with the given name.
+ * @throws java.util.concurrent.TimeoutException if the request to the RddManager times out.
+ */
+ def get[T](name: String)(implicit timeout: Timeout = defaultTimeout): Option[RDD[T]]
+
+ /**
+ * Replaces an existing RDD with a given name with a new RDD. If an old RDD for the given name existed,
+ * it is un-persisted (non-blocking) and destroyed. It is safe to call this method when there is no
+ * existing RDD with the given name. If multiple threads call this around the same time, the end result
+ * is undefined - one of the generated RDDs will win and will be returned from future calls to get().
+ *
+ * The rdd generator function will be called from the caller's thread. Note that if this is called at the
+ * same time as getOrElseCreate() for the same name, and completes before the getOrElseCreate() call,
+ * then threads waiting for the result of getOrElseCreate() will unblock with the result of this
+ * update() call. When the getOrElseCreate() succeeds, it will replace the result of this update() call.
+ *
+ * @param name the unique name of the RDD. The uniqueness is scoped to the current SparkContext.
+ * @param rddGen a 0-ary function which will be called to generate the RDD in the caller's thread.
+ * @tparam T the generic type of the RDD.
+ * @return the RDD with the given name.
+ */
+ def update[T](name: String, rddGen: => RDD[T]): RDD[T]
+
+ /**
+ * Destroys an RDD with the given name, if one existed. Has no effect if no RDD with this name exists.
+ *
+ * @param name the unique name of the RDD. The uniqueness is scoped to the current SparkContext.
+ */
+ def destroy(name: String): Unit
+
+ /**
+ * Returns the names of all named RDDs that are managed by the RddManager.
+ *
+ * Note: this returns a snapshot of RDD names at one point in time. The caller should always expect
+ * that the data returned from this method may be stale and incorrect.
+
+ * @param timeout if the RddManager doesn't respond within this timeout, an error will be thrown.
+ * @return a collection of RDD names representing RDDs managed by the RddManager.
+ */
+ def getNames()(implicit timeout: Timeout = defaultTimeout): Iterable[String]
+}
+
+trait NamedRddSupport { self: SparkJob =>
+ // Note: the JobManagerActor sets the correct NamedRdds instance here before before running our job.
+ private[jobserver] val namedRddsPrivate: AtomicReference[NamedRdds] = new AtomicReference[NamedRdds](null)
+
+ def namedRdds: NamedRdds = namedRddsPrivate.get() match {
+ case null => throw new NullPointerException("namedRdds value is null!")
+ case rdds: NamedRdds => rdds
+ }
+}
View
92 jobserver/src/main/scala/spark.jobserver/RddManagerActor.scala
@@ -0,0 +1,92 @@
+package spark.jobserver
+
+import akka.actor.ActorRef
+import ooyala.common.akka.InstrumentedActor
+import ooyala.common.akka.metrics.YammerMetrics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import scala.collection.mutable
+
+object RddManagerActorMessages {
+ // Message which asks to retrieve an RDD by name. If no such RDD is found, None will be returned.
+ case class GetRddRequest(name: String)
+
+ // Message which asks to retrieve an RDD by name. Different from GetRddRequest, because it tells the
+ // RddManager that the client is willing to create the RDD with this name if one does not already exist.
+ case class GetOrElseCreateRddRequest(name: String)
+
+ // Message which tells the RddManager that a new RDD has been created, or that RDD generation failed.
+ case class CreateRddResult(name: String, rddOrError: Either[Throwable, RDD[_]])
+
+ // Message which tells the RddManager that an RDD should be destroyed and all of its cached blocks removed
+ case class DestroyRdd(name: String)
+
+ // Message which asks for the names of all RDDs currently managed by the RddManager
+ case object GetRddNames
+}
+
+class RddManagerActor(sparkContext: SparkContext) extends InstrumentedActor with YammerMetrics {
+ import RddManagerActorMessages._
+
+ private val namesToIds = new mutable.HashMap[String, Int]()
+ private val waiters =
+ new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
+ private val inProgress = mutable.Set[String]()
+
+ def wrappedReceive: Receive = {
+ case GetRddRequest(name) => sender ! getExistingRdd(name)
+
+ case GetOrElseCreateRddRequest(name) if inProgress.contains(name) =>
+ logger.info("RDD [{}] already being created, actor {} added to waiters list", name: Any, sender.path)
+ waiters.addBinding(name, sender)
+
+ case GetOrElseCreateRddRequest(name) => getExistingRdd(name) match {
+ case Some(rdd) => sender ! Right(rdd)
+ case None =>
+ logger.info("RDD [{}] not found, starting creation", name)
+ inProgress.add(name)
+ sender ! None
+ }
+
+ // TODO(ilyam): log the error?
+ case CreateRddResult(name, Left(error)) => notifyAndClearWaiters(name, Left(error))
+
+ case CreateRddResult(name, Right(rdd)) =>
+ val oldRddOption = getExistingRdd(name)
+ namesToIds(name) = rdd.id
+ notifyAndClearWaiters(name, Right(rdd))
+ // Note: unpersist the old rdd we just replaced, if there was one
+ if (oldRddOption.isDefined && oldRddOption.get.id != rdd.id) {
+ oldRddOption.get.unpersist(blocking = false)
+ }
+
+ case DestroyRdd(name) => getExistingRdd(name).foreach { rdd =>
+ namesToIds.remove(name)
+ rdd.unpersist(blocking = false)
+ }
+
+ case GetRddNames =>
+ val persistentRdds = sparkContext.getPersistentRDDs
+ val result = namesToIds.collect { case (name, id) if persistentRdds.contains(id) => name }
+ // optimization: can remove stale names from our map if the SparkContext has unpersisted them.
+ (namesToIds.keySet -- result).foreach { staleName => namesToIds.remove(staleName) }
+ sender ! result
+ }
+
+ private def getExistingRdd(name: String): Option[RDD[_]] =
+ namesToIds.get(name).flatMap { id => sparkContext.getPersistentRDDs.get(id) } match {
+ case Some(rdd) => Some(rdd)
+ case None =>
+ // If this happens, maybe we never knew about this RDD, or maybe we had a name -> id mapping, but
+ // spark's MetadataCleaner has evicted this RDD from the cache because it was too old, and we need
+ // to forget about it. Remove it from our names -> ids map and respond as if we never knew about it.
+ namesToIds.remove(name)
+ None
+ }
+
+ private def notifyAndClearWaiters(name: String, message: Any) {
+ waiters.get(name).foreach { actors => actors.foreach { actor => actor ! message } }
+ waiters.remove(name) // Note: this removes all bindings for the key in the MultiMap
+ inProgress.remove(name) // this RDD is no longer being computed, clear in progress flag
+ }
+}
View
41 jobserver/src/main/scala/spark.jobserver/SparkJob.scala
@@ -0,0 +1,41 @@
+package spark.jobserver
+
+import com.typesafe.config.Config
+import org.apache.spark.SparkContext
+
+sealed trait SparkJobValidation {
+ // NOTE(harish): We tried using lazy eval here by passing in a function
+ // instead, which worked fine with tests but when run with the job-server
+ // it would just hang and timeout. This is something worth investigating
+ def &&(sparkValidation: SparkJobValidation): SparkJobValidation = this match {
+ case SparkJobValid => sparkValidation
+ case x => x
+ }
+}
+case object SparkJobValid extends SparkJobValidation
+case class SparkJobInvalid(reason: String) extends SparkJobValidation
+
+/**
+ * This trait is the main API for Spark jobs submitted to the Job Server.
+ */
+trait SparkJob {
+ /**
+ * This is the entry point for a Spark Job Server to execute Spark jobs.
+ * This function should create or reuse RDDs and return the result at the end, which the
+ * Job Server will cache or display.
+ * @param sc a SparkContext for the job. May be reused across jobs.
+ * @param jobConfig the Typesafe Config object passed into the job request
+ * @return the job result
+ */
+ def runJob(sc: SparkContext, jobConfig: Config): Any
+
+ /**
+ * This method is called by the job server to allow jobs to validate their input and reject
+ * invalid job requests. If SparkJobInvalid is returned, then the job server returns 400
+ * to the user.
+ * NOTE: this method should return very quickly. If it responds slowly then the job server may time out
+ * trying to start this job.
+ * @return either SparkJobValid or SparkJobInvalid
+ */
+ def validate(sc: SparkContext, config: Config): SparkJobValidation
+}
View
329 jobserver/src/main/scala/spark.jobserver/WebApi.scala
@@ -0,0 +1,329 @@
+package spark.jobserver
+
+import akka.actor.{ ActorSystem, ActorRef }
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.util.duration._
+import com.typesafe.config.{ Config, ConfigFactory, ConfigException }
+import java.util.NoSuchElementException
+import ooyala.common.akka.web.WebService
+import spark.jobserver.io.JobInfo
+import scala.util.Try
+import org.joda.time.DateTime
+import org.slf4j.LoggerFactory
+import spray.http.HttpResponse
+import spray.http.MediaTypes
+import spray.http.StatusCodes
+import spray.httpx.SprayJsonSupport.sprayJsonMarshaller
+import spray.json.DefaultJsonProtocol._
+import spray.routing.{ HttpService, Route, RequestContext }
+
+class WebApi(system: ActorSystem, config: Config, port: Int,
+ jarManager: ActorRef, supervisor: ActorRef, jobInfo: ActorRef)
+ extends HttpService {
+ import CommonMessages._
+ import ContextSupervisor._
+
+ // Get spray-json type classes for serializing Map[String, Any]
+ import ooyala.common.akka.web.JsonUtils._
+
+ override def actorRefFactory: ActorSystem = system
+ implicit val ec = system
+ implicit val ShortTimeout = Timeout(3 seconds)
+ val DefaultSyncTimeout = Timeout(10 seconds)
+ val DefaultJobLimit = 50
+ val StatusKey = "status"
+ val ResultKey = "result"
+
+ val contextTimeout = Try(config.getMilliseconds("spark.jobserver.context-creation-timeout").toInt / 1000)
+ .getOrElse(15)
+
+ val logger = LoggerFactory.getLogger(getClass)
+
+ val myRoutes = jarRoutes ~ contextRoutes ~ jobRoutes ~ otherRoutes
+
+ def start() {
+ logger.info("Starting browser web service...")
+ WebService.startServer(myRoutes, system, "0.0.0.0", port)
+ }
+
+ /**
+ * Routes for listing and uploading jars
+ * GET /jars - lists all current jars
+ * POST /jars/<appName> - upload a new jar file
+ */
+ def jarRoutes: Route = pathPrefix("jars") {
+ // GET /jars route returns a JSON map of the app name and the last time a jar was uploaded.
+ get { ctx =>
+ val future = (jarManager ? ListJars).mapTo[collection.Map[String, DateTime]]
+ future.map { jarTimeMap =>
+ val stringTimeMap = jarTimeMap.map { case (app, dt) => (app, dt.toString()) }.toMap
+ ctx.complete(stringTimeMap)
+ }.recover {
+ case e: Exception => ctx.complete(500, errMap(e, "ERROR"))
+ }
+ } ~
+ // POST /jars/<appName>
+ // The <appName> needs to be unique; uploading a jar with the same appName will replace it.
+ post {
+ path(PathElement) { appName =>
+ entity(as[Array[Byte]]) { jarBytes =>
+ val future = jarManager ? StoreJar(appName, jarBytes)
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ future.map {
+ case JarStored => ctx.complete(StatusCodes.OK)
+ case InvalidJar => badRequest(ctx, "Jar is not of the right format")
+ }.recover {
+ case e: Exception => ctx.complete(500, errMap(e, "ERROR"))
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Routes for listing, adding, and stopping contexts
+ * GET /contexts - lists all current contexts
+ * POST /contexts/<contextName> - creates a new context
+ * DELETE /contexts/<contextName> - stops a context and all jobs running in it
+ */
+ def contextRoutes: Route = pathPrefix("contexts") {
+ import ContextSupervisor._
+ import collection.JavaConverters._
+ get { ctx =>
+ (supervisor ? ListContexts).mapTo[Seq[String]]
+ .map { contexts => ctx.complete(contexts) }
+ } ~
+ post {
+ /**
+ * POST /contexts/<contextName>?<optional params> -
+ * Creates a long-running context with contextName and options for context creation
+ * All options are merged into the defaults in spark.context-settings
+ *
+ * @optional @param num-cpu-cores Int - Number of cores the context will use
+ * @optional @param mem-per-node String - -Xmx style string (512m, 1g, etc) for max memory per worker node
+ * @return the string "OK", or error if context exists or could not be initialized
+ */
+ path(PathElement) { (contextName) =>
+ // Enforce user context name to start with letters
+ if (!contextName.head.isLetter) {
+ complete(StatusCodes.BadRequest, errMap("context name must start with letters"))
+ } else {
+ parameterMap { (params) =>
+ val config = ConfigFactory.parseMap(params.asJava)
+ val future = supervisor ? AddContext(contextName, config)
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ future.map {
+ case ContextInitialized => ctx.complete(StatusCodes.OK)
+ case ContextAlreadyExists => badRequest(ctx, "context " + contextName + " exists")
+ case ContextInitError(e) => ctx.complete(500, errMap(e, "CONTEXT INIT ERROR"))
+ }
+ }
+ }
+ }
+ }
+ } ~
+ delete {
+ // DELETE /contexts/<contextName>
+ // Stop the context with the given name. Executors will be shut down and all cached RDDs
+ // and currently running jobs will be lost. Use with care!
+ path(PathElement) { (contextName) =>
+ val future = supervisor ? StopContext(contextName)
+ respondWithMediaType(MediaTypes.`text/plain`) { ctx =>
+ future.map {
+ case ContextStopped => ctx.complete(StatusCodes.OK)
+ case NoSuchContext => notFound(ctx, "context " + contextName + " not found")
+ }
+ }
+ }
+ }
+ }
+
+ def otherRoutes: Route = get {
+ // Main index.html page
+ path("") {
+ // respondWithMediaType(MediaTypes.`text/html`) { ctx =>
+ // // Marshal to HTML so page displays in browsers
+ // (supervisor ? ListJobs).mapTo[Seq[JobInfo]].map { jobs =>
+ // val currentJobs = jobs.filter { _.endTime == None }
+ // ctx.complete(HtmlUtils.jobsList("Current Jobs", currentJobs))
+ // }
+ // }
+ complete("Not implemented")
+ }
+ }
+
+ val errorEvents: Set[Class[_]] = Set(classOf[JobErroredOut], classOf[JobValidationFailed])
+ val asyncEvents = Set(classOf[JobStarted]) ++ errorEvents
+ val syncEvents = Set(classOf[JobResult]) ++ errorEvents
+
+ /**
+ * Main routes for starting a job, listing existing jobs, getting job results
+ */
+ def jobRoutes: Route = pathPrefix("jobs") {
+
+ import JobManagerActor._
+
+ // GET /jobs/<jobId> returns the result in JSON form in a table
+ // JSON result always starts with: {"status": "ERROR" / "OK" / "RUNNING"}
+ // If the job isn't finished yet, then {"status": "RUNNING" | "ERROR"} is returned.
+ (get & path(PathElement)) { jobId =>
+ val future = jobInfo ? GetJobResult(jobId)
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ future.map {
+ case NoSuchJobId =>
+ notFound(ctx, "No such job ID " + jobId.toString)
+ case JobInfo(_, _, _, _, _, None, _) =>
+ ctx.complete(Map(StatusKey -> "RUNNING"))
+ case JobInfo(_, _, _, _, _, _, Some(ex)) =>
+ ctx.complete(Map(StatusKey -> "ERROR", "ERROR" -> formatException(ex)))
+ case JobResult(_, result) =>
+ ctx.complete(resultToTable(result))
+ }
+ }
+ } ~
+ /**
+ * GET /jobs -- returns a JSON list of hashes containing job status, ex:
+ * [
+ * {jobId: "word-count-2013-04-22", status: "RUNNING"}
+ * ]
+ * @optional @param limit Int - optional limit to number of jobs to display, defaults to 50
+ */
+ get {
+ parameters('limit.as[Int] ?) { (limitOpt) =>
+ val limit = limitOpt.getOrElse(DefaultJobLimit)
+ val future = (jobInfo ? JobInfoActor.GetJobStatuses(Some(limit))).mapTo[Seq[JobInfo]]
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ future.map { infos =>
+ val jobReport = infos.map { info =>
+ Map("jobId" -> info.jobId,
+ "startTime" -> info.startTime.toString(),
+ "classPath" -> info.classPath,
+ "context" -> (if (info.contextName.isEmpty) "<<ad-hoc>>" else info.contextName),
+ "duration" -> getJobDurationString(info)) ++ (info match {
+ case JobInfo(_, _, _, _, _, None, _) => Map(StatusKey -> "RUNNING")
+ case JobInfo(_, _, _, _, _, _, Some(ex)) => Map(StatusKey -> "ERROR",
+ ResultKey -> formatException(ex))
+ case JobInfo(_, _, _, _, _, Some(e), None) => Map(StatusKey -> "FINISHED")
+ })
+ }
+ ctx.complete(jobReport)
+ }
+ }
+ }
+ } ~
+ /**
+ * POST /jobs -- Starts a new job. The job JAR must have been previously uploaded, and
+ * the classpath must refer to an object that implements SparkJob. The `validate()`
+ * API will be invoked before `runJob`.
+ *
+ * @entity The POST entity should be a Typesafe Config format file;
+ * It will be merged with the job server's config file at startup.
+ * @required @param appName String - the appName for the job JAR
+ * @required @param classPath String - the fully qualified class path for the job
+ * @optional @param context String - the name of the context to run the job under. If not specified,
+ * then a temporary context is allocated for the job
+ * @optional @param sync Boolean if "true", then wait for and return results, otherwise return job Id
+ * @optional @param timeout Int - the number of seconds to wait for sync results to come back
+ * @return JSON result of { StatusKey -> "OK" | "ERROR", ResultKey -> "result"}, where "result" is
+ * either the job id, or a result
+ */
+ post {
+ entity(as[String]) { configString =>
+ parameters('appName, 'classPath,
+ 'context ?, 'sync.as[Boolean] ?, 'timeout.as[Int] ?) {
+ (appName, classPath, contextOpt, syncOpt, timeoutOpt) =>
+ try {
+ val async = !syncOpt.getOrElse(false)
+ val jobConfig = ConfigFactory.parseString(configString).withFallback(config)
+ val contextConfig = Try(jobConfig.getConfig("spark.context-settings")).
+ getOrElse(ConfigFactory.empty)
+ val jobManager = getJobManagerForContext(contextOpt, contextConfig, classPath)
+ val events = if (async) asyncEvents else syncEvents
+ val timeout = timeoutOpt.map(t => Timeout(t.seconds)).getOrElse(DefaultSyncTimeout)
+ val future = jobManager.get.ask(
+ JobManagerActor.StartJob(appName, classPath, jobConfig, events))(timeout)
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ future.map {
+ case JobResult(_, res) => ctx.complete(resultToTable(res))
+ case JobErroredOut(_, _, ex) => ctx.complete(errMap(ex, "ERROR"))
+ case JobStarted(jobId, context, _) =>
+ ctx.complete(202, Map[String, Any](
+ StatusKey -> "STARTED",
+ ResultKey -> Map("jobId" -> jobId, "context" -> context)))
+ case JobValidationFailed(_, _, ex) =>
+ ctx.complete(400, errMap(ex, "VALIDATION FAILED"))
+ case NoSuchApplication => notFound(ctx, "appName " + appName + " not found")
+ case NoSuchClass => notFound(ctx, "classPath " + classPath + " not found")
+ case JobLoadingError(err) =>
+ ctx.complete(500, errMap(err, "JOB LOADING FAILED"))
+ case NoJobSlotsAvailable(maxJobSlots) =>
+ val errorMsg = "Too many running jobs (" + maxJobSlots.toString +
+ ") for job context '" + contextOpt.getOrElse("ad-hoc") + "'"
+ ctx.complete(503, Map(StatusKey -> "NO SLOTS AVAILABLE", ResultKey -> errorMsg))
+ case ContextInitError(e) => ctx.complete(500, errMap(e, "CONTEXT INIT FAILED"))
+ }.recover {
+ case e: Exception => ctx.complete(500, errMap(e, "ERROR"))
+ }
+ }
+ } catch {
+ case e: NoSuchElementException =>
+ complete(StatusCodes.NotFound, errMap("context " + contextOpt.get + " not found"))
+ case e: ConfigException =>
+ complete(StatusCodes.BadRequest, errMap("Cannot parse config: " + e.getMessage))
+ case e: Exception =>
+ complete(500, errMap(e, "ERROR"))
+ }
+ }
+ }
+ }
+ }
+
+ private def badRequest(ctx: RequestContext, msg: String) =
+ ctx.complete(StatusCodes.BadRequest, errMap(msg))
+
+ private def notFound(ctx: RequestContext, msg: String) =
+ ctx.complete(StatusCodes.NotFound, errMap(msg))
+
+ private def errMap(errMsg: String) = Map(StatusKey -> "ERROR", ResultKey -> errMsg)
+
+ private def errMap(t: Throwable, status: String) =
+ Map(StatusKey -> status, ResultKey -> formatException(t))
+
+ private def getJobDurationString(info: JobInfo): String =
+ info.jobLengthMillis.map { ms => ms / 1000.0 + " secs" }.getOrElse("Job not done yet")
+
+ def resultToMap(result: Any): Map[String, Any] = result match {
+ case m: Map[_, _] => m.map { case (k, v) => (k.toString, v) }.toMap
+ case s: Seq[_] => s.zipWithIndex.map { case (item, idx) => (idx.toString, item) }.toMap
+ case a: Array[_] => a.toSeq.zipWithIndex.map { case (item, idx) => (idx.toString, item) }.toMap
+ case item => Map(ResultKey -> item)
+ }
+
+ def resultToTable(result: Any): Map[String, Any] = {
+ Map(StatusKey -> "OK", ResultKey -> result)
+ }
+
+ def formatException(t: Throwable): Any =
+ Map("message" -> t.getMessage(),
+ "errorClass" -> t.getClass.getName,
+ "stack" -> t.getStackTrace.map(_.toString).toSeq)
+
+ private def getJobManagerForContext(context: Option[String],
+ contextConfig: Config,
+ classPath: String): Option[ActorRef] = {
+ import ContextSupervisor._
+ val msg =
+ if (context.isDefined)
+ GetContext(context.get)
+ else
+ GetAdHocContext(classPath, contextConfig)
+ Await.result(supervisor ? msg, contextTimeout.seconds) match {
+ case (manager: ActorRef, resultActor: ActorRef) => Some(manager)
+ case NoSuchContext => None
+ case ContextInitError(err) => throw new RuntimeException(err)
+ }
+ }
+}
View
80 jobserver/src/main/scala/spark.jobserver/io/JobDAO.scala
@@ -0,0 +1,80 @@
+package spark.jobserver.io
+
+import com.typesafe.config.ConfigException
+import org.joda.time.{ Duration, DateTime }
+
+// Uniquely identifies the jar used to run a job
+case class JarInfo(appName: String, uploadTime: DateTime)
+
+// Both a response and used to track job progress
+// NOTE: if endTime is not None, then the job has finished.
+case class JobInfo(jobId: String, contextName: String,
+ jarInfo: JarInfo, classPath: String,
+ startTime: DateTime, endTime: Option[DateTime],
+ error: Option[Throwable]) {
+ def jobLengthMillis: Option[Long] = endTime.map { end => new Duration(startTime, end).getMillis() }
+
+ def isRunning: Boolean = !endTime.isDefined
+ def isErroredOut: Boolean = endTime.isDefined && error.isDefined
+}
+
+/**
+ * Core trait for data access objects for persisting data such as jars, applications, jobs, etc.
+ */
+trait JobDAO {
+ /**
+ * Persist a jar.
+ *
+ * @param appName
+ * @param uploadTime
+ * @param jarBytes
+ */
+ def saveJar(appName: String, uploadTime: DateTime, jarBytes: Array[Byte])
+
+ /**
+ * Return all applications name and their upload times.
+ *
+ * @return
+ */
+ def getApps: Map[String, Seq[DateTime]]
+
+ /**
+ * TODO(kelvin): Remove this method later when JarManager doesn't use it anymore.
+ *
+ * @param appName
+ * @param uploadTime
+ * @return the local file path of the retrieved jar file.
+ */
+ def retrieveJarFile(appName: String, uploadTime: DateTime): String
+
+ /**
+ * Persist a job info.
+ *
+ * @param jobInfo
+ */
+ def saveJobInfo(jobInfo: JobInfo)
+
+ /**
+ * Return all job ids to their job info.
+ *
+ * @return
+ */
+ def getJobInfos: Map[String, JobInfo]
+
+ /**
+ * Returns the last upload time for a given app name.
+ * @return Some(lastUploadedTime) if the app exists and the list of times is nonempty, None otherwise
+ */
+ def getLastUploadTime(appName: String): Option[DateTime] =
+ getApps.get(appName).flatMap { uploadTimes => uploadTimes.headOption }
+
+ /**
+ * A safe API for getting values from Typesafe Config, will return a default if the
+ * value is missing. If the value is badly formatted, error still goes through.
+ */
+ def getOrElse[T](getter: => T, default: T): T = {
+ try getter catch {
+ case e: ConfigException.Missing => default
+ }
+ }
+}
View
150 jobserver/src/main/scala/spark.jobserver/io/JobFileDAO.scala
@@ -0,0 +1,150 @@
+package spark.jobserver.io
+
+import com.typesafe.config.Config
+import java.io._
+import org.joda.time.DateTime
+import org.slf4j.LoggerFactory
+import scala.collection.mutable
+
+class JobFileDAO(config: Config) extends JobDAO {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ // appName to its set of upload times. Decreasing times in the seq.
+ private val apps = mutable.HashMap.empty[String, Seq[DateTime]]
+ // jobId to its JobInfo
+ private val jobs = mutable.HashMap.empty[String, JobInfo]
+
+ private val rootDir = getOrElse(config.getString("spark.jobserver.filedao.rootdir"),
+ "/tmp/spark-jobserver/filedao/data")
+ private val rootDirFile = new File(rootDir)
+ logger.info("rootDir is " + rootDirFile.getAbsolutePath)
+
+ private val jarsFile = new File(rootDirFile, "jars.data")
+ private var jarsOutputStream: DataOutputStream = null
+ private val jobsFile = new File(rootDirFile, "jobs.data")
+ private var jobsOutputStream: DataOutputStream = null
+
+ init()
+
+ private def init() {
+ // create the date directory if it doesn't exist
+ if (!rootDirFile.exists()) {
+ if (!rootDirFile.mkdirs()) {
+ throw new RuntimeException("Could not create directory " + rootDir)
+ }
+ }
+
+ // read back all apps info during startup
+ if (jarsFile.exists()) {
+ val in = new DataInputStream(new BufferedInputStream(new FileInputStream(jarsFile)))
+ try {
+ while (true) {
+ val jarInfo = readJarInfo(in)
+ addJar(jarInfo.appName, jarInfo.uploadTime)
+ }
+ } catch {
+ case e: EOFException => // do nothing
+
+ } finally {
+ in.close()
+ }
+ }
+
+ // read back all jobs info during startup
+ if (jobsFile.exists()) {
+ val in = new DataInputStream(new BufferedInputStream(new FileInputStream(jobsFile)))
+ try {
+ while (true) {
+ val jobInfo = readJobInfo(in)
+ jobs(jobInfo.jobId) = jobInfo
+ }
+ } catch {
+ case eof: EOFException => // do nothing
+ case e: Exception => throw e
+
+ } finally {
+ in.close()
+ }
+ }
+
+ // Don't buffer the stream. I want the apps meta data log directly into the file.
+ // Otherwise, server crash will lose the buffer data.
+ jarsOutputStream = new DataOutputStream(new FileOutputStream(jarsFile, true))
+ jobsOutputStream = new DataOutputStream(new FileOutputStream(jobsFile, true))
+
+ }
+
+ override def saveJar(appName: String, uploadTime: DateTime, jarBytes: Array[Byte]) {
+ // The order is important. Save the jar file first and then log it into jobsFile.
+ val outFile = new File(rootDir, createJarName(appName, uploadTime) + ".jar")
+ val bos = new BufferedOutputStream(new FileOutputStream(outFile))
+ try {
+ logger.debug("Writing {} bytes to file {}", jarBytes.size, outFile.getPath)
+ bos.write(jarBytes)
+ bos.flush()
+ } finally {
+ bos.close()
+ }
+
+ // log it into jobsFile
+ writeJarInfo(jarsOutputStream, JarInfo(appName, uploadTime))
+
+ // track the new jar in memory
+ addJar(appName, uploadTime)
+ }
+
+ private def writeJarInfo(out: DataOutputStream, jarInfo: JarInfo) {
+ out.writeUTF(jarInfo.appName)
+ out.writeLong(jarInfo.uploadTime.getMillis)
+ }
+
+ private def readJarInfo(in: DataInputStream) = JarInfo(in.readUTF, new DateTime(in.readLong))
+
+ private def addJar(appName: String, uploadTime: DateTime) {
+ if (apps.contains(appName)) {
+ apps(appName) = uploadTime +: apps(appName) // latest time comes first
+ } else {
+ apps(appName) = Seq(uploadTime)
+ }
+ }
+
+ def getApps: Map[String, Seq[DateTime]] = apps.toMap
+
+ override def retrieveJarFile(appName: String, uploadTime: DateTime): String =
+ new File(rootDir, createJarName(appName, uploadTime) + ".jar").getAbsolutePath
+
+ private def createJarName(appName: String, uploadTime: DateTime): String = appName + "-" + uploadTime
+
+ override def saveJobInfo(jobInfo: JobInfo) {
+ writeJobInfo(jobsOutputStream, jobInfo)
+ jobs(jobInfo.jobId) = jobInfo
+ }
+
+ private def writeJobInfo(out: DataOutputStream, jobInfo: JobInfo) {
+ out.writeUTF(jobInfo.jobId)
+ out.writeUTF(jobInfo.contextName)
+ writeJarInfo(out, jobInfo.jarInfo)
+ out.writeUTF(jobInfo.classPath)
+ out.writeLong(jobInfo.startTime.getMillis)
+ val time = if (jobInfo.endTime.isEmpty) jobInfo.startTime.getMillis else jobInfo.endTime.get.getMillis
+ out.writeLong(time)
+ val errorStr = if (jobInfo.error.isEmpty) "" else jobInfo.error.get.toString
+ out.writeUTF(errorStr)
+ }
+
+ private def readError(in: DataInputStream) = {
+ val error = in.readUTF()
+ if (error == "") None else Some(new Throwable(error))
+ }
+
+ private def readJobInfo(in: DataInputStream) = JobInfo(
+ in.readUTF,
+ in.readUTF,
+ readJarInfo(in),
+ in.readUTF,
+ new DateTime(in.readLong),
+ Some(new DateTime(in.readLong)),
+ readError(in))
+
+ override def getJobInfos: Map[String, JobInfo] = jobs.toMap
+}
View
31 jobserver/src/main/scala/spark.jobserver/util/DateUtils.scala
@@ -0,0 +1,31 @@
+package spark.jobserver.util
+
+import org.joda.time.format.ISODateTimeFormat
+import org.joda.time.{DateTime, DateTimeComparator, DateTimeZone}
+
+
+object DateUtils {
+ val ZeroTime = dtFromUtcSeconds(0)
+
+ private val iso8601format = ISODateTimeFormat.dateTimeNoMillis()
+ private val iso8601withMillis = ISODateTimeFormat.dateTime()
+ private val dateComparator = DateTimeComparator.getInstance()
+
+ def iso8601(dt: DateTime, fractions: Boolean = false): String =
+ if (fractions) iso8601withMillis.print(dt) else iso8601format.print(dt)
+
+ @inline def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
+
+ @inline def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
+
+ /**
+ * Implicit conversions so we can use Scala comparison operators
+ * with JodaTime's DateTime
+ */
+ implicit def dateTimeToScalaWrapper(dt: DateTime): DateTimeWrapper = new DateTimeWrapper(dt)
+
+ class DateTimeWrapper(dt: DateTime) extends Ordered[DateTime] with Ordering[DateTime] {
+ def compare(that: DateTime): Int = dateComparator.compare(dt, that)
+ def compare(a: DateTime, b: DateTime): Int = dateComparator.compare(a, b)
+ }
+}
View
50 jobserver/src/main/scala/spark.jobserver/util/LRUCache.scala
@@ -0,0 +1,50 @@
+package spark.jobserver.util
+
+import java.util.Map.Entry
+import java.lang.ref.SoftReference
+
+/**
+ * A convenience class to define a Least-Recently-Used Cache with a maximum size.
+ * The oldest entries by time of last access will be removed when the number of entries exceeds
+ * cacheSize.
+ * For definitions of cacheSize and loadingFactor, see the docs for java.util.LinkedHashMap
+ * @see LinkedHashMap
+ */
+class LRUCache[K, V](cacheSize: Int, loadingFactor: Float = 0.75F) {
+
+ private val cache = {
+ val initialCapacity = math.ceil(cacheSize / loadingFactor).toInt + 1
+ new java.util.LinkedHashMap[K, V](initialCapacity, loadingFactor, true) {
+ protected override def removeEldestEntry(p1: Entry[K, V]): Boolean = size() > cacheSize
+ }
+ }
+
+ private var cacheMiss = 0
+ private var cacheHit = 0
+
+ /** size of the cache. This is an exact number and runs in constant time */
+ def size: Int = cache.size()
+
+ /** @return TRUE if the cache contains the key */
+ def containsKey(k: K): Boolean = cache.get(k) != null
+
+ /** @return the value in cache or load a new value into cache */
+ def get(k: K, v: => V): V = {
+ cache.get(k) match {
+ case null =>
+ val evaluatedV = v
+ cache.put(k, evaluatedV)
+ cacheMiss += 1
+ evaluatedV
+ case vv =>
+ cacheHit += 1
+ vv
+ }
+ }
+
+ def cacheHitRatio: Double = cacheMiss.toDouble / math.max(cacheMiss + cacheHit, 1)
+
+ def put(k: K, v: V): V = cache.put(k, v)
+
+ def get(k: