From ffa5f8e11db26dd616e85b9d941de3590ca3643e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 18 Sep 2013 17:33:24 +0800 Subject: [PATCH 1/2] Fix issue when local properties pass from parent to child thread --- .../scala/org/apache/spark/SparkContext.scala | 6 ++- .../org/apache/spark/ThreadingSuite.scala | 38 ++++++++++++++++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 72540c712a..3922e9a7fa 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -256,7 +256,9 @@ class SparkContext( private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack - private val localProperties = new ThreadLocal[Properties] + private val localProperties = new InheritableThreadLocal[Properties] { + override protected def childValue(parent: Properties): Properties = new Properties(parent) + } def initLocalProperties() { localProperties.set(new Properties()) @@ -273,6 +275,8 @@ class SparkContext( } } + def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).getOrElse(null) + /** Set a human readable description of the current job. */ def setJobDescription(value: String) { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value) diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index 69383ddfb8..331f79dba1 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -40,7 +40,7 @@ object ThreadingSuiteState { } class ThreadingSuite extends FunSuite with LocalSparkContext { - + test("accessing SparkContext form a different thread") { sc = new SparkContext("local", "test") val nums = sc.parallelize(1 to 10, 2) @@ -149,4 +149,40 @@ class ThreadingSuite extends FunSuite with LocalSparkContext { fail("One or more threads didn't see runningThreads = 4") } } + + test("set local properties in different thread") { + sc = new SparkContext("local", "test") + + val threads = (1 to 5).map{ i => + new Thread() { + override def run() { + sc.setLocalProperty("test", i.toString) + assert(sc.getLocalProperty("test") === i.toString) + } + } + } + + threads.foreach(_.start()) + + assert(sc.getLocalProperty("test") === null) + } + + test("set and get local properties in parent-children thread") { + sc = new SparkContext("local", "test") + sc.setLocalProperty("test", "parent") + + val threads = (1 to 5).map{ i => + new Thread() { + override def run() { + assert(sc.getLocalProperty("test") === "parent") + sc.setLocalProperty("test", i.toString) + assert(sc.getLocalProperty("test") === i.toString) + } + } + } + + threads.foreach(_.start()) + assert(sc.getLocalProperty("test") === "parent") + assert(sc.getLocalProperty("Foo") === null) + } } From aa0c29f74779bc5af70250c7481dbd7052ee39cf Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sun, 22 Sep 2013 09:40:40 +0800 Subject: [PATCH 2/2] Add barrier for local properties unit test and fix some styles --- .../main/scala/org/apache/spark/SparkContext.scala | 3 ++- .../test/scala/org/apache/spark/ThreadingSuite.scala | 11 +++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3922e9a7fa..6bab1f31d0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -275,7 +275,8 @@ class SparkContext( } } - def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).getOrElse(null) + def getLocalProperty(key: String): String = + Option(localProperties.get).map(_.getProperty(key)).getOrElse(null) /** Set a human readable description of the current job. */ def setJobDescription(value: String) { diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index 331f79dba1..75d6493e33 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -152,36 +152,43 @@ class ThreadingSuite extends FunSuite with LocalSparkContext { test("set local properties in different thread") { sc = new SparkContext("local", "test") + val sem = new Semaphore(0) - val threads = (1 to 5).map{ i => + val threads = (1 to 5).map { i => new Thread() { override def run() { sc.setLocalProperty("test", i.toString) assert(sc.getLocalProperty("test") === i.toString) + sem.release() } } } threads.foreach(_.start()) + sem.acquire(5) assert(sc.getLocalProperty("test") === null) } test("set and get local properties in parent-children thread") { sc = new SparkContext("local", "test") sc.setLocalProperty("test", "parent") + val sem = new Semaphore(0) - val threads = (1 to 5).map{ i => + val threads = (1 to 5).map { i => new Thread() { override def run() { assert(sc.getLocalProperty("test") === "parent") sc.setLocalProperty("test", i.toString) assert(sc.getLocalProperty("test") === i.toString) + sem.release() } } } threads.foreach(_.start()) + + sem.acquire(5) assert(sc.getLocalProperty("test") === "parent") assert(sc.getLocalProperty("Foo") === null) }