Skip to content

Commit

Permalink
Cleanup invocation handler. (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnou authored and JoeHegarty committed Mar 6, 2017
1 parent b09bae8 commit 16bf24f
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 42 deletions.
6 changes: 6 additions & 0 deletions actors/pom.xml
Expand Up @@ -65,6 +65,12 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
<artifactId>ea-async</artifactId> <artifactId>ea-async</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>com.intellij</groupId>
<artifactId>annotations</artifactId>
<version>12.0</version>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>


<dependencyManagement> <dependencyManagement>
Expand Down
Expand Up @@ -30,12 +30,10 @@


import cloud.orbit.actors.Actor; import cloud.orbit.actors.Actor;
import cloud.orbit.actors.Stage; import cloud.orbit.actors.Stage;
import cloud.orbit.actors.annotation.Reentrant;
import cloud.orbit.actors.exceptions.ObserverNotFound; import cloud.orbit.actors.exceptions.ObserverNotFound;
import cloud.orbit.actors.net.HandlerContext; import cloud.orbit.actors.net.HandlerContext;
import cloud.orbit.concurrent.Task; import cloud.orbit.concurrent.Task;
import cloud.orbit.lifecycle.Startable; import cloud.orbit.lifecycle.Startable;
import cloud.orbit.util.AnnotationCache;


import java.util.Objects; import java.util.Objects;


Expand All @@ -46,8 +44,6 @@ public class Execution extends AbstractExecution implements Startable


private InvocationHandler invocationHandler; private InvocationHandler invocationHandler;


private final AnnotationCache<Reentrant> reentrantCache = new AnnotationCache<>(Reentrant.class);

@Override @Override
public Task<Void> cleanup() public Task<Void> cleanup()
{ {
Expand Down Expand Up @@ -164,7 +160,7 @@ protected Task<Object> performInvocation(
} }


final ObjectInvoker invoker = DefaultDescriptorFactory.get().getInvoker(target.getObject().getClass()); final ObjectInvoker invoker = DefaultDescriptorFactory.get().getInvoker(target.getObject().getClass());
return invocationHandler.invoke(runtime, reentrantCache, invocation, entry, target, invoker); return invocationHandler.invoke(runtime, invocation, entry, target, invoker);
} }
catch (Throwable exception) catch (Throwable exception)
{ {
Expand Down
Expand Up @@ -28,6 +28,7 @@


package cloud.orbit.actors.runtime; package cloud.orbit.actors.runtime;


import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -42,6 +43,8 @@ public class InvocationHandler
{ {
private static final Logger logger = LoggerFactory.getLogger(InvocationHandler.class); private static final Logger logger = LoggerFactory.getLogger(InvocationHandler.class);


private final AnnotationCache<Reentrant> reentrantCache = new AnnotationCache<>(Reentrant.class);

private boolean performanceLoggingEnabled = true; private boolean performanceLoggingEnabled = true;
private double slowInvokeThresholdMs = 250; private double slowInvokeThresholdMs = 250;
private double slowTaskThresholdMs = 1000; private double slowTaskThresholdMs = 1000;
Expand All @@ -61,15 +64,15 @@ public void setSlowTaskThresholdMs(double slowTaskThresholdMs)
this.slowTaskThresholdMs = slowTaskThresholdMs; this.slowTaskThresholdMs = slowTaskThresholdMs;
} }


public void beforeInvoke(Invocation invocation, Method method) protected void beforeInvoke(Invocation invocation, @Nullable Method method)
{ {
if (logger.isDebugEnabled()) if (logger.isDebugEnabled())
{ {
logger.debug("Invoking: {}.{}", invocation.getToReference().toString(), method.getName()); logger.debug("Invoking: {}.{}", invocation.getToReference().toString(), method != null ? method.getName() : invocation.getMethodId());
} }
} }


public void afterInvoke(long startTimeNanos, Invocation invocation, Method method) protected void afterInvoke(long startTimeNanos, Invocation invocation, @Nullable Method method)
{ {
if (performanceLoggingEnabled && logger.isWarnEnabled()) if (performanceLoggingEnabled && logger.isWarnEnabled())
{ {
Expand All @@ -78,12 +81,12 @@ public void afterInvoke(long startTimeNanos, Invocation invocation, Method metho
if (durationMs > slowInvokeThresholdMs) if (durationMs > slowInvokeThresholdMs)
{ {
logger.warn("Slow task: {}. {} in {} ms", logger.warn("Slow task: {}. {} in {} ms",
invocation.getToReference().toString(), method.getName(), durationMs); invocation.getToReference().toString(), method != null ? method.getName() : invocation.getMethodId(), durationMs);
} }
} }
} }


public void taskComplete(long startTimeNanos, Invocation invocation, Method method) protected void taskComplete(long startTimeNanos, Invocation invocation, @Nullable Method method)
{ {
if (performanceLoggingEnabled && logger.isWarnEnabled()) if (performanceLoggingEnabled && logger.isWarnEnabled())
{ {
Expand All @@ -92,17 +95,18 @@ public void taskComplete(long startTimeNanos, Invocation invocation, Method meth
if (durationMs > slowTaskThresholdMs) if (durationMs > slowTaskThresholdMs)
{ {
logger.warn("Slow chain: {}. {} in {} ms", logger.warn("Slow chain: {}. {} in {} ms",
invocation.getToReference().toString(), method.getName(), durationMs); invocation.getToReference().toString(), method != null ? method.getName() : invocation.getMethodId(), durationMs);
} }
} }
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Task<Object> invoke(Stage runtime, AnnotationCache<Reentrant> reentrantCache, Invocation invocation, LocalObjects.LocalObjectEntry entry, LocalObjects.LocalObjectEntry target, ObjectInvoker invoker) Task<Object> invoke(Stage runtime, Invocation invocation, LocalObjects.LocalObjectEntry entry, LocalObjects.LocalObjectEntry target, ObjectInvoker invoker)
{ {
boolean reentrant = false; runtime.bind();


final Method method; final Method method = invoker.getMethod(invocation.getMethodId());
final boolean reentrant = reentrantCache.isAnnotated(method);


final ActorTaskContext context = ActorTaskContext.current(); final ActorTaskContext context = ActorTaskContext.current();
if (context != null) if (context != null)
Expand All @@ -118,11 +122,8 @@ public Task<Object> invoke(Stage runtime, AnnotationCache<Reentrant> reentrantCa
}); });
} }


method = invoker.getMethod(invocation.getMethodId()); if (reentrant)

if (reentrantCache.isAnnotated(method))
{ {
reentrant = true;
context.setDefaultExecutor(r -> entry.run(o -> context.setDefaultExecutor(r -> entry.run(o ->
{ {
r.run(); r.run();
Expand All @@ -131,33 +132,16 @@ public Task<Object> invoke(Stage runtime, AnnotationCache<Reentrant> reentrantCa
} }
context.setRuntime(runtime); context.setRuntime(runtime);
} }
else
{
method = null;
runtime.bind();
}


final Task<Object> result; beforeInvoke(invocation, method);
if (method != null) final Task<Object> result = invoker.safeInvoke(target.getObject(), invocation.getMethodId(), invocation.getParams());
{ final long startTimeNanos = System.nanoTime();
beforeInvoke(invocation, method); afterInvoke(startTimeNanos, invocation, method);
result = invoker.safeInvoke(target.getObject(), invocation.getMethodId(), invocation.getParams()); if (invocation.getCompletion() != null)
final long startTimeNanos = System.nanoTime();
afterInvoke(startTimeNanos, invocation, method);
if (invocation.getCompletion() != null)
{
InternalUtils.linkFutures(result, invocation.getCompletion());
}
result.thenAccept(n -> taskComplete(startTimeNanos, invocation, method)); // handle instead of thenAccept?
}
else
{ {
result = invoker.safeInvoke(target.getObject(), invocation.getMethodId(), invocation.getParams()); InternalUtils.linkFutures(result, invocation.getCompletion());
if (invocation.getCompletion() != null)
{
InternalUtils.linkFutures(result, invocation.getCompletion());
}
} }
result.whenComplete((o, throwable) -> taskComplete(startTimeNanos, invocation, method));


if (reentrant) if (reentrant)
{ {
Expand Down

0 comments on commit 16bf24f

Please sign in to comment.