Permalink
Browse files

Fixes memory leak when using reflection

References to Threads would be retained long after their termination if
reflection is used in them. This led to a steady, long memory leak in
applications using reflection in thread pools.
  • Loading branch information...
1 parent c4df20d commit d367c5f925e7055e9ee5b4071328e92cf48ce0ed @timcharper timcharper committed Oct 29, 2014
@@ -198,6 +198,11 @@ filter {
{
matchName="scala.collection.immutable.Stream.scala$collection$immutable$Stream$$loop$4"
problemName=MissingMethodProblem
+ },
+ // SI-8946
+ {
+ matchName="scala.reflect.runtime.ThreadLocalStorage#MyThreadLocalStorage.values"
+ problemName=MissingMethodProblem
}
]
}
@@ -11,12 +11,16 @@ private[reflect] trait ThreadLocalStorage {
trait ThreadLocalStorage[T] { def get: T; def set(newValue: T): Unit }
private class MyThreadLocalStorage[T](initialValue: => T) extends ThreadLocalStorage[T] {
// TODO: how do we use org.cliffc.high_scale_lib.NonBlockingHashMap here?
- val values = new java.util.concurrent.ConcurrentHashMap[Thread, T]()
+ // (we would need a version that uses weak keys)
+ private val values = java.util.Collections.synchronizedMap(new java.util.WeakHashMap[Thread, T]())
def get: T = {
if (values containsKey currentThread) values.get(currentThread)
else {
val value = initialValue
- values.putIfAbsent(currentThread, value)
+ // since the key is currentThread, and `values` is private, it
+ // would be impossible for a value to have been set after the
+ // above containsKey check. `putIfAbsent` is not necessary.
+ values.put(currentThread, value)
value
}
}
@@ -0,0 +1,29 @@
+// Tests to assert that references to threads are not strongly held when scala-reflection is used inside of them.
+object Test {
+ import scala.ref.WeakReference
+
+ def forceGc() = {
+ var obj = new Object
+ val ref = new WeakReference(obj)
+ obj = null;
+ while(ref.get.nonEmpty)
+ Array.ofDim[Byte](16 * 1024 * 1024)
+ }
+
+ def main(args: Array[String]): Unit = {
+ val threads = for (i <- (1 to 16)) yield {
+ val t = new Thread {
+ override def run(): Unit = {
+ import reflect.runtime.universe._
+ typeOf[List[String]] <:< typeOf[Seq[_]]
+ }
+ }
+ t.start()
+ t.join()
+ WeakReference(t)
+ }
+ forceGc()
+ val nonGCdThreads = threads.filter(_.get.nonEmpty).length
+ assert(nonGCdThreads == 0, s"${nonGCdThreads} threads were retained; expected 0.")
+ }
+}

0 comments on commit d367c5f

Please sign in to comment.