Skip to content

Commit

Permalink
Merge pull request #5 from spark-jobserver/sc-hang
Browse files Browse the repository at this point in the history
Code & unit test for detecting hang Spark workers
  • Loading branch information
myxjtu committed Sep 2, 2014
2 parents 2c61b38 + 2a2fec9 commit d0ab8b6
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 7 deletions.
2 changes: 2 additions & 0 deletions job-server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Settings for safe local mode development
spark {
master = "local[4]"
# spark web UI port
webUrlPort = 8080

jobserver {
port = 8090
Expand Down
3 changes: 2 additions & 1 deletion job-server/src/spark.jobserver/JobServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ object JobServer {
val jarManager = system.actorOf(Props(classOf[JarManager], jobDAO), "jar-manager")
val supervisor = system.actorOf(Props(classOf[LocalContextSupervisorActor], jobDAO), "context-supervisor")
val jobInfo = system.actorOf(Props(classOf[JobInfoActor], jobDAO, supervisor), "job-info")
val sparkWebUi = system.actorOf(Props(classOf[SparkWebUiActor]), "spark-web-ui")
// Create initial contexts
supervisor ! ContextSupervisor.AddContextsFromConfig
new WebApi(system, config, port, jarManager, supervisor, jobInfo).start()
new WebApi(system, config, port, jarManager, supervisor, jobInfo, sparkWebUi).start()
}

def main(args: Array[String]) {
Expand Down
90 changes: 90 additions & 0 deletions job-server/src/spark.jobserver/SparkWebUiActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package spark.jobserver

import akka.actor.ActorSystem
import akka.io.IO
import akka.pattern.ask
import akka.util.Timeout
import ooyala.common.akka.InstrumentedActor
import scala.util.{Success, Failure}
import scala.concurrent.Future
import spark.jobserver.SparkWebUiActor.{SparkWorkersErrorInfo, SparkWorkersInfo, GetWorkerStatus}
import spray.can.Http
import spray.client.pipelining.{Get, sendReceive, SendReceive}
import spray.http.HttpResponse

object SparkWebUiActor {
// Requests
case class GetWorkerStatus()

// Responses
case class SparkWorkersInfo(alive: Int, dead: Int)
case class SparkWorkersErrorInfo(message :String)
}
/**
* This actor pulls Spark worker status info (ALIVE, DEAD etc) from Spark admin web ui
* Collecting worker info from HTML page is not ideal.
* But at this time Spark does not provide public API yet to expose worker status.
* Also, the current implementation only works for Spark standalone mode
*/
class SparkWebUiActor extends InstrumentedActor {
import actorSystem.dispatcher // execution context for futures
import scala.concurrent.duration._

implicit val actorSystem: ActorSystem = context.system

// Used in the asks (?) below to request info from contextSupervisor and resultActor
implicit val shortTimeout = Timeout(3 seconds)

val config = context.system.settings.config

val sparkWebHostUrl = getSparkHostName()
val sparkWebHostPort = config.getInt("spark.webUrlPort")

val pipeline: Future[SendReceive] =
for (
Http.HostConnectorInfo(connector, _) <-
IO(Http) ? Http.HostConnectorSetup(sparkWebHostUrl, port = sparkWebHostPort)
) yield sendReceive(connector)

override def postStop() {
logger.info("Shutting down actor system for SparkWebUiActor")
}

override def wrappedReceive: Receive = {
case GetWorkerStatus() =>
val request = Get("/")
logger.info("Get the request for spark web UI")

val theSender = sender
val responseFuture: Future[HttpResponse] = pipeline.flatMap(_(request))
responseFuture onComplete {
case Success(httpResponse) =>
val content = httpResponse.entity.asString;

val aliveWorkerNum = "<td>ALIVE</td>".r.findAllIn(content).length
val deadWorkerNum = "<td>DEAD</td>".r.findAllIn(content).length

theSender ! SparkWorkersInfo(aliveWorkerNum, deadWorkerNum)
case Failure(error) =>
val msg = s"Failed to retrieve Spark web UI $sparkWebHostUrl:$sparkWebHostPort"
logger.error( msg )
theSender ! SparkWorkersErrorInfo(msg)
}
}

def getSparkHostName(): String = {
val master = config.getString("spark.master")
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9\*]+)\]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """spark://(.*):.*""".r

master match {
case "localhost" | "local" | LOCAL_N_REGEX(_) => "localhost"
case SPARK_REGEX(sparkUrl) => sparkUrl
case _ => throw new RuntimeException("Could not parse Master URL: '" + master + "'")
}
}


}
48 changes: 44 additions & 4 deletions job-server/src/spark.jobserver/WebApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import java.util.NoSuchElementException
import ooyala.common.akka.web.{ WebService, CommonRoutes }
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import spark.jobserver.SparkWebUiActor.{SparkWorkersErrorInfo, SparkWorkersInfo, GetWorkerStatus}
import scala.concurrent.{Await, ExecutionContext}
import scala.util.Try
import spark.jobserver.io.JobInfo
Expand All @@ -19,7 +20,7 @@ import spray.json.DefaultJsonProtocol._
import spray.routing.{ HttpService, Route, RequestContext }

