Skip to content

Commit

Permalink
Merge pull request #96 from petro-rudenko/cancel_job3
Browse files Browse the repository at this point in the history
Cancel job functionality
  • Loading branch information
velvia committed Apr 3, 2015
2 parents e5d6155 + 9c8de1d commit e5cd03f
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 4 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ the REST API.
GET /jobs - Lists the last N jobs
POST /jobs - Starts a new job, use ?sync=true to wait for results
GET /jobs/<jobId> - Gets the result or status of a specific job
DELETE /jobs/<jobId> - Kills the specified job
GET /jobs/<jobId>/config - Gets the job configuration

### Context configuration
Expand Down Expand Up @@ -365,8 +366,6 @@ Copyright(c) 2014, Ooyala, Inc.
- Add Swagger support. See the spray-swagger project.
- Implement an interactive SQL window. See: [spark-admin](https://github.com/adatao/spark-admin)

- Use `SparkContext.setJobGroup` with the job ID
- Support job cancellation via `cancelJobGroup`
- Stream the current job progress via a Listener
- Add routes to return stage info for a job. Persist it via DAO so that we can always retrieve stage / performance info
even for historical jobs. This would be pretty kickass.
21 changes: 20 additions & 1 deletion doc/job-server-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Context routes

- delete a context with given contextName

user->WebApi: DELECT /contexts/<contextName>
user->WebApi: DELETE /contexts/<contextName>
WebApi->LocalContextSupervisor: StopContext(contextName)
opt If no such context
LocalContextSupervisor->WebApi: NoSuchContext
Expand Down Expand Up @@ -164,3 +164,22 @@ Job routes
note over JobResultActor: subscribers.remove(jobId)
JobFuture->JobStatusActor: Unsubscribe(jobId, WebApi)
JobFuture->JobResultActor: Unsubscribe(jobId, WebApi)

- kill a job with jobId

user->WebApi: DELETE /jobs/<jobId>
WebApi->JobInfoActor: GetJobResult(jobId)
note over JobInfoActor: JobDao.getJobInfos.get(jobId)
opt if jobId not found:
JobInfoActor->WebApi: NoSuchJobId
WebApi->user: 404
end
opt if job is running:
WebApi->JobManager: KillJob(jobId)
JobManager->WebApi: future{}
WebApi->user: 200 + "KILLED"
end
opt if job has error out:
JobInfoActor->WebApi: JobInfo
WebApi->user: 200 + "ERROR"
end
1 change: 1 addition & 0 deletions job-server/src/spark.jobserver/CommonMessages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object CommonMessages {
case class JobFinished(jobId: String, endTime: DateTime) extends StatusMessage
case class JobValidationFailed(jobId: String, endTime: DateTime, err: Throwable) extends StatusMessage
case class JobErroredOut(jobId: String, endTime: DateTime, err: Throwable) extends StatusMessage
case class JobKilled(jobId: String, endTime: DateTime) extends StatusMessage

/**
* NOTE: For Subscribe, make sure to use `classOf[]` to get the Class for the case classes above.
Expand Down
8 changes: 8 additions & 0 deletions job-server/src/spark.jobserver/JobManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ object JobManagerActor {
case object Initialize
case class StartJob(appName: String, classPath: String, config: Config,
subscribedEvents: Set[Class[_]])
case class KillJob(jobId: String)

// Results/Data
case class Initialized(resultActor: ActorRef)
Expand Down Expand Up @@ -116,6 +117,11 @@ class JobManagerActor(dao: JobDAO,

case StartJob(appName, classPath, jobConfig, events) =>
startJobInternal(appName, classPath, jobConfig, events, jobContext, sparkEnv, rddManagerActor)

case KillJob(jobId: String) => {
jobContext.sparkContext.cancelJobGroup(jobId)
statusActor ! JobKilled(jobId, DateTime.now())
}
}

def startJobInternal(appName: String,
Expand Down Expand Up @@ -223,6 +229,8 @@ class JobManagerActor(dao: JobDAO,
}
case SparkJobValid => {
statusActor ! JobStarted(jobId: String, contextName, jobInfo.startTime)
val sc = jobContext.sparkContext
sc.setJobGroup(jobId, s"Job group for $jobId and spark context ${sc.applicationId}", true)
job.runJob(jobC, jobConfig)
}
}
Expand Down
6 changes: 6 additions & 0 deletions job-server/src/spark.jobserver/JobStatusActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ class JobStatusActor(jobDao: JobDAO) extends InstrumentedActor with YammerMetric
case (info, msg) =>
info.copy(endTime = Some(msg.endTime), error = Some(msg.err))
}

case msg: JobKilled =>
processStatus(msg, "killed", remove = true) {
case (info, msg) =>
info.copy(endTime = Some(msg.endTime))
}
}

private def processStatus[M <: StatusMessage](msg: M, logMessage: String, remove: Boolean = false)
Expand Down
20 changes: 20 additions & 0 deletions job-server/src/spark.jobserver/WebApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,26 @@ class WebApi(system: ActorSystem,
}
}
} ~
// DELETE /jobs/<jobId>
// Stop the current job. All other jobs submited with this spark context
// will continue to run
(delete & path(Segment)) { jobId =>
val future = jobInfo ? GetJobResult(jobId)
respondWithMediaType(MediaTypes.`application/json`) { ctx =>
future.map {
case NoSuchJobId =>
notFound(ctx, "No such job ID " + jobId.toString)
case JobInfo(_, contextName, _, classPath, _, None, _) =>
val jobManager = getJobManagerForContext(Some(contextName), config, classPath)
jobManager.get ! KillJob(jobId)
ctx.complete(Map(StatusKey -> "KILLED"))
case JobInfo(_, _, _, _, _, _, Some(ex)) =>
ctx.complete(Map(StatusKey -> "ERROR", "ERROR" -> formatException(ex)))
case JobResult(_, result) =>
ctx.complete(resultToTable(result))
}
}
} ~
/**
* GET /jobs -- returns a JSON list of hashes containing job status, ex:
* [
Expand Down
16 changes: 16 additions & 0 deletions job-server/test/spark.jobserver/JobManagerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package spark.jobserver

import com.typesafe.config.ConfigFactory
import spark.jobserver.JobManagerActor.KillJob
import scala.collection.mutable
import spark.jobserver.io.JobDAO

Expand Down Expand Up @@ -195,5 +196,20 @@ abstract class JobManagerSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
case JobResult(_, result: Int) => result should equal (1 + 2 + 3)
}
}

it("should be able to cancel running job") {
manager ! JobManagerActor.Initialize
expectMsgClass(classOf[JobManagerActor.Initialized])

uploadTestJar()
manager ! JobManagerActor.StartJob("demo", classPrefix + "LongPiJob", stringConfig, allEvents)
expectMsgPF(1 seconds, "Did not get JobResult") {
case JobStarted(id, _, _) => {
manager ! KillJob(id)
expectMsgClass(classOf[JobKilled])
}
}
}

}
}
2 changes: 1 addition & 1 deletion job-server/test/spark.jobserver/JobSpecBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ with FunSpecLike with Matchers with BeforeAndAfter with BeforeAndAfterAll with T
import CommonMessages._

val errorEvents: Set[Class[_]] = Set(classOf[JobErroredOut], classOf[JobValidationFailed],
classOf[NoJobSlotsAvailable])
classOf[NoJobSlotsAvailable], classOf[JobKilled])
val asyncEvents = Set(classOf[JobStarted])
val syncEvents = Set(classOf[JobResult])
val allEvents = errorEvents ++ asyncEvents ++ syncEvents ++ Set(classOf[JobFinished])
Expand Down
10 changes: 10 additions & 0 deletions job-server/test/spark.jobserver/WebApiSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ with ScalatestRouteTest with HttpService {
case GetJobResult("_num") =>
sender ! JobResult("_num", 5000)
case GetJobResult("_unk") => sender ! JobResult("_case", Seq(1, math.BigInt(101)))
case GetJobResult("job_to_kill") => sender ! baseJobInfo
case GetJobResult(id) => sender ! JobResult(id, id + "!!!")
case GetJobStatuses(limitOpt) =>
sender ! Seq(baseJobInfo,
Expand Down Expand Up @@ -236,6 +237,15 @@ with ScalatestRouteTest with HttpService {
}
}

it("should be able to kill job from /jobs/<id> route") {
Delete("/jobs/job_to_kill") ~> sealRoute(routes) ~> check {
status should be (OK)
responseAs[Map[String, String]] should be (Map(
StatusKey -> "KILLED"
))
}
}

it("should be able to query job config from /jobs/<id>/config route") {
Get("/jobs/foobar/config") ~> sealRoute(routes) ~> check {
status should be (OK)
Expand Down

0 comments on commit e5cd03f

Please sign in to comment.