diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index 3487a61f1d..5773cf4389 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -103,6 +103,7 @@ * @author Mark Paluch * @author John Blum * @author Seongjun Lee + * @author Su Ko * @see MessageListener * @see SubscriptionListener */ @@ -168,6 +169,9 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab private @Nullable Subscriber subscriber; + private int phase = Integer.MAX_VALUE; + private boolean autoStartup = true; + /** * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default, * there will be no ErrorHandler so that error-level logging is the only result. @@ -618,6 +622,40 @@ public void removeMessageListener(MessageListener listener) { removeMessageListener(listener, Collections.emptySet()); } + @Override + public int getPhase() { + return this.phase; + } + + /** + * Specify the lifecycle phase for this container. + * Lower values start earlier and stop later. + * The default is {@code Integer.MAX_VALUE}. + * + * @see SmartLifecycle#getPhase() + * @since 4.0 + */ + public void setPhase(int phase) { + this.phase = phase; + } + + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + + /** + * Configure if this Lifecycle connection factory should get started automatically by the container at the time that + * the containing ApplicationContext gets refreshed. + * The default is {@code true}. + * + * @see SmartLifecycle#isAutoStartup() + * @since 4.0 + */ + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + private void initMapping(Map> listeners) { // stop the listener if currently running diff --git a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java index 12635762b9..9be3ae51f7 100644 --- a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java @@ -50,6 +50,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author Su Ko * @since 2.2 */ class DefaultStreamMessageListenerContainer> implements StreamMessageListenerContainer { @@ -67,6 +68,9 @@ class DefaultStreamMessageListenerContainer> implement private boolean running = false; + private int phase = Integer.MAX_VALUE; + private boolean autoStartup = false; + /** * Create a new {@link DefaultStreamMessageListenerContainer}. * @@ -90,6 +94,14 @@ class DefaultStreamMessageListenerContainer> implement } else { this.streamOperations = this.template.opsForStream(); } + + if(containerOptions.isAutoStartup().isPresent()){ + this.autoStartup = containerOptions.isAutoStartup().get(); + } + + if(containerOptions.getPhase().isPresent()){ + this.phase = containerOptions.getPhase().getAsInt(); + } } private static StreamReadOptions getStreamReadOptions(StreamMessageListenerContainerOptions options) { @@ -123,9 +135,21 @@ private RedisTemplate createRedisTemplate(RedisConnectionFactory connectio @Override public boolean isAutoStartup() { - return false; + return this.autoStartup; } + /** + * Configure if this Lifecycle connection factory should get started automatically by the container at the time that + * the containing ApplicationContext gets refreshed. + * The default is {@code false}. + * + * @see org.springframework.context.SmartLifecycle#isAutoStartup() + * @since 4.0 + */ + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + @Override public void stop(Runnable callback) { @@ -177,9 +201,21 @@ public boolean isRunning() { @Override public int getPhase() { - return Integer.MAX_VALUE; + return this.phase; } + /** + * Specify the lifecycle phase for this container. + * Lower values start earlier and stop later. + * The default is {@code Integer.MAX_VALUE}. + * + * @see org.springframework.context.SmartLifecycle#getPhase() + * @since 4.0 + */ + public void setPhase(int phase) { + this.phase = phase; + } + @Override public Subscription register(StreamReadRequest streamRequest, StreamListener listener) { diff --git a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java index 009d6c3f93..f30e81d07a 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.stream; import java.time.Duration; +import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.Executor; import java.util.function.Predicate; @@ -107,6 +108,7 @@ * @author Christoph Strobl * @author Christian Rest * @author DongCheol Kim + * @author Su Ko * @param Stream key and Stream field type. * @param Stream value type. * @since 2.2 @@ -503,12 +505,14 @@ class StreamMessageListenerContainerOptions> { private final @Nullable HashMapper hashMapper; private final ErrorHandler errorHandler; private final Executor executor; + private final @Nullable Integer phase; + private final @Nullable Boolean autoStartup; @SuppressWarnings({ "unchecked", "rawtypes" }) private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable Integer batchSize, RedisSerializer keySerializer, RedisSerializer hashKeySerializer, RedisSerializer hashValueSerializer, @Nullable Class targetType, - @Nullable HashMapper hashMapper, ErrorHandler errorHandler, Executor executor) { + @Nullable HashMapper hashMapper, ErrorHandler errorHandler, Executor executor,@Nullable Integer phase, @Nullable Boolean autoStartup) { this.pollTimeout = pollTimeout; this.batchSize = batchSize; this.keySerializer = keySerializer; @@ -518,6 +522,8 @@ private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable In this.hashMapper = (HashMapper) hashMapper; this.errorHandler = errorHandler; this.executor = executor; + this.phase = phase; + this.autoStartup = autoStartup; } /** @@ -598,6 +604,21 @@ public Executor getExecutor() { return executor; } + /** + * @return the phase. + * @since 4.0 + */ + public OptionalInt getPhase() { + return phase != null ? OptionalInt.of(phase) : OptionalInt.empty(); + } + + /** + * @return the autoStartup. + * @since 4.0 + */ + public Optional isAutoStartup() { + return autoStartup != null ? Optional.of(autoStartup) : Optional.empty(); + } } /** @@ -618,6 +639,8 @@ class StreamMessageListenerContainerOptionsBuilder> { private @Nullable Class targetType; private ErrorHandler errorHandler = LoggingErrorHandler.INSTANCE; private Executor executor = new SimpleAsyncTaskExecutor(); + private @Nullable Integer phase; + private @Nullable Boolean autoStartup; @SuppressWarnings("NullAway") private StreamMessageListenerContainerOptionsBuilder() {} @@ -679,6 +702,28 @@ public StreamMessageListenerContainerOptionsBuilder errorHandler(ErrorHand return this; } + /** + * Configure a phase for the {@link SmartLifecycle} + * + * @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}. + * @since 4.0 + */ + public StreamMessageListenerContainerOptionsBuilder phase(int phase) { + this.phase = phase; + return this; + } + + /** + * Configure a autoStartup for the {@link SmartLifecycle} + * + * @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}. + * @since 4.0 + */ + public StreamMessageListenerContainerOptionsBuilder autoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + return this; + } + /** * Configure a key, hash key and hash value serializer. * @@ -796,7 +841,7 @@ public StreamMessageListenerContainerOptions build() { Assert.notNull(hashValueSerializer, "Hash Value Serializer must not be null"); return new StreamMessageListenerContainerOptions<>(pollTimeout, batchSize, keySerializer, hashKeySerializer, - hashValueSerializer, targetType, hashMapper, errorHandler, executor); + hashValueSerializer, targetType, hashMapper, errorHandler, executor,phase,autoStartup); } } diff --git a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java index ea7dfeb557..2e1c3056e9 100644 --- a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java +++ b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java @@ -239,4 +239,27 @@ void shouldRemoveAllListenersWhenListenerIsNull() { assertThatNoException().isThrownBy(() -> container.removeMessageListener(null, Collections.singletonList(topic))); } + + @Test // GH-3208 + void defaultPhaseShouldBeMaxValue() { + assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE); + } + + @Test // GH-3208 + void shouldApplyConfiguredPhase() { + container.setPhase(3208); + assertThat(container.getPhase()).isEqualTo(3208); + } + + @Test // GH-3208 + void defaultAutoStartupShouldBeTrue() { + assertThat(container.isAutoStartup()).isEqualTo(true); + } + + @Test // GH-3208 + void shouldApplyConfiguredAutoStartup() { + container.setAutoStartup(false); + assertThat(container.isAutoStartup()).isEqualTo(false); + } + } diff --git a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java index 26318b1448..302b0a1ca8 100644 --- a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java @@ -384,6 +384,44 @@ void containerRestartShouldRestartSubscription() throws InterruptedException { cancelAwait(subscription); } + @Test // GH-3208 + void defaultPhaseShouldBeMaxValue() { + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, containerOptions); + + assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE); + } + + @Test // GH-3208 + void shouldApplyConfiguredPhase() { + StreamMessageListenerContainerOptions> options = StreamMessageListenerContainerOptions.builder() + .phase(3208) + .build(); + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, options); + + assertThat(container.getPhase()).isEqualTo(3208); + } + + @Test // GH-3208 + void defaultAutoStartupShouldBeFalse() { + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, containerOptions); + + assertThat(container.isAutoStartup()).isEqualTo(false); + } + + @Test // GH-3208 + void shouldApplyConfiguredAutoStartup() { + StreamMessageListenerContainerOptions> options = StreamMessageListenerContainerOptions.builder() + .autoStartup(true) + .build(); + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, options); + + assertThat(container.isAutoStartup()).isEqualTo(true); + } + private static void cancelAwait(Subscription subscription) { subscription.cancel();