Skip to content

Commit

Permalink
Change hgetall return type from Mono<Map> to Flux<KeyValue> #1434
Browse files Browse the repository at this point in the history
Enable streaming of the hash key value pairs.
  • Loading branch information
mp911de committed Sep 29, 2020
1 parent 4b6b5ee commit 52ae9c0
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 12 deletions.
1 change: 1 addition & 0 deletions RELEASE-NOTES.md
Expand Up @@ -197,6 +197,7 @@ With this release, we took the opportunity to introduce a series of changes that
* Remove Spring support classes #1358
* Replace io.lettuce.core.resource.Futures utility with Netty's PromiseCombiner #1283
* XGROUP DELCONSUMER should return pending message count #1377 (xgroupDelconsumer(…) now returns `Long`)
* Change hgetall return type from Mono<Map> to Flux<KeyValue> #1434
* Script Commands: `eval`, `digest`, `scriptLoad` methods now only accept `String` and `byte[]` argument types. Previously `digest` and `scriptLoad` accepted the script contents as Codec value type which caused issues especially when marshalling values using JSON or Java Serialization. The script charset can be configured via `ClientOptions` (`ClientOptions.builder().scriptCharset(StandardCharsets.US_ASCII).build();`), defaulting to UTF-8.
* Connection: Removal of deprecated timeout methods accepting `TimeUnit`. Use methods accepting `Duration` instead.
* Async Commands: `RedisAsyncCommands.select(…)` and `.auth(…)` methods return now futures instead if being blocking methods.
Expand Down
Expand Up @@ -785,8 +785,8 @@ public Mono<V> hget(K key, K field) {
}

