From 0f7c53dfd2c91cd394b83c2edcb1ef66df1c98ff Mon Sep 17 00:00:00 2001 From: Daniel Sperry Date: Mon, 23 Nov 2015 23:33:49 -0500 Subject: [PATCH] Exposing which task context is active per thread. --- .../actors/runtime/ActorTaskContext.java | 20 +++++++++ .../com/ea/orbit/concurrent/TaskContext.java | 41 +++++++++++++++---- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/actors/core/src/main/java/com/ea/orbit/actors/runtime/ActorTaskContext.java b/actors/core/src/main/java/com/ea/orbit/actors/runtime/ActorTaskContext.java index eacc75be4..b33dec19e 100644 --- a/actors/core/src/main/java/com/ea/orbit/actors/runtime/ActorTaskContext.java +++ b/actors/core/src/main/java/com/ea/orbit/actors/runtime/ActorTaskContext.java @@ -50,6 +50,10 @@ public static ActorTaskContext pushNew() void setActor(final AbstractActor actor) { this.actor = actor; + if (actor != null) + { + this.runtime = actor.runtime; + } } public AbstractActor getActor() @@ -67,6 +71,22 @@ public static ActorTaskContext current() return null; } + /** + * Finds out what task context is active in a certain thread. + * This method is is available here for debugging and profiling. + * + * @return the actor task context associated to the thread + */ + public static ActorTaskContext currentFor(Thread thread) + { + TaskContext current = TaskContext.currentFor(thread); + if (current instanceof ActorTaskContext) + { + return (ActorTaskContext) current; + } + return null; + } + void setRuntime(final ActorRuntime runtime) { this.runtime = runtime; diff --git a/commons/src/main/java/com/ea/orbit/concurrent/TaskContext.java b/commons/src/main/java/com/ea/orbit/concurrent/TaskContext.java index 099ee20cd..eeee25d8a 100644 --- a/commons/src/main/java/com/ea/orbit/concurrent/TaskContext.java +++ b/commons/src/main/java/com/ea/orbit/concurrent/TaskContext.java @@ -1,8 +1,10 @@ package com.ea.orbit.concurrent; -import java.util.LinkedList; +import java.util.Deque; import java.util.Map; +import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -12,9 +14,11 @@ public class TaskContext { - // TODO: replace the linked list with a compressed stack (i.e. repeated object == a counter) - private static ThreadLocal> contextStacks = new ThreadLocal<>(); - private static AtomicLong nextId = new AtomicLong(1); + private final static ThreadLocal> contextStacks = new ThreadLocal<>(); + private final static WeakHashMap> contextStacksMap = new WeakHashMap<>(); + private final static AtomicLong nextId = new AtomicLong(1); + + // human friendly id, for debugging private long id = nextId.getAndIncrement(); private ConcurrentHashMap properties = new ConcurrentHashMap<>(); @@ -24,11 +28,17 @@ public class TaskContext */ public void push() { - LinkedList stack = contextStacks.get(); + Deque stack = contextStacks.get(); if (stack == null) { - stack = new LinkedList<>(); + stack = new ConcurrentLinkedDeque<>(); contextStacks.set(stack); + final Thread currentThread = Thread.currentThread(); + synchronized (contextStacksMap) + { + // this happens only once per thread, no need to optimize it + contextStacksMap.put(currentThread, stack); + } } stack.addLast(this); } @@ -39,7 +49,7 @@ public void push() */ public void pop() { - LinkedList stack = contextStacks.get(); + Deque stack = contextStacks.get(); if (stack == null || stack.size() == 0 || stack.getLast() != this) { throw new IllegalStateException("Invalid execution context stack state: " + stack + " trying to remove: " + this); @@ -60,7 +70,7 @@ public String toString() */ public static TaskContext current() { - final LinkedList stack = contextStacks.get(); + final Deque stack = contextStacks.get(); if (stack == null || stack.size() == 0) { return null; @@ -68,6 +78,21 @@ public static TaskContext current() return stack.getLast(); } + /** + * Enables the application to peek into what is being executed in another thread. + * This method is intended for debugging and profiling. + */ + public static TaskContext currentFor(Thread thread) + { + final Deque stack; + synchronized (contextStacksMap) + { + // this should not be called very often, it's for profiling + stack = contextStacksMap.get(thread); + } + return (stack != null) ? stack.peekLast() : null; + } + /** * Wraps a Runnable in such a way the it will push the current execution context before any code gets executed and pop it afterwards *