Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel job functionality #96

Merged
merged 3 commits into from
Apr 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ the REST API.
GET /jobs - Lists the last N jobs
POST /jobs - Starts a new job, use ?sync=true to wait for results
GET /jobs/<jobId> - Gets the result or status of a specific job
DELETE /jobs/<jobId> - Kills the specified job
GET /jobs/<jobId>/config - Gets the job configuration

### Context configuration
Expand Down Expand Up @@ -362,8 +363,6 @@ Copyright(c) 2014, Ooyala, Inc.
- Add Swagger support. See the spray-swagger project.
- Implement an interactive SQL window. See: [spark-admin](https://github.com/adatao/spark-admin)

- Use `SparkContext.setJobGroup` with the job ID
- Support job cancellation via `cancelJobGroup`
- Stream the current job progress via a Listener
- Add routes to return stage info for a job. Persist it via DAO so that we can always retrieve stage / performance info
even for historical jobs. This would be pretty kickass.
21 changes: 20 additions & 1 deletion doc/job-server-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Context routes

- delete a context with given contextName

user->WebApi: DELECT /contexts/<contextName>
user->WebApi: DELETE /contexts/<contextName>
WebApi->LocalContextSupervisor: StopContext(contextName)
opt If no such context
LocalContextSupervisor->WebApi: NoSuchContext
Expand Down Expand Up @@ -164,3 +164,22 @@ Job routes
note over JobResultActor: subscribers.remove(jobId)
JobFuture->JobStatusActor: Unsubscribe(jobId, WebApi)
JobFuture->JobResultActor: Unsubscribe(jobId, WebApi)

- kill a job with jobId

user->WebApi: DELETE /jobs/<jobId>
WebApi->JobInfoActor: GetJobResult(jobId)
note over JobInfoActor: JobDao.getJobInfos.get(jobId)
opt if jobId not found:
JobInfoActor->WebApi: NoSuchJobId
WebApi->user: 404
end
opt if job is running:
WebApi->JobManager: KillJob(jobId)
JobManager->WebApi: future{}
WebApi->user: 200 + "KILLED"
end
opt if job has error out:
JobInfoActor->WebApi: JobInfo
WebApi->user: 200 + "ERROR"
end
1 change: 1 addition & 0 deletions job-server/src/spark.jobserver/CommonMessages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object CommonMessages {
case class JobFinished(jobId: String, endTime: DateTime) extends StatusMessage
case class JobValidationFailed(jobId: String, endTime: DateTime, err: Throwable) extends StatusMessage
case class JobErroredOut(jobId: String, endTime: DateTime, err: Throwable) extends StatusMessage
case class JobKilled(jobId: String, endTime: DateTime) extends StatusMessage

/**
* NOTE: For Subscribe, make sure to use `classOf[]` to get the Class for the case classes above.
Expand Down
8 changes: 8 additions & 0 deletions job-server/src/spark.jobserver/JobManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ object JobManagerActor {
case object Initialize
case class StartJob(appName: String, classPath: String, config: Config,
subscribedEvents: Set[Class[_]])
case class KillJob(jobId: String)

// Results/Data
case class Initialized(resultActor: ActorRef)
Expand Down Expand Up @@ -116,6 +117,11 @@ class JobManagerActor(dao: JobDAO,

case StartJob(appName, classPath, jobConfig, events) =>
startJobInternal(appName, classPath, jobConfig, events, jobContext, sparkEnv, rddManagerActor)

case KillJob(jobId: String) => {
jobContext.sparkContext.cancelJobGroup(jobId)
statusActor ! JobKilled(jobId, DateTime.now())
}
}

def startJobInternal(appName: String,
Expand Down Expand Up @@ -225,6 +231,8 @@ class JobManagerActor(dao: JobDAO,
}
case SparkJobValid => {
statusActor ! JobStarted(jobId: String, contextName, jobInfo.startTime)
val sc = jobContext.sparkContext
sc.setJobGroup(jobId, s"Job group for $jobId and spark context ${sc.applicationId}", true)
job.runJob(jobC, jobConfig)
}
}
Expand Down
6 changes: 6 additions & 0 deletions job-server/src/spark.jobserver/JobStatusActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ class JobStatusActor(jobDao: JobDAO) extends InstrumentedActor with YammerMetric
case (info, msg) =>
info.copy(endTime = Some(msg.endTime), error = Some(msg.err))
}

case msg: JobKilled =>
processStatus(msg, "killed", remove = true) {
case (info, msg) =>
info.copy(endTime = Some(msg.endTime))
}
}

private def processStatus[M <: StatusMessage](msg: M, logMessage: String, remove: Boolean = false)
Expand Down
20 changes: 20 additions & 0 deletions job-server/src/spark.jobserver/WebApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,26 @@ class WebApi(system: ActorSystem,
}
}
} ~
// DELETE /jobs/<jobId>
// Stop the current job. All other jobs submited with this spark context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It stops the job with the specified jobId right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, jobserver job id (that returned after job submitted to jobserver).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but, what I'm trying to say is that the documentation says "current job" , there might be several jobs currently working, you just want to cancel one.

// will continue to run
(delete & path(Segment)) { jobId =>
val future = jobInfo ? GetJobResult(jobId)
respondWithMediaType(MediaTypes.`application/json`) { ctx =>
future.map {
case NoSuchJobId =>
notFound(ctx, "No such job ID " + jobId.toString)
case JobInfo(_, contextName, _, classPath, _, None, _) =>
val jobManager = getJobManagerForContext(Some(contextName), config, classPath)
jobManager.get ! KillJob(jobId)
ctx.complete(Map(StatusKey -> "KILLED"))
case JobInfo(_, _, _, _, _, _, Some(ex)) =>
ctx.complete(Map(StatusKey -> "ERROR", "ERROR" -> formatException(ex)))
case JobResult(_, result) =>
ctx.complete(resultToTable(result))
}
}
} ~
/**
* GET /jobs -- returns a JSON list of hashes containing job status, ex:
* [
Expand Down
16 changes: 16 additions & 0 deletions job-server/test/spark.jobserver/JobManagerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package spark.jobserver

import com.typesafe.config.ConfigFactory
import spark.jobserver.JobManagerActor.KillJob
import scala.collection.mutable
import spark.jobserver.io.JobDAO

Expand Down Expand Up @@ -195,5 +196,20 @@ abstract class JobManagerSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
case JobResult(_, result: Int) => result should equal (1 + 2 + 3)
}
}

it("should be able to cancel running job") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for what happens when the job is not running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will just call sparkContext.cancelJobGroup(NONEXISTING_GROUP_ID) which is safe (will not raise any exception) and will do actually nothing. In WebAPI it will actually handle this:
https://github.com/petro-rudenko/spark-jobserver/blob/cancel_job3/job-server/src/spark.jobserver/WebApi.scala#L228

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'm saying there's no test for that scenario, the user should receive a message that no such job exists right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, if you try to delete a job that already finished what would be the result?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeitos It seems from the code that deleting a nonexistant job would get the user the NotFound message. I think the same thing if the job already finished.

manager ! JobManagerActor.Initialize
expectMsgClass(classOf[JobManagerActor.Initialized])

uploadTestJar()
manager ! JobManagerActor.StartJob("demo", classPrefix + "LongPiJob", stringConfig, allEvents)
expectMsgPF(1 seconds, "Did not get JobResult") {
case JobStarted(id, _, _) => {
manager ! KillJob(id)
expectMsgClass(classOf[JobKilled])
}
}
}

}
}
2 changes: 1 addition & 1 deletion job-server/test/spark.jobserver/JobSpecBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ with FunSpecLike with Matchers with BeforeAndAfter with BeforeAndAfterAll with T
import CommonMessages._

val errorEvents: Set[Class[_]] = Set(classOf[JobErroredOut], classOf[JobValidationFailed],
classOf[NoJobSlotsAvailable])
classOf[NoJobSlotsAvailable], classOf[JobKilled])
val asyncEvents = Set(classOf[JobStarted])
val syncEvents = Set(classOf[JobResult])
val allEvents = errorEvents ++ asyncEvents ++ syncEvents ++ Set(classOf[JobFinished])
Expand Down
10 changes: 10 additions & 0 deletions job-server/test/spark.jobserver/WebApiSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ with ScalatestRouteTest with HttpService {
case GetJobResult("_num") =>
sender ! JobResult("_num", 5000)
case GetJobResult("_unk") => sender ! JobResult("_case", Seq(1, math.BigInt(101)))
case GetJobResult("job_to_kill") => sender ! baseJobInfo
case GetJobResult(id) => sender ! JobResult(id, id + "!!!")
case GetJobStatuses(limitOpt) =>
sender ! Seq(baseJobInfo,
Expand Down Expand Up @@ -236,6 +237,15 @@ with ScalatestRouteTest with HttpService {
}
}

it("should be able to kill job from /jobs/<id> route") {
Delete("/jobs/job_to_kill") ~> sealRoute(routes) ~> check {
status should be (OK)
responseAs[Map[String, String]] should be (Map(
StatusKey -> "KILLED"
))
}
}

it("should be able to query job config from /jobs/<id>/config route") {
Get("/jobs/foobar/config") ~> sealRoute(routes) ~> check {
status should be (OK)
Expand Down