diff --git a/README.md b/README.md index 18eb3a72f..a66df7fe3 100644 --- a/README.md +++ b/README.md @@ -261,6 +261,7 @@ the REST API. GET /jobs - Lists the last N jobs POST /jobs - Starts a new job, use ?sync=true to wait for results GET /jobs/ - Gets the result or status of a specific job + DELETE /jobs/ - Kills the specified job GET /jobs//config - Gets the job configuration ### Context configuration @@ -365,8 +366,6 @@ Copyright(c) 2014, Ooyala, Inc. - Add Swagger support. See the spray-swagger project. - Implement an interactive SQL window. See: [spark-admin](https://github.com/adatao/spark-admin) -- Use `SparkContext.setJobGroup` with the job ID -- Support job cancellation via `cancelJobGroup` - Stream the current job progress via a Listener - Add routes to return stage info for a job. Persist it via DAO so that we can always retrieve stage / performance info even for historical jobs. This would be pretty kickass. diff --git a/doc/job-server-flow.md b/doc/job-server-flow.md index 9bcde3c49..9d2d5999d 100644 --- a/doc/job-server-flow.md +++ b/doc/job-server-flow.md @@ -52,7 +52,7 @@ Context routes - delete a context with given contextName - user->WebApi: DELECT /contexts/ + user->WebApi: DELETE /contexts/ WebApi->LocalContextSupervisor: StopContext(contextName) opt If no such context LocalContextSupervisor->WebApi: NoSuchContext @@ -164,3 +164,22 @@ Job routes note over JobResultActor: subscribers.remove(jobId) JobFuture->JobStatusActor: Unsubscribe(jobId, WebApi) JobFuture->JobResultActor: Unsubscribe(jobId, WebApi) + +- kill a job with jobId + + user->WebApi: DELETE /jobs/ + WebApi->JobInfoActor: GetJobResult(jobId) + note over JobInfoActor: JobDao.getJobInfos.get(jobId) + opt if jobId not found: + JobInfoActor->WebApi: NoSuchJobId + WebApi->user: 404 + end + opt if job is running: + WebApi->JobManager: KillJob(jobId) + JobManager->WebApi: future{} + WebApi->user: 200 + "KILLED" + end + opt if job has error out: + JobInfoActor->WebApi: JobInfo + WebApi->user: 200 + "ERROR" + end diff --git a/job-server/src/spark.jobserver/CommonMessages.scala b/job-server/src/spark.jobserver/CommonMessages.scala index b52790812..789f2738d 100644 --- a/job-server/src/spark.jobserver/CommonMessages.scala +++ b/job-server/src/spark.jobserver/CommonMessages.scala @@ -14,6 +14,7 @@ object CommonMessages { case class JobFinished(jobId: String, endTime: DateTime) extends StatusMessage case class JobValidationFailed(jobId: String, endTime: DateTime, err: Throwable) extends StatusMessage case class JobErroredOut(jobId: String, endTime: DateTime, err: Throwable) extends StatusMessage + case class JobKilled(jobId: String, endTime: DateTime) extends StatusMessage /** * NOTE: For Subscribe, make sure to use `classOf[]` to get the Class for the case classes above. diff --git a/job-server/src/spark.jobserver/JobManagerActor.scala b/job-server/src/spark.jobserver/JobManagerActor.scala index 16563be12..20da2eb20 100644 --- a/job-server/src/spark.jobserver/JobManagerActor.scala +++ b/job-server/src/spark.jobserver/JobManagerActor.scala @@ -18,6 +18,7 @@ object JobManagerActor { case object Initialize case class StartJob(appName: String, classPath: String, config: Config, subscribedEvents: Set[Class[_]]) + case class KillJob(jobId: String) // Results/Data case class Initialized(resultActor: ActorRef) @@ -116,6 +117,11 @@ class JobManagerActor(dao: JobDAO, case StartJob(appName, classPath, jobConfig, events) => startJobInternal(appName, classPath, jobConfig, events, jobContext, sparkEnv, rddManagerActor) + + case KillJob(jobId: String) => { + jobContext.sparkContext.cancelJobGroup(jobId) + statusActor ! JobKilled(jobId, DateTime.now()) + } } def startJobInternal(appName: String, @@ -223,6 +229,8 @@ class JobManagerActor(dao: JobDAO, } case SparkJobValid => { statusActor ! JobStarted(jobId: String, contextName, jobInfo.startTime) + val sc = jobContext.sparkContext + sc.setJobGroup(jobId, s"Job group for $jobId and spark context ${sc.applicationId}", true) job.runJob(jobC, jobConfig) } } diff --git a/job-server/src/spark.jobserver/JobStatusActor.scala b/job-server/src/spark.jobserver/JobStatusActor.scala index cc0c34f11..fa23d7769 100644 --- a/job-server/src/spark.jobserver/JobStatusActor.scala +++ b/job-server/src/spark.jobserver/JobStatusActor.scala @@ -85,6 +85,12 @@ class JobStatusActor(jobDao: JobDAO) extends InstrumentedActor with YammerMetric case (info, msg) => info.copy(endTime = Some(msg.endTime), error = Some(msg.err)) } + + case msg: JobKilled => + processStatus(msg, "killed", remove = true) { + case (info, msg) => + info.copy(endTime = Some(msg.endTime)) + } } private def processStatus[M <: StatusMessage](msg: M, logMessage: String, remove: Boolean = false) diff --git a/job-server/src/spark.jobserver/WebApi.scala b/job-server/src/spark.jobserver/WebApi.scala index 38549c7ed..02084580b 100644 --- a/job-server/src/spark.jobserver/WebApi.scala +++ b/job-server/src/spark.jobserver/WebApi.scala @@ -217,6 +217,26 @@ class WebApi(system: ActorSystem, } } } ~ + // DELETE /jobs/ + // Stop the current job. All other jobs submited with this spark context + // will continue to run + (delete & path(Segment)) { jobId => + val future = jobInfo ? GetJobResult(jobId) + respondWithMediaType(MediaTypes.`application/json`) { ctx => + future.map { + case NoSuchJobId => + notFound(ctx, "No such job ID " + jobId.toString) + case JobInfo(_, contextName, _, classPath, _, None, _) => + val jobManager = getJobManagerForContext(Some(contextName), config, classPath) + jobManager.get ! KillJob(jobId) + ctx.complete(Map(StatusKey -> "KILLED")) + case JobInfo(_, _, _, _, _, _, Some(ex)) => + ctx.complete(Map(StatusKey -> "ERROR", "ERROR" -> formatException(ex))) + case JobResult(_, result) => + ctx.complete(resultToTable(result)) + } + } + } ~ /** * GET /jobs -- returns a JSON list of hashes containing job status, ex: * [ diff --git a/job-server/test/spark.jobserver/JobManagerSpec.scala b/job-server/test/spark.jobserver/JobManagerSpec.scala index 9d83de45a..2fde2e50e 100644 --- a/job-server/test/spark.jobserver/JobManagerSpec.scala +++ b/job-server/test/spark.jobserver/JobManagerSpec.scala @@ -1,6 +1,7 @@ package spark.jobserver import com.typesafe.config.ConfigFactory +import spark.jobserver.JobManagerActor.KillJob import scala.collection.mutable import spark.jobserver.io.JobDAO @@ -195,5 +196,20 @@ abstract class JobManagerSpec extends JobSpecBase(JobManagerSpec.getNewSystem) { case JobResult(_, result: Int) => result should equal (1 + 2 + 3) } } + + it("should be able to cancel running job") { + manager ! JobManagerActor.Initialize + expectMsgClass(classOf[JobManagerActor.Initialized]) + + uploadTestJar() + manager ! JobManagerActor.StartJob("demo", classPrefix + "LongPiJob", stringConfig, allEvents) + expectMsgPF(1 seconds, "Did not get JobResult") { + case JobStarted(id, _, _) => { + manager ! KillJob(id) + expectMsgClass(classOf[JobKilled]) + } + } + } + } } diff --git a/job-server/test/spark.jobserver/JobSpecBase.scala b/job-server/test/spark.jobserver/JobSpecBase.scala index 21dfe40a2..74707338a 100644 --- a/job-server/test/spark.jobserver/JobSpecBase.scala +++ b/job-server/test/spark.jobserver/JobSpecBase.scala @@ -59,7 +59,7 @@ with FunSpecLike with Matchers with BeforeAndAfter with BeforeAndAfterAll with T import CommonMessages._ val errorEvents: Set[Class[_]] = Set(classOf[JobErroredOut], classOf[JobValidationFailed], - classOf[NoJobSlotsAvailable]) + classOf[NoJobSlotsAvailable], classOf[JobKilled]) val asyncEvents = Set(classOf[JobStarted]) val syncEvents = Set(classOf[JobResult]) val allEvents = errorEvents ++ asyncEvents ++ syncEvents ++ Set(classOf[JobFinished]) diff --git a/job-server/test/spark.jobserver/WebApiSpec.scala b/job-server/test/spark.jobserver/WebApiSpec.scala index ff1fad328..4355113ff 100644 --- a/job-server/test/spark.jobserver/WebApiSpec.scala +++ b/job-server/test/spark.jobserver/WebApiSpec.scala @@ -67,6 +67,7 @@ with ScalatestRouteTest with HttpService { case GetJobResult("_num") => sender ! JobResult("_num", 5000) case GetJobResult("_unk") => sender ! JobResult("_case", Seq(1, math.BigInt(101))) + case GetJobResult("job_to_kill") => sender ! baseJobInfo case GetJobResult(id) => sender ! JobResult(id, id + "!!!") case GetJobStatuses(limitOpt) => sender ! Seq(baseJobInfo, @@ -236,6 +237,15 @@ with ScalatestRouteTest with HttpService { } } + it("should be able to kill job from /jobs/ route") { + Delete("/jobs/job_to_kill") ~> sealRoute(routes) ~> check { + status should be (OK) + responseAs[Map[String, String]] should be (Map( + StatusKey -> "KILLED" + )) + } + } + it("should be able to query job config from /jobs//config route") { Get("/jobs/foobar/config") ~> sealRoute(routes) ~> check { status should be (OK)