Skip to content

Commit

Permalink
Correct usage of await (onInvocation return type cannot be void..). (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
johnou authored and JoeHegarty committed Nov 13, 2016
1 parent b6f9c64 commit 4b94aaf
Showing 1 changed file with 15 additions and 18 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@


package cloud.orbit.actors.runtime; package cloud.orbit.actors.runtime;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cloud.orbit.actors.Stage; import cloud.orbit.actors.Stage;
import cloud.orbit.actors.annotation.OnlyIfActivated;
import cloud.orbit.actors.annotation.PreferLocalPlacement; import cloud.orbit.actors.annotation.PreferLocalPlacement;
import cloud.orbit.actors.annotation.StatelessWorker; import cloud.orbit.actors.annotation.StatelessWorker;
import cloud.orbit.actors.cluster.ClusterPeer; import cloud.orbit.actors.cluster.ClusterPeer;
import cloud.orbit.actors.cluster.NodeAddress; import cloud.orbit.actors.cluster.NodeAddress;
import cloud.orbit.actors.exceptions.ObserverNotFound; import cloud.orbit.actors.exceptions.ObserverNotFound;
import cloud.orbit.actors.extensions.PipelineExtension; import cloud.orbit.actors.extensions.PipelineExtension;
import cloud.orbit.actors.net.HandlerContext; import cloud.orbit.actors.net.HandlerContext;
import cloud.orbit.actors.annotation.OnlyIfActivated;
import cloud.orbit.concurrent.Task; import cloud.orbit.concurrent.Task;
import cloud.orbit.lifecycle.Startable;
import cloud.orbit.exception.UncheckedException; import cloud.orbit.exception.UncheckedException;
import cloud.orbit.lifecycle.Startable;
import cloud.orbit.util.AnnotationCache; import cloud.orbit.util.AnnotationCache;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.ArrayList; import java.util.ArrayList;
Expand Down Expand Up @@ -283,7 +283,7 @@ public Task<NodeAddress> locateActor(final RemoteReference reference, final bool
private Task<NodeAddress> locateActiveActor(final RemoteReference<?> actorReference) private Task<NodeAddress> locateActiveActor(final RemoteReference<?> actorReference)
{ {
final NodeAddress address = await(getCachedAddressTask(actorReference)); final NodeAddress address = await(getCachedAddressTask(actorReference));
if (address != null && activeNodes.containsKey(address)) if (address != null && address != nullAddress && activeNodes.containsKey(address))
{ {
return Task.fromValue(address); return Task.fromValue(address);
} }
Expand Down Expand Up @@ -337,7 +337,7 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
NodeAddress address = await(getCachedAddressTask(actorReference)); NodeAddress address = await(getCachedAddressTask(actorReference));


// Is this actor already activated and in the local cache? If so, we're done // Is this actor already activated and in the local cache? If so, we're done
if (address != null && activeNodes.containsKey(address)) if (address != null && address != nullAddress && activeNodes.containsKey(address))
{ {
return Task.fromValue(address); return Task.fromValue(address);
} }
Expand All @@ -357,8 +357,6 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
// Target node still valid? // Target node still valid?
if (activeNodes.containsKey(nodeAddress)) if (activeNodes.containsKey(nodeAddress))
{ {
// Cache locally
setCachedAddress(actorReference, Task.fromValue(nodeAddress));
return Task.fromValue(nodeAddress); return Task.fromValue(nodeAddress);
} }
else else
Expand Down Expand Up @@ -391,12 +389,11 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
nodeAddress = otherNodeAddress; nodeAddress = otherNodeAddress;
} }


// Add to local cache
setCachedAddress(actorReference, Task.fromValue(nodeAddress));

return Task.fromValue(nodeAddress); return Task.fromValue(nodeAddress);


}, stage.getExecutionPool()); }, stage.getExecutionPool());

// Cache locally
setCachedAddress(actorReference, async); setCachedAddress(actorReference, async);


return async; return async;
Expand Down Expand Up @@ -603,28 +600,28 @@ public void onRead(final HandlerContext ctx, final Object msg) throws Exception
} }
} }


private void onInvocation(final HandlerContext ctx, final Invocation invocation) private Task onInvocation(final HandlerContext ctx, final Invocation invocation)
{ {
final RemoteReference toReference = invocation.getToReference(); final RemoteReference toReference = invocation.getToReference();
final NodeAddress localAddress = getNodeAddress(); final NodeAddress localAddress = getNodeAddress();


if (Objects.equals(toReference.address, localAddress)) if (Objects.equals(toReference.address, localAddress))
{ {
ctx.fireRead(invocation); ctx.fireRead(invocation);
return; return Task.done();
} }
final NodeAddress cachedAddress = await(getCachedAddressTask(toReference)); final NodeAddress cachedAddress = await(getCachedAddressTask(toReference));
if (Objects.equals(cachedAddress, localAddress)) if (Objects.equals(cachedAddress, localAddress))
{ {
ctx.fireRead(invocation); ctx.fireRead(invocation);
return; return Task.done();
} }
if (toReference._interfaceClass().isAnnotationPresent(StatelessWorker.class) if (toReference._interfaceClass().isAnnotationPresent(StatelessWorker.class)
&& stage.canActivateActor(toReference._interfaceClass().getName())) && stage.canActivateActor(toReference._interfaceClass().getName()))
{ {
// accepting stateless worker call // accepting stateless worker call
ctx.fireRead(invocation); ctx.fireRead(invocation);
return; return Task.done();
} }


if (logger.isDebugEnabled()) if (logger.isDebugEnabled())
Expand All @@ -636,7 +633,7 @@ private void onInvocation(final HandlerContext ctx, final Invocation invocation)
// since we received this message someone thinks that this node is the right one. // since we received this message someone thinks that this node is the right one.
// so we remove that entry from the local cache and query the global cache again // so we remove that entry from the local cache and query the global cache again
localAddressCache.remove(toReference); localAddressCache.remove(toReference);
locateActor(invocation.getToReference(), true) return locateActor(invocation.getToReference(), true)
.whenComplete((r, e) -> { .whenComplete((r, e) -> {
if (e != null) if (e != null)
{ {
Expand Down Expand Up @@ -685,7 +682,7 @@ else if (r != null)
// don't know what to do with it... // don't know what to do with it...
if (logger.isErrorEnabled()) if (logger.isErrorEnabled())
{ {
logger.error("Can't find a destination for {}", invocation); logger.error("Failed to find destination for {}", invocation);
} }
} }
}); });
Expand Down

0 comments on commit 4b94aaf

Please sign in to comment.