Skip to content

Commit

Permalink
Executor instead of ThreadFactory for DebeziumMP
Browse files Browse the repository at this point in the history
Related to #8642

For consistency with other Spring requirements and realignment with virtual threads,
it is better to require a `TaskExecutor` injection instead of `ThreadFactory`

* Fix `DebeziumMessageProducer` to rely on a `TaskExecutor` API instead of `ThreadFactory`
and `ExecutorService`

* * Remove unused import from the `DebeziumMessageProducerSpec`
  • Loading branch information
artembilan committed Jun 13, 2023
1 parent b576748 commit a862692
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 47 deletions.
Expand Up @@ -18,23 +18,23 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadFactory;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import io.debezium.engine.format.SerializationFormat;

import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.debezium.inbound.DebeziumMessageProducer;
import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.messaging.support.HeaderMapper;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

/**
* A {@link org.springframework.integration.dsl.MessageProducerSpec} for {@link DebeziumMessageProducer}.
*
* @author Christian Tzolov
* @author Artem Bilan
*
* @since 6.2
*/
Expand Down Expand Up @@ -74,19 +74,18 @@ public DebeziumMessageProducerSpec enableEmptyPayload(boolean enabled) {
}

/**
* Set a {@link ThreadFactory} for the Debezium executor. Defaults to the {@link CustomizableThreadFactory} with a
* {@code debezium:inbound-channel-adapter-thread-} prefix.
* @param threadFactory the {@link ThreadFactory} instance to use.
* Set a {@link TaskExecutor} for the Debezium engine.
* @param taskExecutor the {@link TaskExecutor} to use.
* @return the spec.
*/
public DebeziumMessageProducerSpec threadFactory(ThreadFactory threadFactory) {
this.target.setThreadFactory(threadFactory);
public DebeziumMessageProducerSpec taskExecutor(TaskExecutor taskExecutor) {
this.target.setTaskExecutor(taskExecutor);
return this;
}

/**
* Set the outbound message content type. Must be aligned with the {@link SerializationFormat} configuration used by
* the provided {@link DebeziumEngine}.
* Set the outbound message content type.
* Must be aligned with the {@link SerializationFormat} configuration used by the provided {@link DebeziumEngine}.
* @param contentType payload content type.
* @return the spec.
*/
Expand Down
Expand Up @@ -21,9 +21,6 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand All @@ -35,14 +32,15 @@
import io.debezium.engine.Header;
import io.debezium.engine.format.SerializationFormat;

import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.debezium.support.DebeziumHeaders;
import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.HeaderMapper;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;

/**
Expand All @@ -60,12 +58,9 @@ public class DebeziumMessageProducer extends MessageProducerSupport {
private DebeziumEngine<ChangeEvent<byte[], byte[]>> debeziumEngine;

/**
* Debezium Engine is designed to be submitted to an {@link Executor}
* or {@link ExecutorService} for execution by a single thread.
* By default, a single-threaded ExecutorService instance is provided configured with a
* {@link CustomizableThreadFactory} and a `debezium-` thread prefix.
* Debezium Engine is designed to be submitted to an {@link Executor}.
*/
private ExecutorService executorService;
private TaskExecutor taskExecutor;

private String contentType = "application/json";

Expand All @@ -75,8 +70,6 @@ public class DebeziumMessageProducer extends MessageProducerSupport {

private boolean enableBatch = false;

private ThreadFactory threadFactory;

private volatile CountDownLatch lifecycleLatch = new CountDownLatch(0);

/**
Expand Down Expand Up @@ -116,14 +109,12 @@ public void setEnableEmptyPayload(boolean enabled) {
}

/**
* Set a {@link ThreadFactory} for the Debezium executor.
* Defaults to the {@link CustomizableThreadFactory} with a
* {@code debezium:inbound-channel-adapter-thread-} prefix.
* @param threadFactory the {@link ThreadFactory} instance to use.
* Set a {@link TaskExecutor} for the Debezium engine task.
* @param taskExecutor the {@link TaskExecutor} to use.
*/
public void setThreadFactory(ThreadFactory threadFactory) {
Assert.notNull(threadFactory, "'threadFactory' must not be null");
this.threadFactory = threadFactory;
public void setTaskExecutor(TaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "'taskExecutor' must not be null");
this.taskExecutor = taskExecutor;
}

/**
Expand Down Expand Up @@ -156,12 +147,10 @@ public String getComponentType() {
protected void onInit() {
super.onInit();

if (this.threadFactory == null) {
this.threadFactory = new CustomizableThreadFactory(getComponentName() + "-thread-");
if (this.taskExecutor == null) {
this.taskExecutor = new SimpleAsyncTaskExecutor(getComponentName() + "-thread-");
}

this.executorService = Executors.newSingleThreadExecutor(this.threadFactory);

if (!this.enableBatch) {
this.debeziumEngineBuilder.notifying(new StreamChangeEventConsumer<>());
}
Expand All @@ -178,7 +167,7 @@ protected void doStart() {
return;
}
this.lifecycleLatch = new CountDownLatch(1);
this.executorService.execute(() -> {
this.taskExecutor.execute(() -> {
try {
// Runs the debezium connector and deliver database changes to the registered consumer. This method
// blocks until the connector is stopped.
Expand Down Expand Up @@ -213,19 +202,6 @@ protected void doStop() {
}
}

@Override
public void destroy() {
super.destroy();

this.executorService.shutdown();
try {
this.executorService.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new IllegalStateException("Debezium failed to close!", e);
}
}

@Nullable
private <T> Message<?> toMessage(ChangeEvent<T, T> changeEvent) {
Object key = changeEvent.key();
Expand Down
3 changes: 1 addition & 2 deletions src/reference/asciidoc/debezium.adoc
Expand Up @@ -78,8 +78,7 @@ Defaults to `false`.
- `headerMapper` - custom `HeaderMapper` implementation that allows for selecting and converting the `ChangeEvent` headers into `Message` headers.
The default `DefaultDebeziumHeaderMapper` implementation provides a setter for `setHeaderNamesToMap`.
By default, all headers are mapped.
- `threadFactory` - Set custom `ThreadFactory` for the Debezium executor service.
Debezium Engine is designed to be submitted to an `Executor` or `ExecutorService` for execution by single thread.
- `taskExecutor` - Set a custom `TaskExecutor` for the Debezium engine.

The following code snippets demonstrate various configuration for this channel adapter:

Expand Down

0 comments on commit a862692

Please sign in to comment.