Skip to content
Permalink
Browse files
Proactively informing observers when shutting down (#15)
Rather than waiting for edge failure detection to kick in when a cluster has been shut down, this change proactively informs the observers of a node with a new Leaving message. In turn the observers the trigger edge failure alerting immediately.

Adds Cluster.leaveGracefully() and Cluster.shutdown() APIs for graceful and forced shutdowns respectfully. Accessing membership state after either of these APIs are invoked is illegal and will result in a thrown exception.


* Proactively informs observer nodes that the node is leaving when the cluster is shut down

* Leave notifications delivered in parallel, call to leave() protected by try/finally

* Fixing parallel leave message sending

- tolerating failure in delivering the messages, i.e. not cancelling other notifications
- adjusting test intervals in order to reach agreement faster

* Throw exceptions when trying to access membership state after shutting down
  • Loading branch information
manuelbernhardt committed Feb 27, 2020
1 parent ec079d2 commit b04d66683ab31ced86b18d71f0f44a5f6dbf92cf
Showing 5 changed files with 94 additions and 3 deletions.
@@ -0,0 +1,3 @@
target/
*.iml
.idea
@@ -77,6 +77,7 @@ public final class Cluster {
private final IMessagingServer rpcServer;
private final SharedResources sharedResources;
private final Endpoint listenAddress;
private boolean hasShutdown = false;

private Cluster(final IMessagingServer rpcServer,
final MembershipService membershipService,
@@ -92,26 +93,38 @@ private Cluster(final IMessagingServer rpcServer,
* Returns the list of endpoints currently in the membership set.
*
* @return list of endpoints in the membership set
* @throws IllegalStateException when trying to get the memberlist after shutting down
*/
public List<Endpoint> getMemberlist() {
if (hasShutdown) {
throw new IllegalStateException("Can't access the memberlist after having shut down");
}
return membershipService.getMembershipView();
}

/**
* Returns the number of endpoints currently in the membership set.
*
* @return the number of endpoints in the membership set
* @throws IllegalStateException when trying to get the membership size after shutting down
*/
public int getMembershipSize() {
if (hasShutdown) {
throw new IllegalStateException("Can't access the memberlist after having shut down");
}
return membershipService.getMembershipSize();
}

/**
* Returns the list of endpoints currently in the membership set.
*
* @return list of endpoints in the membership set
* @throws IllegalStateException when trying to get the cluster metadata after shutting down
*/
public Map<String, Metadata> getClusterMetadata() {
if (hasShutdown) {
throw new IllegalStateException("Can't access the memberlist after having shut down");
}
return membershipService.getMetadata();
}

@@ -127,13 +140,23 @@ public void registerSubscription(final ClusterEvents event,
}

/**
* Shutdown the RpcServer
* Gracefully leaves the cluster by informing observers of the intent and then shuts down the entire system
*/
public void leaveGracefully() {
LOG.debug("Leaving the membership group and shutting down");
membershipService.leave();
shutdown();
}

/**
* Shuts down the entire system
*/
public void shutdown() {
LOG.debug("Shutting down RpcServer and MembershipService");
rpcServer.shutdown();
membershipService.shutdown();
sharedResources.shutdown();
this.hasShutdown = true;
}

public static class Builder {
@@ -33,6 +33,7 @@
import com.vrg.rapid.pb.ProbeResponse;
import com.vrg.rapid.pb.RapidRequest;
import com.vrg.rapid.pb.RapidResponse;
import com.vrg.rapid.pb.LeaveMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -53,12 +54,13 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;


/**
* Membership server class that implements the Rapid protocol.
*
@@ -72,6 +74,7 @@ public final class MembershipService {
private static final int BATCHING_WINDOW_IN_MS = 100;
private static final int DEFAULT_FAILURE_DETECTOR_INITIAL_DELAY_IN_MS = 0;
static final int DEFAULT_FAILURE_DETECTOR_INTERVAL_IN_MS = 1000;
private static final int LEAVE_MESSAGE_TIMEOUT = 1500;
private final MembershipView membershipView;
private final MultiNodeCutDetector cutDetection;
private final Endpoint myAddr;
@@ -179,6 +182,8 @@ public ListenableFuture<RapidResponse> handleMessage(final RapidRequest msg) {
case PHASE2AMESSAGE:
case PHASE2BMESSAGE:
return handleConsensusMessages(msg);
case LEAVEMESSAGE:
return handleLeaveMessage(msg);
case CONTENT_NOT_SET:
default:
throw new IllegalArgumentException("Unidentified RapidRequest type " + msg.getContentCase());
@@ -341,6 +346,15 @@ private ListenableFuture<RapidResponse> handleConsensusMessages(final RapidReque
return future;
}

/**
* Propagates the intent of a node to leave the group
*/
private ListenableFuture<RapidResponse> handleLeaveMessage(final RapidRequest request) {
final SettableFuture<RapidResponse> future = SettableFuture.create();
edgeFailureNotification(request.getLeaveMessage().getSender(), membershipView.getCurrentConfigurationId());
future.set(null);
return future;
}

/**
* This is invoked by FastPaxos modules when they arrive at a decision.
@@ -499,6 +513,32 @@ void shutdown() {
messagingClient.shutdown();
}

/**
* Leaves the cluster by telling all the observers to proactively trigger failure.
* This operation is blocking, as we need to wait to send the alert messages before shutting down the rest
*/
void leave() {
final LeaveMessage leaveMessage = LeaveMessage.newBuilder().setSender(myAddr).build();
final RapidRequest leave = RapidRequest.newBuilder().setLeaveMessage(leaveMessage).build();

try {
final List<Endpoint> observers = membershipView.getObserversOf(myAddr);
final ListenableFuture<List<RapidResponse>> leaveResponses = Futures.successfulAsList(observers.stream()
.map(endpoint -> messagingClient.sendMessageBestEffort(endpoint, leave))
.collect(Collectors.toList()));
try {
leaveResponses.get(LEAVE_MESSAGE_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (final InterruptedException | ExecutionException e) {
LOG.trace("Exception while leaving", e);
} catch (final TimeoutException e) {
LOG.trace("Timeout while leaving", e);
}
} catch (final MembershipView.NodeNotInRingException e) {
// we already were removed, so that's fine
LOG.trace("Node was already removed prior to leaving", e);
}
}

/**
* Queues a AlertMessage to be broadcasted after potentially being batched.
*
@@ -30,6 +30,7 @@ message RapidRequest
Phase1bMessage phase1bMessage = 7;
Phase2aMessage phase2aMessage = 8;
Phase2bMessage phase2bMessage = 9;
LeaveMessage leaveMessage = 10;
}
}

@@ -178,6 +179,12 @@ message Metadata
map<string, bytes> metadata = 1;
}

// ******* Leave protocol *******

message LeaveMessage
{
Endpoint sender = 1;
}

// ******* Used by simple probing failure detector *******

@@ -308,7 +308,7 @@ public void failRandomThirdOfNodes() throws IOException, InterruptedException {
final Set<Endpoint> failingNodes = getRandomHosts(numFailingNodes);
staticFds.values().forEach(e -> e.addFailedNodes(failingNodes));
failingNodes.forEach(h -> instances.remove(h).shutdown());
waitAndVerifyAgreement(numNodes - failingNodes.size(), 20, 1000);
waitAndVerifyAgreement(numNodes - failingNodes.size(), 20, 1500);
// Nodes do not actually shutdown(), but are detected faulty. The faulty nodes have active
// cluster instances and identify themselves as kicked out.
verifyNumClusterInstances(numNodes - failingNodes.size());
@@ -502,6 +502,24 @@ public void testRejoinMultipleNodes() throws IOException, InterruptedException {
executor.shutdownNow();
}

/**
* Test a node proactively leaving the cluster
*/
@Test(timeout = 30000)
public void testLeaving() throws IOException, InterruptedException {
final int numNodes = 10;
final Endpoint seedEndpoint = Utils.hostFromParts("127.0.0.1", basePort);
createCluster(1, seedEndpoint); // Only bootstrap a seed.
verifyCluster(1);
for (int i = 0; i < numNodes; i++) {
extendCluster(1, seedEndpoint);
waitAndVerifyAgreement(i + 2, 5, 1000);
}
instances.get(seedEndpoint).leaveGracefully();
instances.remove(seedEndpoint);
waitAndVerifyAgreement(numNodes, 2, 1000);
}

/**
* Creates a cluster of size {@code numNodes} with a seed {@code seedEndpoint}.
*

0 comments on commit b04d666

Please sign in to comment.