Navigation Menu

Skip to content

Commit

Permalink
Optimizing Hosting
Browse files Browse the repository at this point in the history
  • Loading branch information
Sperry, Daniel committed Dec 16, 2015
1 parent 35c0c42 commit c8482ee
Showing 1 changed file with 45 additions and 34 deletions.
79 changes: 45 additions & 34 deletions actors/stage/src/main/java/com/ea/orbit/actors/runtime/Hosting.java
Expand Up @@ -76,7 +76,7 @@ public class Hosting implements NodeCapabilities, Startable, PipelineExtension
private Stage stage;

// according to the micro benchmarks, a guava cache is much slower than using a ConcurrentHashMap here.
private Map<RemoteReference<?>, NodeAddress> localAddressCache = new ConcurrentHashMap<>();
private Map<RemoteReference<?>, Task<NodeAddress>> localAddressCache = new ConcurrentHashMap<>();

// don't use RemoteReferences, better to restrict keys to a small set of classes.
private volatile ConcurrentMap<RemoteKey, NodeAddress> distributedDirectory;
Expand All @@ -93,6 +93,8 @@ public class Hosting implements NodeCapabilities, Startable, PipelineExtension
@Config("orbit.actors.maxLocalAddressCacheCount")
private int maxLocalAddressCacheCount = 10_000;

private final Task<NodeAddress> nullAddress = Task.fromValue(null);

public Hosting()
{
//
Expand Down Expand Up @@ -189,7 +191,7 @@ public Task<Void> nodeModeChanged(final NodeAddress nodeAddress, final NodeState

public Task<Void> moved(RemoteReference remoteReference, NodeAddress oldAddress, NodeAddress newAddress)
{
setCachedAddress(remoteReference, newAddress);
setCachedAddress(remoteReference, Task.fromValue(newAddress));
return Task.done();
}

Expand Down Expand Up @@ -275,7 +277,7 @@ public Task<NodeAddress> locateActor(final RemoteReference reference, final bool

private Task<NodeAddress> locateActiveActor(final RemoteReference<?> actorReference)
{
NodeAddress address = getCachedAddress(actorReference);
final NodeAddress address = await(getCachedAddressTask(actorReference));
if (address != null && activeNodes.containsKey(address))
{
return Task.fromValue(address);
Expand All @@ -285,9 +287,10 @@ private Task<NodeAddress> locateActiveActor(final RemoteReference<?> actorRefere
return Task.fromValue(getDistributedDirectory().get(createRemoteKey(actorReference)));
}

private NodeAddress getCachedAddress(final RemoteReference<?> actorReference)
private Task<NodeAddress> getCachedAddressTask(final RemoteReference<?> actorReference)
{
return localAddressCache.get(actorReference);
final Task<NodeAddress> addressTask = localAddressCache.get(actorReference);
return (addressTask == null || addressTask.isCompletedExceptionally()) ? nullAddress : addressTask;
}

public void actorDeactivated(RemoteReference remoteReference)
Expand All @@ -314,12 +317,12 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
else
{
final String interfaceClassName = interfaceClass.getName();
return Task.fromValue(selectNode(interfaceClassName, true));
return selectNode(interfaceClassName, true);
}
}

// Get the existing activation from the local cache (if any)
NodeAddress address = getCachedAddress(actorReference);
NodeAddress address = await(getCachedAddressTask(actorReference));

// Is this actor already activated and in the local cache? If so, we're done
if (address != null && activeNodes.containsKey(address))
Expand All @@ -328,7 +331,9 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
}

// There is no existing activation at this time or it's not in the local cache
final CompletableFuture<NodeAddress> async = CompletableFuture.supplyAsync(() ->


final Task<NodeAddress> async = Task.supplyAsync(() ->
{
NodeAddress nodeAddress = null;

Expand All @@ -343,8 +348,8 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
if (activeNodes.containsKey(nodeAddress))
{
// Cache locally
setCachedAddress(actorReference, nodeAddress);
return nodeAddress;
setCachedAddress(actorReference, Task.fromValue(nodeAddress));
return Task.fromValue(nodeAddress);
}
else
{
Expand All @@ -364,7 +369,7 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
if (nodeAddress == null)
{
// If not, select randomly
nodeAddress = selectNode(interfaceClass.getName(), true);
nodeAddress = await(selectNode(interfaceClass.getName(), true));
}

// Push our selection to the distributed cache (if possible)
Expand All @@ -377,17 +382,18 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
}

// Add to local cache
setCachedAddress(actorReference, nodeAddress);
setCachedAddress(actorReference, Task.fromValue(nodeAddress));

return nodeAddress;
return Task.fromValue(nodeAddress);

}, stage.getExecutionPool());
setCachedAddress(actorReference, async);

return Task.from(async);
return async;

}

private void setCachedAddress(final RemoteReference<?> actorReference, final NodeAddress nodeAddress)
private void setCachedAddress(final RemoteReference<?> actorReference, final Task<NodeAddress> nodeAddress)
{
cleanup();
localAddressCache.put(actorReference, nodeAddress);
Expand All @@ -414,7 +420,7 @@ private RemoteKey createRemoteKey(final RemoteReference actorReference)
String.valueOf(actorReference.id));
}

private NodeAddress selectNode(final String interfaceClassName, boolean allowToBlock)
private Task<NodeAddress> selectNode(final String interfaceClassName, boolean allowToBlock)
{
List<NodeInfo> potentialNodes;
long start = System.currentTimeMillis();
Expand All @@ -441,21 +447,7 @@ private NodeAddress selectNode(final String interfaceClassName, boolean allowToB
{
return null;
}
// waits for servers
synchronized (serverNodesUpdateMutex)
{
if (serverNodes.size() == 0)
{
try
{
serverNodesUpdateMutex.wait(5);
}
catch (InterruptedException e)
{
throw new UncheckedException(e);
}
}
}
waitForServers();
}
else
{
Expand All @@ -467,7 +459,7 @@ private NodeAddress selectNode(final String interfaceClassName, boolean allowToB
// ask if the node can activate this type of actor.
try
{
canActivate = nodeInfo.nodeCapabilities.canActivate(interfaceClassName).join();
canActivate = await(nodeInfo.nodeCapabilities.canActivate(interfaceClassName));
if (canActivate == actorSupported_noneSupported)
{
nodeInfo.cannotHostActors = true;
Expand All @@ -487,7 +479,26 @@ private NodeAddress selectNode(final String interfaceClassName, boolean allowToB
}
if (canActivate == actorSupported_yes)
{
return nodeInfo.address;
return Task.fromValue(nodeInfo.address);
}
}
}
}

private void waitForServers()
{
// waits for servers
synchronized (serverNodesUpdateMutex)
{
if (serverNodes.size() == 0)
{
try
{
serverNodesUpdateMutex.wait(5);
}
catch (InterruptedException e)
{
throw new UncheckedException(e);
}
}
}
Expand Down Expand Up @@ -592,7 +603,7 @@ private void onInvocation(final HandlerContext ctx, final Invocation invocation)
ctx.fireRead(invocation);
return;
}
final NodeAddress cachedAddress = getCachedAddress(toReference);
final NodeAddress cachedAddress = await(getCachedAddressTask(toReference));
if (Objects.equals(cachedAddress, localAddress))
{
ctx.fireRead(invocation);
Expand Down

0 comments on commit c8482ee

Please sign in to comment.