Skip to content

Commit

Permalink
ISPN-9801 ClusterTopologyManagerImpl hangs when restarting a FORK node
Browse files Browse the repository at this point in the history
* ClusterTopologyManagerImpl should process new views while waiting
  for member hearbeats for a previous view
* Member availability check should ignore nodes that didn't join
  any cache
* Test doesn't reproduce the problem described in the issue,
  but it's still good to have
* Small changes in other tests
  • Loading branch information
danberindei authored and wburns committed Jan 30, 2019
1 parent b7e75ba commit 8dce1c9
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 46 deletions.
Expand Up @@ -379,14 +379,20 @@ private void updateMembers() {
}
}

public synchronized void doHandleClusterView() {
public synchronized void doHandleClusterView(int viewId) {
// TODO Clean up ClusterCacheStatus instances once they no longer have any members
if (currentTopology == null)
return;

List<Address> newClusterMembers = transport.getMembers();
int newViewId = transport.getViewId();
if (newViewId != viewId) {
log.debugf("Cache %s skipping members update for view %d, newer view received: %d",
cacheName, viewId, newViewId);
return;
}
if (trace) log.tracef("Cache %s updating members for view %d: %s", cacheName, viewId, newClusterMembers);
boolean cacheMembersModified = retainMembers(newClusterMembers);
if (trace) log.tracef("Cache %s doHandleClusterView newClusterMembers=%s", cacheName, newClusterMembers);
availabilityStrategy.onClusterViewChange(this, newClusterMembers);

if (cacheMembersModified) {
Expand Down
Expand Up @@ -17,6 +17,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand All @@ -26,6 +27,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import net.jcip.annotations.GuardedBy;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
Expand Down Expand Up @@ -65,6 +67,7 @@
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.statetransfer.RebalanceType;
import org.infinispan.util.concurrent.CompletableFutures;
Expand All @@ -75,8 +78,6 @@
import org.infinispan.util.logging.events.EventLogManager;
import org.infinispan.util.logging.events.EventLogger;

import net.jcip.annotations.GuardedBy;

/**
* The {@code ClusterTopologyManager} implementation.
*
Expand Down Expand Up @@ -318,7 +319,7 @@ private void handleClusterView(boolean mergeView, int newViewId) {
if (clusterManagerStatus == ClusterManagerStatus.COORDINATOR) {
// If we have recovered the cluster status, we rebalance the caches to include minor partitions
// If we processed a regular view, we prune members that left.
updateCacheMembers(transport.getMembers());
updateCacheMembers(newViewId);
}
} catch (Throwable t) {
log.viewHandlingError(newViewId, t);
Expand Down Expand Up @@ -500,31 +501,56 @@ private void recoverClusterStatus(int newViewId, final List<Address> clusterMemb
latch.await(getGlobalTimeout(), TimeUnit.MILLISECONDS);
}

public void updateCacheMembers(List<Address> newClusterMembers) {
private void updateCacheMembers(int viewId) {
// Confirm that view's members are all available first, so in a network split scenario
// we can enter degraded mode without starting a rebalance first
// We don't really need to run on the view handling executor because ClusterCacheStatus
// has its own synchronization
confirmMembersAvailable().whenCompleteAsync((ignored, throwable) -> {
doUpdateCacheMembers(viewId, throwable);
}, viewHandlingExecutor);
}

private void doUpdateCacheMembers(int viewId, Throwable throwable) {
try {
log.tracef("Updating cluster members for all the caches. New list is %s", newClusterMembers);
try {
// If we get a SuspectException here, it means we will have a new view soon and we can ignore this one.
confirmMembersAvailable();
} catch (SuspectException e) {
log.tracef("Node %s left while updating cache members", e.getSuspect());
if (throwable == null) {
int newViewId = transport.getViewId();
if (newViewId != viewId) {
log.debugf("Skipping cache members update for view %d, newer view received: %d", viewId, newViewId);
return;
}
for (ClusterCacheStatus cacheStatus : cacheStatusMap.values()) {
cacheStatus.doHandleClusterView(viewId);
}
} else if (throwable instanceof SuspectException) {
Address leaver = ((SuspectException) throwable).getSuspect();
log.debugf("Skipping cache members update for view %d, node %s left", viewId, leaver);
return;
}
} catch (Throwable t) {
throwable = t;
}
if (clusterManagerStatus.isRunning()) {
log.errorUpdatingMembersList(viewId, throwable);
}
}

private CompletionStage<Void> confirmMembersAvailable() {
try {
Set<Address> expectedMembers = new HashSet<>();
for (ClusterCacheStatus cacheStatus : cacheStatusMap.values()) {
cacheStatus.doHandleClusterView();
expectedMembers.addAll(cacheStatus.getExpectedMembers());
}
expectedMembers.retainAll(transport.getMembers());
return transport.invokeCommandOnAll(expectedMembers, HeartBeatCommand.INSTANCE,
VoidResponseCollector.validOnly(),
DeliverOrder.NONE, getGlobalTimeout() / CLUSTER_RECOVERY_ATTEMPTS,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
if (clusterManagerStatus.isRunning()) {
log.errorUpdatingMembersList(e);
}
return CompletableFutures.completedExceptionFuture(e);
}
}

private void confirmMembersAvailable() throws Exception {
transport.invokeRemotely(null, HeartBeatCommand.INSTANCE, ResponseMode.SYNCHRONOUS, getGlobalTimeout(), null, DeliverOrder.NONE, false);
}

/**
* Wait until we have received view {@code joinerViewId} and we have finished recovering the cluster state.
* <p>
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/infinispan/util/logging/Log.java
Expand Up @@ -735,8 +735,8 @@ void preparedTxAlreadyExists(RecoveryAwareTransaction previous,
void failedToRecoverClusterState(@Cause Throwable cause);

@LogMessage(level = WARN)
@Message(value = "Error updating cluster member list", id = 197)
void errorUpdatingMembersList(@Cause Throwable cause);
@Message(value = "Error updating cluster member list for view %d, waiting for next view", id = 197)
void errorUpdatingMembersList(int viewId, @Cause Throwable cause);

@LogMessage(level = INFO)
@Message(value = "Unable to register MBeans for default cache", id = 198)
Expand Down
Expand Up @@ -68,6 +68,7 @@ public void init(int viewId, List<Address> members) {
}

public void updateView(int viewId, List<Address> members) {
log.debugf("Installing view %d %s", viewId, members);
this.viewId = viewId;
this.members = members;

Expand Down Expand Up @@ -145,7 +146,7 @@ public Map<Address, Response> invokeRemotely(Collection<Address> recipients, Rep
DeliverOrder deliverOrder, boolean anycast) throws Exception {
Collection<Address> targets = recipients != null ? recipients : members;
MapResponseCollector collector = MapResponseCollector.ignoreLeavers(shouldIgnoreLeavers(mode), targets.size());
CompletableFuture<Map<Address, Response>> rpcFuture = blockRequest(rpcCommand, collector);
CompletableFuture<Map<Address, Response>> rpcFuture = blockRequest(recipients, rpcCommand, collector);
if (mode.isAsynchronous()) {
return Collections.emptyMap();
} else {
Expand All @@ -165,22 +166,22 @@ public CompletableFuture<Map<Address, Response>> invokeRemotelyAsync(Collection<
Collection<Address> targets = recipients != null ? recipients : members;
MapResponseCollector collector =
mode.isSynchronous() ? MapResponseCollector.ignoreLeavers(shouldIgnoreLeavers(mode), targets.size()) : null;
return blockRequest(rpcCommand, collector);
return blockRequest(recipients, rpcCommand, collector);
}

@Override
public void sendTo(Address destination, ReplicableCommand rpcCommand, DeliverOrder deliverOrder) {
blockRequest(rpcCommand, null);
blockRequest(Collections.singleton(destination), rpcCommand, null);
}

@Override
public void sendToMany(Collection<Address> destinations, ReplicableCommand rpcCommand, DeliverOrder deliverOrder) {
blockRequest(rpcCommand, null);
blockRequest(destinations, rpcCommand, null);
}

@Override
public void sendToAll(ReplicableCommand rpcCommand, DeliverOrder deliverOrder) throws Exception {
blockRequest(rpcCommand, null);
blockRequest(members, rpcCommand, null);
}

@Override
Expand Down Expand Up @@ -280,34 +281,34 @@ public Set<String> getSitesView() {
@Override
public <T> CompletionStage<T> invokeCommand(Address target, ReplicableCommand command, ResponseCollector<T>
collector, DeliverOrder deliverOrder, long timeout, TimeUnit unit) {
return blockRequest(command, collector);
return blockRequest(Collections.singleton(target), command, collector);
}

@Override
public <T> CompletionStage<T> invokeCommand(Collection<Address> targets, ReplicableCommand command,
ResponseCollector<T> collector, DeliverOrder deliverOrder, long
timeout, TimeUnit unit) {
return blockRequest(command, collector);
return blockRequest(targets, command, collector);
}

@Override
public <T> CompletionStage<T> invokeCommandOnAll(ReplicableCommand command, ResponseCollector<T> collector,
DeliverOrder deliverOrder, long timeout, TimeUnit unit) {
return blockRequest(command, collector);
return blockRequest(members, command, collector);
}

@Override
public <T> CompletableFuture<T> invokeCommandOnAll(Collection<Address> requiredTargets, ReplicableCommand command,
ResponseCollector<T> collector, DeliverOrder deliverOrder,
long timeout, TimeUnit unit) {
return blockRequest(command, collector);
return blockRequest(requiredTargets, command, collector);
}

@Override
public <T> CompletionStage<T> invokeCommandStaggered(Collection<Address> targets, ReplicableCommand command,
ResponseCollector<T> collector, DeliverOrder deliverOrder,
long timeout, TimeUnit unit) {
return blockRequest(command, collector);
return blockRequest(targets, command, collector);
}

@Override
Expand All @@ -316,8 +317,8 @@ public <T> CompletionStage<T> invokeCommands(Collection<Address> targets, Functi
throw new UnsupportedOperationException();
}

private <T> CompletableFuture<T> blockRequest(ReplicableCommand command, ResponseCollector<T> collector) {
log.debugf("Intercepted command %s", command);
private <T> CompletableFuture<T> blockRequest(Collection<Address> targets, ReplicableCommand command, ResponseCollector<T> collector) {
log.debugf("Intercepted command %s to %s", command, targets);
BlockedRequest request = new BlockedRequest(command, collector);
blockedRequests.add(request);
return request.getResultFuture();
Expand Down
Expand Up @@ -207,6 +207,8 @@ public void testCoordinatorLeaves() throws InterruptedException, ExecutionExcept
oteBarrier.await(10, TimeUnit.SECONDS);

assertEquals("value", future.get());
((DelayedViewJGroupsTransport) transport1.getDelegate()).assertUnblocked();
((DelayedViewJGroupsTransport) manager(2).getTransport()).assertUnblocked();
}

private void assertOwners(BlockedTopology t, boolean current, int segmentId, Address... address) {
Expand Down
Expand Up @@ -104,7 +104,7 @@ public void testConcurrentStart(int eagerManager, int lazyManager) throws Except
private EmbeddedCacheManager createCacheManager(String name1, JChannel ch1) {
GlobalConfigurationBuilder gcb1 = new GlobalConfigurationBuilder();
gcb1.transport().nodeName(ch1.getName()).distributedSyncTimeout(10, SECONDS);
CustomChannelLookup.registerChannel(gcb1, ch1, name1, false);
CustomChannelLookup.configureTransportWithChannel(gcb1, ch1, name1, false);

ConfigurationBuilder replCfg = new ConfigurationBuilder();
replCfg.clustering().cacheMode(CacheMode.REPL_SYNC);
Expand Down
Expand Up @@ -96,12 +96,7 @@ public void testConcurrentStart(int eagerManager, int lazyManager) throws Except
}

private EmbeddedCacheManager createCacheManager(ConfigurationBuilder cacheCfg, String name,
JChannel channel) throws
Exception {
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
gcb.transport().nodeName(channel.getName());
gcb.transport().distributedSyncTimeout(30, TimeUnit.SECONDS);

JChannel channel) throws Exception {
FORK fork = new FORK();
fork.setUnknownForkHandler(new UnknownForkHandler() {
@Override
Expand Down Expand Up @@ -134,7 +129,11 @@ private Object handle(Message message) {
});
channel.getProtocolStack().addProtocol(fork);
ForkChannel fch = new ForkChannel(channel, "stack1", "channel1");
CustomChannelLookup.registerChannel(gcb, fch, name, true);

GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
gcb.transport().transport(new JGroupsTransport(fch));
gcb.transport().nodeName(channel.getName());
gcb.transport().distributedSyncTimeout(30, TimeUnit.SECONDS);

EmbeddedCacheManager cm = new DefaultCacheManager(gcb.build(), cacheCfg.build(), false);
registerCacheManager(cm);
Expand Down
Expand Up @@ -20,8 +20,8 @@ public class CustomChannelLookup implements JGroupsChannelLookup {
private static final Map<String, JChannel> channelMap = CollectionFactory.makeConcurrentMap();
private boolean connect;

public static void registerChannel(GlobalConfigurationBuilder gcb, JChannel channel, String nodeName,
boolean connect) {
public static void configureTransportWithChannel(GlobalConfigurationBuilder gcb, JChannel channel, String nodeName,
boolean connect) {
TransportConfigurationBuilder tcb = gcb.transport();
tcb.defaultTransport();
tcb.addProperty(JGroupsTransport.CHANNEL_LOOKUP, CustomChannelLookup.class.getName());
Expand Down

0 comments on commit 8dce1c9

Please sign in to comment.