diff --git a/pom.xml b/pom.xml index 4a9d06ed..9bb4b4e4 100644 --- a/pom.xml +++ b/pom.xml @@ -42,7 +42,7 @@ UTF-8 6.1.0-RC1 3.2.0-RC1 - + 3.0.2 @@ -407,6 +407,13 @@ limitations under the License. pom import + + io.awspring.cloud + spring-cloud-aws-dependencies + ${spring-cloud-aws-bom.version} + pom + import + diff --git a/spring-modulith-events/pom.xml b/spring-modulith-events/pom.xml index c3d3d896..0ca73fcb 100644 --- a/spring-modulith-events/pom.xml +++ b/spring-modulith-events/pom.xml @@ -16,6 +16,8 @@ spring-modulith-events-amqp spring-modulith-events-api + spring-modulith-events-aws-sns + spring-modulith-events-aws-sqs spring-modulith-events-core spring-modulith-events-jackson spring-modulith-events-jdbc diff --git a/spring-modulith-events/spring-modulith-events-aws-sns/pom.xml b/spring-modulith-events/spring-modulith-events-aws-sns/pom.xml new file mode 100644 index 00000000..0454d680 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sns/pom.xml @@ -0,0 +1,96 @@ + + + 4.0.0 + + + org.springframework.modulith + spring-modulith-events + 1.1.0-SNAPSHOT + + + Spring Modulith - Events - AWS SNS support + spring-modulith-events-aws-sns + + + org.springframework.modulith.events.aws.sns + + + + + + org.springframework.modulith + spring-modulith-api + ${project.version} + + + + org.springframework.modulith + spring-modulith-events-core + ${project.version} + + + + io.awspring.cloud + spring-cloud-aws-sns + + + + com.fasterxml.jackson.core + jackson-databind + true + + + + + + org.springframework.modulith + spring-modulith-starter-jdbc + ${project.version} + test + + + + com.h2database + h2 + test + + + + org.springframework.boot + spring-boot-starter-json + test + + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.springframework.boot + spring-boot-testcontainers + test + + + + io.awspring.cloud + spring-cloud-aws-starter-sns + test + + + + io.awspring.cloud + spring-cloud-aws-starter-sqs + test + + + + org.testcontainers + localstack + test + + + + + diff --git a/spring-modulith-events/spring-modulith-events-aws-sns/src/main/java/org/springframework/modulith/events/aws/sns/SnsEventExternalizerConfiguration.java b/spring-modulith-events/spring-modulith-events-aws-sns/src/main/java/org/springframework/modulith/events/aws/sns/SnsEventExternalizerConfiguration.java new file mode 100644 index 00000000..fee0e5ad --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sns/src/main/java/org/springframework/modulith/events/aws/sns/SnsEventExternalizerConfiguration.java @@ -0,0 +1,85 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events.aws.sns; + +import io.awspring.cloud.sns.core.SnsNotification; +import io.awspring.cloud.sns.core.SnsOperations; +import io.awspring.cloud.sns.core.SnsTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sns.model.InvalidParameterException; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.expression.BeanFactoryResolver; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.messaging.MessageDeliveryException; +import org.springframework.modulith.events.EventExternalizationConfiguration; +import org.springframework.modulith.events.config.EventExternalizationAutoConfiguration; +import org.springframework.modulith.events.support.BrokerRouting; +import org.springframework.modulith.events.support.DelegatingEventExternalizer; + +/** + * Auto-configuration to set up a {@link DelegatingEventExternalizer} to externalize events to SNS. + * + * @author Maciej Walkowiak + * @since 1.1 + */ +@AutoConfiguration +@AutoConfigureAfter(EventExternalizationAutoConfiguration.class) +@ConditionalOnClass(SnsTemplate.class) +@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled", + havingValue = "true", + matchIfMissing = true) +class SnsEventExternalizerConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(SnsEventExternalizerConfiguration.class); + + @Bean + DelegatingEventExternalizer snsEventExternalizer(EventExternalizationConfiguration configuration, + SnsOperations operations, BeanFactory factory) { + + logger.debug("Registering domain event externalization to SNS…"); + + var context = new StandardEvaluationContext(); + context.setBeanResolver(new BeanFactoryResolver(factory)); + + return new DelegatingEventExternalizer(configuration, (target, payload) -> { + + var routing = BrokerRouting.of(target, context); + + var builder = SnsNotification.builder(payload); + var key = routing.getKey(payload); + // when routing key is set, SNS topic must be a FIFO topic + if (key != null) { + builder.groupId(key); + } + try { + operations.sendNotification(routing.getTarget(), builder.build()); + } catch (MessageDeliveryException e) { + // message delivery may fail if groupId is set and topic is not a FIFO topic, or content based deduplication has not been set on topic attributes. + if (e.getCause() instanceof InvalidParameterException) { + logger.error("Failed to send notification to SNS topic {}:{}", routing.getTarget(), e.getCause().getMessage()); + } + throw e; + } + }); + } +} diff --git a/spring-modulith-events/spring-modulith-events-aws-sns/src/main/java/org/springframework/modulith/events/aws/sns/package-info.java b/spring-modulith-events/spring-modulith-events-aws-sns/src/main/java/org/springframework/modulith/events/aws/sns/package-info.java new file mode 100644 index 00000000..4b089111 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sns/src/main/java/org/springframework/modulith/events/aws/sns/package-info.java @@ -0,0 +1,5 @@ +/** + * SNS event externalization support. + */ +@org.springframework.lang.NonNullApi +package org.springframework.modulith.events.aws.sns; diff --git a/spring-modulith-events/spring-modulith-events-aws-sns/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-modulith-events/spring-modulith-events-aws-sns/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..16c99bfe --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sns/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.modulith.events.aws.sns.SnsEventExternalizerConfiguration diff --git a/spring-modulith-events/spring-modulith-events-aws-sns/src/test/java/org/springframework/modulith/events/aws/sns/SnsEventExternalizerConfigurationIntegrationTests.java b/spring-modulith-events/spring-modulith-events-aws-sns/src/test/java/org/springframework/modulith/events/aws/sns/SnsEventExternalizerConfigurationIntegrationTests.java new file mode 100644 index 00000000..3469eda1 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sns/src/test/java/org/springframework/modulith/events/aws/sns/SnsEventExternalizerConfigurationIntegrationTests.java @@ -0,0 +1,64 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events.aws.sns; + +import io.awspring.cloud.sns.core.SnsOperations; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.modulith.events.EventExternalizationConfiguration; +import org.springframework.modulith.events.support.DelegatingEventExternalizer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Integration tests for {@link SnsEventExternalizerConfiguration}. + * + * @author Maciej Walkowiak + * @since 1.1 + */ +class SnsEventExternalizerConfigurationIntegrationTests { + + @Test // GH-342 + void registersExternalizerByDefault() { + + basicSetup() + .run(ctxt -> { + assertThat(ctxt).hasSingleBean(DelegatingEventExternalizer.class); + }); + } + + @Test // GH-342 + void disablesExternalizationIfConfigured() { + + basicSetup() + .withPropertyValues("spring.modulith.events.externalization.enabled=false") + .run(ctxt -> { + assertThat(ctxt).doesNotHaveBean(DelegatingEventExternalizer.class); + }); + } + + private ApplicationContextRunner basicSetup() { + + return new ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of(SnsEventExternalizerConfiguration.class)) + .withBean(EventExternalizationConfiguration.class, () -> EventExternalizationConfiguration.disabled()) + .withBean(SnsOperations.class, () -> mock(SnsOperations.class)); + } +} diff --git a/spring-modulith-events/spring-modulith-events-aws-sns/src/test/java/org/springframework/modulith/events/aws/sns/SnsEventPublicationIntegrationTests.java b/spring-modulith-events/spring-modulith-events-aws-sns/src/test/java/org/springframework/modulith/events/aws/sns/SnsEventPublicationIntegrationTests.java new file mode 100644 index 00000000..ad7fec33 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sns/src/test/java/org/springframework/modulith/events/aws/sns/SnsEventPublicationIntegrationTests.java @@ -0,0 +1,170 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events.aws.sns; + +import java.util.Map; + +import lombok.RequiredArgsConstructor; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.modulith.events.ApplicationModuleListener; +import org.springframework.modulith.events.Externalized; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.transaction.annotation.Transactional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Integration tests for SQS-based event publication. + * + * @author Maciej Walkowiak + */ +@SpringBootTest +class SnsEventPublicationIntegrationTests { + + @Autowired TestPublisher publisher; + @Autowired SnsClient snsClient; + @Autowired SqsAsyncClient sqsAsyncClient; + + @SpringBootApplication + static class TestConfiguration { + + @Bean + LocalStackContainer localStackContainer(DynamicPropertyRegistry registry) { + var localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.3.2")); + registry.add("spring.cloud.aws.endpoint", localstack::getEndpoint); + registry.add("spring.cloud.aws.credentials.access-key", localstack::getAccessKey); + registry.add("spring.cloud.aws.credentials.secret-key", localstack::getSecretKey); + registry.add("spring.cloud.aws.region.static", localstack::getRegion); + return localstack; + } + + @Bean + TestPublisher testPublisher(ApplicationEventPublisher publisher) { + return new TestPublisher(publisher); + } + + @Bean + TestListener testListener() { + return new TestListener(); + } + } + + @Test + void publishesEventToSns() { + + var topicArn = snsClient.createTopic(request -> request.name("target")).topicArn(); + + var queueUrl = sqsAsyncClient.createQueue(request -> request.queueName("queue")) + .join() + .queueUrl(); + + var queueArn = sqsAsyncClient + .getQueueAttributes(r -> r.queueUrl(queueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) + .join().attributes().get(QueueAttributeName.QUEUE_ARN); + snsClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)); + + publisher.publishEvent(); + + await().untilAsserted(() -> { + var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl)).join(); + + assertThat(response.hasMessages()).isTrue(); + }); + } + + @Test + void publishesEventWithGroupIdToSns() { + + var topicArn = snsClient.createTopic(request -> request.name("target.fifo") + .attributes(Map.of( + "FifoTopic", "true", + "ContentBasedDeduplication", "true" + ))) + .topicArn(); + + var queueUrl = sqsAsyncClient.createQueue(request -> request.queueName("queue.fifo") + .attributes(Map.of(QueueAttributeName.FIFO_QUEUE, "true"))) + .join() + .queueUrl(); + + var queueArn = sqsAsyncClient + .getQueueAttributes(r -> r.queueUrl(queueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)) + .join().attributes().get(QueueAttributeName.QUEUE_ARN); + snsClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn)); + + publisher.publishEventWithKey(); + + await().untilAsserted(() -> { + var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl)).join(); + assertThat(response.hasMessages()).isTrue(); + }); + } + + @Externalized("target") + static class TestEvent { } + + @Externalized("target.fifo::#{getKey()}") + static class TestEventWithKey { + private final String key; + + TestEventWithKey(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + } + + @RequiredArgsConstructor + static class TestPublisher { + + private final ApplicationEventPublisher events; + + @Transactional + void publishEvent() { + events.publishEvent(new TestEvent()); + } + + @Transactional + void publishEventWithKey() { + events.publishEvent(new TestEventWithKey("aKey")); + } + } + + static class TestListener { + + @ApplicationModuleListener + void on(TestEvent event) { + } + + @ApplicationModuleListener + void on(TestEventWithKey event) { + } + } +} diff --git a/spring-modulith-events/spring-modulith-events-aws-sns/src/test/resources/application.properties b/spring-modulith-events/spring-modulith-events-aws-sns/src/test/resources/application.properties new file mode 100644 index 00000000..79d64cb2 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sns/src/test/resources/application.properties @@ -0,0 +1 @@ +spring.modulith.events.jdbc.schema-initialization.enabled=true diff --git a/spring-modulith-events/spring-modulith-events-aws-sns/src/test/resources/logback.xml b/spring-modulith-events/spring-modulith-events-aws-sns/src/test/resources/logback.xml new file mode 100644 index 00000000..48661bd1 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sns/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/spring-modulith-events/spring-modulith-events-aws-sqs/pom.xml b/spring-modulith-events/spring-modulith-events-aws-sqs/pom.xml new file mode 100644 index 00000000..7bd4b8f7 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sqs/pom.xml @@ -0,0 +1,90 @@ + + + 4.0.0 + + + org.springframework.modulith + spring-modulith-events + 1.1.0-SNAPSHOT + + + Spring Modulith - Events - AWS SQS support + spring-modulith-events-aws-sqs + + + org.springframework.modulith.events.aws.sqs + + + + + + org.springframework.modulith + spring-modulith-api + ${project.version} + + + + org.springframework.modulith + spring-modulith-events-core + ${project.version} + + + + io.awspring.cloud + spring-cloud-aws-sqs + + + + com.fasterxml.jackson.core + jackson-databind + true + + + + + + org.springframework.modulith + spring-modulith-starter-jdbc + ${project.version} + test + + + + com.h2database + h2 + test + + + + org.springframework.boot + spring-boot-starter-json + test + + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.springframework.boot + spring-boot-testcontainers + test + + + + io.awspring.cloud + spring-cloud-aws-starter-sqs + test + + + + org.testcontainers + localstack + test + + + + + diff --git a/spring-modulith-events/spring-modulith-events-aws-sqs/src/main/java/org/springframework/modulith/events/aws/sqs/SqsEventExternalizerConfiguration.java b/spring-modulith-events/spring-modulith-events-aws-sqs/src/main/java/org/springframework/modulith/events/aws/sqs/SqsEventExternalizerConfiguration.java new file mode 100644 index 00000000..87b46ca4 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sqs/src/main/java/org/springframework/modulith/events/aws/sqs/SqsEventExternalizerConfiguration.java @@ -0,0 +1,74 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events.aws.sqs; + +import io.awspring.cloud.sqs.operations.SqsOperations; +import io.awspring.cloud.sqs.operations.SqsTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.expression.BeanFactoryResolver; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.modulith.events.EventExternalizationConfiguration; +import org.springframework.modulith.events.config.EventExternalizationAutoConfiguration; +import org.springframework.modulith.events.support.BrokerRouting; +import org.springframework.modulith.events.support.DelegatingEventExternalizer; + +/** + * Auto-configuration to set up a {@link DelegatingEventExternalizer} to externalize events to SQS. + * + * @author Maciej Walkowiak + * @since 1.1 + */ +@AutoConfiguration +@AutoConfigureAfter(EventExternalizationAutoConfiguration.class) +@ConditionalOnClass(SqsTemplate.class) +@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled", + havingValue = "true", + matchIfMissing = true) +class SqsEventExternalizerConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(SqsEventExternalizerConfiguration.class); + + @Bean + DelegatingEventExternalizer sqsEventExternalizer(EventExternalizationConfiguration configuration, + SqsOperations operations, BeanFactory factory) { + + logger.debug("Registering domain event externalization to SQS…"); + + var context = new StandardEvaluationContext(); + context.setBeanResolver(new BeanFactoryResolver(factory)); + + return new DelegatingEventExternalizer(configuration, (target, payload) -> { + + var routing = BrokerRouting.of(target, context); + + operations.send(sqsSendOptions -> { + var options = sqsSendOptions.queue(routing.getTarget()).payload(payload); + var key = routing.getKey(payload); + if (key != null) { + options.messageGroupId(key); + } + }); + }); + } +} diff --git a/spring-modulith-events/spring-modulith-events-aws-sqs/src/main/java/org/springframework/modulith/events/aws/sqs/package-info.java b/spring-modulith-events/spring-modulith-events-aws-sqs/src/main/java/org/springframework/modulith/events/aws/sqs/package-info.java new file mode 100644 index 00000000..f914b12a --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sqs/src/main/java/org/springframework/modulith/events/aws/sqs/package-info.java @@ -0,0 +1,5 @@ +/** + * SQS event externalization support. + */ +@org.springframework.lang.NonNullApi +package org.springframework.modulith.events.aws.sqs; diff --git a/spring-modulith-events/spring-modulith-events-aws-sqs/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-modulith-events/spring-modulith-events-aws-sqs/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..4e80bd15 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sqs/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.modulith.events.aws.sqs.SqsEventExternalizerConfiguration diff --git a/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/java/org/springframework/modulith/events/aws/sqs/SqsEventExternalizerConfigurationIntegrationTests.java b/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/java/org/springframework/modulith/events/aws/sqs/SqsEventExternalizerConfigurationIntegrationTests.java new file mode 100644 index 00000000..40e19f46 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/java/org/springframework/modulith/events/aws/sqs/SqsEventExternalizerConfigurationIntegrationTests.java @@ -0,0 +1,64 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events.aws.sqs; + +import io.awspring.cloud.sqs.operations.SqsOperations; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.modulith.events.EventExternalizationConfiguration; +import org.springframework.modulith.events.support.DelegatingEventExternalizer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Integration tests for {@link SqsEventExternalizerConfiguration}. + * + * @author Maciej Walkowiak + * @since 1.1 + */ +class SqsEventExternalizerConfigurationIntegrationTests { + + @Test // GH-342 + void registersExternalizerByDefault() { + + basicSetup() + .run(ctxt -> { + assertThat(ctxt).hasSingleBean(DelegatingEventExternalizer.class); + }); + } + + @Test // GH-342 + void disablesExternalizationIfConfigured() { + + basicSetup() + .withPropertyValues("spring.modulith.events.externalization.enabled=false") + .run(ctxt -> { + assertThat(ctxt).doesNotHaveBean(DelegatingEventExternalizer.class); + }); + } + + private ApplicationContextRunner basicSetup() { + + return new ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of(SqsEventExternalizerConfiguration.class)) + .withBean(EventExternalizationConfiguration.class, () -> EventExternalizationConfiguration.disabled()) + .withBean(SqsOperations.class, () -> mock(SqsOperations.class)); + } +} diff --git a/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/java/org/springframework/modulith/events/aws/sqs/SqsEventPublicationIntegrationTests.java b/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/java/org/springframework/modulith/events/aws/sqs/SqsEventPublicationIntegrationTests.java new file mode 100644 index 00000000..468d9866 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/java/org/springframework/modulith/events/aws/sqs/SqsEventPublicationIntegrationTests.java @@ -0,0 +1,148 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events.aws.sqs; + +import java.util.Map; + +import lombok.RequiredArgsConstructor; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.modulith.events.ApplicationModuleListener; +import org.springframework.modulith.events.Externalized; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.transaction.annotation.Transactional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Integration tests for SQS-based event publication. + * + * @author Maciej Walkowiak + */ +@SpringBootTest +class SqsEventPublicationIntegrationTests { + + @Autowired TestPublisher publisher; + @Autowired SqsAsyncClient sqsAsyncClient; + + @SpringBootApplication + static class TestConfiguration { + + @Bean + LocalStackContainer localStackContainer(DynamicPropertyRegistry registry) { + var localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.3.2")); + registry.add("spring.cloud.aws.endpoint", localstack::getEndpoint); + registry.add("spring.cloud.aws.credentials.access-key", localstack::getAccessKey); + registry.add("spring.cloud.aws.credentials.secret-key", localstack::getSecretKey); + registry.add("spring.cloud.aws.region.static", localstack::getRegion); + return localstack; + } + + @Bean + TestPublisher testPublisher(ApplicationEventPublisher publisher) { + return new TestPublisher(publisher); + } + + @Bean + TestListener testListener() { + return new TestListener(); + } + } + + @Test + void publishesEventToSqs() throws Exception { + + var queueUrl = sqsAsyncClient.createQueue(request -> request.queueName("target")) + .join() + .queueUrl(); + + publisher.publishEvent(); + + await().untilAsserted(() -> { + var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl)).join(); + + assertThat(response.hasMessages()).isTrue(); + }); + } + + @Test + void publishesEventWithGroupIdToSqs() throws Exception { + + var queueUrl = sqsAsyncClient.createQueue(request -> request.queueName("target.fifo") + .attributes(Map.of(QueueAttributeName.FIFO_QUEUE, "true"))) + .join() + .queueUrl(); + + publisher.publishEventWithKey(); + + await().untilAsserted(() -> { + var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl)).join(); + + assertThat(response.hasMessages()).isTrue(); + }); + } + + @Externalized("target") + static class TestEvent {} + + @Externalized("target.fifo::#{getKey()}") + static class TestEventWithKey { + private final String key; + + TestEventWithKey(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + } + + @RequiredArgsConstructor + static class TestPublisher { + + private final ApplicationEventPublisher events; + + @Transactional + void publishEvent() { + events.publishEvent(new TestEvent()); + } + + @Transactional + void publishEventWithKey() { + events.publishEvent(new TestEventWithKey("aKey")); + } + } + + static class TestListener { + + @ApplicationModuleListener + void on(TestEvent event) {} + + @ApplicationModuleListener + void on(TestEventWithKey event) {} + } +} diff --git a/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/resources/application.properties b/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/resources/application.properties new file mode 100644 index 00000000..62b9893f --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/resources/application.properties @@ -0,0 +1,2 @@ +spring.artemis.embedded.topics=target +spring.modulith.events.jdbc.schema-initialization.enabled=true diff --git a/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/resources/logback.xml b/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/resources/logback.xml new file mode 100644 index 00000000..f98102e7 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-aws-sqs/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/src/docs/antora/modules/ROOT/pages/events.adoc b/src/docs/antora/modules/ROOT/pages/events.adoc index d4dfdf01..2516c0bd 100644 --- a/src/docs/antora/modules/ROOT/pages/events.adoc +++ b/src/docs/antora/modules/ROOT/pages/events.adoc @@ -316,6 +316,18 @@ The logical routing key will be used as AMQP routing key. |`spring-modulith-events-jms` |Uses Spring's core JMS support. Does not support routing keys. + +|SQS +|`spring-modulith-events-aws-sqs` +|Uses Spring Cloud AWS SQS support. +The logical routing key will be used as SQS message group id. +When routing key is set, requires SQS queue to be configured as a FIFO queue. + +|SNS +|`spring-modulith-events-aws-sns` +|Uses Spring Cloud AWS SNS support. +The logical routing key will be used as SNS message group id. +When routing key is set, requires SNS to be configured as a FIFO topic with content based deduplication enabled. |=== [[externalization.fundamentals]]