Skip to content

Commit

Permalink
Execute the requested shutdown in a separate thread.
Browse files Browse the repository at this point in the history
Fix for Issue awslabs#167
  • Loading branch information
pfifer committed Jun 12, 2017
1 parent a3e4df6 commit 9686d7c
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 473 deletions.
Original file line number Diff line number Diff line change
@@ -1,102 +1,123 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RequestedShutdownCoordinator {
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

class RequestedShutdownCoordinator {

private final ExecutorService executorService;
static Future<Boolean> startRequestedShutdown(Callable<Boolean> shutdownCallable) {
FutureTask<Boolean> task = new FutureTask<>(shutdownCallable);
Thread shutdownThread = new Thread(task, "RequestedShutdownThread");
shutdownThread.start();
return task;

RequestedShutdownCoordinator(ExecutorService executorService) {
this.executorService = executorService;
}

static class RequestedShutdownCallable implements Callable<Void> {
static Callable<Boolean> createRequestedShutdownCallable(CountDownLatch shutdownCompleteLatch,
CountDownLatch notificationCompleteLatch, Worker worker) {
return new RequestedShutdownCallable(shutdownCompleteLatch, notificationCompleteLatch, worker);
}

static class RequestedShutdownCallable implements Callable<Boolean> {

private static final Log log = LogFactory.getLog(RequestedShutdownCallable.class);

private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
private final ExecutorService shutdownExecutor;

RequestedShutdownCallable(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker, ExecutorService shutdownExecutor) {
RequestedShutdownCallable(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch,
Worker worker) {
this.shutdownCompleteLatch = shutdownCompleteLatch;
this.notificationCompleteLatch = notificationCompleteLatch;
this.worker = worker;
this.shutdownExecutor = shutdownExecutor;
}

private boolean isWorkerShutdownComplete() {
return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty();
}

private long outstandingRecordProcessors(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
private String awaitingLogMessage() {
long awaitingNotification = notificationCompleteLatch.getCount();
long awaitingFinalShutdown = shutdownCompleteLatch.getCount();

return String.format(
"Waiting for %d record process to complete shutdown notification, and %d record processor to complete final shutdown ",
awaitingNotification, awaitingFinalShutdown);
}

private String awaitingFinalShutdownMessage() {
long outstanding = shutdownCompleteLatch.getCount();
return String.format("Waiting for %d record processors to complete final shutdown", outstanding);
}

final long startNanos = System.nanoTime();
private boolean waitForRecordProcessors() {

//
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
// There is the possibility of a race condition where a lease is terminated after the shutdown request
// notification is started, but before the ShardConsumer is sent the notification. In this case the
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
//
if (!notificationCompleteLatch.await(timeout, unit)) {
long awaitingNotification = notificationCompleteLatch.getCount();
long awaitingFinalShutdown = shutdownCompleteLatch.getCount();
log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and "
+ awaitingFinalShutdown + " awaiting final shutdown");
if (awaitingFinalShutdown != 0) {
//
// The number of record processor awaiting final shutdown should be a superset of the those awaiting
// notification
//
return checkWorkerShutdownMiss(awaitingFinalShutdown);
try {
while (!notificationCompleteLatch.await(1, TimeUnit.SECONDS)) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
log.info(awaitingLogMessage());
if (workerShutdownWithRemaining(shutdownCompleteLatch.getCount())) {
return false;
}
}
} catch (InterruptedException ie) {
log.warn("Interrupted while waiting for notification complete, terminating shutdown. "
+ awaitingLogMessage());
return false;
}

long remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time.");
if (Thread.interrupted()) {
log.warn("Interrupted before worker shutdown, terminating shutdown");
return false;
}

//
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
//
worker.shutdown();
remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time.");

if (Thread.interrupted()) {
log.warn("Interrupted after worker shutdown, terminating shutdown");
return false;
}

//
// Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
// processing. This should really be a no-op since as part of the notification completion the lease for
// ShardConsumer is terminated.
//
if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) {
long outstanding = shutdownCompleteLatch.getCount();
log.info("Awaiting " + outstanding + " record processors to complete final shutdown");

return checkWorkerShutdownMiss(outstanding);
}
return 0;
}

private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) {
long checkNanos = System.nanoTime() - startNanos;
return unit.toNanos(timeout) - checkNanos;
}

private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException {
if (remainingNanos <= 0) {
throw new TimeoutException(message);
try {
while (!shutdownCompleteLatch.await(1, TimeUnit.SECONDS)) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
log.info(awaitingFinalShutdownMessage());
if (workerShutdownWithRemaining(shutdownCompleteLatch.getCount())) {
return false;
}
}
} catch (InterruptedException ie) {
log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. "
+ awaitingFinalShutdownMessage());
return false;
}
return true;
}

/**
Expand All @@ -106,24 +127,23 @@ private void throwTimeoutMessageIfExceeded(long remainingNanos, String message)
*
* @param outstanding
* the number of record processor still awaiting shutdown.
* @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already.
*/
private long checkWorkerShutdownMiss(long outstanding) {
private boolean workerShutdownWithRemaining(long outstanding) {
if (isWorkerShutdownComplete()) {
if (outstanding != 0) {
log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: "
+ worker.isShutdownComplete() + " -- Consumer Map: "
+ worker.getShardInfoShardConsumerMap().size());
return true;
}
return 0;
}
return outstanding;
return false;
}

@Override
public Void call() throws Exception {
return null;
public Boolean call() throws Exception {
return waitForRecordProcessors();
}
}
}

This file was deleted.

Loading

0 comments on commit 9686d7c

Please sign in to comment.