Skip to content

Commit

Permalink
ISPN-2357 Serialize topology update commands
Browse files Browse the repository at this point in the history
Synchronize broadcasting of CH_UPDATE commands so that it's not possible
for the cluster members to receive them out-of-order.
  • Loading branch information
danberindei committed Oct 3, 2012
1 parent 9b9512a commit 908d857
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
Expand Up @@ -334,13 +334,18 @@ public void updateCacheStatusAfterMerge(String cacheName, List<CacheTopology> pa
}

private void broadcastConsistentHashUpdate(String cacheName, ClusterCacheStatus cacheStatus) throws Exception {
CacheTopology cacheTopology = cacheStatus.getCacheTopology();
log.debugf("Updating cluster-wide consistent hash for cache %s, topology = %s",
cacheName, cacheTopology);
ReplicableCommand command = new CacheTopologyControlCommand(cacheName,
CacheTopologyControlCommand.Type.CH_UPDATE, transport.getAddress(), cacheTopology,
transport.getViewId());
executeOnClusterSync(command, getGlobalTimeout());
// Serialize CH update commands, so that they don't arrive on the other members out-of-order.
// We are ok with sending the same CH update twice, we just don't want the other members to receive
// an older CH after they got the latest CH.
synchronized (cacheStatus) {
CacheTopology cacheTopology = cacheStatus.getCacheTopology();
log.debugf("Updating cluster-wide consistent hash for cache %s, topology = %s",
cacheName, cacheTopology);
ReplicableCommand command = new CacheTopologyControlCommand(cacheName,
CacheTopologyControlCommand.Type.CH_UPDATE, transport.getAddress(), cacheTopology,
transport.getViewId());
executeOnClusterSync(command, getGlobalTimeout());
}
}

private void startRebalance(String cacheName) throws Exception {
Expand Down Expand Up @@ -438,7 +443,12 @@ private HashMap<String, List<CacheTopology>> recoverClusterStatus() throws Excep
topologyList = new ArrayList<CacheTopology>();
clusterCacheMap.put(cacheName, topologyList);
}
topologyList.add(cacheTopology);

// The cache topology could be null if the new node sent a join request to the old coordinator
// but didn't get a response back yet
if (cacheTopology != null) {
topologyList.add(cacheTopology);
}

// This node may have joined, and still not be in the current or pending CH
// because the old coordinator didn't manage to start the rebalance before shutting down
Expand All @@ -463,7 +473,7 @@ public void updateClusterMembers(List<Address> newClusterMembers) throws Excepti
}

private boolean onCacheMembershipChange(String cacheName, ClusterCacheStatus cacheStatus) throws Exception {
boolean topologyChanged = updateTopologyAfterMembershipChange(cacheStatus);
boolean topologyChanged = updateTopologyAfterMembershipChange(cacheName, cacheStatus);
if (!topologyChanged)
return true;

Expand All @@ -479,22 +489,26 @@ private boolean onCacheMembershipChange(String cacheName, ClusterCacheStatus cac
return false;
}

private boolean updateTopologyAfterMembershipChange(ClusterCacheStatus cacheStatus) {
/**
* @return {@code true} if the topology was changed, {@code false} otherwise
*/
private boolean updateTopologyAfterMembershipChange(String cacheName, ClusterCacheStatus cacheStatus) {
synchronized (cacheStatus) {
ConsistentHashFactory consistentHashFactory = cacheStatus.getJoinInfo().getConsistentHashFactory();
int topologyId = cacheStatus.getCacheTopology().getTopologyId();
ConsistentHash currentCH = cacheStatus.getCacheTopology().getCurrentCH();
ConsistentHash pendingCH = cacheStatus.getCacheTopology().getPendingCH();
if (!cacheStatus.needConsistentHashUpdate()) {
// The cache already had an empty CH, there's nothing left to do
log.tracef("Cache %s members list was updated, but the cache topology doesn't need to change: %s",
cacheName, cacheStatus.getCacheTopology());
return false;
}

List<Address> newCurrentMembers = cacheStatus.pruneInvalidMembers(currentCH.getMembers());
if (newCurrentMembers.isEmpty()) {
CacheTopology newTopology = new CacheTopology(topologyId + 1, null, null);
cacheStatus.updateCacheTopology(newTopology);
// Technically the topology changed, but there's nobody to broadcast that update to
log.tracef("Initial topology installed for cache %s: %s", cacheName, newTopology);
return false;
}
ConsistentHash newCurrentCH = consistentHashFactory.updateMembers(currentCH, newCurrentMembers);
Expand All @@ -505,6 +519,7 @@ private boolean updateTopologyAfterMembershipChange(ClusterCacheStatus cacheStat
}
CacheTopology newTopology = new CacheTopology(topologyId, newCurrentCH, newPendingCH);
cacheStatus.updateCacheTopology(newTopology);
log.tracef("Cache %s topology updated: %s", cacheName, newTopology);
return true;
}
}
Expand All @@ -515,6 +530,7 @@ private void waitForView(int viewId) throws InterruptedException {
}
synchronized (viewUpdateLock) {
while (this.viewId < viewId) {
// break out of the loop after state transfer timeout expires
viewUpdateLock.wait(1000);
}
}
Expand Down
Expand Up @@ -159,7 +159,7 @@ public Map<String, Object[]> handleStatusRequest(int viewId) {
@Override
public void handleConsistentHashUpdate(String cacheName, CacheTopology cacheTopology, int viewId) throws InterruptedException {
if (!running) {
log.debugf("Ignoring consistent hash update %s for cache %s, the local cache manager is shutting down",
log.debugf("Ignoring consistent hash update %s for cache %s, the local cache manager is not running",
cacheTopology.getTopologyId(), cacheName);
return;
}
Expand Down Expand Up @@ -198,7 +198,7 @@ public void handleConsistentHashUpdate(String cacheName, CacheTopology cacheTopo
@Override
public void handleRebalance(String cacheName, CacheTopology cacheTopology, int viewId) throws InterruptedException {
if (!running) {
log.debugf("Ignoring rebalance request %s for cache %s, the local cache manager is shutting down",
log.debugf("Ignoring rebalance request %s for cache %s, the local cache manager is not running",
cacheTopology.getTopologyId(), cacheName);
return;
}
Expand Down
Expand Up @@ -61,7 +61,7 @@ public boolean confirmRebalance(Address node, int receivedTopologyId) {

boolean removed = confirmationsNeeded.remove(node);
if (!removed) {
log.tracef("Rebalance confirmation collector %d@%s ignored confirmation for %s, which is not a member",
log.tracef("Rebalance confirmation collector %d@%s ignored confirmation for %s, which is already confirmed",
topologyId, cacheName, node);
return false;
}
Expand All @@ -79,6 +79,8 @@ public boolean updateMembers(Collection<Address> newMembers) {
synchronized (this) {
// only return true the first time
boolean modified = confirmationsNeeded.retainAll(newMembers);
log.tracef("Rebalance confirmation collector %d@s members list updated, remaining list is %s",
topologyId, cacheName, confirmationsNeeded);
return modified && confirmationsNeeded.isEmpty();
}
}
Expand Down

0 comments on commit 908d857

Please sign in to comment.