From 954de8e2930b8ac4af50fe775bc2911b55775e64 Mon Sep 17 00:00:00 2001 From: dengliming Date: Wed, 7 Oct 2020 14:18:14 +0800 Subject: [PATCH] Add support for xClaimJustId --- .../redis/connection/RedisStreamCommands.java | 33 +++++++++++++++---- .../LettuceReactiveStreamCommands.java | 4 --- .../lettuce/LettuceStreamCommands.java | 5 +-- .../connection/lettuce/StreamConverters.java | 4 ++- .../LettuceReactiveStreamCommandsTests.java | 18 ++++++++++ 5 files changed, 49 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index 016b1f575d..fa90e203fc 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -229,6 +229,7 @@ default List xClaim(byte[] key, String group, String newOwner, Durat /** * @author Christoph Strobl + * @author Dengliming * @since 2.3 */ class XClaimOptions { @@ -239,9 +240,10 @@ class XClaimOptions { private final @Nullable Instant unixTime; private final @Nullable Long retryCount; private final boolean force; + private final boolean justId; private XClaimOptions(List ids, Duration minIdleTime, @Nullable Duration idleTime, - @Nullable Instant unixTime, @Nullable Long retryCount, boolean force) { + @Nullable Instant unixTime, @Nullable Long retryCount, boolean force, boolean justId) { this.ids = new ArrayList<>(ids); this.minIdleTime = minIdleTime; @@ -249,6 +251,7 @@ private XClaimOptions(List ids, Duration minIdleTime, @Nullable Durati this.unixTime = unixTime; this.retryCount = retryCount; this.force = force; + this.justId = justId; } /** @@ -281,7 +284,7 @@ public static XClaimOptionsBuilder minIdleMs(long millis) { * @return {@code this}. */ public XClaimOptions idle(Duration idleTime) { - return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force); + return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId); } /** @@ -292,7 +295,7 @@ public XClaimOptions idle(Duration idleTime) { * @return {@code this}. */ public XClaimOptions time(Instant unixTime) { - return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force); + return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId); } /** @@ -302,7 +305,7 @@ public XClaimOptions time(Instant unixTime) { * @return new instance of {@link XClaimOptions}. */ public XClaimOptions retryCount(long retryCount) { - return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force); + return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId); } /** @@ -312,7 +315,16 @@ public XClaimOptions retryCount(long retryCount) { * @return new instance of {@link XClaimOptions}. */ public XClaimOptions force() { - return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, true); + return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, true, justId); + } + + /** + * Set the JUSTID flag. + * + * @return new instance of {@link XClaimOptions}. + */ + public XClaimOptions justId() { + return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, true); } /** @@ -381,6 +393,15 @@ public boolean isForce() { return force; } + /** + * Get the JUSTID flag. + * + * @return + */ + public boolean isJustId() { + return justId; + } + public static class XClaimOptionsBuilder { private final Duration minIdleTime; @@ -404,7 +425,7 @@ public XClaimOptions ids(List ids) { .map(it -> it instanceof RecordId ? (RecordId) it : RecordId.of(it.toString())) .collect(Collectors.toList()); - return new XClaimOptions(idList, minIdleTime, null, null, null, false); + return new XClaimOptions(idList, minIdleTime, null, null, null, false, false); } /** diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java index 77a4fb308f..69d531afbc 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java @@ -123,10 +123,6 @@ public Flux> xAdd(Publisher>> xClaimJustId(Publisher commands) { - if (true /* TODO: set the JUSTID flag */ ) { - throw new UnsupportedOperationException("Lettuce does not support XCLAIM with JUSTID. (Ref: lettuce-io#1233)"); - } - return connection.execute(cmd -> Flux.from(commands).map(command -> { String[] ids = command.getOptions().getIdsAsStringArray(); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index 62d07c0105..6066b0fe4f 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -49,6 +49,7 @@ * @author Mark Paluch * @author Tugdual Grall * @author Dejan Jankov + * @author Dengliming * @since 2.2 */ class LettuceStreamCommands implements RedisStreamCommands { @@ -136,10 +137,6 @@ public List xClaimJustId(byte[] key, String group, String newOwner, XC LettuceConverters.toBytes(newOwner)); XClaimArgs args = StreamConverters.toXClaimArgs(options); - if (true /* TODO: set the JUSTID flag */ ) { - throw new UnsupportedOperationException("Lettuce does not support XCLAIM with JUSTID. (Ref: lettuce-io#1233)"); - } - try { if (isPipelined()) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java index 733f107951..6152bd64e4 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java @@ -236,7 +236,9 @@ public XClaimArgs convert(XClaimOptions source) { if (source.getUnixTime() != null) { args.time(source.getUnixTime()); } - + if (source.isJustId()) { + args.justid(); + } return args; } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java index 61773cb71d..cf419f4762 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java @@ -488,4 +488,22 @@ public void xinfoConsumersNoConsumer() { connection.streamCommands().xInfoConsumers(KEY_1_BBUFFER, "my-group").as(StepVerifier::create).verifyComplete(); } + @Test // DATAREDIS-1226 + public void xClaimJustId() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + String expected = nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) // + .delayElements(Duration.ofMillis(5)).next() // + .flatMapMany(record -> connection.streamCommands().xClaimJustId(KEY_1_BBUFFER, "my-group", "my-consumer", + XClaimOptions.minIdle(Duration.ofMillis(1)).ids(record.getId()).justId()) + ).as(StepVerifier::create) // + .assertNext(it -> assertThat(it.getValue()).isEqualTo(expected)) // + .verifyComplete(); + } }