Skip to content

Commit

Permalink
feat(jobserver): Make stop-context-on-job-error default for streaming…
Browse files Browse the repository at this point in the history
… jobs

Since Spark kills the driver if a streaming job has an error,
we want the same behavior in jobserver. This default is
being used to mimic that behavior.

Change-Id: If1902bf5cdbe95e9e2817166a071b0eafb2c82fa
  • Loading branch information
bsikander committed Nov 14, 2019
1 parent a4ac6ca commit e068585
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 2 deletions.
Expand Up @@ -63,6 +63,8 @@ trait SparkContextFactory {
val sparkConf = configToSparkConf(config, contextConfig, contextName)
makeContext(sparkConf, contextConfig, contextName)
}

def updateConfig(config: Config): Config = config
}

case class ScalaJobContainer(job: api.SparkJobBase) extends JobContainer {
Expand Down
@@ -1,9 +1,10 @@
package spark.jobserver.context

import com.typesafe.config.Config
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import spark.jobserver.{api, ContextLike, SparkStreamingJob}
import spark.jobserver.{ContextLike, SparkStreamingJob, api}
import spark.jobserver.util.JobserverConfig

class StreamingContextFactory extends ScalaContextFactory {

Expand All @@ -22,4 +23,21 @@ class StreamingContextFactory extends ScalaContextFactory {
}
}
}

override def updateConfig(contextConfig: Config): Config = {
contextConfig.hasPath(JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR) match {
case true => logger.info(
s"${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR} already configured, not changing the config")
contextConfig
case false =>
logger.warn(
s"""${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR} is not set. Streaming contexts have
|default set to true. On any error the context will stop. To change behavior,
|set ${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR} while creating context."""
.stripMargin.replaceAll("\n", " "))

contextConfig.withFallback(
ConfigFactory.parseString(s"${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR}=true"))
}
}
}
Expand Up @@ -145,6 +145,16 @@ class StreamingJobSpec extends JobSpecBase(StreamingJobSpec.getNewSystem) {

deathWatch.expectNoMsg(1.seconds)
}

it("should automatically stop a streaming context if" +
s" job throws an exception (since ${JobserverConfig.STOP_CONTEXT_ON_JOB_ERROR} is by default true)") {
val deathWatch = TestProbe()
deathWatch.watch(manager)

triggerFailingStreamingJob(cfg)

deathWatch.expectTerminated(manager, 3.seconds)
}
}

private def triggerFailingStreamingJob(ctxConfig: Config): Unit = {
Expand Down
Expand Up @@ -344,6 +344,8 @@ class JobManagerActor(daoActor: ActorRef, supervisorActorAddress: String, contex
jarLoader.addURL(new URL(convertJarUriSparkToJava(jarUri)))
}
factory = getContextFactory()
// Add defaults or update the config according to a specific context
contextConfig = factory.updateConfig(contextConfig)
jobContext = factory.makeContext(config, contextConfig, contextName)
jobContext.sparkContext.addSparkListener(sparkListener)
sparkEnv = SparkEnv.get
Expand Down

0 comments on commit e068585

Please sign in to comment.