Skip to content

Commit 5dde5b2

Browse files
committed
[GH-3208] Feat: Allow configuring 'autoStartup' and 'phase' in StreamMessageListenerContainerOptions
Signed-off-by: Su Ko <rhtn1128@gmail.com>
1 parent f7382ff commit 5dde5b2

File tree

4 files changed

+94
-3
lines changed

4 files changed

+94
-3
lines changed

src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,14 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement
9494
} else {
9595
this.streamOperations = this.template.opsForStream();
9696
}
97+
98+
if(containerOptions.isAutoStartup().isPresent()){
99+
this.autoStartup = containerOptions.isAutoStartup().get();
100+
}
101+
102+
if(containerOptions.getPhase().isPresent()){
103+
this.phase = containerOptions.getPhase().getAsInt();
104+
}
97105
}
98106

99107
private static StreamReadOptions getStreamReadOptions(StreamMessageListenerContainerOptions<?, ?> options) {

src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.redis.stream;
1717

1818
import java.time.Duration;
19+
import java.util.Optional;
1920
import java.util.OptionalInt;
2021
import java.util.concurrent.Executor;
2122
import java.util.function.Predicate;
@@ -107,6 +108,7 @@
107108
* @author Christoph Strobl
108109
* @author Christian Rest
109110
* @author DongCheol Kim
111+
* @author Su Ko
110112
* @param <K> Stream key and Stream field type.
111113
* @param <V> Stream value type.
112114
* @since 2.2
@@ -503,12 +505,14 @@ class StreamMessageListenerContainerOptions<K, V extends Record<K, ?>> {
503505
private final @Nullable HashMapper<Object, Object, Object> hashMapper;
504506
private final ErrorHandler errorHandler;
505507
private final Executor executor;
508+
private final @Nullable Integer phase;
509+
private final @Nullable Boolean autoStartup;
506510

507511
@SuppressWarnings({ "unchecked", "rawtypes" })
508512
private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable Integer batchSize,
509513
RedisSerializer<K> keySerializer, RedisSerializer<Object> hashKeySerializer,
510514
RedisSerializer<Object> hashValueSerializer, @Nullable Class<?> targetType,
511-
@Nullable HashMapper<V, ?, ?> hashMapper, ErrorHandler errorHandler, Executor executor) {
515+
@Nullable HashMapper<V, ?, ?> hashMapper, ErrorHandler errorHandler, Executor executor,@Nullable Integer phase, @Nullable Boolean autoStartup) {
512516
this.pollTimeout = pollTimeout;
513517
this.batchSize = batchSize;
514518
this.keySerializer = keySerializer;
@@ -518,6 +522,8 @@ private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable In
518522
this.hashMapper = (HashMapper) hashMapper;
519523
this.errorHandler = errorHandler;
520524
this.executor = executor;
525+
this.phase = phase;
526+
this.autoStartup = autoStartup;
521527
}
522528

523529
/**
@@ -598,6 +604,21 @@ public Executor getExecutor() {
598604
return executor;
599605
}
600606

607+
/**
608+
* @return the phase.
609+
* @since 4.0
610+
*/
611+
public OptionalInt getPhase() {
612+
return phase != null ? OptionalInt.of(phase) : OptionalInt.empty();
613+
}
614+
615+
/**
616+
* @return the autoStartup.
617+
* @since 4.0
618+
*/
619+
public Optional<Boolean> isAutoStartup() {
620+
return autoStartup != null ? Optional.of(autoStartup) : Optional.empty();
621+
}
601622
}
602623

603624
/**
@@ -618,6 +639,8 @@ class StreamMessageListenerContainerOptionsBuilder<K, V extends Record<K, ?>> {
618639
private @Nullable Class<?> targetType;
619640
private ErrorHandler errorHandler = LoggingErrorHandler.INSTANCE;
620641
private Executor executor = new SimpleAsyncTaskExecutor();
642+
private @Nullable Integer phase;
643+
private @Nullable Boolean autoStartup;
621644

622645
@SuppressWarnings("NullAway")
623646
private StreamMessageListenerContainerOptionsBuilder() {}
@@ -679,6 +702,28 @@ public StreamMessageListenerContainerOptionsBuilder<K, V> errorHandler(ErrorHand
679702
return this;
680703
}
681704

705+
/**
706+
* Configure a phase for the {@link SmartLifecycle}
707+
*
708+
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
709+
* @since 4.0
710+
*/
711+
public StreamMessageListenerContainerOptionsBuilder<K, V> phase(int phase) {
712+
this.phase = phase;
713+
return this;
714+
}
715+
716+
/**
717+
* Configure a autoStartup for the {@link SmartLifecycle}
718+
*
719+
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
720+
* @since 4.0
721+
*/
722+
public StreamMessageListenerContainerOptionsBuilder<K, V> autoStartup(boolean autoStartup) {
723+
this.autoStartup = autoStartup;
724+
return this;
725+
}
726+
682727
/**
683728
* Configure a key, hash key and hash value serializer.
684729
*
@@ -796,7 +841,7 @@ public StreamMessageListenerContainerOptions<K, V> build() {
796841
Assert.notNull(hashValueSerializer, "Hash Value Serializer must not be null");
797842

798843
return new StreamMessageListenerContainerOptions<>(pollTimeout, batchSize, keySerializer, hashKeySerializer,
799-
hashValueSerializer, targetType, hashMapper, errorHandler, executor);
844+
hashValueSerializer, targetType, hashMapper, errorHandler, executor,phase,autoStartup);
800845
}
801846

802847
}

src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ void shouldApplyConfiguredPhase() {
252252
}
253253

254254
@Test // GH-3208
255-
void defaultAutoStartupShouldBeMaxValue() {
255+
void defaultAutoStartupShouldBeTrue() {
256256
assertThat(container.isAutoStartup()).isEqualTo(true);
257257
}
258258

src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,44 @@ void containerRestartShouldRestartSubscription() throws InterruptedException {
384384
cancelAwait(subscription);
385385
}
386386

387+
@Test // GH-3208
388+
void defaultPhaseShouldBeMaxValue() {
389+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
390+
.create(connectionFactory, containerOptions);
391+
392+
assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE);
393+
}
394+
395+
@Test // GH-3208
396+
void shouldApplyConfiguredPhase() {
397+
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainerOptions.builder()
398+
.phase(3208)
399+
.build();
400+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
401+
.create(connectionFactory, options);
402+
403+
assertThat(container.getPhase()).isEqualTo(3208);
404+
}
405+
406+
@Test // GH-3208
407+
void defaultAutoStartupShouldBeFalse() {
408+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
409+
.create(connectionFactory, containerOptions);
410+
411+
assertThat(container.isAutoStartup()).isEqualTo(false);
412+
}
413+
414+
@Test // GH-3208
415+
void shouldApplyConfiguredAutoStartup() {
416+
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainerOptions.builder()
417+
.autoStartup(true)
418+
.build();
419+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
420+
.create(connectionFactory, options);
421+
422+
assertThat(container.isAutoStartup()).isEqualTo(true);
423+
}
424+
387425
private static void cancelAwait(Subscription subscription) {
388426

389427
subscription.cancel();

0 commit comments

Comments
 (0)