Skip to content

Commit

Permalink
KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to clos…
Browse files Browse the repository at this point in the history
…e executors in Connect Runtime (apache#13594)

apache#13557 introduced a utils method to close executors silently. This PR leverages that method to close executors in connect runtime. There was duplicate code while closing the executors which isn't the case with this PR.

Note that there are a few more executors used in Connect runtime but their close methods don't follow this pattern of shutdown, await and shutdown. Some of them have some logic like executor like Worker, so not changing at such places.

---------

Co-authored-by: Sagar Rao <sagarrao@Sagars-MacBook-Pro.local>

Reviewers: Daniel Urban <durban@cloudera.com>, Yash Mayya <yash.mayya@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
  • Loading branch information
vamossagar12 committed May 8, 2023
1 parent 2b98f85 commit 86daf8c
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 62 deletions.
Expand Up @@ -17,6 +17,9 @@

package org.apache.kafka.common.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,6 +29,8 @@
* Utilities for working with threads.
*/
public class ThreadUtils {

private static final Logger log = LoggerFactory.getLogger(ThreadUtils.class);
/**
* Create a new ThreadFactory.
*
Expand Down Expand Up @@ -56,20 +61,30 @@ public Thread newThread(Runnable r) {
}

/**
* Shuts down an executor service with a timeout. After the timeout/on interrupt, the service is forcefully closed.
* Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks,
* and then calling shutdownNow, if necessary, to cancel any lingering tasks.
* After the timeout/on interrupt, the service is forcefully closed.
* @param executorService The service to shut down.
* @param timeout The timeout of the shutdown.
* @param timeUnit The time unit of the shutdown timeout.
*/
public static void shutdownExecutorServiceQuietly(ExecutorService executorService,
long timeout, TimeUnit timeUnit) {
executorService.shutdown();
executorService.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executorService.awaitTermination(timeout, timeUnit)) {
executorService.shutdownNow();
executorService.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executorService.awaitTermination(timeout, timeUnit)) {
log.error("Executor {} did not terminate in time", executorService);
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executorService.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
Expand Up @@ -68,14 +68,7 @@ public SourceTaskOffsetCommitter(WorkerConfig config) {
}

public void close(long timeoutMs) {
commitExecutorService.shutdown();
try {
if (!commitExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
log.error("Graceful shutdown of offset commitOffsets thread timed out.");
}
} catch (InterruptedException e) {
// ignore and allow to exit immediately
}
ThreadUtils.shutdownExecutorServiceQuietly(commitExecutorService, timeoutMs, TimeUnit.MILLISECONDS);
}

public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
Expand Down Expand Up @@ -247,20 +248,7 @@ public void stop() {
connectorStatusMetricsGroup.close();

workerConfigTransformer.close();
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
executor.shutdownNow(); //cancel current executing threads
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS))
log.error("Executor did not terminate in time");
}
} catch (InterruptedException e) {
executor.shutdownNow(); // (Re-)Cancel if current thread also interrupted
// Preserve interrupt status
Thread.currentThread().interrupt();
}
ThreadUtils.shutdownExecutorServiceQuietly(executor, EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}

/**
Expand Down
Expand Up @@ -804,22 +804,9 @@ public void stop() {

stopping.set(true);
member.wakeup();
herderExecutor.shutdown();
try {
if (!herderExecutor.awaitTermination(herderExecutorTimeoutMs(), TimeUnit.MILLISECONDS))
herderExecutor.shutdownNow();

forwardRequestExecutor.shutdown();
startAndStopExecutor.shutdown();

if (!forwardRequestExecutor.awaitTermination(FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))
forwardRequestExecutor.shutdownNow();
if (!startAndStopExecutor.awaitTermination(START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))
startAndStopExecutor.shutdownNow();
} catch (InterruptedException e) {
// ignore
}

ThreadUtils.shutdownExecutorServiceQuietly(herderExecutor, herderExecutorTimeoutMs(), TimeUnit.MILLISECONDS);
ThreadUtils.shutdownExecutorServiceQuietly(forwardRequestExecutor, FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
ThreadUtils.shutdownExecutorServiceQuietly(startAndStopExecutor, START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
log.info("Herder stopped");
running = false;
}
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.standalone;

import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -105,14 +106,7 @@ public synchronized void start() {
@Override
public synchronized void stop() {
log.info("Herder stopping");
requestExecutorService.shutdown();
try {
if (!requestExecutorService.awaitTermination(30, TimeUnit.SECONDS))
requestExecutorService.shutdownNow();
} catch (InterruptedException e) {
// ignore
}

ThreadUtils.shutdownExecutorServiceQuietly(requestExecutorService, 30, TimeUnit.SECONDS);
// There's no coordination/hand-off to do here since this is all standalone. Instead, we
// should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
// the tasks.
Expand Down
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.storage;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.common.utils.ThreadUtils;
Expand Down Expand Up @@ -61,17 +60,7 @@ public void start() {
@Override
public void stop() {
if (executor != null) {
executor.shutdown();
// Best effort wait for any get() and set() tasks (and caller's callbacks) to complete.
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!executor.shutdownNow().isEmpty()) {
throw new ConnectException("Failed to stop MemoryOffsetBackingStore. Exiting without cleanly " +
"shutting down pending tasks and/or callbacks.");
}
ThreadUtils.shutdownExecutorServiceQuietly(executor, 30, TimeUnit.SECONDS);
executor = null;
}
}
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;

import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void testCloseTimeout() throws Exception {
// Normal termination, where termination times out.
when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);

try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(ThreadUtils.class)) {
committer.close(timeoutMs);
assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("ERROR")));
}
Expand Down

0 comments on commit 86daf8c

Please sign in to comment.