diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java index 749492bf34..33926f5e3c 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java @@ -16,10 +16,11 @@ package org.springframework.rabbit.stream.listener; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import org.aopalliance.aop.Advice; -import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; @@ -29,6 +30,7 @@ import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.DefaultPointcutAdvisor; import org.springframework.beans.factory.BeanNameAware; +import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; import org.springframework.rabbit.stream.support.StreamMessageProperties; import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter; @@ -49,15 +51,21 @@ */ public class StreamListenerContainer implements MessageListenerContainer, BeanNameAware { - protected Log logger = LogFactory.getLog(getClass()); // NOSONAR + protected LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR private final ConsumerBuilder builder; + private final Collection consumers = new ArrayList<>(); + private StreamMessageConverter streamConverter; private ConsumerCustomizer consumerCustomizer = (id, con) -> { }; - private Consumer consumer; + private boolean simpleStream; + + private boolean superStream; + + private int concurrency = 1; private String listenerId; @@ -96,22 +104,41 @@ public StreamListenerContainer(Environment environment, @Nullable Codec codec) { */ @Override public void setQueueNames(String... queueNames) { + Assert.isTrue(!this.superStream, "setQueueNames() and superStream() are mutually exclusive"); Assert.isTrue(queueNames != null && queueNames.length == 1, "Only one stream is supported"); this.builder.stream(queueNames[0]); + this.simpleStream = true; + } + + /** + * Enable Single Active Consumer on a Super Stream, with one consumer. + * Mutually exclusive with {@link #setQueueNames(String...)}. + * @param streamName the stream. + * @param name the consumer name. + * @since 3.0 + */ + public void superStream(String streamName, String name) { + superStream(streamName, name, 1); } /** - * Enable Single Active Consumer on a Super Stream. + * Enable Single Active Consumer on a Super Stream with the provided number of consumers. + * There must be at least that number of partitions in the Super Stream. * Mutually exclusive with {@link #setQueueNames(String...)}. - * @param superStream the stream. + * @param streamName the stream. * @param name the consumer name. + * @param consumers the number of consumers. * @since 3.0 */ - public void superStream(String superStream, String name) { - Assert.notNull(superStream, "'superStream' cannot be null"); - this.builder.superStream(superStream) + public void superStream(String streamName, String name, int consumers) { + Assert.isTrue(consumers > 0, () -> "'concurrency' must be greater than zero, not " + consumers); + this.concurrency = consumers; + Assert.isTrue(!this.simpleStream, "setQueueNames() and superStream() are mutually exclusive"); + Assert.notNull(streamName, "'superStream' cannot be null"); + this.builder.superStream(streamName) .singleActiveConsumer() .name(name); + this.superStream = true; } /** @@ -201,23 +228,35 @@ public Object getMessageListener() { @Override public synchronized boolean isRunning() { - return this.consumer != null; + return this.consumers.size() > 0; } @Override public synchronized void start() { - if (this.consumer == null) { + if (this.consumers.size() == 0) { this.consumerCustomizer.accept(getListenerId(), this.builder); - this.consumer = this.builder.build(); + if (this.simpleStream) { + this.consumers.add(this.builder.build()); + } + else { + for (int i = 0; i < this.concurrency; i++) { + this.consumers.add(this.builder.build()); + } + } } } @Override public synchronized void stop() { - if (this.consumer != null) { - this.consumer.close(); - this.consumer = null; - } + this.consumers.forEach(consumer -> { + try { + consumer.close(); + } + catch (RuntimeException ex) { + this.logger.error(ex, "Failed to close consumer"); + } + }); + this.consumers.clear(); } @Override @@ -233,8 +272,8 @@ public void setupMessageListener(MessageListener messageListener) { try { ((ChannelAwareMessageListener) this.messageListener).onMessage(message2, null); } - catch (Exception e) { // NOSONAR - this.logger.error("Listner threw an exception", e); + catch (Exception ex) { // NOSONAR + this.logger.error(ex, "Listner threw an exception"); } } else { diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java new file mode 100644 index 0000000000..f089a2ca8b --- /dev/null +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java @@ -0,0 +1,133 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Declarables; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.rabbit.stream.config.SuperStream; +import org.springframework.rabbit.stream.support.AbstractIntegrationTests; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import com.rabbitmq.stream.Address; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.OffsetSpecification; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +@SpringJUnitConfig +public class SuperStreamConcurrentSACTests extends AbstractIntegrationTests { + + @Test + void concurrent(@Autowired StreamListenerContainer container, @Autowired RabbitTemplate template, + @Autowired Config config, @Autowired RabbitAdmin admin, + @Autowired Declarables superStream) throws InterruptedException { + + template.getConnectionFactory().createConnection(); + container.start(); + assertThat(config.consumerLatch.await(10, TimeUnit.SECONDS)).isTrue(); + template.convertAndSend("ss.sac.concurrency.test", "0", "foo"); + template.convertAndSend("ss.sac.concurrency.test", "1", "bar"); + template.convertAndSend("ss.sac.concurrency.test", "2", "baz"); + assertThat(config.messageLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(config.threads).hasSize(3); + container.stop(); + clean(admin, superStream); + } + + private void clean(RabbitAdmin admin, Declarables declarables) { + declarables.getDeclarablesByType(Queue.class).forEach(queue -> admin.deleteQueue(queue.getName())); + declarables.getDeclarablesByType(DirectExchange.class).forEach(ex -> admin.deleteExchange(ex.getName())); + } + + @Configuration + public static class Config { + + final Set threads = new HashSet<>(); + + final CountDownLatch consumerLatch = new CountDownLatch(3); + + final CountDownLatch messageLatch = new CountDownLatch(3); + + @Bean + CachingConnectionFactory cf() { + return new CachingConnectionFactory("localhost", amqpPort()); + } + + @Bean + RabbitAdmin admin(ConnectionFactory cf) { + return new RabbitAdmin(cf); + } + + @Bean + RabbitTemplate template(ConnectionFactory cf) { + return new RabbitTemplate(cf); + } + + @Bean + SuperStream superStream() { + return new SuperStream("ss.sac.concurrency.test", 3); + } + + @Bean + static Environment environment() { + return Environment.builder() + .addressResolver(add -> new Address("localhost", streamPort())) + .maxConsumersByConnection(1) + .build(); + } + + @Bean + StreamListenerContainer concurrentContainer(Environment env) { + StreamListenerContainer container = new StreamListenerContainer(env); + container.superStream("ss.sac.concurrency.test", "concurrent", 3); + container.setupMessageListener(msg -> { + this.threads.add(Thread.currentThread().getName()); + this.messageLatch.countDown(); + }); + container.setConsumerCustomizer((id, builder) -> { + builder.consumerUpdateListener(context -> { + this.consumerLatch.countDown(); + return OffsetSpecification.last(); + }); + }); + container.setAutoStartup(false); + return container; + } + + } + +} diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/support/AbstractIntegrationTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/support/AbstractIntegrationTests.java index 7afd67fd02..9b6f1575d5 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/support/AbstractIntegrationTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/support/AbstractIntegrationTests.java @@ -33,7 +33,7 @@ public abstract class AbstractIntegrationTests { static { if (System.getProperty("spring.rabbit.use.local.server") == null && System.getenv("SPRING_RABBIT_USE_LOCAL_SERVER") == null) { - String image = "rabbitmq:3.11"; + String image = "rabbitmq:3.11-management"; String cache = System.getenv().get("IMAGE_CACHE"); if (cache != null) { image = cache + image; diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 9095eff965..0c83b0a3b6 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -5741,7 +5741,7 @@ Enable this feature by calling `ConnectionFactoryUtils.enableAfterCompletionFail ==== Message Listener Container Configuration There are quite a few options for configuring a `SimpleMessageListenerContainer` (SMLC) and a `DirectMessageListenerContainer` (DMLC) related to transactions and quality of service, and some of them interact with each other. -Properties that apply to the SMLC, DMLC, or `StreamListenerContainer` (StLC) (see <> are indicated by the check mark in the appropriate column. +Properties that apply to the SMLC, DMLC, or `StreamListenerContainer` (StLC) (see <>) are indicated by the check mark in the appropriate column. See <> for information to help you decide which container is appropriate for your application. The following table shows the container property names and their equivalent attribute names (in parentheses) when using the namespace to configure a ``. @@ -5894,10 +5894,11 @@ a| |The number of concurrent consumers to initially start for each listener. See <>. +For the `StLC`, concurrency is controlled via an overloaded `superStream` method; see <>. a|image::images/tickmark.png[] a| -a| +a|image::images/tickmark.png[] |[[connectionFactory]]<> + (connection-factory) diff --git a/src/reference/asciidoc/stream.adoc b/src/reference/asciidoc/stream.adoc index 6adac16561..01a72ccdd3 100644 --- a/src/reference/asciidoc/stream.adoc +++ b/src/reference/asciidoc/stream.adoc @@ -216,6 +216,7 @@ RabbitStreamTemplate streamTemplate(Environment env) { You can also publish over AMQP, using the `RabbitTemplate`. +[[super-stream-consumer]] ===== Consuming Super Streams with Single Active Consumers Invoke the `superStream` method on the listener container to enable a single active consumer on a super stream. @@ -227,7 +228,7 @@ Invoke the `superStream` method on the listener container to enable a single act @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) StreamListenerContainer container(Environment env, String name) { StreamListenerContainer container = new StreamListenerContainer(env); - container.superStream("ss.sac", "myConsumer"); + container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3 container.setupMessageListener(msg -> { ... }); @@ -236,3 +237,6 @@ StreamListenerContainer container(Environment env, String name) { } ---- ==== + +IMPORTANT: At this time, when the concurrency is greater than 1, the actual concurrency is further controlled by the `Environment`; to achieve full concurrency, set the environment's `maxConsumersByConnection` to 1. +See https://rabbitmq.github.io/rabbitmq-stream-java-client/snapshot/htmlsingle/#configuring-the-environment[Configuring the Environment].