From 0e0ae4164bedeee6b44c102ff241dfe97057600e Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 16 Dec 2025 11:37:48 -0500 Subject: [PATCH] GH-10624: PartitionedChannel: customize worker queue size Fixes: https://github.com/spring-projects/spring-integration/issues/10624 * Expose `workerQueueSize` from the `PartitionedDispatcher` and respective delegates from the `PartitionedChannel` and `PartitionedChannelSpec` **Auto-cherry-pick to `6.5.x` & `6.4.x`** --- .../channel/PartitionedChannel.java | 14 +++++++-- .../dispatcher/PartitionedDispatcher.java | 31 +++++++++++++++++-- .../dsl/PartitionedChannelSpec.java | 15 +++++++++ .../channel/PartitionedChannelTests.java | 25 ++++++++++++++- 4 files changed, 80 insertions(+), 5 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java index b9ff9aca893..bac0def5c32 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java @@ -36,7 +36,7 @@ * The {@code partitionKeyFunction} is used to determine to which partition the message * has to be dispatched. * By default, the {@link IntegrationMessageHeaderAccessor#CORRELATION_ID} message header is used - * for partition key. + * for a partition key. *

* The actual dispatching and threading logic is implemented in the {@link PartitionedDispatcher}. *

@@ -71,7 +71,7 @@ public PartitionedChannel(int partitionCount) { } /** - * Instantiate based on a provided number of partitions and function for partition key against + * Instantiate based on a provided number of partitions and function for a partition key against * the message. * @param partitionCount the number of partitions in this channel. * @param partitionKeyFunction the function to resolve a partition key against the message @@ -123,6 +123,16 @@ public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalanci getDispatcher().setLoadBalancingStrategy(loadBalancingStrategy); } + /** + * Provide a size of the queue in the partition executor's worker. + * Default to zero. + * @param workerQueueSize the size of the partition executor's worker queue. + * @since 6.4.10 + */ + public void setWorkerQueueSize(int workerQueueSize) { + getDispatcher().setWorkerQueueSize(workerQueueSize); + } + @Override protected PartitionedDispatcher getDispatcher() { return (PartitionedDispatcher) this.dispatcher; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java index 14d443333d6..fed9cea71ee 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java @@ -19,10 +19,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -30,6 +34,7 @@ import org.jspecify.annotations.Nullable; +import org.springframework.integration.util.CallerBlocksPolicy; import org.springframework.integration.util.ErrorHandlingTaskExecutor; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; @@ -78,6 +83,8 @@ public class PartitionedDispatcher extends AbstractDispatcher { private final Lock lock = new ReentrantLock(); + private int workerQueueSize; + /** * Instantiate based on a provided number of partitions and function for a partition key against * the message to dispatch. @@ -153,6 +160,17 @@ public void setMessageHandlingTaskDecorator(MessageHandlingTaskDecorator message this.messageHandlingTaskDecorator = messageHandlingTaskDecorator; } + /** + * Provide a size of the queue in the partition executor's worker. + * Default to zero. + * @param workerQueueSize the size of the partition executor's worker queue. + * @since 6.4.10 + */ + public void setWorkerQueueSize(int workerQueueSize) { + Assert.isTrue(workerQueueSize >= 0, "'workerQueueSize' must be greater than or equal to 0."); + this.workerQueueSize = workerQueueSize; + } + /** * Shutdown this dispatcher on application close. * The partition executors are shutdown and the internal state of this instance is cleared. @@ -188,7 +206,16 @@ private void populatedPartitions() { } private UnicastingDispatcher newPartition() { - ExecutorService executor = Executors.newSingleThreadExecutor(this.threadFactory); + BlockingQueue workQueue = + this.workerQueueSize == 0 + ? new SynchronousQueue<>() + : new LinkedBlockingQueue<>(this.workerQueueSize); + ExecutorService executor = + new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + workQueue, + this.threadFactory, + new CallerBlocksPolicy(Long.MAX_VALUE)); this.executors.add(executor); Executor effectiveExecutor = this.errorHandler != null diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java index 300a71ca2e1..fbc85acb1fd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java @@ -39,6 +39,8 @@ public class PartitionedChannelSpec extends LoadBalancingChannelSpec message, MessageChannel ch, MessageHa String partitionForLastMessage = partitionedMessages.keySet().iterator().next(); assertThat(partitionForLastMessage).isIn(allocatedPartitions); + List partitionExecutors = TestUtils.getPropertyValue(partitionedChannel, "dispatcher.executors", List.class); + BlockingQueue workQueue = ((ThreadPoolExecutor) partitionExecutors.get(0)).getQueue(); + + assertThat(workQueue).isInstanceOf(SynchronousQueue.class); + partitionedChannel.destroy(); } @@ -138,6 +150,9 @@ public void afterMessageHandled(Message message, MessageChannel ch, MessageHa @Autowired PollableChannel resultChannel; + @Autowired + PartitionedChannel testChannel; + @Test void messagesArePartitionedByCorrelationId() { this.inputChannel.send(new GenericMessage<>(IntStream.range(0, 5).toArray())); @@ -153,6 +168,14 @@ void messagesArePartitionedByCorrelationId() { Set strings = new HashSet<>((Collection) receive.getPayload()); assertThat(strings).hasSize(1) .allMatch(value -> value.startsWith("testChannel-partition-thread-")); + + List partitionExecutors = TestUtils.getPropertyValue(this.testChannel, "dispatcher.executors", List.class); + BlockingQueue workQueue = ((ThreadPoolExecutor) partitionExecutors.get(0)).getQueue(); + + assertThat(workQueue) + .asInstanceOf(type(LinkedBlockingQueue.class)) + .extracting(LinkedBlockingQueue::remainingCapacity) + .isEqualTo(1); } @Configuration @@ -163,7 +186,7 @@ public static class TestConfiguration { IntegrationFlow someFlow() { return f -> f .split() - .channel(c -> c.partitioned("testChannel", 10)) + .channel(c -> c.partitioned("testChannel", 10).workerQueueSize(1)) .transform(p -> Thread.currentThread().getName()) .aggregate() .channel(c -> c.queue("resultChannel"));