From 732f0da28413183f0c1e302875dbbeadc317523a Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 26 Sep 2022 18:32:36 -0400 Subject: [PATCH] GH-1465: Super Stream Support in Template --- .../stream/producer/RabbitStreamTemplate.java | 21 ++++++++++- .../producer/RabbitStreamTemplateTests.java | 25 +++++++++++++ src/reference/asciidoc/stream.adoc | 36 +++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java index e8483935fb..418c22bf37 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java @@ -17,6 +17,7 @@ package org.springframework.rabbit.stream.producer; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; @@ -52,6 +53,8 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, BeanNameAwa private final String streamName; + private Function superStreamRouting; + private MessageConverter messageConverter = new SimpleMessageConverter(); private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter(); @@ -80,7 +83,13 @@ public RabbitStreamTemplate(Environment environment, String streamName) { private synchronized Producer createOrGetProducer() { if (this.producer == null) { ProducerBuilder builder = this.environment.producerBuilder(); - builder.stream(this.streamName); + if (this.superStreamRouting == null) { + builder.stream(this.streamName); + } + else { + builder.superStream(this.streamName) + .routing(this.superStreamRouting); + } this.producerCustomizer.accept(this.beanName, builder); this.producer = builder.build(); if (!this.streamConverterSet) { @@ -96,6 +105,16 @@ public synchronized void setBeanName(String name) { this.beanName = name; } + /** + * Add a routing function, making the stream a super stream. + * @param superStreamRouting the routing function. + * @since 3.0 + */ + public void setSuperStreamRouting(Function superStreamRouting) { + this.superStreamRouting = superStreamRouting; + } + + /** * Set a converter for {@link #convertAndSend(Object)} operations. * @param messageConverter the converter. diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplateTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplateTests.java index b0bb26414d..9d06f42abe 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplateTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplateTests.java @@ -22,6 +22,8 @@ import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -114,4 +116,27 @@ void handleConfirm() throws InterruptedException, ExecutionException { } } + @Test + void superStream() { + Environment env = mock(Environment.class); + ProducerBuilder pb = mock(ProducerBuilder.class); + given(pb.superStream(any())).willReturn(pb); + given(env.producerBuilder()).willReturn(pb); + Producer producer = mock(Producer.class); + given(pb.build()).willReturn(producer); + try (RabbitStreamTemplate template = new RabbitStreamTemplate(env, "foo")) { + SimpleMessageConverter messageConverter = new SimpleMessageConverter(); + template.setMessageConverter(messageConverter); + assertThat(template.messageConverter()).isSameAs(messageConverter); + StreamMessageConverter converter = mock(StreamMessageConverter.class); + given(converter.fromMessage(any())).willReturn(mock(Message.class)); + template.setStreamConverter(converter); + template.setSuperStreamRouting(msg -> "bar"); + template.convertAndSend("x"); + verify(pb).superStream("foo"); + verify(pb).routing(any()); + verify(pb, never()).stream("foo"); + } + } + } diff --git a/src/reference/asciidoc/stream.adoc b/src/reference/asciidoc/stream.adoc index 8de6ca0596..6adac16561 100644 --- a/src/reference/asciidoc/stream.adoc +++ b/src/reference/asciidoc/stream.adoc @@ -180,6 +180,42 @@ SuperStream superStream() { The `RabbitAdmin` detects this bean and will declare the exchange (`my.super.stream`) and 3 queues (partitions) - `my.super-stream-n` where `n` is `0`, `1`, `2`, bound with routing keys equal to `n`. +If you also wish to publish over AMQP to the exchange, you can provide custom routing keys: + +==== +[source, java] +---- +@Bean +SuperStream superStream() { + return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i) + .mapToObj(j -> "rk-" + j) + .collect(Collectors.toList())); +} +---- +==== + +The number of keys must equal the number of partitions. + +===== Producing to a SuperStream + +You must add a `superStreamRoutingFunction` to the `RabbitStreamTemplate`: + +==== +[source, java] +---- +@Bean +RabbitStreamTemplate streamTemplate(Environment env) { + RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1"); + template.setSuperStreamRouting(message -> { + // some logic to return a String for the client's hashing algorithm + }); + return template; +} +---- +==== + +You can also publish over AMQP, using the `RabbitTemplate`. + ===== Consuming Super Streams with Single Active Consumers Invoke the `superStream` method on the listener container to enable a single active consumer on a super stream.