diff --git a/job-server-tests/src/main/scala/spark/jobserver/SparkTestJobs.scala b/job-server-tests/src/main/scala/spark/jobserver/SparkTestJobs.scala index 326454877..1a5f69475 100644 --- a/job-server-tests/src/main/scala/spark/jobserver/SparkTestJobs.scala +++ b/job-server-tests/src/main/scala/spark/jobserver/SparkTestJobs.scala @@ -15,6 +15,13 @@ class MyErrorJob extends SparkTestJob { } } +/** @see [[scala.util.control.NonFatal]] */ +class MyFatalErrorJob extends SparkTestJob { + def runJob(sc: SparkContext, config: Config): Any = { + throw new OutOfMemoryError("this is a fatal error") + } +} + class ConfigCheckerJob extends SparkTestJob { import scala.collection.JavaConverters._ diff --git a/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala b/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala index e9c84e5ea..a74426c56 100644 --- a/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala +++ b/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala @@ -600,11 +600,13 @@ class JobManagerActor(daoActor: ActorRef, supervisorActorAddress: String, contex case e: java.lang.AbstractMethodError => { logger.error("Oops, there's an AbstractMethodError... maybe you compiled " + "your code with an older version of SJS? here's the exception:", e) - throw e + // wrap so it can complete as Failure even if not a scala.util.control.NonFatal + throw new RuntimeException(e) } case e: Throwable => { logger.error("Got Throwable", e) - throw e + // wrap so it can complete as Failure even if not a scala.util.control.NonFatal + throw new RuntimeException(e) }; } }(executionContext).andThen { @@ -620,7 +622,9 @@ class JobManagerActor(daoActor: ActorRef, supervisorActorAddress: String, contex // with context-per-jvm=true configuration resultActor ! JobResult(jobId, result) statusActor ! JobFinished(jobId, DateTime.now()) - case Failure(error: Throwable) => + case Failure(wrapped: Throwable) => + // actual error was wrapped so we could process fatal errors, see #1161 + val error = wrapped.getCause // If and only if job validation fails, JobErroredOut message is dropped silently in JobStatusActor. statusActor ! JobErroredOut(jobId, DateTime.now(), error) logger.error("Exception from job " + jobId + ": ", error) diff --git a/job-server/src/test/scala/spark/jobserver/JobManagerActorSpec.scala b/job-server/src/test/scala/spark/jobserver/JobManagerActorSpec.scala index 8d9cb6d5e..147d3dd3b 100644 --- a/job-server/src/test/scala/spark/jobserver/JobManagerActorSpec.scala +++ b/job-server/src/test/scala/spark/jobserver/JobManagerActorSpec.scala @@ -225,6 +225,16 @@ class JobManagerActorSpec extends JobSpecBase(JobManagerActorSpec.getNewSystem) errorMsg.err.getClass should equal (classOf[IllegalArgumentException]) } + it("should return error if job throws a fatal error") { + manager ! JobManagerActor.Initialize(contextConfig, None, emptyActor) + expectMsgClass(initMsgWait, classOf[JobManagerActor.Initialized]) + + uploadTestJar() + manager ! JobManagerActor.StartJob("demo", classPrefix + "MyFatalErrorJob", emptyConfig, errorEvents) + val errorMsg = expectMsgClass(startJobWait, classOf[JobErroredOut]) + errorMsg.err.getClass should equal (classOf[OutOfMemoryError]) + } + it("job should get jobConfig passed in to StartJob message") { val jobConfig = ConfigFactory.parseString("foo.bar.baz = 3") manager ! JobManagerActor.Initialize(contextConfig, None, emptyActor)