Skip to content

Commit

Permalink
Adds support to lPop or rPop N elements from a Redis List in Reactive…
Browse files Browse the repository at this point in the history
…ListOperations.

Closes spring-projects#2692
  • Loading branch information
jxblum committed Sep 8, 2023
1 parent 7c54dd5 commit d96693c
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
* @author John Blum
* @since 2.0
*/
class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V> {
Expand Down Expand Up @@ -244,13 +245,21 @@ public Mono<V> leftPop(K key, Duration timeout) {

Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");

return createMono(listCommands ->
listCommands.blPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}

@Override
public Flux<V> leftPop(K key, long count) {

Assert.notNull(key, "Key must not be null");

return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readValue));
}

@Override
public Mono<V> rightPop(K key) {

Expand All @@ -264,13 +273,21 @@ public Mono<V> rightPop(K key, Duration timeout) {

Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");

return createMono(listCommands ->
listCommands.brPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}

@Override
public Flux<V> rightPop(K key, long count) {

Assert.notNull(key, "Key must not be null");

return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readValue));
}

@Override
public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey) {

Expand All @@ -287,7 +304,7 @@ public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo
Assert.notNull(sourceKey, "Source key must not be null");
Assert.notNull(destinationKey, "Destination key must not be null");
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");

return createMono(listCommands ->
listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
Expand Down Expand Up @@ -315,7 +332,7 @@ private <T> Flux<T> createFlux(Function<ReactiveListCommands, Publisher<T>> func
return template.doCreateFlux(connection -> function.apply(connection.listCommands()));
}

private boolean isZeroOrGreater1Second(Duration timeout) {
private boolean isZeroOrGreaterOneSecond(Duration timeout) {
return timeout.isZero() || timeout.getNano() % TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
* @author John Blum
* @see <a href="https://redis.io/commands#list">Redis Documentation: List Commands</a>
* @since 2.0
*/
Expand Down Expand Up @@ -325,6 +326,16 @@ default Mono<V> move(MoveFrom<K> from, MoveTo<K> to, Duration timeout) {
*/
Mono<V> leftPop(K key, Duration timeout);

/**
* Removes {@link Long count} elements from the left-side of the Redis list stored at key.
*
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
* @param count {@link Long count} of the number of elements to remove from the left-side of the Redis list.
* @return a {@link Flux} containing the elements removed from the Redis list.
* @since 3.2
*/
Flux<V> leftPop(K key, long count);

/**
* Removes and returns last element in list stored at {@code key}.
*
Expand All @@ -347,6 +358,16 @@ default Mono<V> move(MoveFrom<K> from, MoveTo<K> to, Duration timeout) {
*/
Mono<V> rightPop(K key, Duration timeout);

/**
* Removes {@link Long count} elements from the right-side of the Redis list stored at key.
*
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
* @param count {@link Long count} of the number of elements to remove from the right-side of the Redis list.
* @return a {@link Flux} containing the elements removed from the Redis list.
* @since 3.2
*/
Flux<V> rightPop(K key, long count);

/**
* Remove the last element from list at {@code sourceKey}, append it to {@code destinationKey} and return its value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.leftPopAndAwait(key:
suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.leftPopAndAwait(key: K, timeout: Duration): V? =
leftPop(key, timeout).awaitFirstOrNull()

/**
* Coroutines variant of [ReactiveListOperations.leftPop] with count.
*
* @author John Blum
* @since 3.2
*/
fun <K : Any, V : Any> ReactiveListOperations<K, V>.leftPopAsFlow(key: K, count :Long): Flow<V> =
leftPop(key, count).asFlow()

/**
* Coroutines variant of [ReactiveListOperations.rightPop].
*
Expand All @@ -201,6 +210,15 @@ suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.rightPopAndAwait(key
suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.rightPopAndAwait(key: K, timeout: Duration): V? =
rightPop(key, timeout).awaitFirstOrNull()

/**
* Coroutines variant of [ReactiveListOperations.rightPop] with count.
*
* @author John Blum
* @since 3.2
*/
fun <K : Any, V : Any> ReactiveListOperations<K, V>.rightPopAsFlow(key: K, count:Long): Flow<V> =
rightPop(key, count).asFlow()

/**
* Coroutines variant of [ReactiveListOperations.rightPopAndLeftPush].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
package org.springframework.data.redis.core;

import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assumptions.*;

import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.time.Duration;
import java.util.Collection;
Expand All @@ -35,11 +33,14 @@
import org.springframework.data.redis.test.extension.parametrized.MethodSource;
import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest;

import reactor.test.StepVerifier;

/**
* Integration tests for {@link DefaultReactiveListOperations}.
*
* @author Mark Paluch
* @author Christoph Strobl
* @author John Blum
*/
@MethodSource("testParams")
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -458,6 +459,38 @@ void leftPop() {
listOperations.leftPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
}

@ParameterizedRedisTest // GH-2692
@SuppressWarnings("all")
void leftPopWithNullKey() {

assertThatIllegalArgumentException()
.isThrownBy(() -> this.listOperations.leftPop(null, 100L))
.withMessage("Key must not be null")
.withNoCause();
}

@ParameterizedRedisTest // GH-2692
void leftPopWithCount() {

assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);

K key = keyFactory.instance();
V value1 = valueFactory.instance();
V value2 = valueFactory.instance();
V value3 = valueFactory.instance();

listOperations.leftPushAll(key, value1, value2, value3)
.as(StepVerifier::create)
.expectNext(3L)
.verifyComplete();

listOperations.leftPop(key, 2)
.as(StepVerifier::create)
.expectNext(value3)
.expectNext(value2)
.verifyComplete();
}

@ParameterizedRedisTest // DATAREDIS-602
void rightPop() {

Expand All @@ -472,6 +505,38 @@ void rightPop() {
listOperations.rightPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
}

@ParameterizedRedisTest // GH-2692
@SuppressWarnings("all")
void rightPopWithNullKey() {

assertThatIllegalArgumentException()
.isThrownBy(() -> this.listOperations.rightPop(null, 100L))
.withMessage("Key must not be null")
.withNoCause();
}

@ParameterizedRedisTest // GH-2692
void rightPopWithCount() {

assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);

K key = keyFactory.instance();
V value1 = valueFactory.instance();
V value2 = valueFactory.instance();
V value3 = valueFactory.instance();

listOperations.rightPushAll(key, value3, value2, value1)
.as(StepVerifier::create)
.expectNext(3L)
.verifyComplete();

listOperations.rightPop(key, 2)
.as(StepVerifier::create)
.expectNext(value1)
.expectNext(value2)
.verifyComplete();
}

@ParameterizedRedisTest // DATAREDIS-602
void leftPopWithTimeout() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import java.time.Duration
* @author Mark Paluch
* @author Sebastien Deleuze
* @author Wonwoo Lee
* @author John Blum
*/
class ReactiveListOperationsExtensionsUnitTests {

Expand Down Expand Up @@ -290,11 +291,27 @@ class ReactiveListOperationsExtensionsUnitTests {
}
}

@Test // GH-2692
fun leftPopWithCount() {

val operations = mockk<ReactiveListOperations<String, String>>()

every { operations.leftPop(any(), any<Long>()) } returns Flux.just("foo", "bar", "baz")

runBlocking {
assertThat(operations.leftPopAsFlow("TestKey", 3L).toList()).containsExactly("foo", "bar", "baz")
}

verify {
operations.leftPop("TestKey", 3L)
}
}

@Test // DATAREDIS-937
fun blockingLeftPop() {

val operations = mockk<ReactiveListOperations<String, String>>()
every { operations.leftPop(any(), any()) } returns Mono.just("foo")
every { operations.leftPop(any(), any<Duration>()) } returns Mono.just("foo")

runBlocking {
assertThat(operations.leftPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")
Expand All @@ -320,11 +337,28 @@ class ReactiveListOperationsExtensionsUnitTests {
}
}

@Test // GH-2692
fun rightPopWithCount() {

val operations = mockk<ReactiveListOperations<String, String>>()

every { operations.rightPop(any(), any<Long>()) } returns Flux.just("foo", "bar", "baz")

runBlocking {
assertThat(operations.rightPopAsFlow("TestKey", 3L).toList())
.containsExactly("foo", "bar", "baz")
}

verify {
operations.rightPop("TestKey", 3L)
}
}

@Test // DATAREDIS-937
fun blockingRightPop() {

val operations = mockk<ReactiveListOperations<String, String>>()
every { operations.rightPop(any(), any()) } returns Mono.just("foo")
every { operations.rightPop(any(), any<Duration>()) } returns Mono.just("foo")

runBlocking {
assertThat(operations.rightPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")
Expand Down

0 comments on commit d96693c

Please sign in to comment.