Skip to content

Commit

Permalink
ISPN-6314 ClusterTopologyManager.waitForJoinerView can wait forever
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and danberindei committed Mar 4, 2016
1 parent cbd7a16 commit c8c2a27
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 22 deletions.
@@ -1,6 +1,7 @@
package org.infinispan.topology; package org.infinispan.topology;




import net.jcip.annotations.GuardedBy;
import org.infinispan.commands.ReplicableCommand; import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.CacheException; import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory; import org.infinispan.commons.util.CollectionFactory;
Expand Down Expand Up @@ -32,13 +33,11 @@
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport; import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.SuspectException; import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.util.TimeService;
import org.infinispan.util.concurrent.TimeoutException; import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;
import org.infinispan.util.logging.events.EventLogCategory; import org.infinispan.util.logging.events.EventLogCategory;
import org.infinispan.util.logging.events.EventLogManager; import org.infinispan.util.logging.events.EventLogManager;
import org.infinispan.util.logging.events.EventLogger;


import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
Expand All @@ -50,6 +49,9 @@
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


import static java.lang.String.format; import static java.lang.String.format;
import static org.infinispan.factories.KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR; import static org.infinispan.factories.KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR;
Expand Down Expand Up @@ -88,15 +90,15 @@ boolean isCoordinator() {
private GlobalComponentRegistry gcr; private GlobalComponentRegistry gcr;
private CacheManagerNotifier cacheManagerNotifier; private CacheManagerNotifier cacheManagerNotifier;
private EmbeddedCacheManager cacheManager; private EmbeddedCacheManager cacheManager;
private TimeService timeService;
private ExecutorService asyncTransportExecutor; private ExecutorService asyncTransportExecutor;
private SemaphoreCompletionService<Void> viewHandlingCompletionService; private SemaphoreCompletionService<Void> viewHandlingCompletionService;
private EventLogManager eventLogManager; private EventLogManager eventLogManager;


// These need to be volatile because they are sometimes read without holding the view handling lock. // These need to be volatile because they are sometimes read without holding the view handling lock.
private volatile int viewId = -1; private volatile int viewId = -1;
private volatile ClusterManagerStatus clusterManagerStatus = ClusterManagerStatus.INITIALIZING; private volatile ClusterManagerStatus clusterManagerStatus = ClusterManagerStatus.INITIALIZING;
private final Object clusterManagerLock = new Object(); private final Lock clusterManagerLock = new ReentrantLock();
private final Condition clusterStateChanged = clusterManagerLock.newCondition();




private final ConcurrentMap<String, ClusterCacheStatus> cacheStatusMap = CollectionFactory.makeConcurrentMap(); private final ConcurrentMap<String, ClusterCacheStatus> cacheStatusMap = CollectionFactory.makeConcurrentMap();
Expand All @@ -110,14 +112,13 @@ public void inject(Transport transport,
@ComponentName(ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncTransportExecutor, @ComponentName(ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncTransportExecutor,
GlobalConfiguration globalConfiguration, GlobalComponentRegistry gcr, GlobalConfiguration globalConfiguration, GlobalComponentRegistry gcr,
CacheManagerNotifier cacheManagerNotifier, EmbeddedCacheManager cacheManager, CacheManagerNotifier cacheManagerNotifier, EmbeddedCacheManager cacheManager,
TimeService timeService, EventLogManager eventLogManager) { EventLogManager eventLogManager) {
this.transport = transport; this.transport = transport;
this.asyncTransportExecutor = asyncTransportExecutor; this.asyncTransportExecutor = asyncTransportExecutor;
this.globalConfiguration = globalConfiguration; this.globalConfiguration = globalConfiguration;
this.gcr = gcr; this.gcr = gcr;
this.cacheManagerNotifier = cacheManagerNotifier; this.cacheManagerNotifier = cacheManagerNotifier;
this.cacheManager = cacheManager; this.cacheManager = cacheManager;
this.timeService = timeService;
this.eventLogManager = eventLogManager; this.eventLogManager = eventLogManager;
} }


Expand Down Expand Up @@ -159,9 +160,12 @@ protected void fetchRebalancingStatusFromCoordinator() {
@Stop(priority = 100) @Stop(priority = 100)
public void stop() { public void stop() {
// Stop blocking cache topology commands. // Stop blocking cache topology commands.
synchronized (clusterManagerLock) { clusterManagerLock.lock();
try {
clusterManagerStatus = ClusterManagerStatus.STOPPING; clusterManagerStatus = ClusterManagerStatus.STOPPING;
clusterManagerLock.notifyAll(); clusterStateChanged.signalAll();
} finally {
clusterManagerLock.unlock();
} }


cacheManagerNotifier.removeListener(viewListener); cacheManagerNotifier.removeListener(viewListener);
Expand All @@ -171,7 +175,8 @@ public void stop() {
public CacheStatusResponse handleJoin(String cacheName, Address joiner, CacheJoinInfo joinInfo, public CacheStatusResponse handleJoin(String cacheName, Address joiner, CacheJoinInfo joinInfo,
int joinerViewId) throws Exception { int joinerViewId) throws Exception {
ClusterCacheStatus cacheStatus; ClusterCacheStatus cacheStatus;
synchronized (clusterManagerLock) { clusterManagerLock.lock();
try {
waitForJoinerView(joiner, joinerViewId, joinInfo.getTimeout()); waitForJoinerView(joiner, joinerViewId, joinInfo.getTimeout());


if (!clusterManagerStatus.isRunning()) { if (!clusterManagerStatus.isRunning()) {
Expand All @@ -186,6 +191,8 @@ public CacheStatusResponse handleJoin(String cacheName, Address joiner, CacheJoi
} }


cacheStatus = initCacheStatusIfAbsent(cacheName); cacheStatus = initCacheStatusIfAbsent(cacheName);
} finally {
clusterManagerLock.unlock();
} }
return cacheStatus.doJoin(joiner, joinInfo); return cacheStatus.doJoin(joiner, joinInfo);
} }
Expand Down Expand Up @@ -285,7 +292,8 @@ public boolean needMoreResponses() {


@Override @Override
public void handleClusterView(boolean mergeView, int newViewId) { public void handleClusterView(boolean mergeView, int newViewId) {
synchronized (clusterManagerLock) { clusterManagerLock.lock();
try {
if (newViewId < transport.getViewId()) { if (newViewId < transport.getViewId()) {
log.tracef("Ignoring old cluster view notification: %s", newViewId); log.tracef("Ignoring old cluster view notification: %s", newViewId);
return; return;
Expand All @@ -308,7 +316,9 @@ public void handleClusterView(boolean mergeView, int newViewId) {


// notify threads that might be waiting to join // notify threads that might be waiting to join
viewId = newViewId; viewId = newViewId;
clusterManagerLock.notifyAll(); clusterStateChanged.signalAll();
} finally {
clusterManagerLock.unlock();
} }


// The SemaphoreCompletionService acts as a critical section, so we don't need to worry about // The SemaphoreCompletionService acts as a critical section, so we don't need to worry about
Expand All @@ -321,14 +331,17 @@ public void handleClusterView(boolean mergeView, int newViewId) {
try { try {
recoverClusterStatus(newViewId, mergeView, transport.getMembers()); recoverClusterStatus(newViewId, mergeView, transport.getMembers());


synchronized (clusterManagerLock) { clusterManagerLock.lock();
try {
if (viewId != newViewId) { if (viewId != newViewId) {
log.debugf("View updated while we were recovering the cluster for view %d", newViewId); log.debugf("View updated while we were recovering the cluster for view %d", newViewId);
return; return;
} }
clusterManagerStatus = ClusterManagerStatus.COORDINATOR; clusterManagerStatus = ClusterManagerStatus.COORDINATOR;
// notify threads that might be waiting to join // notify threads that might be waiting to join
clusterManagerLock.notifyAll(); clusterStateChanged.signalAll();
} finally {
clusterManagerLock.unlock();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (trace) if (trace)
Expand Down Expand Up @@ -446,9 +459,12 @@ private void confirmMembersAvailable() throws Exception {
* Wait until we have received view {@code joinerViewId} and we have finished recovering the cluster state. * Wait until we have received view {@code joinerViewId} and we have finished recovering the cluster state.
* <p> * <p>
* Returns early if the node is shutting down. * Returns early if the node is shutting down.
* <p>
* This method should be invoked with the lock hold.
* *
* @throws TimeoutException if the timeout expired. * @throws TimeoutException if the timeout expired.
*/ */
@GuardedBy("clusterManagerLock")
private void waitForJoinerView(Address joiner, int joinerViewId, long timeout) private void waitForJoinerView(Address joiner, int joinerViewId, long timeout)
throws InterruptedException { throws InterruptedException {
if (joinerViewId > viewId || clusterManagerStatus == ClusterManagerStatus.RECOVERING_CLUSTER) { if (joinerViewId > viewId || clusterManagerStatus == ClusterManagerStatus.RECOVERING_CLUSTER) {
Expand All @@ -460,16 +476,13 @@ private void waitForJoinerView(Address joiner, int joinerViewId, long timeout)
log.tracef("Waiting to recover cluster status before processing join request from %s", joiner); log.tracef("Waiting to recover cluster status before processing join request from %s", joiner);
} }
} }
long endTime = timeService.expectedEndTime(timeout, TimeUnit.MILLISECONDS); long nanosTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
synchronized (clusterManagerLock) { while ((viewId < joinerViewId || clusterManagerStatus == ClusterManagerStatus.RECOVERING_CLUSTER) &&
while (viewId < joinerViewId || clusterManagerStatus == ClusterManagerStatus.RECOVERING_CLUSTER) { clusterManagerStatus.isRunning()) {
if (timeService.isTimeExpired(endTime) || !clusterManagerStatus.isRunning()) if (nanosTimeout <= 0) {
break; throw log.timeoutWaitingForView(joinerViewId);
clusterManagerLock.wait(timeService.remainingTime(endTime, TimeUnit.MILLISECONDS));
} }
} nanosTimeout = clusterStateChanged.awaitNanos(nanosTimeout);
if (timeService.isTimeExpired(endTime)) {
throw new TimeoutException("Timed out waiting for view " + joinerViewId);
} }
} }
} }
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/infinispan/util/logging/Log.java
Expand Up @@ -1397,5 +1397,8 @@ void preparedTxAlreadyExists(RecoveryAwareTransaction previous,


@Message(value = "Node %s timed out, time : %s %s", id = 401) @Message(value = "Node %s timed out, time : %s %s", id = 401)
TimeoutException remoteNodeTimedOut(Address address, long time, TimeUnit unit); TimeoutException remoteNodeTimedOut(Address address, long time, TimeUnit unit);

@Message(value = "Timeout waiting for view %d.")
TimeoutException timeoutWaitingForView(int viewId);
} }


0 comments on commit c8c2a27

Please sign in to comment.