From a862692546f7eff88f187d38c25c0292d6b9b54f Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 13 Jun 2023 16:06:37 -0400 Subject: [PATCH] Executor instead of ThreadFactory for DebeziumMP Related to https://github.com/spring-projects/spring-integration/issues/8642 For consistency with other Spring requirements and realignment with virtual threads, it is better to require a `TaskExecutor` injection instead of `ThreadFactory` * Fix `DebeziumMessageProducer` to rely on a `TaskExecutor` API instead of `ThreadFactory` and `ExecutorService` * * Remove unused import from the `DebeziumMessageProducerSpec` --- .../dsl/DebeziumMessageProducerSpec.java | 17 ++++--- .../inbound/DebeziumMessageProducer.java | 48 +++++-------------- src/reference/asciidoc/debezium.adoc | 3 +- 3 files changed, 21 insertions(+), 47 deletions(-) diff --git a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/DebeziumMessageProducerSpec.java b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/DebeziumMessageProducerSpec.java index d01e4af589b..f79b1bc3a63 100644 --- a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/DebeziumMessageProducerSpec.java +++ b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/dsl/DebeziumMessageProducerSpec.java @@ -18,23 +18,23 @@ import java.util.List; import java.util.Optional; -import java.util.concurrent.ThreadFactory; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.Header; import io.debezium.engine.format.SerializationFormat; +import org.springframework.core.task.TaskExecutor; import org.springframework.integration.debezium.inbound.DebeziumMessageProducer; import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper; import org.springframework.integration.dsl.MessageProducerSpec; import org.springframework.messaging.support.HeaderMapper; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; /** * A {@link org.springframework.integration.dsl.MessageProducerSpec} for {@link DebeziumMessageProducer}. * * @author Christian Tzolov + * @author Artem Bilan * * @since 6.2 */ @@ -74,19 +74,18 @@ public DebeziumMessageProducerSpec enableEmptyPayload(boolean enabled) { } /** - * Set a {@link ThreadFactory} for the Debezium executor. Defaults to the {@link CustomizableThreadFactory} with a - * {@code debezium:inbound-channel-adapter-thread-} prefix. - * @param threadFactory the {@link ThreadFactory} instance to use. + * Set a {@link TaskExecutor} for the Debezium engine. + * @param taskExecutor the {@link TaskExecutor} to use. * @return the spec. */ - public DebeziumMessageProducerSpec threadFactory(ThreadFactory threadFactory) { - this.target.setThreadFactory(threadFactory); + public DebeziumMessageProducerSpec taskExecutor(TaskExecutor taskExecutor) { + this.target.setTaskExecutor(taskExecutor); return this; } /** - * Set the outbound message content type. Must be aligned with the {@link SerializationFormat} configuration used by - * the provided {@link DebeziumEngine}. + * Set the outbound message content type. + * Must be aligned with the {@link SerializationFormat} configuration used by the provided {@link DebeziumEngine}. * @param contentType payload content type. * @return the spec. */ diff --git a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducer.java b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducer.java index 3f6877a1b4e..51ce2df1982 100644 --- a/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducer.java +++ b/spring-integration-debezium/src/main/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducer.java @@ -21,9 +21,6 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -35,6 +32,8 @@ import io.debezium.engine.Header; import io.debezium.engine.format.SerializationFormat; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; import org.springframework.integration.debezium.support.DebeziumHeaders; import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper; import org.springframework.integration.endpoint.MessageProducerSupport; @@ -42,7 +41,6 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.HeaderMapper; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.util.Assert; /** @@ -60,12 +58,9 @@ public class DebeziumMessageProducer extends MessageProducerSupport { private DebeziumEngine> debeziumEngine; /** - * Debezium Engine is designed to be submitted to an {@link Executor} - * or {@link ExecutorService} for execution by a single thread. - * By default, a single-threaded ExecutorService instance is provided configured with a - * {@link CustomizableThreadFactory} and a `debezium-` thread prefix. + * Debezium Engine is designed to be submitted to an {@link Executor}. */ - private ExecutorService executorService; + private TaskExecutor taskExecutor; private String contentType = "application/json"; @@ -75,8 +70,6 @@ public class DebeziumMessageProducer extends MessageProducerSupport { private boolean enableBatch = false; - private ThreadFactory threadFactory; - private volatile CountDownLatch lifecycleLatch = new CountDownLatch(0); /** @@ -116,14 +109,12 @@ public void setEnableEmptyPayload(boolean enabled) { } /** - * Set a {@link ThreadFactory} for the Debezium executor. - * Defaults to the {@link CustomizableThreadFactory} with a - * {@code debezium:inbound-channel-adapter-thread-} prefix. - * @param threadFactory the {@link ThreadFactory} instance to use. + * Set a {@link TaskExecutor} for the Debezium engine task. + * @param taskExecutor the {@link TaskExecutor} to use. */ - public void setThreadFactory(ThreadFactory threadFactory) { - Assert.notNull(threadFactory, "'threadFactory' must not be null"); - this.threadFactory = threadFactory; + public void setTaskExecutor(TaskExecutor taskExecutor) { + Assert.notNull(taskExecutor, "'taskExecutor' must not be null"); + this.taskExecutor = taskExecutor; } /** @@ -156,12 +147,10 @@ public String getComponentType() { protected void onInit() { super.onInit(); - if (this.threadFactory == null) { - this.threadFactory = new CustomizableThreadFactory(getComponentName() + "-thread-"); + if (this.taskExecutor == null) { + this.taskExecutor = new SimpleAsyncTaskExecutor(getComponentName() + "-thread-"); } - this.executorService = Executors.newSingleThreadExecutor(this.threadFactory); - if (!this.enableBatch) { this.debeziumEngineBuilder.notifying(new StreamChangeEventConsumer<>()); } @@ -178,7 +167,7 @@ protected void doStart() { return; } this.lifecycleLatch = new CountDownLatch(1); - this.executorService.execute(() -> { + this.taskExecutor.execute(() -> { try { // Runs the debezium connector and deliver database changes to the registered consumer. This method // blocks until the connector is stopped. @@ -213,19 +202,6 @@ protected void doStop() { } } - @Override - public void destroy() { - super.destroy(); - - this.executorService.shutdown(); - try { - this.executorService.awaitTermination(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - throw new IllegalStateException("Debezium failed to close!", e); - } - } - @Nullable private Message toMessage(ChangeEvent changeEvent) { Object key = changeEvent.key(); diff --git a/src/reference/asciidoc/debezium.adoc b/src/reference/asciidoc/debezium.adoc index ee65f32002c..d4c4aa8ed6e 100644 --- a/src/reference/asciidoc/debezium.adoc +++ b/src/reference/asciidoc/debezium.adoc @@ -78,8 +78,7 @@ Defaults to `false`. - `headerMapper` - custom `HeaderMapper` implementation that allows for selecting and converting the `ChangeEvent` headers into `Message` headers. The default `DefaultDebeziumHeaderMapper` implementation provides a setter for `setHeaderNamesToMap`. By default, all headers are mapped. -- `threadFactory` - Set custom `ThreadFactory` for the Debezium executor service. -Debezium Engine is designed to be submitted to an `Executor` or `ExecutorService` for execution by single thread. +- `taskExecutor` - Set a custom `TaskExecutor` for the Debezium engine. The following code snippets demonstrate various configuration for this channel adapter: