Skip to content

Commit

Permalink
Refactor local address cache to allow for clearing cache if node drop…
Browse files Browse the repository at this point in the history
…s from cluster. (#235)
  • Loading branch information
johnou authored and JoeHegarty committed May 5, 2017
1 parent ea9ec0c commit ff54f6f
Showing 1 changed file with 46 additions and 65 deletions.
111 changes: 46 additions & 65 deletions actors/runtime/src/main/java/cloud/orbit/actors/runtime/Hosting.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Random;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import static com.ea.async.Async.await; import static com.ea.async.Async.await;
Expand All @@ -76,13 +76,12 @@ public class Hosting implements NodeCapabilities, Startable, PipelineExtension
private Stage stage; private Stage stage;


// according to the micro benchmarks, a guava cache is much slower than using a ConcurrentHashMap here. // according to the micro benchmarks, a guava cache is much slower than using a ConcurrentHashMap here.
private final Map<RemoteReference<?>, Task<NodeAddress>> localAddressCache = new ConcurrentHashMap<>(); private final Map<RemoteReference<?>, NodeAddress> localAddressCache = new ConcurrentHashMap<>();


// don't use RemoteReferences, better to restrict keys to a small set of classes. // don't use RemoteReferences, better to restrict keys to a small set of classes.
private volatile ConcurrentMap<RemoteKey, NodeAddress> distributedDirectory; private volatile ConcurrentMap<RemoteKey, NodeAddress> distributedDirectory;


private long timeToWaitForServersMillis = 30000; private long timeToWaitForServersMillis = 30000;
private Random random = new Random();


private TreeMap<String, NodeInfo> consistentHashNodeTree = new TreeMap<>(); private TreeMap<String, NodeInfo> consistentHashNodeTree = new TreeMap<>();
private final AnnotationCache<OnlyIfActivated> onlyIfActivateCache = new AnnotationCache<>(OnlyIfActivated.class); private final AnnotationCache<OnlyIfActivated> onlyIfActivateCache = new AnnotationCache<>(OnlyIfActivated.class);
Expand All @@ -91,8 +90,6 @@ public class Hosting implements NodeCapabilities, Startable, PipelineExtension


private int maxLocalAddressCacheCount = 10_000; private int maxLocalAddressCacheCount = 10_000;


private final Task<NodeAddress> nullAddress = Task.fromValue(null);

private NodeSelectorExtension nodeSelector; private NodeSelectorExtension nodeSelector;


public Hosting() public Hosting()
Expand Down Expand Up @@ -177,6 +174,8 @@ public Task<Void> nodeModeChanged(final NodeAddress nodeAddress, final NodeState
node.state = newState; node.state = newState;
if (node.state != NodeState.RUNNING) if (node.state != NodeState.RUNNING)
{ {
// clear local cache
localAddressCache.values().remove(nodeAddress);
// clear list of actors this node can activate // clear list of actors this node can activate
node.canActivate.clear(); node.canActivate.clear();
} }
Expand All @@ -191,7 +190,7 @@ public Task<Void> moved(RemoteReference remoteReference, NodeAddress oldAddress,
{ {
logger.debug("Move {} to from {} to {}.", remoteReference, oldAddress, newAddress); logger.debug("Move {} to from {} to {}.", remoteReference, oldAddress, newAddress);
} }
setCachedAddress(remoteReference, Task.fromValue(newAddress)); setCachedAddress(remoteReference, newAddress);
return Task.done(); return Task.done();
} }


Expand Down Expand Up @@ -227,7 +226,6 @@ private void onClusterViewChanged(final Collection<NodeAddress> nodes)


final HashMap<NodeAddress, NodeInfo> oldNodes = new HashMap<>(activeNodes); final HashMap<NodeAddress, NodeInfo> oldNodes = new HashMap<>(activeNodes);
final HashMap<NodeAddress, NodeInfo> newNodes = new HashMap<>(nodes.size()); final HashMap<NodeAddress, NodeInfo> newNodes = new HashMap<>(nodes.size());
final List<NodeInfo> justAddedNodes = new ArrayList<>(Math.max(1, nodes.size() - oldNodes.size()));


final TreeMap<String, NodeInfo> newHashes = new TreeMap<>(); final TreeMap<String, NodeInfo> newHashes = new TreeMap<>();


Expand All @@ -239,7 +237,6 @@ private void onClusterViewChanged(final Collection<NodeAddress> nodes)
nodeInfo = new NodeInfo(a); nodeInfo = new NodeInfo(a);
nodeInfo.nodeCapabilities = stage.getRemoteObserverReference(a, NodeCapabilities.class, ""); nodeInfo.nodeCapabilities = stage.getRemoteObserverReference(a, NodeCapabilities.class, "");
nodeInfo.active = true; nodeInfo.active = true;
justAddedNodes.add(nodeInfo);
} }
newNodes.put(a, nodeInfo); newNodes.put(a, nodeInfo);


