Skip to content

Commit

Permalink
fix(jobserver): ensure job cleanup after fatal error - ref #1161
Browse files Browse the repository at this point in the history
`Future` only completes on success or when encountering a `NonFatal`
error; when encountering a fatal error, the future does not complete at
all and the error is passed to the execution context.

This change wraps all `Throwable` so the `Future` can complete and the
`Failure` handlers can cleanup the job and properly mark it as failed.
  • Loading branch information
sgautrin authored and bsikander committed Apr 29, 2019
1 parent a0a28fb commit b361c07
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
Expand Up @@ -15,6 +15,13 @@ class MyErrorJob extends SparkTestJob {
}
}

/** @see [[scala.util.control.NonFatal]] */
class MyFatalErrorJob extends SparkTestJob {
def runJob(sc: SparkContext, config: Config): Any = {
throw new OutOfMemoryError("this is a fatal error")
}
}

class ConfigCheckerJob extends SparkTestJob {
import scala.collection.JavaConverters._

Expand Down
10 changes: 7 additions & 3 deletions job-server/src/main/scala/spark/jobserver/JobManagerActor.scala
Expand Up @@ -600,11 +600,13 @@ class JobManagerActor(daoActor: ActorRef, supervisorActorAddress: String, contex
case e: java.lang.AbstractMethodError => {
logger.error("Oops, there's an AbstractMethodError... maybe you compiled " +
"your code with an older version of SJS? here's the exception:", e)
throw e
// wrap so it can complete as Failure even if not a scala.util.control.NonFatal
throw new RuntimeException(e)
}
case e: Throwable => {
logger.error("Got Throwable", e)
throw e
// wrap so it can complete as Failure even if not a scala.util.control.NonFatal
throw new RuntimeException(e)
};
}
}(executionContext).andThen {
Expand All @@ -620,7 +622,9 @@ class JobManagerActor(daoActor: ActorRef, supervisorActorAddress: String, contex
// with context-per-jvm=true configuration
resultActor ! JobResult(jobId, result)
statusActor ! JobFinished(jobId, DateTime.now())
case Failure(error: Throwable) =>
case Failure(wrapped: Throwable) =>
// actual error was wrapped so we could process fatal errors, see #1161
val error = wrapped.getCause
// If and only if job validation fails, JobErroredOut message is dropped silently in JobStatusActor.
statusActor ! JobErroredOut(jobId, DateTime.now(), error)
logger.error("Exception from job " + jobId + ": ", error)
Expand Down
Expand Up @@ -225,6 +225,16 @@ class JobManagerActorSpec extends JobSpecBase(JobManagerActorSpec.getNewSystem)
errorMsg.err.getClass should equal (classOf[IllegalArgumentException])
}

it("should return error if job throws a fatal error") {
manager ! JobManagerActor.Initialize(contextConfig, None, emptyActor)
expectMsgClass(initMsgWait, classOf[JobManagerActor.Initialized])

uploadTestJar()
manager ! JobManagerActor.StartJob("demo", classPrefix + "MyFatalErrorJob", emptyConfig, errorEvents)
val errorMsg = expectMsgClass(startJobWait, classOf[JobErroredOut])
errorMsg.err.getClass should equal (classOf[OutOfMemoryError])
}

it("job should get jobConfig passed in to StartJob message") {
val jobConfig = ConfigFactory.parseString("foo.bar.baz = 3")
manager ! JobManagerActor.Initialize(contextConfig, None, emptyActor)
Expand Down

0 comments on commit b361c07

Please sign in to comment.