diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java index b9ff9aca89..bac0def5c3 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java @@ -36,7 +36,7 @@ * The {@code partitionKeyFunction} is used to determine to which partition the message * has to be dispatched. * By default, the {@link IntegrationMessageHeaderAccessor#CORRELATION_ID} message header is used - * for partition key. + * for a partition key. *

* The actual dispatching and threading logic is implemented in the {@link PartitionedDispatcher}. *

@@ -71,7 +71,7 @@ public PartitionedChannel(int partitionCount) { } /** - * Instantiate based on a provided number of partitions and function for partition key against + * Instantiate based on a provided number of partitions and function for a partition key against * the message. * @param partitionCount the number of partitions in this channel. * @param partitionKeyFunction the function to resolve a partition key against the message @@ -123,6 +123,16 @@ public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalanci getDispatcher().setLoadBalancingStrategy(loadBalancingStrategy); } + /** + * Provide a size of the queue in the partition executor's worker. + * Default to zero. + * @param workerQueueSize the size of the partition executor's worker queue. + * @since 6.4.10 + */ + public void setWorkerQueueSize(int workerQueueSize) { + getDispatcher().setWorkerQueueSize(workerQueueSize); + } + @Override protected PartitionedDispatcher getDispatcher() { return (PartitionedDispatcher) this.dispatcher; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java index 14d443333d..fed9cea71e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java @@ -19,10 +19,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -30,6 +34,7 @@ import org.jspecify.annotations.Nullable; +import org.springframework.integration.util.CallerBlocksPolicy; import org.springframework.integration.util.ErrorHandlingTaskExecutor; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; @@ -78,6 +83,8 @@ public class PartitionedDispatcher extends AbstractDispatcher { private final Lock lock = new ReentrantLock(); + private int workerQueueSize; + /** * Instantiate based on a provided number of partitions and function for a partition key against * the message to dispatch. @@ -153,6 +160,17 @@ public void setMessageHandlingTaskDecorator(MessageHandlingTaskDecorator message this.messageHandlingTaskDecorator = messageHandlingTaskDecorator; } + /** + * Provide a size of the queue in the partition executor's worker. + * Default to zero. + * @param workerQueueSize the size of the partition executor's worker queue. + * @since 6.4.10 + */ + public void setWorkerQueueSize(int workerQueueSize) { + Assert.isTrue(workerQueueSize >= 0, "'workerQueueSize' must be greater than or equal to 0."); + this.workerQueueSize = workerQueueSize; + } + /** * Shutdown this dispatcher on application close. * The partition executors are shutdown and the internal state of this instance is cleared. @@ -188,7 +206,16 @@ private void populatedPartitions() { } private UnicastingDispatcher newPartition() { - ExecutorService executor = Executors.newSingleThreadExecutor(this.threadFactory); + BlockingQueue workQueue = + this.workerQueueSize == 0 + ? new SynchronousQueue<>() + : new LinkedBlockingQueue<>(this.workerQueueSize); + ExecutorService executor = + new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + workQueue, + this.threadFactory, + new CallerBlocksPolicy(Long.MAX_VALUE)); this.executors.add(executor); Executor effectiveExecutor = this.errorHandler != null diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java index 300a71ca2e..fbc85acb1f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java @@ -39,6 +39,8 @@ public class PartitionedChannelSpec extends LoadBalancingChannelSpec message, MessageChannel ch, MessageHa String partitionForLastMessage = partitionedMessages.keySet().iterator().next(); assertThat(partitionForLastMessage).isIn(allocatedPartitions); + List partitionExecutors = TestUtils.getPropertyValue(partitionedChannel, "dispatcher.executors", List.class); + BlockingQueue workQueue = ((ThreadPoolExecutor) partitionExecutors.get(0)).getQueue(); + + assertThat(workQueue).isInstanceOf(SynchronousQueue.class); + partitionedChannel.destroy(); } @@ -138,6 +150,9 @@ public void afterMessageHandled(Message message, MessageChannel ch, MessageHa @Autowired PollableChannel resultChannel; + @Autowired + PartitionedChannel testChannel; + @Test void messagesArePartitionedByCorrelationId() { this.inputChannel.send(new GenericMessage<>(IntStream.range(0, 5).toArray())); @@ -153,6 +168,14 @@ void messagesArePartitionedByCorrelationId() { Set strings = new HashSet<>((Collection) receive.getPayload()); assertThat(strings).hasSize(1) .allMatch(value -> value.startsWith("testChannel-partition-thread-")); + + List partitionExecutors = TestUtils.getPropertyValue(this.testChannel, "dispatcher.executors", List.class); + BlockingQueue workQueue = ((ThreadPoolExecutor) partitionExecutors.get(0)).getQueue(); + + assertThat(workQueue) + .asInstanceOf(type(LinkedBlockingQueue.class)) + .extracting(LinkedBlockingQueue::remainingCapacity) + .isEqualTo(1); } @Configuration @@ -163,7 +186,7 @@ public static class TestConfiguration { IntegrationFlow someFlow() { return f -> f .split() - .channel(c -> c.partitioned("testChannel", 10)) + .channel(c -> c.partitioned("testChannel", 10).workerQueueSize(1)) .transform(p -> Thread.currentThread().getName()) .aggregate() .channel(c -> c.queue("resultChannel"));