Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for RabbitListener.queuesToDeclare #352

Merged
merged 7 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions springwolf-plugins/springwolf-amqp-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies {
implementation "org.springframework.amqp:spring-amqp"
implementation "org.springframework.amqp:spring-rabbit"

implementation "org.apache.commons:commons-lang3:${commonsLang3Version}"

implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"

Expand All @@ -33,6 +35,7 @@ dependencies {

testImplementation "org.assertj:assertj-core:${assertjCoreVersion}"
testImplementation "org.junit.jupiter:junit-jupiter-api:${junitJupiterVersion}"
testImplementation("org.junit.jupiter:junit-jupiter-params:${junitJupiterVersion}")
testImplementation "org.mockito:mockito-core:${mockitoCoreVersion}"

testImplementation "org.springframework.boot:spring-boot-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.asyncapi.v2.binding.operation.OperationBinding;
import com.asyncapi.v2.binding.operation.amqp.AMQPOperationBinding;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeTypes;
Expand Down Expand Up @@ -34,13 +35,12 @@ public class RabbitListenerUtil {
private static final String DEFAULT_EXCHANGE_TYPE = ExchangeTypes.DIRECT;

public static String getChannelName(RabbitListener annotation, StringValueResolver resolver) {
Stream<String> annotationQueueNames = Arrays.stream(annotation.queues());
Stream<String> annotationBindingChannelNames = Arrays.stream(annotation.bindings())
.flatMap(binding -> Stream.concat(
Stream.of(binding.key()), // if routing key is configured, prefer it
Stream.of(binding.value().name())));

return Stream.concat(annotationQueueNames, annotationBindingChannelNames)
return Stream.concat(streamQueueNames(annotation), annotationBindingChannelNames)
.map(resolver::resolveStringValue)
.filter(Objects::nonNull)
.peek(queue -> log.debug("Resolved channel name: {}", queue))
Expand All @@ -51,11 +51,10 @@ public static String getChannelName(RabbitListener annotation, StringValueResolv
}

public static String getQueueName(RabbitListener annotation, StringValueResolver resolver) {
Stream<String> annotationQueueNames = Arrays.stream(annotation.queues());
Stream<String> annotationBindingChannelNames = Arrays.stream(annotation.bindings())
.flatMap(binding -> Stream.of(binding.value().name()));

return Stream.concat(annotationQueueNames, annotationBindingChannelNames)
return Stream.concat(streamQueueNames(annotation), annotationBindingChannelNames)
.map(resolver::resolveStringValue)
.filter(Objects::nonNull)
.peek(queue -> log.debug("Resolved queue name: {}", queue))
Expand All @@ -65,6 +64,25 @@ public static String getQueueName(RabbitListener annotation, StringValueResolver
"No queue name was found in @RabbitListener annotation (neither in queues nor bindings property)"));
}

/**
*
* @param rabbitListenerAnnotation a RabbitListener annotation
* @return A stream of ALL queue names as defined in the following 'locations':
* <UL>
* <LI>{@link RabbitListener#queues()}</LI>
* <LI>{@link RabbitListener#queuesToDeclare()}.name</LI>
* </UL>
* Note: queues, queuesToDeclare (and bindings) are mutually exclusive
* @see <a href="https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitListener.html#queuesToDeclare()">RabbitListener.queuesToDeclare</a>
* */
private static Stream<String> streamQueueNames(RabbitListener rabbitListenerAnnotation) {
return Arrays.stream(ArrayUtils.addAll(
rabbitListenerAnnotation.queues(),
Arrays.stream(rabbitListenerAnnotation.queuesToDeclare())
.map(queue -> queue.name())
.toArray(String[]::new)));
}

public static Map<String, ? extends ChannelBinding> buildChannelBinding(
RabbitListener annotation, StringValueResolver resolver, RabbitListenerUtilContext context) {
AMQPChannelBinding.AMQPChannelBindingBuilder channelBinding = AMQPChannelBinding.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.HeaderReference;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.schemas.DefaultSchemasService;
import io.github.stavshamir.springwolf.schemas.example.ExampleJsonGenerator;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
Expand Down Expand Up @@ -61,9 +62,6 @@ class MethodLevelRabbitListenerScannerIntegrationTest {
@MockBean
private ComponentClassScanner componentsScanner;

@MockBean
private AsyncApiDocket asyncApiDocket;
sam0r040 marked this conversation as resolved.
Show resolved Hide resolved

private static final String QUEUE = "test-queue";
private static final Map<String, Object> defaultOperationBinding =
Map.of("amqp", AMQPOperationBinding.builder().cc(List.of(QUEUE)).build());
Expand Down Expand Up @@ -103,10 +101,15 @@ void scan_componentHasNoRabbitListenerMethods() {
assertThat(channels).isEmpty();
}

@Test
void scan_componentHasRabbitListenerMethods_hardCodedTopic() {
@ParameterizedTest
@ValueSource(
classes = {
ClassWithRabbitListenerAnnotationHardCodedTopic.class,
ClassWithRabbitListenerAnnotationUsingQueuesToDeclare.class
})
void scan_componentHasRabbitListenerMethods_hardCodedTopic(Class<?> classWithRabbitListenerAnnotation) {
// Given a class with methods annotated with RabbitListener, whose queues attribute is hard coded
setClassToScan(ClassWithRabbitListenerAnnotationHardCodedTopic.class);
setClassToScan(classWithRabbitListenerAnnotation);

// When scan is called
Map<String, ChannelItem> actualChannelItems = rabbitListenerScanner.scan();
Expand All @@ -128,7 +131,7 @@ void scan_componentHasRabbitListenerMethods_hardCodedTopic() {

Operation operation = Operation.builder()
.description("Auto-generated description")
.operationId("test-queue_publish_methodWithAnnotation")
.operationId(QUEUE + "_publish_methodWithAnnotation")
.bindings(defaultOperationBinding)
.message(message)
.build();
Expand Down Expand Up @@ -177,7 +180,7 @@ void scan_componentHasRabbitListenerMethods_embeddedValueTopic() {

Operation operation = Operation.builder()
.description("Auto-generated description")
.operationId("test-queue_publish_methodWithAnnotation1")
.operationId(QUEUE + "_publish_methodWithAnnotation1")
.bindings(defaultOperationBinding)
.message(message)
.build();
Expand Down Expand Up @@ -223,7 +226,7 @@ void scan_componentHasRabbitListenerMethods_bindingsAnnotation() {

Operation operation = Operation.builder()
.description("Auto-generated description")
.operationId("key_publish_methodWithAnnotation1")
.operationId("key_publish_methodWithAnnotation")
.bindings(Map.of(
"amqp",
AMQPOperationBinding.builder()
Expand Down Expand Up @@ -329,7 +332,7 @@ void scan_componentHasRabbitListenerMethods_multipleParamsWithPayloadAnnotation(

Operation operation = Operation.builder()
.description("Auto-generated description")
.operationId("test-queue_publish_methodWithAnnotation")
.operationId(QUEUE + "_publish_methodWithAnnotation")
.bindings(defaultOperationBinding)
.message(message)
.build();
Expand Down Expand Up @@ -362,9 +365,18 @@ private static class ClassWithRabbitListenerAnnotationUsingBindings {
@QueueBinding(
exchange = @Exchange(name = "name", type = "topic"),
key = "key",
value = @Queue(name = "test-queue"))
value = @Queue(name = QUEUE))
})
private void methodWithAnnotation1(SimpleFoo payload) {}
private void methodWithAnnotation(SimpleFoo payload) {}
}

/**
* Note: bindings, queues, and queuesToDeclare are mutually exclusive
* @see <a href="https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitListener.html#queuesToDeclare()">RabbitListener.queuesToDeclare</a>
*/
private static class ClassWithRabbitListenerAnnotationUsingQueuesToDeclare {
@RabbitListener(queuesToDeclare = @Queue(name = QUEUE))
private void methodWithAnnotation(SimpleFoo payload) {}
}

public static class ClassWithRabbitListenerAnnotationsBindingBean {
Expand Down