Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ default List<ByteRecord> xClaim(byte[] key, String group, String newOwner, Durat

/**
* @author Christoph Strobl
* @author Dengliming
* @since 2.3
*/
class XClaimOptions {
Expand All @@ -239,16 +240,18 @@ class XClaimOptions {
private final @Nullable Instant unixTime;
private final @Nullable Long retryCount;
private final boolean force;
private final boolean justId;

private XClaimOptions(List<RecordId> ids, Duration minIdleTime, @Nullable Duration idleTime,
@Nullable Instant unixTime, @Nullable Long retryCount, boolean force) {
@Nullable Instant unixTime, @Nullable Long retryCount, boolean force, boolean justId) {

this.ids = new ArrayList<>(ids);
this.minIdleTime = minIdleTime;
this.idleTime = idleTime;
this.unixTime = unixTime;
this.retryCount = retryCount;
this.force = force;
this.justId = justId;
}

/**
Expand Down Expand Up @@ -281,7 +284,7 @@ public static XClaimOptionsBuilder minIdleMs(long millis) {
* @return {@code this}.
*/
public XClaimOptions idle(Duration idleTime) {
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId);
}

/**
Expand All @@ -292,7 +295,7 @@ public XClaimOptions idle(Duration idleTime) {
* @return {@code this}.
*/
public XClaimOptions time(Instant unixTime) {
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId);
}

/**
Expand All @@ -302,7 +305,7 @@ public XClaimOptions time(Instant unixTime) {
* @return new instance of {@link XClaimOptions}.
*/
public XClaimOptions retryCount(long retryCount) {
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId);
}

/**
Expand All @@ -312,7 +315,16 @@ public XClaimOptions retryCount(long retryCount) {
* @return new instance of {@link XClaimOptions}.
*/
public XClaimOptions force() {
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, true);
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, true, justId);
}

/**
* Set the JUSTID flag.
*
* @return new instance of {@link XClaimOptions}.
*/
public XClaimOptions justId() {
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, true);
}

/**
Expand Down Expand Up @@ -381,6 +393,15 @@ public boolean isForce() {
return force;
}

/**
* Get the JUSTID flag.
*
* @return
*/
public boolean isJustId() {
return justId;
}

public static class XClaimOptionsBuilder {

private final Duration minIdleTime;
Expand All @@ -404,7 +425,7 @@ public XClaimOptions ids(List<?> ids) {
.map(it -> it instanceof RecordId ? (RecordId) it : RecordId.of(it.toString()))
.collect(Collectors.toList());

return new XClaimOptions(idList, minIdleTime, null, null, null, false);
return new XClaimOptions(idList, minIdleTime, null, null, null, false, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ public Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStream
@Override
public Flux<CommandResponse<XClaimCommand, Flux<RecordId>>> xClaimJustId(Publisher<XClaimCommand> commands) {

if (true /* TODO: set the JUSTID flag */ ) {
throw new UnsupportedOperationException("Lettuce does not support XCLAIM with JUSTID. (Ref: lettuce-io#1233)");
}

return connection.execute(cmd -> Flux.from(commands).map(command -> {

String[] ids = command.getOptions().getIdsAsStringArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* @author Mark Paluch
* @author Tugdual Grall
* @author Dejan Jankov
* @author Dengliming
* @since 2.2
*/
class LettuceStreamCommands implements RedisStreamCommands {
Expand Down Expand Up @@ -136,10 +137,6 @@ public List<RecordId> xClaimJustId(byte[] key, String group, String newOwner, XC
LettuceConverters.toBytes(newOwner));
XClaimArgs args = StreamConverters.toXClaimArgs(options);

if (true /* TODO: set the JUSTID flag */ ) {
throw new UnsupportedOperationException("Lettuce does not support XCLAIM with JUSTID. (Ref: lettuce-io#1233)");
}

try {
if (isPipelined()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ public XClaimArgs convert(XClaimOptions source) {
if (source.getUnixTime() != null) {
args.time(source.getUnixTime());
}

if (source.isJustId()) {
args.justid();
}
return args;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,4 +488,22 @@ public void xinfoConsumersNoConsumer() {
connection.streamCommands().xInfoConsumers(KEY_1_BBUFFER, "my-group").as(StepVerifier::create).verifyComplete();
}

@Test // DATAREDIS-1226
public void xClaimJustId() {

String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1);
nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group");

String expected = nativeCommands.xadd(KEY_1, KEY_2, VALUE_2);

connection.streamCommands()
.xReadGroup(Consumer.from("my-group", "my-consumer"),
StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) //
.delayElements(Duration.ofMillis(5)).next() //
.flatMapMany(record -> connection.streamCommands().xClaimJustId(KEY_1_BBUFFER, "my-group", "my-consumer",
XClaimOptions.minIdle(Duration.ofMillis(1)).ids(record.getId()).justId())
).as(StepVerifier::create) //
.assertNext(it -> assertThat(it.getValue()).isEqualTo(expected)) //
.verifyComplete();
}
}