diff --git a/job-server-api/src/main/scala/spark/jobserver/context/SparkContextFactory.scala b/job-server-api/src/main/scala/spark/jobserver/context/SparkContextFactory.scala index 25d3bffa5..7ff83459d 100644 --- a/job-server-api/src/main/scala/spark/jobserver/context/SparkContextFactory.scala +++ b/job-server-api/src/main/scala/spark/jobserver/context/SparkContextFactory.scala @@ -63,6 +63,8 @@ trait SparkContextFactory { val sparkConf = configToSparkConf(config, contextConfig, contextName) makeContext(sparkConf, contextConfig, contextName) } + + def updateConfig(config: Config): Config = config } case class ScalaJobContainer(job: api.SparkJobBase) extends JobContainer { diff --git a/job-server-extras/src/main/scala/spark/jobserver/context/StreamingContextFactory.scala b/job-server-extras/src/main/scala/spark/jobserver/context/StreamingContextFactory.scala index 3443a56fd..6f38ff6c2 100644 --- a/job-server-extras/src/main/scala/spark/jobserver/context/StreamingContextFactory.scala +++ b/job-server-extras/src/main/scala/spark/jobserver/context/StreamingContextFactory.scala @@ -1,9 +1,10 @@ package spark.jobserver.context -import com.typesafe.config.Config +import com.typesafe.config.{Config, ConfigFactory} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import spark.jobserver.{api, ContextLike, SparkStreamingJob} +import spark.jobserver.{ContextLike, SparkStreamingJob, api} +import spark.jobserver.util.JobserverConfig class StreamingContextFactory extends ScalaContextFactory { @@ -22,4 +23,21 @@ class StreamingContextFactory extends ScalaContextFactory { } } } + + override def updateConfig(contextConfig: Config): Config = { + contextConfig.hasPath(JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR) match { + case true => logger.info( + s"${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR} already configured, not changing the config") + contextConfig + case false => + logger.warn( + s"""${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR} is not set. Streaming contexts have + |default set to true. On any error the context will stop. To change behavior, + |set ${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR} while creating context.""" + .stripMargin.replaceAll("\n", " ")) + + contextConfig.withFallback( + ConfigFactory.parseString(s"${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR}=true")) + } + } } diff --git a/job-server-extras/src/test/scala/spark/jobserver/StreamingJobSpec.scala b/job-server-extras/src/test/scala/spark/jobserver/StreamingJobSpec.scala index 932cf2575..cde539afc 100644 --- a/job-server-extras/src/test/scala/spark/jobserver/StreamingJobSpec.scala +++ b/job-server-extras/src/test/scala/spark/jobserver/StreamingJobSpec.scala @@ -145,6 +145,16 @@ class StreamingJobSpec extends JobSpecBase(StreamingJobSpec.getNewSystem) { deathWatch.expectNoMsg(1.seconds) } + + it("should automatically stop a streaming context if" + + s" job throws an exception (since ${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR} is by default true)") { + val deathWatch = TestProbe() + deathWatch.watch(manager) + + triggerFailingStreamingJob(cfg) + + deathWatch.expectTerminated(manager, 3.seconds) + } } private def triggerFailingStreamingJob(ctxConfig: Config): Unit = { diff --git a/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala b/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala index 7f4a69380..8e59327f7 100644 --- a/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala +++ b/job-server/src/main/scala/spark/jobserver/JobManagerActor.scala @@ -344,6 +344,8 @@ class JobManagerActor(daoActor: ActorRef, supervisorActorAddress: String, contex jarLoader.addURL(new URL(convertJarUriSparkToJava(jarUri))) } factory = getContextFactory() + // Add defaults or update the config according to a specific context + contextConfig = factory.updateConfig(contextConfig) jobContext = factory.makeContext(config, contextConfig, contextName) jobContext.sparkContext.addSparkListener(sparkListener) sparkEnv = SparkEnv.get