class WebApi(system: ActorSystem, config: Config, port: Int,
jarManager: ActorRef, supervisor: ActorRef, jobInfo: ActorRef)
jarManager: ActorRef, supervisor: ActorRef, jobInfo: ActorRef, sparkWebUiActor: ActorRef)
extends HttpService with CommonRoutes {
import CommonMessages._
import ContextSupervisor._
Expand All @@ -38,10 +39,11 @@ class WebApi(system: ActorSystem, config: Config, port: Int,

val contextTimeout = Try(config.getMilliseconds("spark.jobserver.context-creation-timeout").toInt / 1000)
.getOrElse(15)
val sparkAliveWorkerThreshold = Try(config.getInt("spark.jobserver.sparkAliveWorkerThreshold")).getOrElse(1)

val logger = LoggerFactory.getLogger(getClass)

val myRoutes = jarRoutes ~ contextRoutes ~ jobRoutes ~ otherRoutes
val myRoutes = jarRoutes ~ contextRoutes ~ jobRoutes ~ healthzRoutes ~ sparkHealthzRoutes ~ otherRoutes

def start() {
logger.info("Starting browser web service...")
Expand Down Expand Up @@ -141,6 +143,46 @@ class WebApi(system: ActorSystem, config: Config, port: Int,
}
}

/**
* Routes for getting health status of Spark cluster
* GET /sparkHealthz - return OK or error message
*/
def sparkHealthzRoutes: Route = pathPrefix("sparkHealthz") {
get { ctx =>
logger.info("Receiving sparkHealthz check request")
val future = sparkWebUiActor ? GetWorkerStatus()
future.map {
case SparkWorkersInfo(dead, alive) =>
if ( dead > 0 ) {
logger.warn( "Spark dead worker non-zero: " + dead)
}
if ( alive > sparkAliveWorkerThreshold ) {
ctx.complete("OK")
} else {
logger.error( "Spark alive worker below threshold: " + alive)
ctx.complete("ERROR")
}

case SparkWorkersErrorInfo =>
ctx.complete("ERROR")

}.recover {
case e: Exception => ctx.complete(500, errMap(e, "ERROR"))
}
}
}

/**
* Routes for getting health status of job server
* GET /healthz - return OK or error message
*/
def healthzRoutes: Route = pathPrefix("healthz") {
get { ctx =>
logger.info("Receiving healthz check request")
ctx.complete("OK")
}
}

def otherRoutes: Route = get {
implicit val ar = actorRefFactory

Expand All @@ -150,8 +192,6 @@ class WebApi(system: ActorSystem, config: Config, port: Int,
} ~ pathPrefix("html") {
// Static files needed by index.html
getFromResourceDirectory("html")
} ~ path("healthz") {
complete("OK")
}
}

Expand Down
133 changes: 133 additions & 0 deletions job-server/test/spark.jobserver/SparkWebUiActorSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package spark.jobserver

import akka.actor.{Props, ActorRef, ActorSystem}
import akka.io.IO
import akka.pattern.ask
import akka.testkit.{TestKit, ImplicitSender}
import com.typesafe.config.ConfigFactory
import org.scalatest.{FunSpec, BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.matchers.ShouldMatchers
import spray.client.pipelining._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import akka.util.Timeout
import akka.actor._
import spray.can.Http
import spray.http._
import HttpMethods._
import SparkWebUiActor._

import scala.util.{Failure, Success}

object SparkWebUiActorSpec {
val sparkWebUrl = "localhost"
val sparkWebPort = 8098
val config = ConfigFactory.parseString(s"""
spark {
master = "spark://localhost:7077"
webUrl = $sparkWebUrl
webUrlPort = $sparkWebPort
temp-contexts {
num-cpu-cores = 4 # Number of cores to allocate. Required.
memory-per-node = 512m # Executor memory per node, -Xmx style eg 512m, 1G, etc.
}
jobserver.job-result-cache-size = 100
jobserver.context-creation-timeout = 5 s
jobserver.context-factory = spark.jobserver.util.DefaultSparkContextFactory
contexts {
olap-demo {
num-cpu-cores = 4
memory-per-node = 512m
}
}
context-settings {
num-cpu-cores = 2
memory-per-node = 512m
}
}
akka.log-dead-letters = 0
""")

val system = ActorSystem("test", config)
}

/**
* Created by senqiang on 8/22/14.
*/

// simple http service
class SimpleHttpServer extends Actor with ActorLogging {
implicit val timeout: Timeout = 1.second // for the actor 'asks'
import context.dispatcher // ExecutionContext for the futures and scheduler

def receive = {
// when a new connection comes in we register ourselves as the connection handler
case _: Http.Connected => sender ! Http.Register(self)
case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
sender ! HttpResponse(entity =
"""<td>ALIVE</td>
|<td>DEAD</td>
""".stripMargin)
}
case HttpRequest(GET, Uri.Path("/stop"), _, _, _) =>
sender ! HttpResponse(entity = "Shutting down in 1 second ...")
sender ! Http.Close
context.system.scheduler.scheduleOnce(1.second) { context.system.shutdown() }
}
}

class SparkWebUiActorSpec extends TestKit(SparkWebUiActorSpec.system) with ImplicitSender
with FunSpec with ShouldMatchers with BeforeAndAfter with BeforeAndAfterAll {

// Used in the asks (?) below to request info from contextSupervisor and resultActor
implicit val ShortTimeout = Timeout(3 seconds)

override def beforeAll(): Unit = {
import akka.actor.{ActorSystem, Props}
import akka.io.IO
import spray.can.Http

implicit val system = ActorSystem()
// the handler actor replies to incoming HttpRequests
val handler = system.actorOf(Props[SimpleHttpServer], name = "simpleHttpServer")
IO(Http) ! Http.Bind(handler, interface = SparkWebUiActorSpec.sparkWebUrl, port = SparkWebUiActorSpec.sparkWebPort)
}

override def afterAll() {
ooyala.common.akka.AkkaTestUtils.shutdownAndWait(SparkWebUiActorSpec.system)

// close the web service
implicit val system = ActorSystem("test")
import system.dispatcher // execution context for futures below

val pipeline: Future[SendReceive] =
for (
Http.HostConnectorInfo(connector, _) <-
IO(Http) ? Http.HostConnectorSetup(SparkWebUiActorSpec.sparkWebUrl, port = SparkWebUiActorSpec.sparkWebPort)
) yield sendReceive(connector)

val request = Get("/stop")
pipeline.flatMap(_(request)) // async call

}

var actor: ActorRef = _

before {
actor = SparkWebUiActorSpec.system.actorOf(Props(classOf[SparkWebUiActor]), "spark-web-ui")
}

after {
ooyala.common.akka.AkkaTestUtils.shutdownAndWait(actor)
}

describe("SparkWebUiActor") {
it("should get worker info") {
val future = actor ? GetWorkerStatus()
val result = Await.result(future, ShortTimeout.duration).asInstanceOf[SparkWorkersInfo]
result.alive should equal (1)
result.dead should equal (1)
}
}
}
15 changes: 13 additions & 2 deletions job-server/test/spark.jobserver/WebApiSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package spark.jobserver

import akka.actor.{Actor, Props}
import com.typesafe.config.ConfigFactory
import spark.jobserver.SparkWebUiActor.{GetWorkerStatus, SparkWorkersInfo}
import spark.jobserver.io.{JobInfo, JarInfo}
import org.joda.time.DateTime
import org.scalatest.matchers.ShouldMatchers
Expand Down Expand Up @@ -30,7 +31,8 @@ with ScalatestRouteTest with HttpService {
// for actors declared as inner classes we need to pass this as first arg
val dummyActor = system.actorOf(Props(classOf[DummyActor], this))
val statusActor = system.actorOf(Props(classOf[JobStatusActor], new InMemoryDAO))
val api = new WebApi(system, config, dummyPort, dummyActor, dummyActor, dummyActor)

val api = new WebApi(system, config, dummyPort, dummyActor, dummyActor, dummyActor, dummyActor)
val routes = api.myRoutes

val dt = DateTime.parse("2013-05-29T00Z")
Expand Down Expand Up @@ -94,7 +96,7 @@ with ScalatestRouteTest with HttpService {

case GetJobConfig("badjobid") => sender ! NoSuchJobId
case GetJobConfig(_) => sender ! config

case GetWorkerStatus() => sender ! SparkWorkersInfo(2,0)
}
}

Expand Down Expand Up @@ -362,5 +364,14 @@ with ScalatestRouteTest with HttpService {
}
}
}

describe("spark alive workers") {
it("should return OK") {
// responseAs[] uses spray-json to convert JSON results back to types for easier checking
Get("/healthz") ~> sealRoute(routes) ~> check {
status should be (OK)
}
}
}
}

1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ object Dependencies {
"io.spray" %% "spray-json" % "1.2.5",
"io.spray" % "spray-can" % "1.2.0",
"io.spray" % "spray-routing" % "1.2.0",
"io.spray" % "spray-client" % "1.2.0",
yammerDeps
) ++ yodaDeps

Expand Down

0 comments on commit d0ab8b6

Please sign in to comment.