Skip to content

Commit

Permalink
Exposing which task context is active per thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielSperry committed Nov 24, 2015
1 parent 0d02c16 commit 0f7c53d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 8 deletions.
Expand Up @@ -50,6 +50,10 @@ public static ActorTaskContext pushNew()
void setActor(final AbstractActor<?> actor)
{
this.actor = actor;
if (actor != null)
{
this.runtime = actor.runtime;
}
}

public AbstractActor<?> getActor()
Expand All @@ -67,6 +71,22 @@ public static ActorTaskContext current()
return null;
}

/**
* Finds out what task context is active in a certain thread.
* This method is is available here for debugging and profiling.
*
* @return the actor task context associated to the thread
*/
public static ActorTaskContext currentFor(Thread thread)
{
TaskContext current = TaskContext.currentFor(thread);
if (current instanceof ActorTaskContext)
{
return (ActorTaskContext) current;
}
return null;
}

void setRuntime(final ActorRuntime runtime)
{
this.runtime = runtime;
Expand Down
41 changes: 33 additions & 8 deletions commons/src/main/java/com/ea/orbit/concurrent/TaskContext.java
@@ -1,8 +1,10 @@
package com.ea.orbit.concurrent;

import java.util.LinkedList;
import java.util.Deque;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand All @@ -12,9 +14,11 @@

public class TaskContext
{
// TODO: replace the linked list with a compressed stack (i.e. repeated object == a counter)
private static ThreadLocal<LinkedList<TaskContext>> contextStacks = new ThreadLocal<>();
private static AtomicLong nextId = new AtomicLong(1);
private final static ThreadLocal<Deque<TaskContext>> contextStacks = new ThreadLocal<>();
private final static WeakHashMap<Thread, Deque<TaskContext>> contextStacksMap = new WeakHashMap<>();
private final static AtomicLong nextId = new AtomicLong(1);

// human friendly id, for debugging
private long id = nextId.getAndIncrement();

private ConcurrentHashMap<String, Object> properties = new ConcurrentHashMap<>();
Expand All @@ -24,11 +28,17 @@ public class TaskContext
*/
public void push()
{
LinkedList<TaskContext> stack = contextStacks.get();
Deque<TaskContext> stack = contextStacks.get();
if (stack == null)
{
stack = new LinkedList<>();
stack = new ConcurrentLinkedDeque<>();
contextStacks.set(stack);
final Thread currentThread = Thread.currentThread();
synchronized (contextStacksMap)
{
// this happens only once per thread, no need to optimize it
contextStacksMap.put(currentThread, stack);
}
}
stack.addLast(this);
}
Expand All @@ -39,7 +49,7 @@ public void push()
*/
public void pop()
{
LinkedList<TaskContext> stack = contextStacks.get();
Deque<TaskContext> stack = contextStacks.get();
if (stack == null || stack.size() == 0 || stack.getLast() != this)
{
throw new IllegalStateException("Invalid execution context stack state: " + stack + " trying to remove: " + this);
Expand All @@ -60,14 +70,29 @@ public String toString()
*/
public static TaskContext current()
{
final LinkedList<TaskContext> stack = contextStacks.get();
final Deque<TaskContext> stack = contextStacks.get();
if (stack == null || stack.size() == 0)
{
return null;
}
return stack.getLast();
}

/**
* Enables the application to peek into what is being executed in another thread.
* This method is intended for debugging and profiling.
*/
public static TaskContext currentFor(Thread thread)
{
final Deque<TaskContext> stack;
synchronized (contextStacksMap)
{
// this should not be called very often, it's for profiling
stack = contextStacksMap.get(thread);
}
return (stack != null) ? stack.peekLast() : null;
}

/**
* Wraps a Runnable in such a way the it will push the current execution context before any code gets executed and pop it afterwards
*
Expand Down

0 comments on commit 0f7c53d

Please sign in to comment.