Skip to content

Commit

Permalink
[SPARK-4180] Prevent creations of multiple active SparkContexts.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Nov 5, 2014
1 parent 23f966f commit afaa7e3
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 24 deletions.
100 changes: 78 additions & 22 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,30 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")

// This is placed after the configuration validation so that common configuration errors, like
// forgetting to pass a master url or app name, don't prevent subsequent SparkContexts from being
// constructed.
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
SparkContext.activeSparkContextCreationSite.foreach { creationSite =>
val errMsg = "Only one SparkContext may be active in this JVM (see SPARK-2243)."
val errDetails = if (SparkContext.activeSparkContextIsFullyConstructed) {
s"The currently active SparkContext was created at ${creationSite.shortForm}"
} else {
s"Another SparkContext, created at ${creationSite.shortForm}, is either being constructed" +
" or threw an exception from its constructor; please restart your JVM in order to" +
" create a new SparkContext."
}
val exception = new SparkException(s"$errMsg $errDetails")
if (conf.getBoolean("spark.driver.disableMultipleSparkContextsErrorChecking", false)) {
logWarning("Multiple SparkContext error detection is disabled!", exception)
} else {
throw exception
}
}
SparkContext.activeSparkContextCreationSite = Some(Utils.getCallSite())
SparkContext.activeSparkContextIsFullyConstructed = false
}

val jars: Seq[String] =
conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

Expand Down Expand Up @@ -1071,27 +1095,31 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {

/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
env.metricsSystem.report()
metadataCleaner.cancel()
env.actorSystem.stop(heartbeatReceiver)
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
SparkContext.activeSparkContextCreationSite = None
SparkContext.activeSparkContextIsFullyConstructed = false
postApplicationEnd()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
env.metricsSystem.report()
metadataCleaner.cancel()
env.actorSystem.stop(heartbeatReceiver)
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
}
}
}

Expand Down Expand Up @@ -1157,7 +1185,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
if (dagScheduler == null) {
throw new SparkException("SparkContext has been shutdown")
}
val callSite = getCallSite
val callSite = Utils.getCallSite()
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
Expand Down Expand Up @@ -1380,6 +1408,10 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
}

SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
SparkContext.activeSparkContextIsFullyConstructed = true
}
}

/**
Expand All @@ -1388,6 +1420,30 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
*/
object SparkContext extends Logging {

/**
* Lock that prevents multiple threads from being in the SparkContext constructor at the same
* time.
*/
private[spark] val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()

/**
* Records the creation site of the last SparkContext to successfully enter the constructor.
* This may be an active SparkContext, or a SparkContext that is currently under construction.
*
* Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private[spark] var activeSparkContextCreationSite: Option[CallSite] = None

/**
* Tracks whether `activeSparkContextCreationSite` refers to a fully-constructed SparkContext
* or a partially-constructed one that is either still executing its constructor or threw
* an exception from its constructor. This is used to enable better error-reporting when
* SparkContext construction fails due to existing contexts.
*
* Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private[spark] var activeSparkContextIsFullyConstructed: Boolean = false

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
Expand Down
48 changes: 46 additions & 2 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,53 @@ import org.scalatest.FunSuite

import org.apache.hadoop.io.BytesWritable

class SparkContextSuite extends FunSuite {
//Regression test for SPARK-3121
class SparkContextSuite extends FunSuite with LocalSparkContext {

test("Only one SparkContext may be active at a time") {
// Regression test for SPARK-4180
val conf = new SparkConf().setAppName("test").setMaster("local")
sc = new SparkContext(conf)
// A SparkContext is already running, so we shouldn't be able to create a second one
intercept[SparkException] { new SparkContext(conf) }
// After stopping the running context, we should be able to create a new one
resetSparkContext()
sc = new SparkContext(conf)
}

test("Can still construct a new SparkContext after failing due to missing app name or master") {
val missingMaster = new SparkConf()
val missingAppName = missingMaster.clone.setMaster("local")
val validConf = missingAppName.clone.setAppName("test")
// We shouldn't be able to construct SparkContexts because these are invalid configurations
intercept[SparkException] { new SparkContext(missingMaster) }
intercept[SparkException] { new SparkContext(missingAppName) }
// Even though those earlier calls failed, we should still be able to create a new SparkContext
sc = new SparkContext(validConf)
}

test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
val propertyName = "spark.driver.disableMultipleSparkContextsErrorChecking"
val originalPropertyValue = System.getProperty(propertyName)
var secondSparkContext: SparkContext = null
try {
System.setProperty(propertyName, "true")
val conf = new SparkConf().setAppName("test").setMaster("local")
sc = new SparkContext(conf)
secondSparkContext = new SparkContext(conf)
} finally {
if (secondSparkContext != null) {
secondSparkContext.stop()
}
if (originalPropertyValue != null) {
System.setProperty(propertyName, originalPropertyValue)
} else {
System.clearProperty(propertyName)
}
}
}

test("BytesWritable implicit conversion is correct") {
// Regression test for SPARK-3121
val bytesWritable = new BytesWritable()
val inputArray = (1 to 10).map(_.toByte).toArray
bytesWritable.set(inputArray, 0, 10)
Expand Down

0 comments on commit afaa7e3

Please sign in to comment.