Permalink
Browse files

Use a ReferenceQueue to track dying thread and remove their RubyThrea…

…d and

ThreadContext entries.

Modified to encapsulate the weak collection by Charles Oliver Nutter <headius@headius.com>
  • Loading branch information...
1 parent 93cd271 commit 364c41a1ead0666d217856347b1adf2b23d77a90 @chrisa chrisa committed with headius Mar 8, 2010
Showing with 105 additions and 17 deletions.
  1. +88 −0 src/org/jruby/internal/runtime/RubyThreadMap.java
  2. +17 −17 src/org/jruby/internal/runtime/ThreadService.java
@@ -0,0 +1,88 @@
+package org.jruby.internal.runtime;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+import org.jruby.RubyThread;
+import org.jruby.runtime.ThreadContext;
+
+public class RubyThreadMap {
+
+ private final Map<RubyThreadWeakReference<Object>, RubyThread> map = new Hashtable();
+ private final ReferenceQueue<Object> queue = new ReferenceQueue();
+ private final Map<RubyThread, ThreadContext> mapToClean;
+
+ public RubyThreadMap(Map<RubyThread, ThreadContext> mapToClean) {
+ this.mapToClean = mapToClean;
+ }
+
+ public static class RubyThreadWeakReference<T> extends WeakReference<T> {
+ private final RubyThread thread;
+ public int hashCode;
+ public RubyThreadWeakReference(T referrent, RubyThread thread) {
+ super(referrent);
+ hashCode = referrent.hashCode();
+ this.thread = thread;
+ }
+ public RubyThreadWeakReference(T referrent, ReferenceQueue<? super T> queue, RubyThread thread) {
+ super(referrent, queue);
+ hashCode = referrent.hashCode();
+ this.thread = thread;
+ }
+ public RubyThread getThread() {
+ return thread;
+ }
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+ @Override
+ public boolean equals(Object other) {
+ Object myKey = get();
+ if (other instanceof RubyThreadWeakReference) {
+ Object otherKey = ((RubyThreadWeakReference)other).get();
+ if (myKey != otherKey) return false;
+ return true;
+ } else if (other instanceof Thread) {
+ return myKey == other;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private void cleanup() {
+ RubyThreadWeakReference<Object> ref;
+ while ((ref = (RubyThreadWeakReference<Object>) queue.poll()) != null) {
+ map.remove(ref);
+ mapToClean.remove(ref.getThread());
+ }
+ }
+
+ public int size() {
+ cleanup();
+ return map.size();
+ }
+
+ public Set<Map.Entry<RubyThreadWeakReference<Object>, RubyThread>> entrySet() {
+ return map.entrySet();
+ }
+
+ public RubyThread get(Object key) {
+ cleanup();
+ return map.get(key);
+ }
+
+ public RubyThread put(Object key, RubyThread value) {
+ cleanup();
+ return map.put(new RubyThreadWeakReference(key, value), value);
+ }
+
+ public RubyThread remove(Object key) {
+ cleanup();
+ RubyThread t = map.remove(key);
+ return t;
+ }
+}
@@ -30,22 +30,21 @@
***** END LICENSE BLOCK *****/
package org.jruby.internal.runtime;
-import java.io.IOException;
+import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
-import java.nio.channels.Channel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.Selector;
+import java.lang.ref.WeakReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.Hashtable;
import java.util.concurrent.Future;
import org.jruby.Ruby;
-import org.jruby.RubyIO;
import org.jruby.RubyThread;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.ThreadContext;
@@ -55,10 +54,11 @@
private ThreadContext mainContext;
private ThreadLocal<SoftReference<ThreadContext>> localContext;
private ThreadGroup rubyThreadGroup;
- private Map<Object, RubyThread> rubyThreadMap;
+
+ private RubyThreadMap rubyThreadMap;
+ private Map<RubyThread,ThreadContext> threadContextMap;
private ReentrantLock criticalLock = new ReentrantLock();
- private Map<RubyThread,ThreadContext> threadContextMap;
public ThreadService(Ruby runtime) {
this.runtime = runtime;
@@ -71,8 +71,8 @@ public ThreadService(Ruby runtime) {
this.rubyThreadGroup = Thread.currentThread().getThreadGroup();
}
- this.rubyThreadMap = Collections.synchronizedMap(new WeakHashMap<Object, RubyThread>());
- this.threadContextMap = Collections.synchronizedMap(new WeakHashMap<RubyThread,ThreadContext>());
+ this.threadContextMap = new Hashtable<RubyThread, ThreadContext>();
+ this.rubyThreadMap = new RubyThreadMap(threadContextMap);
// Must be called from main thread (it is currently, but this bothers me)
localContext.set(new SoftReference<ThreadContext>(mainContext));
@@ -154,8 +154,10 @@ public void setMainThread(Thread thread, RubyThread rubyThread) {
synchronized(rubyThreadMap) {
List<RubyThread> rtList = new ArrayList<RubyThread>(rubyThreadMap.size());
- for (Map.Entry<Object, RubyThread> entry : rubyThreadMap.entrySet()) {
- Object key = entry.getKey();
+ for (Map.Entry<RubyThreadMap.RubyThreadWeakReference<Object>, RubyThread> entry : rubyThreadMap.entrySet()) {
+ Object key = entry.getKey().get();
+ if (key == null) continue;
+
if (key instanceof Thread) {
Thread t = (Thread)key;
@@ -178,10 +180,6 @@ public void setMainThread(Thread thread, RubyThread rubyThread) {
}
}
- public Map getRubyThreadMap() {
- return rubyThreadMap;
- }
-
public ThreadGroup getRubyThreadGroup() {
return rubyThreadGroup;
}
@@ -199,6 +197,8 @@ public synchronized ThreadContext registerNewThread(RubyThread thread) {
}
public synchronized void associateThread(Object threadOrFuture, RubyThread rubyThread) {
+ Reference r;
+
rubyThreadMap.put(threadOrFuture, rubyThread);
}

0 comments on commit 364c41a

Please sign in to comment.