Skip to content

Commit

Permalink
Merge pull request #937 from jerryshao/localProperties-fix
Browse files Browse the repository at this point in the history
Fix PR926 local properties issues in Spark Streaming like scenarios
  • Loading branch information
rxin committed Sep 22, 2013
2 parents f06f2da + aa0c29f commit a2ea069
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None

// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new ThreadLocal[Properties]
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}

def initLocalProperties() {
localProperties.set(new Properties())
Expand All @@ -273,6 +275,9 @@ class SparkContext(
}
}

def getLocalProperty(key: String): String =
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)

/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
Expand Down
45 changes: 44 additions & 1 deletion core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ThreadingSuiteState {
}

class ThreadingSuite extends FunSuite with LocalSparkContext {

test("accessing SparkContext form a different thread") {
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
Expand Down Expand Up @@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
fail("One or more threads didn't see runningThreads = 4")
}
}

test("set local properties in different thread") {
sc = new SparkContext("local", "test")
val sem = new Semaphore(0)

val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
}
}
}

threads.foreach(_.start())

sem.acquire(5)
assert(sc.getLocalProperty("test") === null)
}

test("set and get local properties in parent-children thread") {
sc = new SparkContext("local", "test")
sc.setLocalProperty("test", "parent")
val sem = new Semaphore(0)

val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
assert(sc.getLocalProperty("test") === "parent")
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
}
}
}

threads.foreach(_.start())

sem.acquire(5)
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
}
}

0 comments on commit a2ea069

Please sign in to comment.