diff --git a/job-server/src/main/resources/application.conf b/job-server/src/main/resources/application.conf
index 0dd5fd0..2d6ff64 100644
--- a/job-server/src/main/resources/application.conf
+++ b/job-server/src/main/resources/application.conf
@@ -1,6 +1,8 @@
# Settings for safe local mode development
spark {
master = "local[4]"
+ # spark web UI port
+ webUrlPort = 8080
jobserver {
port = 8090
diff --git a/job-server/src/spark.jobserver/JobServer.scala b/job-server/src/spark.jobserver/JobServer.scala
index be1df34..e4bcbbe 100644
--- a/job-server/src/spark.jobserver/JobServer.scala
+++ b/job-server/src/spark.jobserver/JobServer.scala
@@ -52,9 +52,10 @@ object JobServer {
val jarManager = system.actorOf(Props(classOf[JarManager], jobDAO), "jar-manager")
val supervisor = system.actorOf(Props(classOf[LocalContextSupervisorActor], jobDAO), "context-supervisor")
val jobInfo = system.actorOf(Props(classOf[JobInfoActor], jobDAO, supervisor), "job-info")
+ val sparkWebUi = system.actorOf(Props(classOf[SparkWebUiActor]), "spark-web-ui")
// Create initial contexts
supervisor ! ContextSupervisor.AddContextsFromConfig
- new WebApi(system, config, port, jarManager, supervisor, jobInfo).start()
+ new WebApi(system, config, port, jarManager, supervisor, jobInfo, sparkWebUi).start()
}
def main(args: Array[String]) {
diff --git a/job-server/src/spark.jobserver/SparkWebUiActor.scala b/job-server/src/spark.jobserver/SparkWebUiActor.scala
new file mode 100644
index 0000000..d4dda23
--- /dev/null
+++ b/job-server/src/spark.jobserver/SparkWebUiActor.scala
@@ -0,0 +1,90 @@
+package spark.jobserver
+
+import akka.actor.ActorSystem
+import akka.io.IO
+import akka.pattern.ask
+import akka.util.Timeout
+import ooyala.common.akka.InstrumentedActor
+import scala.util.{Success, Failure}
+import scala.concurrent.Future
+import spark.jobserver.SparkWebUiActor.{SparkWorkersErrorInfo, SparkWorkersInfo, GetWorkerStatus}
+import spray.can.Http
+import spray.client.pipelining.{Get, sendReceive, SendReceive}
+import spray.http.HttpResponse
+
+object SparkWebUiActor {
+ // Requests
+ case class GetWorkerStatus()
+
+ // Responses
+ case class SparkWorkersInfo(alive: Int, dead: Int)
+ case class SparkWorkersErrorInfo(message :String)
+}
+/**
+ * This actor pulls Spark worker status info (ALIVE, DEAD etc) from Spark admin web ui
+ * Collecting worker info from HTML page is not ideal.
+ * But at this time Spark does not provide public API yet to expose worker status.
+ * Also, the current implementation only works for Spark standalone mode
+ */
+class SparkWebUiActor extends InstrumentedActor {
+ import actorSystem.dispatcher // execution context for futures
+ import scala.concurrent.duration._
+
+ implicit val actorSystem: ActorSystem = context.system
+
+ // Used in the asks (?) below to request info from contextSupervisor and resultActor
+ implicit val shortTimeout = Timeout(3 seconds)
+
+ val config = context.system.settings.config
+
+ val sparkWebHostUrl = getSparkHostName()
+ val sparkWebHostPort = config.getInt("spark.webUrlPort")
+
+ val pipeline: Future[SendReceive] =
+ for (
+ Http.HostConnectorInfo(connector, _) <-
+ IO(Http) ? Http.HostConnectorSetup(sparkWebHostUrl, port = sparkWebHostPort)
+ ) yield sendReceive(connector)
+
+ override def postStop() {
+ logger.info("Shutting down actor system for SparkWebUiActor")
+ }
+
+ override def wrappedReceive: Receive = {
+ case GetWorkerStatus() =>
+ val request = Get("/")
+ logger.info("Get the request for spark web UI")
+
+ val theSender = sender
+ val responseFuture: Future[HttpResponse] = pipeline.flatMap(_(request))
+ responseFuture onComplete {
+ case Success(httpResponse) =>
+ val content = httpResponse.entity.asString;
+
+ val aliveWorkerNum = "
ALIVE | ".r.findAllIn(content).length
+ val deadWorkerNum = "DEAD | ".r.findAllIn(content).length
+
+ theSender ! SparkWorkersInfo(aliveWorkerNum, deadWorkerNum)
+ case Failure(error) =>
+ val msg = s"Failed to retrieve Spark web UI $sparkWebHostUrl:$sparkWebHostPort"
+ logger.error( msg )
+ theSender ! SparkWorkersErrorInfo(msg)
+ }
+ }
+
+ def getSparkHostName(): String = {
+ val master = config.getString("spark.master")
+ // Regular expression used for local[N] and local[*] master formats
+ val LOCAL_N_REGEX = """local\[([0-9\*]+)\]""".r
+ // Regular expression for connecting to Spark deploy clusters
+ val SPARK_REGEX = """spark://(.*):.*""".r
+
+ master match {
+ case "localhost" | "local" | LOCAL_N_REGEX(_) => "localhost"
+ case SPARK_REGEX(sparkUrl) => sparkUrl
+ case _ => throw new RuntimeException("Could not parse Master URL: '" + master + "'")
+ }
+ }
+
+
+}
diff --git a/job-server/src/spark.jobserver/WebApi.scala b/job-server/src/spark.jobserver/WebApi.scala
index ac8a4af..d157332 100644
--- a/job-server/src/spark.jobserver/WebApi.scala
+++ b/job-server/src/spark.jobserver/WebApi.scala
@@ -8,6 +8,7 @@ import java.util.NoSuchElementException
import ooyala.common.akka.web.{ WebService, CommonRoutes }
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
+import spark.jobserver.SparkWebUiActor.{SparkWorkersErrorInfo, SparkWorkersInfo, GetWorkerStatus}
import scala.concurrent.{Await, ExecutionContext}
import scala.util.Try
import spark.jobserver.io.JobInfo
@@ -19,7 +20,7 @@ import spray.json.DefaultJsonProtocol._
import spray.routing.{ HttpService, Route, RequestContext }
class WebApi(system: ActorSystem, config: Config, port: Int,
- jarManager: ActorRef, supervisor: ActorRef, jobInfo: ActorRef)
+ jarManager: ActorRef, supervisor: ActorRef, jobInfo: ActorRef, sparkWebUiActor: ActorRef)
extends HttpService with CommonRoutes {
import CommonMessages._
import ContextSupervisor._
@@ -38,10 +39,11 @@ class WebApi(system: ActorSystem, config: Config, port: Int,
val contextTimeout = Try(config.getMilliseconds("spark.jobserver.context-creation-timeout").toInt / 1000)
.getOrElse(15)
+ val sparkAliveWorkerThreshold = Try(config.getInt("spark.jobserver.sparkAliveWorkerThreshold")).getOrElse(1)
val logger = LoggerFactory.getLogger(getClass)
- val myRoutes = jarRoutes ~ contextRoutes ~ jobRoutes ~ otherRoutes
+ val myRoutes = jarRoutes ~ contextRoutes ~ jobRoutes ~ healthzRoutes ~ sparkHealthzRoutes ~ otherRoutes
def start() {
logger.info("Starting browser web service...")
@@ -141,6 +143,46 @@ class WebApi(system: ActorSystem, config: Config, port: Int,
}
}
+ /**
+ * Routes for getting health status of Spark cluster
+ * GET /sparkHealthz - return OK or error message
+ */
+ def sparkHealthzRoutes: Route = pathPrefix("sparkHealthz") {
+ get { ctx =>
+ logger.info("Receiving sparkHealthz check request")
+ val future = sparkWebUiActor ? GetWorkerStatus()
+ future.map {
+ case SparkWorkersInfo(dead, alive) =>
+ if ( dead > 0 ) {
+ logger.warn( "Spark dead worker non-zero: " + dead)
+ }
+ if ( alive > sparkAliveWorkerThreshold ) {
+ ctx.complete("OK")
+ } else {
+ logger.error( "Spark alive worker below threshold: " + alive)
+ ctx.complete("ERROR")
+ }
+
+ case SparkWorkersErrorInfo =>
+ ctx.complete("ERROR")
+
+ }.recover {
+ case e: Exception => ctx.complete(500, errMap(e, "ERROR"))
+ }
+ }
+ }
+
+ /**
+ * Routes for getting health status of job server
+ * GET /healthz - return OK or error message
+ */
+ def healthzRoutes: Route = pathPrefix("healthz") {
+ get { ctx =>
+ logger.info("Receiving healthz check request")
+ ctx.complete("OK")
+ }
+ }
+
def otherRoutes: Route = get {
implicit val ar = actorRefFactory
@@ -150,8 +192,6 @@ class WebApi(system: ActorSystem, config: Config, port: Int,
} ~ pathPrefix("html") {
// Static files needed by index.html
getFromResourceDirectory("html")
- } ~ path("healthz") {
- complete("OK")
}
}
diff --git a/job-server/test/spark.jobserver/SparkWebUiActorSpec.scala b/job-server/test/spark.jobserver/SparkWebUiActorSpec.scala
new file mode 100644
index 0000000..9ac318f
--- /dev/null
+++ b/job-server/test/spark.jobserver/SparkWebUiActorSpec.scala
@@ -0,0 +1,133 @@
+package spark.jobserver
+
+import akka.actor.{Props, ActorRef, ActorSystem}
+import akka.io.IO
+import akka.pattern.ask
+import akka.testkit.{TestKit, ImplicitSender}
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{FunSpec, BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.matchers.ShouldMatchers
+import spray.client.pipelining._
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import akka.util.Timeout
+import akka.actor._
+import spray.can.Http
+import spray.http._
+import HttpMethods._
+import SparkWebUiActor._
+
+import scala.util.{Failure, Success}
+
+object SparkWebUiActorSpec {
+ val sparkWebUrl = "localhost"
+ val sparkWebPort = 8098
+ val config = ConfigFactory.parseString(s"""
+ spark {
+ master = "spark://localhost:7077"
+ webUrl = $sparkWebUrl
+ webUrlPort = $sparkWebPort
+ temp-contexts {
+ num-cpu-cores = 4 # Number of cores to allocate. Required.
+ memory-per-node = 512m # Executor memory per node, -Xmx style eg 512m, 1G, etc.
+ }
+ jobserver.job-result-cache-size = 100
+ jobserver.context-creation-timeout = 5 s
+ jobserver.context-factory = spark.jobserver.util.DefaultSparkContextFactory
+ contexts {
+ olap-demo {
+ num-cpu-cores = 4
+ memory-per-node = 512m
+ }
+ }
+ context-settings {
+ num-cpu-cores = 2
+ memory-per-node = 512m
+ }
+ }
+ akka.log-dead-letters = 0
+ """)
+
+ val system = ActorSystem("test", config)
+}
+
+/**
+ * Created by senqiang on 8/22/14.
+ */
+
+// simple http service
+class SimpleHttpServer extends Actor with ActorLogging {
+ implicit val timeout: Timeout = 1.second // for the actor 'asks'
+ import context.dispatcher // ExecutionContext for the futures and scheduler
+
+ def receive = {
+ // when a new connection comes in we register ourselves as the connection handler
+ case _: Http.Connected => sender ! Http.Register(self)
+ case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
+ sender ! HttpResponse(entity =
+ """ALIVE |
+ |DEAD |
+ """.stripMargin)
+ }
+ case HttpRequest(GET, Uri.Path("/stop"), _, _, _) =>
+ sender ! HttpResponse(entity = "Shutting down in 1 second ...")
+ sender ! Http.Close
+ context.system.scheduler.scheduleOnce(1.second) { context.system.shutdown() }
+ }
+}
+
+class SparkWebUiActorSpec extends TestKit(SparkWebUiActorSpec.system) with ImplicitSender
+ with FunSpec with ShouldMatchers with BeforeAndAfter with BeforeAndAfterAll {
+
+ // Used in the asks (?) below to request info from contextSupervisor and resultActor
+ implicit val ShortTimeout = Timeout(3 seconds)
+
+ override def beforeAll(): Unit = {
+ import akka.actor.{ActorSystem, Props}
+ import akka.io.IO
+ import spray.can.Http
+
+ implicit val system = ActorSystem()
+ // the handler actor replies to incoming HttpRequests
+ val handler = system.actorOf(Props[SimpleHttpServer], name = "simpleHttpServer")
+ IO(Http) ! Http.Bind(handler, interface = SparkWebUiActorSpec.sparkWebUrl, port = SparkWebUiActorSpec.sparkWebPort)
+ }
+
+ override def afterAll() {
+ ooyala.common.akka.AkkaTestUtils.shutdownAndWait(SparkWebUiActorSpec.system)
+
+ // close the web service
+ implicit val system = ActorSystem("test")
+ import system.dispatcher // execution context for futures below
+
+ val pipeline: Future[SendReceive] =
+ for (
+ Http.HostConnectorInfo(connector, _) <-
+ IO(Http) ? Http.HostConnectorSetup(SparkWebUiActorSpec.sparkWebUrl, port = SparkWebUiActorSpec.sparkWebPort)
+ ) yield sendReceive(connector)
+
+ val request = Get("/stop")
+ pipeline.flatMap(_(request)) // async call
+
+ }
+
+ var actor: ActorRef = _
+
+ before {
+ actor = SparkWebUiActorSpec.system.actorOf(Props(classOf[SparkWebUiActor]), "spark-web-ui")
+ }
+
+ after {
+ ooyala.common.akka.AkkaTestUtils.shutdownAndWait(actor)
+ }
+
+ describe("SparkWebUiActor") {
+ it("should get worker info") {
+ val future = actor ? GetWorkerStatus()
+ val result = Await.result(future, ShortTimeout.duration).asInstanceOf[SparkWorkersInfo]
+ result.alive should equal (1)
+ result.dead should equal (1)
+ }
+ }
+}
\ No newline at end of file
diff --git a/job-server/test/spark.jobserver/WebApiSpec.scala b/job-server/test/spark.jobserver/WebApiSpec.scala
index 343ce96..8be95e5 100644
--- a/job-server/test/spark.jobserver/WebApiSpec.scala
+++ b/job-server/test/spark.jobserver/WebApiSpec.scala
@@ -2,6 +2,7 @@ package spark.jobserver
import akka.actor.{Actor, Props}
import com.typesafe.config.ConfigFactory
+import spark.jobserver.SparkWebUiActor.{GetWorkerStatus, SparkWorkersInfo}
import spark.jobserver.io.{JobInfo, JarInfo}
import org.joda.time.DateTime
import org.scalatest.matchers.ShouldMatchers
@@ -30,7 +31,8 @@ with ScalatestRouteTest with HttpService {
// for actors declared as inner classes we need to pass this as first arg
val dummyActor = system.actorOf(Props(classOf[DummyActor], this))
val statusActor = system.actorOf(Props(classOf[JobStatusActor], new InMemoryDAO))
- val api = new WebApi(system, config, dummyPort, dummyActor, dummyActor, dummyActor)
+
+ val api = new WebApi(system, config, dummyPort, dummyActor, dummyActor, dummyActor, dummyActor)
val routes = api.myRoutes
val dt = DateTime.parse("2013-05-29T00Z")
@@ -94,7 +96,7 @@ with ScalatestRouteTest with HttpService {
case GetJobConfig("badjobid") => sender ! NoSuchJobId
case GetJobConfig(_) => sender ! config
-
+ case GetWorkerStatus() => sender ! SparkWorkersInfo(2,0)
}
}
@@ -362,5 +364,14 @@ with ScalatestRouteTest with HttpService {
}
}
}
+
+ describe("spark alive workers") {
+ it("should return OK") {
+ // responseAs[] uses spray-json to convert JSON results back to types for easier checking
+ Get("/healthz") ~> sealRoute(routes) ~> check {
+ status should be (OK)
+ }
+ }
+ }
}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 32be86a..e94ca4d 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -22,6 +22,7 @@ object Dependencies {
"io.spray" %% "spray-json" % "1.2.5",
"io.spray" % "spray-can" % "1.2.0",
"io.spray" % "spray-routing" % "1.2.0",
+ "io.spray" % "spray-client" % "1.2.0",
yammerDeps
) ++ yodaDeps