Expand Down Expand Up @@ -289,8 +286,8 @@ public Task<NodeAddress> locateActor(final RemoteReference reference, final bool


private Task<NodeAddress> locateActiveActor(final RemoteReference<?> actorReference) private Task<NodeAddress> locateActiveActor(final RemoteReference<?> actorReference)
{ {
final NodeAddress address = await(getCachedAddressTask(actorReference)); final NodeAddress address = localAddressCache.get(actorReference);
if (address != null && address != nullAddress && activeNodes.containsKey(address)) if (address != null && activeNodes.containsKey(address))
{ {
return Task.fromValue(address); return Task.fromValue(address);
} }
Expand All @@ -299,12 +296,6 @@ private Task<NodeAddress> locateActiveActor(final RemoteReference<?> actorRefere
return Task.fromValue(getDistributedDirectory().get(createRemoteKey(actorReference))); return Task.fromValue(getDistributedDirectory().get(createRemoteKey(actorReference)));
} }


private Task<NodeAddress> getCachedAddressTask(final RemoteReference<?> actorReference)
{
final Task<NodeAddress> addressTask = localAddressCache.get(actorReference);
return (addressTask == null || addressTask.isCompletedExceptionally()) ? nullAddress : addressTask;
}

public void actorDeactivated(RemoteReference remoteReference) public void actorDeactivated(RemoteReference remoteReference)
{ {
// removing the reference from the cluster directory and local caches // removing the reference from the cluster directory and local caches
Expand Down Expand Up @@ -340,73 +331,63 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
} }


// Get the existing activation from the local cache (if any) // Get the existing activation from the local cache (if any)
final NodeAddress address = await(getCachedAddressTask(actorReference)); final NodeAddress address = localAddressCache.get(actorReference);


// Is this actor already activated and in the local cache? If so, we're done // Is this actor already activated and in the local cache? If so, we're done
if (address != null && address != nullAddress && activeNodes.containsKey(address)) if (address != null && activeNodes.containsKey(address))
{ {
return Task.fromValue(address); return Task.fromValue(address);
} }


// There is no existing activation at this time or it's not in the local cache // Get the distributed cache if needed
final Task<NodeAddress> async = Task.supplyAsync(() -> final ConcurrentMap<RemoteKey, NodeAddress> distributedDirectory = getDistributedDirectory();
{
NodeAddress nodeAddress = null;

// Get the distributed cache if needed
final ConcurrentMap<RemoteKey, NodeAddress> distributedDirectory = getDistributedDirectory();

// Get the existing activation from the distributed cache (if any)
nodeAddress = distributedDirectory.get(remoteKey);
if (nodeAddress != null)
{
// Target node still valid?
if (activeNodes.containsKey(nodeAddress))
{
return Task.fromValue(nodeAddress);
}
else
{
// Target node is now dead, remove this activation from distributed cache
distributedDirectory.remove(remoteKey, nodeAddress);
}
}
nodeAddress = null;


// Should place locally? // Get the existing activation from the distributed cache (if any)
if (shouldPlaceLocally(interfaceClass)) NodeAddress nodeAddress = distributedDirectory.get(remoteKey);
if (nodeAddress != null)
{
// Target node still valid?
if (activeNodes.containsKey(nodeAddress))
{ {
nodeAddress = clusterPeer.localAddress(); return Task.fromValue(nodeAddress);
} }

else
// Do we have a target node yet?
if (nodeAddress == null)
{ {
// If not, select randomly // Target node is now dead, remove this activation from distributed cache
nodeAddress = await(selectNode(interfaceClass.getName())); distributedDirectory.remove(remoteKey, nodeAddress);
} }
}
nodeAddress = null;


// Push our selection to the distributed cache (if possible) // Should place locally?
final NodeAddress otherNodeAddress = distributedDirectory.putIfAbsent(remoteKey, nodeAddress); if (shouldPlaceLocally(interfaceClass))
{
nodeAddress = clusterPeer.localAddress();
}


// Someone else beat us to placement, use that node // Do we have a target node yet?
if (otherNodeAddress != null) if (nodeAddress == null)
{ {
nodeAddress = otherNodeAddress; // If not, select randomly
} nodeAddress = await(selectNode(interfaceClass.getName()));
}


return Task.fromValue(nodeAddress); // Push our selection to the distributed cache (if possible)
NodeAddress otherNodeAddress = distributedDirectory.putIfAbsent(remoteKey, nodeAddress);


}, stage.getExecutionPool()); // Someone else beat us to placement, use that node
if (otherNodeAddress != null)
{
nodeAddress = otherNodeAddress;
}


// Cache locally // Cache locally
setCachedAddress(actorReference, async); setCachedAddress(actorReference, nodeAddress);

return async;


return Task.fromValue(nodeAddress);
} }


private void setCachedAddress(final RemoteReference<?> actorReference, final Task<NodeAddress> nodeAddress) private void setCachedAddress(final RemoteReference<?> actorReference, final NodeAddress nodeAddress)
{ {
cleanup(); cleanup();
localAddressCache.put(actorReference, nodeAddress); localAddressCache.put(actorReference, nodeAddress);
Expand Down Expand Up @@ -526,7 +507,7 @@ private boolean shouldPlaceLocally(final Class<?> interfaceClass)
nodeType == NodeTypeEnum.SERVER && stage.canActivateActor(interfaceClassName)) nodeType == NodeTypeEnum.SERVER && stage.canActivateActor(interfaceClassName))
{ {
final int percentile = interfaceClass.getAnnotation(PreferLocalPlacement.class).percentile(); final int percentile = interfaceClass.getAnnotation(PreferLocalPlacement.class).percentile();
if (random.nextInt(100) < percentile) if (ThreadLocalRandom.current().nextInt(100) < percentile)
{ {
return true; return true;
} }
Expand Down Expand Up @@ -617,7 +598,7 @@ private Task onInvocation(final HandlerContext ctx, final Invocation invocation)
ctx.fireRead(invocation); ctx.fireRead(invocation);
return Task.done(); return Task.done();
} }
final NodeAddress cachedAddress = await(getCachedAddressTask(toReference)); final NodeAddress cachedAddress = localAddressCache.get(toReference);
if (Objects.equals(cachedAddress, localAddress)) if (Objects.equals(cachedAddress, localAddress))
{ {
ctx.fireRead(invocation); ctx.fireRead(invocation);
Expand Down

0 comments on commit ff54f6f

Please sign in to comment.