Skip to content

Commit

Permalink
ISPN-14793 Join recovering caches after view merge
Browse files Browse the repository at this point in the history
* After receiving a CacheStatusRequest from the coordinator, the nodes
will send a join request for the caches which need to be recovered from
the persistent state.
  • Loading branch information
jabolina committed Apr 20, 2023
1 parent 3cc3c30 commit 697379f
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.util.concurrent.ActionSequencer;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
Expand Down Expand Up @@ -184,7 +185,20 @@ public CompletionStage<CacheTopology> join(String cacheName, CacheJoinInfo joinI
});
}

public CompletionStage<CacheStatusResponse> sendJoinRequest(String cacheName, CacheJoinInfo joinInfo, long timeout,
private CompletionStage<CacheTopology> join(String cacheName, LocalCacheStatus cacheStatus) {
return orderOnCache(cacheName, () -> {
if (runningCaches.get(cacheName) != cacheStatus) {
throw new IllegalStateException("Cache status changed while joining");
}

long timeout = cacheStatus.getJoinInfo().getTimeout();
long endTime = timeService.expectedEndTime(timeout, MILLISECONDS);
return sendJoinRequest(cacheName, cacheStatus.getJoinInfo(), timeout, endTime)
.thenCompose(joinResponse -> handleJoinResponse(cacheName, cacheStatus, joinResponse));
});
}

private CompletionStage<CacheStatusResponse> sendJoinRequest(String cacheName, CacheJoinInfo joinInfo, long timeout,
long endTime) {
int viewId = transport.getViewId();
ReplicableCommand command = new CacheJoinCommand(cacheName, transport.getAddress(), joinInfo, viewId);
Expand Down Expand Up @@ -295,6 +309,7 @@ public CompletionStage<ManagerStatusResponse> handleStatusRequest(int viewId) {
// As long as we have an older view, we can still process topologies from the old coordinator
return withView(viewId, getGlobalTimeout(), MILLISECONDS).thenApply(ignored -> {
Map<String, CacheStatusResponse> caches = new HashMap<>();
AggregateCompletionStage<Void> joins = CompletionStages.aggregateCompletionStage();
synchronized (runningCaches) {
latestStatusResponseViewId = viewId;

Expand All @@ -304,8 +319,21 @@ public CompletionStage<ManagerStatusResponse> handleStatusRequest(int viewId) {
// Ignore caches that haven't finished joining yet.
// They will either wait for recovery to finish (if started in the current view)
// or retry (if started in a previous view).
if (cacheStatus.getCurrentTopology() == null)
if (cacheStatus.getCurrentTopology() == null) {
// If the cache has a persistent state, it tries to join again.
// The coordinator has cleared the previous information about running caches,
// so we need to send a join again for the caches waiting recovery in order to
// reconstruct them from the persistent state.
// This join only completes *after* the coordinator receives the state from all nodes.
if (cacheStatus.needRecovery()) {
final String name = cacheName;
joins.dependsOn(join(name, cacheStatus)
.whenComplete((ignore, t) -> {
if (t != null) leave(name, getGlobalTimeout());
}));
}
continue;
}

caches.put(e.getKey(), new CacheStatusResponse(cacheStatus.getJoinInfo(),
cacheStatus.getCurrentTopology(),
Expand All @@ -316,6 +344,7 @@ public CompletionStage<ManagerStatusResponse> handleStatusRequest(int viewId) {
}
}

joins.freeze();
log.debugf("Sending cluster status response for view %d", viewId);
return new ManagerStatusResponse(caches, gcr.getClusterTopologyManager().isRebalancingEnabled());
});
Expand Down Expand Up @@ -535,6 +564,7 @@ private CompletionStage<Void> doHandleStableTopologyUpdate(String cacheName, Cac
CacheTopology stableTopology = cacheStatus.getStableTopology();
if (stableTopology == null || stableTopology.getTopologyId() < newStableTopology.getTopologyId()) {
log.tracef("Updating stable topology for cache %s: %s", cacheName, newStableTopology);
//System.out.printf("[%s] Updating stable topology for cache %s: %s\n", transport.getAddress(), cacheName, newStableTopology);
cacheStatus.setStableTopology(newStableTopology);
if (newStableTopology != null && cacheStatus.getJoinInfo().getPersistentUUID() != null) {
// Don't use the current CH state for the next restart
Expand Down Expand Up @@ -900,7 +930,11 @@ CompletionStage<Boolean> getStableTopologyCompletion() {
}

boolean isStableTopologyRestored() {
return (stable.isDone() && stableTopology != null) || stableMembersSize < 0;
return (stable.isDone() && stableTopology != null) || !needRecovery();
}

boolean needRecovery() {
return stableMembersSize > 0;
}

int getStableMembersSize() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package org.infinispan.globalstate;

import static org.infinispan.commons.test.CommonsTestingUtil.tmpDirectory;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;

import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

import org.infinispan.Cache;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.BasePartitionHandlingTest;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.topology.MissingMembersException;
import org.infinispan.topology.PersistentUUID;
import org.infinispan.topology.PersistentUUIDManager;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "globalstate.NodeRestartPartitionHandlingTest")
public class NodeRestartPartitionHandlingTest extends BasePartitionHandlingTest {

public static final int DATA_SIZE = 100;
public static final String CACHE_NAME = "testCache";

{
partitionHandling = PartitionHandling.ALLOW_READ_WRITES;
numMembersInCluster = 2;
}

protected int getClusterSize() {
return numMembersInCluster;
}

@Override
protected String customCacheName() {
return CACHE_NAME;
}

@Override
protected void createCacheManagers() throws Throwable {
Util.recursiveFileRemove(tmpDirectory(this.getClass().getSimpleName()));
createStatefulCacheManagers(true);
}

public void testRestartDuringNetworkPartition() throws Throwable {
Map<JGroupsAddress, PersistentUUID> addressMappings = createInitialCluster();
ConsistentHash oldConsistentHash = advancedCache(0, CACHE_NAME).getDistributionManager().getWriteConsistentHash();

for (int i = 0; i < getClusterSize(); i++) {
((DefaultCacheManager) manager(i)).shutdownAllCaches();
}

TestingUtil.killCacheManagers(this.cacheManagers);

// Verify that the cache state file exists
for (int i = 0; i < getClusterSize(); i++) {
String persistentLocation = manager(i).getCacheManagerConfiguration().globalState().persistentLocation();
File[] listFiles = new File(persistentLocation).listFiles((dir, name) -> name.equals(CACHE_NAME + ".state"));
assertEquals(Arrays.toString(listFiles), 1, listFiles.length);
}
cacheManagers.clear();

createStatefulCacheManagers(false);

// We split the cluster. This should make the caches not be able to restore.
splitCluster(new int[]{0}, new int[]{1});
partition(0).assertDegradedMode();
partition(1).assertDegradedMode();

// We restart the cluster, completely. Caches should issue join requests during partition.
for (int i = 0; i < getClusterSize(); i++) {
cache(i, CACHE_NAME);
}

// Assert we still partitioned.
partition(0).assertDegradedMode();
partition(1).assertDegradedMode();

// Since the cluster is partitioned, the cache didn't recovered, operations should fail.
assertOperationsFail();

// Merge the cluster. This should make the caches restore.
partition(0).merge(partition(1), false);
waitForClusterToForm(CACHE_NAME);
assertHealthyCluster(addressMappings, oldConsistentHash);
}

protected void createStatefulCacheManagers(boolean clear) {
for (int i = 0; i < getClusterSize(); i++) {
createStatefulCacheManager(Character.toString((char) ('A' + i)), clear);
}
}

void createStatefulCacheManager(String id, boolean clear) {
String stateDirectory = tmpDirectory(this.getClass().getSimpleName(), id);
if (clear)
Util.recursiveFileRemove(stateDirectory);
GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();
global.globalState().enable().persistentLocation(stateDirectory);

ConfigurationBuilder config = new ConfigurationBuilder();
partitionHandlingBuilder(config);
config.persistence().addSingleFileStore().location(stateDirectory).fetchPersistentState(true);
EmbeddedCacheManager manager = addClusterEnabledCacheManager(global, null);
manager.defineConfiguration(CACHE_NAME, config.build());
}

Map<JGroupsAddress, PersistentUUID> createInitialCluster() {
waitForClusterToForm(CACHE_NAME);
Map<JGroupsAddress, PersistentUUID> addressMappings = new LinkedHashMap<>();

for (int i = 0; i < getClusterSize(); i++) {
LocalTopologyManager ltm = TestingUtil.extractGlobalComponent(manager(i), LocalTopologyManager.class);
PersistentUUID uuid = ltm.getPersistentUUID();
assertNotNull(uuid);
addressMappings.put((JGroupsAddress) manager(i).getAddress(), uuid);
}

fillData();
checkData();

return addressMappings;
}

private void assertOperationsFail() {
for (int i = 0; i < cacheManagers.size(); i++) {
for (int v = 0; v < DATA_SIZE; v++) {
final Cache<Object, Object> cache = cache(i, CACHE_NAME);
String key = String.valueOf(v);
// Always returns null. Message about not stable yet is logged.
Exceptions.expectException(MissingMembersException.class,
"ISPN000689: Recovering cache 'testCache' but there are missing members, known members \\[.*\\] of a total of 2$",
() -> cache.get(key));
}
}
}

private void fillData() {
// Fill some data
for (int i = 0; i < DATA_SIZE; i++) {
cache(0, CACHE_NAME).put(String.valueOf(i), String.valueOf(i));
}
}

void checkData() {
// Ensure that the cache contains the right data
assertEquals(DATA_SIZE, cache(0, CACHE_NAME).size());
for (int i = 0; i < DATA_SIZE; i++) {
assertEquals(cache(0, CACHE_NAME).get(String.valueOf(i)), String.valueOf(i));
}
}

protected void assertHealthyCluster(Map<JGroupsAddress, PersistentUUID> addressMappings, ConsistentHash oldConsistentHash) throws Throwable {
// Healthy cluster
waitForClusterToForm(CACHE_NAME);

checkClusterRestartedCorrectly(addressMappings);
checkData();

ConsistentHash newConsistentHash =
advancedCache(0, CACHE_NAME).getDistributionManager().getWriteConsistentHash();
PersistentUUIDManager persistentUUIDManager = TestingUtil.extractGlobalComponent(manager(0), PersistentUUIDManager.class);
assertEquivalent(addressMappings, oldConsistentHash, newConsistentHash, persistentUUIDManager);
}

void checkClusterRestartedCorrectly(Map<JGroupsAddress, PersistentUUID> addressMappings) throws Exception {
Iterator<Map.Entry<JGroupsAddress, PersistentUUID>> addressIterator = addressMappings.entrySet().iterator();
Set<PersistentUUID> uuids = new HashSet<>();
for (int i = 0; i < cacheManagers.size(); i++) {
LocalTopologyManager ltm = TestingUtil.extractGlobalComponent(manager(i), LocalTopologyManager.class);
assertTrue(uuids.add(ltm.getPersistentUUID()));
}

for (int i = 0; i < cacheManagers.size(); i++) {
LocalTopologyManager ltm = TestingUtil.extractGlobalComponent(manager(i), LocalTopologyManager.class);
// Ensure that nodes have the old UUID
Map.Entry<JGroupsAddress, PersistentUUID> entry = addressIterator.next();
assertTrue(entry.getKey() + " is mapping to the wrong UUID: " +
"Expected: " + entry.getValue() + " not found in: " + uuids, uuids.contains(entry.getValue()));
// Ensure that rebalancing is enabled for the cache
assertTrue(ltm.isCacheRebalancingEnabled(CACHE_NAME));
}
}

void assertEquivalent(Map<JGroupsAddress, PersistentUUID> addressMappings, ConsistentHash oldConsistentHash,
ConsistentHash newConsistentHash, PersistentUUIDManager persistentUUIDManager) {
assertTrue(isEquivalent(addressMappings, oldConsistentHash, newConsistentHash, persistentUUIDManager));
}

private boolean isEquivalent(Map<JGroupsAddress, PersistentUUID> addressMapping, ConsistentHash oldConsistentHash, ConsistentHash newConsistentHash, PersistentUUIDManager persistentUUIDManager) {
if (oldConsistentHash.getNumSegments() != newConsistentHash.getNumSegments()) return false;
for (int i = 0; i < oldConsistentHash.getMembers().size(); i++) {
JGroupsAddress oldAddress = (JGroupsAddress) oldConsistentHash.getMembers().get(i);
JGroupsAddress remappedOldAddress = (JGroupsAddress) persistentUUIDManager.getAddress(addressMapping.get(oldAddress));
JGroupsAddress newAddress = (JGroupsAddress) newConsistentHash.getMembers().get(i);
if (!remappedOldAddress.equals(newAddress)) return false;
Set<Integer> oldSegmentsForOwner = oldConsistentHash.getSegmentsForOwner(oldAddress);
Set<Integer> newSegmentsForOwner = newConsistentHash.getSegmentsForOwner(newAddress);
if (!oldSegmentsForOwner.equals(newSegmentsForOwner)) return false;
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ public BasePartitionHandlingTest() {
@Override
protected void createCacheManagers() throws Throwable {
ConfigurationBuilder dcc = cacheConfiguration();
partitionHandlingBuilder(dcc);
customizeCacheConfiguration(dcc);
createClusteredCaches(numMembersInCluster, serializationContextInitializer(), dcc,
new TransportFlags().withFD(true).withMerge(true));
waitForClusterToForm();
}

protected String customCacheName() {
return null;
}

protected void customizeCacheConfiguration(ConfigurationBuilder dcc) {

}

protected void partitionHandlingBuilder(ConfigurationBuilder dcc) {
dcc.clustering().cacheMode(cacheMode).partitionHandling().whenSplit(partitionHandling).mergePolicy(mergePolicy);
if (cacheMode == CacheMode.DIST_SYNC) {
dcc.clustering().hash().numOwners(numberOfOwners);
Expand All @@ -84,14 +100,6 @@ protected void createCacheManagers() throws Throwable {
if (lockingMode != null) {
dcc.transaction().lockingMode(lockingMode);
}
customizeCacheConfiguration(dcc);
createClusteredCaches(numMembersInCluster, serializationContextInitializer(), dcc,
new TransportFlags().withFD(true).withMerge(true));
waitForClusterToForm();
}

protected void customizeCacheConfiguration(ConfigurationBuilder dcc) {

}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -334,7 +342,7 @@ private String printView(ArrayList<JChannel> view1) {
}

private void waitForPartitionToForm(boolean waitForNoRebalance) {
List<Cache<Object, Object>> caches = new ArrayList<>(getCaches(null));
List<Cache<Object, Object>> caches = new ArrayList<>(getCaches(customCacheName()));
caches.removeIf(objectObjectCache -> !channels.contains(channel(objectObjectCache)));
Cache<Object, Object> cache = caches.get(0);
blockUntilViewsReceived(10000, caches);
Expand Down Expand Up @@ -400,8 +408,9 @@ public void assertKeyNotAvailableForRead(Object key) {

private <K,V> List<Cache<K,V>> cachesInThisPartition() {
List<Cache<K,V>> caches = new ArrayList<>();
for (final Cache<K,V> c : BasePartitionHandlingTest.this.<K,V>caches()) {
if (channels.contains(channel(c))) {
for (final Cache<K,V> c : BasePartitionHandlingTest.this.<K,V>caches(customCacheName())) {
JChannel ch = channel(c);
if (channels.contains(ch)) {
caches.add(c);
}
}
Expand Down

0 comments on commit 697379f

Please sign in to comment.