Skip to content

Commit

Permalink
KAFKA-7477: Improve Streams close timeout semantics (apache#5747)
Browse files Browse the repository at this point in the history
Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
  • Loading branch information
nizhikov authored and pengxiaolong committed Jun 14, 2019
1 parent ef34dd5 commit 144501d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 12 deletions.
39 changes: 27 additions & 12 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,7 @@ private boolean waitOnState(final State targetState, final long waitMs) {
synchronized (stateLock) {
long elapsedMs = 0L;
while (state != targetState) {
if (waitMs == 0) {
try {
stateLock.wait();
} catch (final InterruptedException e) {
// it is ok: just move on to the next iteration
}
} else if (waitMs > elapsedMs) {
if (waitMs > elapsedMs) {
final long remainingMs = waitMs - elapsedMs;
try {
stateLock.wait(remainingMs);
Expand Down Expand Up @@ -825,17 +819,30 @@ public void close() {
* threads to join.
* A {@code timeout} of 0 means to wait forever.
*
* @param timeout how long to wait for the threads to shutdown
* @param timeout how long to wait for the threads to shutdown. Can't be negative. If {@code timeout=0} just checking the state and return immediately.
* @param timeUnit unit of time used for timeout
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
* @deprecated Use {@link #close(Duration)} instead
* @deprecated Use {@link #close(Duration)} instead; note, that {@link #close(Duration)} has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`.
*/
@Deprecated
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
long timeoutMs = timeUnit.toMillis(timeout);

log.debug("Stopping Streams client with timeoutMillis = {} ms. You are using deprecated method. " +
"Please, consider update your code.", timeoutMs);

if (timeoutMs < 0) {
timeoutMs = 0;
} else if (timeoutMs == 0) {
timeoutMs = Long.MAX_VALUE;
}

return close(timeoutMs);
}

private boolean close(final long timeoutMs) {
if (!setState(State.PENDING_SHUTDOWN)) {
// if transition failed, it means it was either in PENDING_SHUTDOWN
// or NOT_RUNNING already; just check that all threads have been stopped
Expand Down Expand Up @@ -891,7 +898,7 @@ public void run() {
shutdownThread.start();
}

if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
if (waitOnState(State.NOT_RUNNING, timeoutMs)) {
log.info("Streams client stopped completely");
return true;
} else {
Expand All @@ -913,7 +920,15 @@ public void run() {
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeout, "timeout");
return close(timeout.toMillis(), TimeUnit.MILLISECONDS);

final long timeoutMs = timeout.toMillis();
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}

log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);

return close(timeoutMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,34 @@ public void shouldCleanupOldStateDirs() throws InterruptedException {
}
}

@Test
public void shouldThrowOnNegativeTimeoutForClose() {
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
try {
streams.close(Duration.ofMillis(-1L));
fail("should not accept negative close parameter");
} catch (final IllegalArgumentException e) {
// expected
} finally {
streams.close();
}
}

@Test
public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L)));

th.start();

try {
th.join(30_000L);
assertFalse(th.isAlive());
} finally {
streams.close();
}
}

private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
final File taskDir = new File(appDir, "0_0");
TestUtils.waitForCondition(
Expand Down

0 comments on commit 144501d

Please sign in to comment.