@Override
public Mono<Map<K, V>> hgetall(K key) {
return createMono(() -> commandBuilder.hgetall(key));
public Flux<KeyValue<K, V>> hgetall(K key) {
return createDissolvingFlux(() -> commandBuilder.hgetallKeyValue(key));
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Expand Up @@ -939,6 +939,12 @@ Command<K, V, Map<K, V>> hgetall(K key) {
return createCommand(HGETALL, new MapOutput<>(codec), key);
}

Command<K, V, List<KeyValue<K, V>>> hgetallKeyValue(K key) {
notNullKey(key);

return createCommand(HGETALL, new KeyValueListOutput<>(codec), key);
}

Command<K, V, Long> hgetall(KeyValueStreamingChannel<K, V> channel, K key) {
notNullKey(key);
notNull(channel);
Expand Down
Expand Up @@ -94,7 +94,7 @@ public interface RedisHashReactiveCommands<K, V> {
* @return Map&lt;K,V&gt; array-reply list of fields and their values stored in the hash, or an empty list when {@code key}
* does not exist.
*/
Mono<Map<K, V>> hgetall(K key);
Flux<KeyValue<K, V>> hgetall(K key);

/**
* Stream over all the fields and values in a hash.
Expand Down
36 changes: 29 additions & 7 deletions src/main/java/io/lettuce/core/output/KeyValueListOutput.java
Expand Up @@ -25,7 +25,8 @@
import io.lettuce.core.internal.LettuceAssert;

/**
* {@link List} of values output.
* {@link List} of {@link KeyValue} output. Can be either used to decode key-value tuples (e.g. {@code HGETALL}) of for a pure
* value response where keys are supplied as input (for e.g. {@code HMGET}).
*
* @param <K> Key type.
* @param <V> Value type.
Expand All @@ -39,10 +40,18 @@ public class KeyValueListOutput<K, V> extends CommandOutput<K, V, List<KeyValue<

private Subscriber<KeyValue<K, V>> subscriber;

private Iterable<K> keys;
private final Iterable<K> keys;

private Iterator<K> keyIterator;

private K key;

public KeyValueListOutput(RedisCodec<K, V> codec) {
super(codec, Collections.emptyList());
setSubscriber(ListSubscriber.instance());
this.keys = null;
}

public KeyValueListOutput(RedisCodec<K, V> codec, Iterable<K> keys) {
super(codec, Collections.emptyList());
setSubscriber(ListSubscriber.instance());
Expand All @@ -52,18 +61,31 @@ public KeyValueListOutput(RedisCodec<K, V> codec, Iterable<K> keys) {
@Override
public void set(ByteBuffer bytes) {

if (keyIterator == null) {
keyIterator = keys.iterator();
}
if (keys == null) {
if (key == null) {
key = codec.decodeKey(bytes);
return;
}

K key = this.key;
this.key = null;
subscriber.onNext(output, KeyValue.fromNullable(key, bytes == null ? null : codec.decodeValue(bytes)));

subscriber.onNext(output, KeyValue.fromNullable(keyIterator.next(), bytes == null ? null : codec.decodeValue(bytes)));
} else {
if (keyIterator == null) {
keyIterator = keys.iterator();
}

subscriber.onNext(output,
KeyValue.fromNullable(keyIterator.next(), bytes == null ? null : codec.decodeValue(bytes)));
}
}

@Override
public void multi(int count) {

if (!initialized) {
output = OutputFactory.newList(count);
output = OutputFactory.newList(keys == null ? count / 2 : count);
initialized = true;
}
}
Expand Down
Expand Up @@ -17,6 +17,9 @@

import javax.inject.Inject;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.commands.HashCommandIntegrationTests;
import io.lettuce.test.ReactiveSyncInvocationHandler;
Expand All @@ -30,4 +33,16 @@ class HashClusterReactiveCommandIntegrationTests extends HashCommandIntegrationT
HashClusterReactiveCommandIntegrationTests(StatefulRedisClusterConnection<String, String> connection) {
super(ReactiveSyncInvocationHandler.sync(connection));
}

@Test
@Disabled("API differences")
public void hgetall() {

}

@Test
@Disabled("API differences")
public void hgetallStreaming() {

}
}
Expand Up @@ -88,7 +88,7 @@ void hget() {
}

@Test
void hgetall() {
public void hgetall() {
assertThat(redis.hgetall(key).isEmpty()).isTrue();

redis.hset(key, "zero", "0");
Expand All @@ -102,7 +102,7 @@ void hgetall() {
}

@Test
void hgetallStreaming() {
public void hgetallStreaming() {

KeyValueStreamingAdapter<String, String> adapter = new KeyValueStreamingAdapter<>();

Expand Down
Expand Up @@ -15,8 +15,18 @@
*/
package io.lettuce.core.commands.reactive;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.stream.Collectors;

import javax.inject.Inject;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import reactor.test.StepVerifier;
import io.lettuce.core.KeyValue;
import io.lettuce.core.Value;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.commands.HashCommandIntegrationTests;
import io.lettuce.test.ReactiveSyncInvocationHandler;
Expand All @@ -26,8 +36,31 @@
*/
class HashReactiveCommandIntegrationTests extends HashCommandIntegrationTests {

private final StatefulRedisConnection<String, String> connection;

@Inject
HashReactiveCommandIntegrationTests(StatefulRedisConnection<String, String> connection) {
super(ReactiveSyncInvocationHandler.sync(connection));
this.connection = connection;
}

@Test
public void hgetall() {

connection.sync().hset(key, "zero", "0");
connection.sync().hset(key, "one", "1");
connection.sync().hset(key, "two", "2");

connection.reactive().hgetall(key).collect(Collectors.toMap(KeyValue::getKey, Value::getValue)).as(StepVerifier::create)
.assertNext(actual -> {

assertThat(actual).containsEntry("zero", "0").containsEntry("one", "1").containsEntry("two", "2");
}).verifyComplete();
}

@Test
@Disabled("API differences")
public void hgetallStreaming() {

}
}

0 comments on commit 52ae9c0

Please sign in to comment.