diff --git a/pom.xml b/pom.xml index 0e3024e099..0dfebdef85 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 4.0.0-SNAPSHOT + 4.0.0-3232-SNAPSHOT Spring Data Redis Spring Data module for Redis diff --git a/src/main/antora/modules/ROOT/pages/appendix.adoc b/src/main/antora/modules/ROOT/pages/appendix.adoc index 669bf82204..8c9addf6ec 100644 --- a/src/main/antora/modules/ROOT/pages/appendix.adoc +++ b/src/main/antora/modules/ROOT/pages/appendix.adoc @@ -184,6 +184,22 @@ link:https://www.springframework.org/schema/redis/spring-redis-1.0.xsd[Spring Da |UNSUBSCRIBE |X |UNWATCH |X |WATCH |X +|XACK |X +|XACKDEL |X +|XADD |X +|XAUTOCLAIM |X +|XCLAIM |X +|XDEL |X +|XDELEX |X +|XGROUP |X +|XINFO |X +|XLEN |X +|XPENDING |X +|XRANGE |X +|XREAD |X +|XREADGROUP |X +|XREVRANGE |X +|XTRIM |X |ZADD |X |ZCARD |X |ZCOUNT |X diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java index ad35115b73..ba8db75508 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java @@ -2905,6 +2905,18 @@ public Long xDel(String key, RecordId... recordIds) { return convertAndReturn(delegate.xDel(serialize(key), recordIds), Converters.identityConverter()); } + @Override + public List xDelEx(String key, XDelOptions options, RecordId... recordIds) { + return convertAndReturn(delegate.xDelEx(serialize(key), options, recordIds), + Converters.identityConverter()); + } + + @Override + public List xAckDel(String key, String group, XDelOptions options, RecordId... recordIds) { + return convertAndReturn(delegate.xAckDel(serialize(key), group, options, recordIds), + Converters.identityConverter()); + } + @Override public String xGroupCreate(String key, ReadOffset readOffset, String group) { return convertAndReturn(delegate.xGroupCreate(serialize(key), group, readOffset), Converters.identityConverter()); @@ -3021,6 +3033,11 @@ public Long xTrim(String key, long count, boolean approximateTrimming) { return convertAndReturn(delegate.xTrim(serialize(key), count, approximateTrimming), Converters.identityConverter()); } + @Override + public Long xTrim(String key, XTrimOptions options) { + return convertAndReturn(delegate.xTrim(serialize(key), options), Converters.identityConverter()); + } + @Override public Long xAck(byte[] key, String group, RecordId... recordIds) { return delegate.xAck(key, group, recordIds); @@ -3046,6 +3063,16 @@ public Long xDel(byte[] key, RecordId... recordIds) { return delegate.xDel(key, recordIds); } + @Override + public List xDelEx(byte[] key, XDelOptions options, RecordId... recordIds) { + return delegate.xDelEx(key, options, recordIds); + } + + @Override + public List xAckDel(byte[] key, String group, XDelOptions options, RecordId... recordIds) { + return delegate.xAckDel(key, group, options, recordIds); + } + @Override public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset) { return delegate.xGroupCreate(key, groupName, readOffset); @@ -3129,6 +3156,11 @@ public Long xTrim(byte[] key, long count, boolean approximateTrimming) { return delegate.xTrim(key, count, approximateTrimming); } + @Override + public Long xTrim(byte[] key, XTrimOptions options) { + return delegate.xTrim(key, options); + } + /** * Specifies if pipelined and tx results should be deserialized to Strings. If false, results of * {@link #closePipeline()} and {@link #exec()} will be of the type returned by the underlying connection diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java index dd02f85661..4ba0292d2f 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java @@ -550,6 +550,20 @@ default Long xDel(byte[] key, RecordId... recordIds) { return streamCommands().xDel(key, recordIds); } + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ + @Override + @Deprecated + default List xDelEx(byte[] key, XDelOptions options, RecordId... recordIds) { + return streamCommands().xDelEx(key, options, recordIds); + } + + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ + @Override + @Deprecated + default List xAckDel(byte[] key, String group, XDelOptions options, RecordId... recordIds) { + return streamCommands().xAckDel(key, group, options, recordIds); + } + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ @Override @Deprecated @@ -686,12 +700,20 @@ default Long xTrim(byte[] key, long count) { return xTrim(key, count, false); } + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ @Override @Deprecated default Long xTrim(byte[] key, long count, boolean approximateTrimming) { return streamCommands().xTrim(key, count, approximateTrimming); } + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ + @Override + @Deprecated + default Long xTrim(byte[] key, XTrimOptions options) { + return streamCommands().xTrim(key, options); + } + // LIST COMMANDS /** @deprecated in favor of {@link RedisConnection#listCommands()}}. */ diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 9796288f50..dbd3bf81a4 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -35,7 +35,10 @@ import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.stream.ByteBufferRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.PendingMessage; @@ -200,20 +203,13 @@ default Mono xAck(ByteBuffer key, String group, RecordId... recordIds) { class AddStreamRecord extends KeyCommand { private final ByteBufferRecord record; - private final boolean nomkstream; - private final @Nullable Long maxlen; - private final boolean approximateTrimming; - private final @Nullable RecordId minId; + private final XAddOptions options; - private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream, - boolean approximateTrimming, @Nullable RecordId minId) { + private AddStreamRecord(ByteBufferRecord record, XAddOptions options) { super(record.getStream()); this.record = record; - this.maxlen = maxlen; - this.nomkstream = nomkstream; - this.approximateTrimming = approximateTrimming; - this.minId = minId; + this.options = options; } /** @@ -226,7 +222,7 @@ public static AddStreamRecord of(ByteBufferRecord record) { Assert.notNull(record, "Record must not be null"); - return new AddStreamRecord(record, null, false, false, null); + return new AddStreamRecord(record, XAddOptions.none()); } /** @@ -239,7 +235,7 @@ public static AddStreamRecord body(Map body) { Assert.notNull(body, "Body must not be null"); - return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false, false, null); + return new AddStreamRecord(StreamRecords.rawBuffer(body), XAddOptions.none()); } /** @@ -249,7 +245,7 @@ public static AddStreamRecord body(Map body) { * @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied. */ public AddStreamRecord to(ByteBuffer key) { - return new AddStreamRecord(record.withStreamKey(key), maxlen, nomkstream, approximateTrimming, minId); + return new AddStreamRecord(record.withStreamKey(key), options); } /** @@ -259,7 +255,7 @@ public AddStreamRecord to(ByteBuffer key) { * @since 2.6 */ public AddStreamRecord makeNoStream() { - return new AddStreamRecord(record, maxlen, true, approximateTrimming, minId); + return new AddStreamRecord(record, XAddOptions.makeNoStream()); } /** @@ -270,7 +266,7 @@ public AddStreamRecord makeNoStream() { * @since 2.6 */ public AddStreamRecord makeNoStream(boolean makeNoStream) { - return new AddStreamRecord(record, maxlen, makeNoStream, approximateTrimming, minId); + return new AddStreamRecord(record, XAddOptions.makeNoStream(makeNoStream)); } /** @@ -279,7 +275,7 @@ public AddStreamRecord makeNoStream(boolean makeNoStream) { * @return new instance of {@link AddStreamRecord}. */ public AddStreamRecord maxlen(long maxlen) { - return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId); + return new AddStreamRecord(record, XAddOptions.maxlen(maxlen)); } /** @@ -290,7 +286,7 @@ public AddStreamRecord maxlen(long maxlen) { * @since 2.7 */ public AddStreamRecord minId(RecordId minId) { - return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId); + return new AddStreamRecord(record, options.minId(minId)); } /** @@ -299,7 +295,23 @@ public AddStreamRecord minId(RecordId minId) { * @return new instance of {@link AddStreamRecord}. */ public AddStreamRecord approximateTrimming(boolean approximateTrimming) { - return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId); + return new AddStreamRecord(record, options.approximateTrimming(approximateTrimming)); + } + + /** + * Apply the given {@link XAddOptions} to configure the {@literal XADD} command. + *

+ * This method allows setting all XADD options at once, including trimming strategies + * ({@literal MAXLEN}, {@literal MINID}), stream creation behavior ({@literal NOMKSTREAM}), + * and other parameters. Constructs a new command instance with all previously configured + * properties except the options, which are replaced by the provided {@link XAddOptions}. + * + * @param options the {@link XAddOptions} to apply. Must not be {@literal null}. + * @return a new {@link AddStreamRecord} with the specified options applied. + * @since 4.0 + */ + public AddStreamRecord withOptions(XAddOptions options) { + return new AddStreamRecord(record, options); } /** @@ -318,7 +330,7 @@ public ByteBufferRecord getRecord() { * @since 2.6 */ public boolean isNoMkStream() { - return nomkstream; + return options.isNoMkStream(); } /** @@ -328,23 +340,21 @@ public boolean isNoMkStream() { * @since 2.3 */ public @Nullable Long getMaxlen() { - return maxlen; + return options.getMaxlen(); } /** * @return {@literal true} if {@literal MAXLEN} is set. * @since 2.3 */ - public boolean hasMaxlen() { - return maxlen != null; - } + public boolean hasMaxlen() { return options.hasMaxlen(); } /** * @return {@literal true} if {@literal approximateTrimming} is set. * @since 2.7 */ public boolean isApproximateTrimming() { - return approximateTrimming; + return options.isApproximateTrimming(); } /** @@ -352,7 +362,7 @@ public boolean isApproximateTrimming() { * @since 2.7 */ public @Nullable RecordId getMinId() { - return minId; + return options.getMinId(); } /** @@ -360,7 +370,15 @@ public boolean isApproximateTrimming() { * @since 2.7 */ public boolean hasMinId() { - return minId != null; + return options.hasMinId(); + } + + /** + * @return the XAddOptions options. + * @since 4.0 + */ + public XAddOptions getOptions() { + return options; } } @@ -409,18 +427,8 @@ default Mono xAdd(ByteBufferRecord record, XAddOptions xAddOptions) { Assert.notNull(record, "Record must not be null"); Assert.notNull(xAddOptions, "XAddOptions must not be null"); - AddStreamRecord addStreamRecord = AddStreamRecord.of(record) - .approximateTrimming(xAddOptions.isApproximateTrimming()).makeNoStream(xAddOptions.isNoMkStream()); - - if (xAddOptions.hasMaxlen()) { - addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen()); - } - - if (xAddOptions.hasMinId()) { - addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId()); - } - - return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput); + return xAdd(Mono.just(AddStreamRecord.of(record).withOptions(xAddOptions))).next() + .map(CommandResponse::getOutput); } /** @@ -602,6 +610,194 @@ public List getRecordIds() { } } + /** + * {@code XDELEX} command parameters. + * + * @author Viktoriya Kutsarova + * @since 4.0 + * @see Redis Documentation: XDELEX + */ + class DeleteExCommand extends KeyCommand { + + private final List recordIds; + private final XDelOptions options; + + private DeleteExCommand(@Nullable ByteBuffer key, List recordIds, XDelOptions options) { + + super(key); + this.recordIds = recordIds; + this.options = options; + } + + /** + * Creates a new {@link DeleteExCommand} given a {@link ByteBuffer key}. + * + * @param key must not be {@literal null}. + * @return a new {@link DeleteExCommand} for {@link ByteBuffer key}. + */ + public static DeleteExCommand stream(ByteBuffer key) { + + Assert.notNull(key, "Key must not be null"); + + return new DeleteExCommand(key, Collections.emptyList(), XDelOptions.defaultOptions()); + } + + /** + * Applies the {@literal recordIds}. Constructs a new command instance with all previously configured properties. + * + * @param recordIds must not be {@literal null}. + * @return a new {@link DeleteExCommand} with {@literal recordIds} applied. + */ + public DeleteExCommand records(String... recordIds) { + + Assert.notNull(recordIds, "RecordIds must not be null"); + + return records(Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new)); + } + + /** + * Applies the {@literal recordIds}. Constructs a new command instance with all previously configured properties. + * + * @param recordIds must not be {@literal null}. + * @return a new {@link DeleteExCommand} with {@literal recordIds} applied. + */ + public DeleteExCommand records(RecordId... recordIds) { + + Assert.notNull(recordIds, "RecordIds must not be null"); + + List newRecordIds = new ArrayList<>(getRecordIds().size() + recordIds.length); + newRecordIds.addAll(getRecordIds()); + newRecordIds.addAll(Arrays.asList(recordIds)); + + return new DeleteExCommand(getKey(), newRecordIds, options); + } + + /** + * Applies the {@link XDelOptions}. Constructs a new command instance with all previously configured properties. + * + * @param options must not be {@literal null}. + * @return a new {@link DeleteExCommand} with {@link XDelOptions} applied. + */ + public DeleteExCommand withOptions(XDelOptions options) { + + Assert.notNull(options, "XDelOptions must not be null"); + + return new DeleteExCommand(getKey(), recordIds, options); + } + + public List getRecordIds() { + return recordIds; + } + + public XDelOptions getOptions() { + return options; + } + } + + /** + * {@code XACKDEL} command parameters. + * + * @author Viktoriya Kutsarova + * @since 4.0 + * @see Redis Documentation: XACKDEL + */ + class AcknowledgeDeleteCommand extends KeyCommand { + + private final @Nullable String group; + private final List recordIds; + private final XDelOptions options; + + private AcknowledgeDeleteCommand(@Nullable ByteBuffer key, @Nullable String group, List recordIds, + XDelOptions options) { + + super(key); + this.group = group; + this.recordIds = recordIds; + this.options = options; + } + + /** + * Creates a new {@link AcknowledgeDeleteCommand} given a {@link ByteBuffer key}. + * + * @param key must not be {@literal null}. + * @return a new {@link AcknowledgeDeleteCommand} for {@link ByteBuffer key}. + */ + public static AcknowledgeDeleteCommand stream(ByteBuffer key) { + + Assert.notNull(key, "Key must not be null"); + + return new AcknowledgeDeleteCommand(key, null, Collections.emptyList(), XDelOptions.defaultOptions()); + } + + /** + * Applies the {@literal group}. Constructs a new command instance with all previously configured properties. + * + * @param group must not be {@literal null}. + * @return a new {@link AcknowledgeDeleteCommand} with {@literal group} applied. + */ + public AcknowledgeDeleteCommand group(String group) { + + Assert.notNull(group, "Group must not be null"); + + return new AcknowledgeDeleteCommand(getKey(), group, recordIds, options); + } + + /** + * Applies the {@literal recordIds}. Constructs a new command instance with all previously configured properties. + * + * @param recordIds must not be {@literal null}. + * @return a new {@link AcknowledgeDeleteCommand} with {@literal recordIds} applied. + */ + public AcknowledgeDeleteCommand records(String... recordIds) { + + Assert.notNull(recordIds, "RecordIds must not be null"); + + return records(Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new)); + } + + /** + * Applies the {@literal recordIds}. Constructs a new command instance with all previously configured properties. + * + * @param recordIds must not be {@literal null}. + * @return a new {@link AcknowledgeDeleteCommand} with {@literal recordIds} applied. + */ + public AcknowledgeDeleteCommand records(RecordId... recordIds) { + + Assert.notNull(recordIds, "RecordIds must not be null"); + + List newRecordIds = new ArrayList<>(getRecordIds().size() + recordIds.length); + newRecordIds.addAll(getRecordIds()); + newRecordIds.addAll(Arrays.asList(recordIds)); + + return new AcknowledgeDeleteCommand(getKey(), group, newRecordIds, options); + } + + /** + * Applies the {@link XDelOptions}. Constructs a new command instance with all previously configured properties. + * + * @param options must not be {@literal null}. + * @return a new {@link AcknowledgeDeleteCommand} with {@link XDelOptions} applied. + */ + public AcknowledgeDeleteCommand withOptions(XDelOptions options) { + + Assert.notNull(options, "XDelOptions must not be null"); + + return new AcknowledgeDeleteCommand(getKey(), group, recordIds, options); + } + + public @Nullable String getGroup() { + return group; + } + + public List getRecordIds() { + return recordIds; + } + + public XDelOptions getOptions() { + return options; + } + } + /** * Removes the specified entries from the stream. Returns the number of items deleted, that may be different from the * number of IDs passed in case certain IDs do not exist. @@ -646,6 +842,128 @@ default Mono xDel(ByteBuffer key, RecordId... recordIds) { */ Flux> xDel(Publisher commands); + /** + * Deletes one or multiple entries from the stream at the specified key with extended options. + *

+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries + * are deleted concerning consumer groups. + * + * @param key the stream key. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return {@link Flux} emitting {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + default Flux xDelEx(ByteBuffer key, + XDelOptions options, String... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(options, "XDelOptions must not be null"); + Assert.notNull(recordIds, "RecordIds must not be null"); + + return xDelEx(Mono.just(DeleteExCommand.stream(key).withOptions(options).records(recordIds))) + .flatMap(response -> Flux.fromIterable(response.getOutput())); + } + + /** + * Deletes one or multiple entries from the stream at the specified key with extended options. + *

+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries + * are deleted concerning consumer groups. + * + * @param key the stream key. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return {@link Flux} emitting {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + default Flux xDelEx(ByteBuffer key, + XDelOptions options, RecordId... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(options, "XDelOptions must not be null"); + Assert.notNull(recordIds, "RecordIds must not be null"); + + return xDelEx(Mono.just(DeleteExCommand.stream(key).withOptions(options).records(recordIds))) + .flatMap(response -> Flux.fromIterable(response.getOutput())); + } + + /** + * Deletes one or multiple entries from the stream with extended options. + * + * @param commands must not be {@literal null}. + * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} per {@link DeleteExCommand}. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + Flux>> xDelEx( + Publisher commands); + + /** + * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key. + *

+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the + * given consumer group and simultaneously attempts to delete the corresponding entries from the stream. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return {@link Flux} emitting {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + default Flux xAckDel(ByteBuffer key, String group, + XDelOptions options, String... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(group, "Group must not be null"); + Assert.notNull(options, "XDelOptions must not be null"); + Assert.notNull(recordIds, "RecordIds must not be null"); + + return xAckDel(Mono.just(AcknowledgeDeleteCommand.stream(key).group(group).withOptions(options).records(recordIds))) + .flatMap(response -> Flux.fromIterable(response.getOutput())); + } + + /** + * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key. + *

+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the + * given consumer group and simultaneously attempts to delete the corresponding entries from the stream. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return {@link Flux} emitting {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + default Flux xAckDel(ByteBuffer key, String group, + XDelOptions options, RecordId... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(group, "Group must not be null"); + Assert.notNull(options, "XDelOptions must not be null"); + Assert.notNull(recordIds, "RecordIds must not be null"); + + return xAckDel(Mono.just(AcknowledgeDeleteCommand.stream(key).group(group).withOptions(options).records(recordIds))) + .flatMap(response -> Flux.fromIterable(response.getOutput())); + } + + /** + * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group. + * + * @param commands must not be {@literal null}. + * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} per {@link AcknowledgeDeleteCommand}. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + Flux>> xAckDel( + Publisher commands); + /** * Get the size of the stream stored at {@literal key}. * @@ -1565,13 +1883,11 @@ default Flux xRevRange(ByteBuffer key, Range range, Li */ class TrimCommand extends KeyCommand { - private @Nullable Long count; - private boolean approximateTrimming; + private final XTrimOptions options; - private TrimCommand(@Nullable ByteBuffer key, @Nullable Long count, boolean approximateTrimming) { + private TrimCommand(@Nullable ByteBuffer key, XTrimOptions options) { super(key); - this.count = count; - this.approximateTrimming = approximateTrimming; + this.options = options; } /** @@ -1584,18 +1900,18 @@ public static TrimCommand stream(ByteBuffer key) { Assert.notNull(key, "Key must not be null"); - return new TrimCommand(key, null, false); + return new TrimCommand(key, XTrimOptions.none()); } /** - * Applies the numeric {@literal count}. Constructs a new command instance with all previously configured + * Applies the numeric {@literal limit}. Constructs a new command instance with all previously configured * properties. * - * @param count - * @return a new {@link TrimCommand} with {@literal count} applied. + * @param limit + * @return a new {@link TrimCommand} with {@literal limit} applied. */ - public TrimCommand to(long count) { - return new TrimCommand(getKey(), count, approximateTrimming); + public TrimCommand to(long limit) { + return new TrimCommand(getKey(), options.limit(limit)); } /** @@ -1616,18 +1932,39 @@ public TrimCommand approximate() { * @since 2.4 */ public TrimCommand approximate(boolean approximateTrimming) { - return new TrimCommand(getKey(), count, approximateTrimming); + return new TrimCommand(getKey(), options.approximateTrimming(approximateTrimming)); + } + + /** + * Apply the given {@link XTrimOptions} to configure the {@literal XTRIM} command. + *

+ * This method allows setting all XTRIM options at once, including trimming strategies + * ({@literal MAXLEN}, {@literal MINID}), stream creation behavior ({@literal NOMKSTREAM}), + * and other parameters. Constructs a new command instance with all previously configured + * properties except the options, which are replaced by the provided {@link XTrimOptions}. + * + * @param options the {@link XTrimOptions} to apply. Must not be {@literal null}. + * @return a new {@link TrimCommand} with the specified options applied. + * @since 4.0 + */ + public TrimCommand withOptions(XTrimOptions options) { + return new TrimCommand(getKey(), options); } /** * @return can be {@literal null}. */ public @Nullable Long getCount() { - return count; + return options.getLimit(); } + public boolean isApproximateTrimming() { - return approximateTrimming; + return options.isApproximateTrimming(); + } + + public XTrimOptions getOptions() { + return options; } } @@ -1661,6 +1998,14 @@ default Mono xTrim(ByteBuffer key, long count, boolean approximateTrimming .map(NumericResponse::getOutput); } + default Mono xTrim(ByteBuffer key, XTrimOptions options) { + + Assert.notNull(key, "Key must not be null"); + + return xTrim(Mono.just(TrimCommand.stream(key).withOptions(options))).next() + .map(NumericResponse::getOutput); + } + /** * Trims the stream to {@code count} elements. * diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index 2204513eda..7a9ee81fd0 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -46,6 +46,7 @@ * @author Dengliming * @author Mark John Moreno * @author Jeonggyu Choi + * @author Viktoriya Kutsarova * @since 2.2 * @see RedisCommands * @see Redis Documentation - Streams @@ -112,134 +113,210 @@ default RecordId xAdd(@NonNull MapRecord record) { */ RecordId xAdd(MapRecord record, @NonNull XAddOptions options); - /** - * Additional options applicable for {@literal XADD} command. - * - * @author Christoph Strobl - * @author Mark John Moreno - * @author Liming Deng - * @since 2.3 - */ - @NullMarked - class XAddOptions { - private static final XAddOptions NONE = new XAddOptions(null, false, false, null); + interface TrimStrategy { + T threshold(); + } - private final @Nullable Long maxlen; - private final boolean nomkstream; - private final boolean approximateTrimming; - private final @Nullable RecordId minId; + record MaxLenTrimStrategy(Long threshold) implements TrimStrategy { + } - private XAddOptions(@Nullable Long maxlen, boolean nomkstream, boolean approximateTrimming, - @Nullable RecordId minId) { - this.maxlen = maxlen; - this.nomkstream = nomkstream; - this.approximateTrimming = approximateTrimming; - this.minId = minId; + record MinIdTrimStrategy(RecordId threshold) implements TrimStrategy { + } + + enum TrimOperator { + EXACT, + APPROXIMATE + } + + @NullMarked + class TrimOptions { + + private final TrimStrategy trimStrategy; + private final TrimOperator trimOperator; + private final @Nullable Long limit; + private final @Nullable StreamDeletionPolicy deletionPolicy; + + private TrimOptions(TrimStrategy trimStrategy, TrimOperator trimOperator, @Nullable Long limit, @Nullable StreamDeletionPolicy deletionPolicy) { + this.trimStrategy = trimStrategy; + this.trimOperator = trimOperator; + this.limit = limit; + this.deletionPolicy = deletionPolicy; } - /** - * @return - */ - public static XAddOptions none() { - return NONE; + public static TrimOptions maxLen(Long maxLen) { + return new TrimOptions(new MaxLenTrimStrategy(maxLen), TrimOperator.EXACT, null, null); } - /** - * Disable creation of stream if it does not already exist. - * - * @return new instance of {@link XAddOptions}. - * @since 2.6 - */ - public static XAddOptions makeNoStream() { - return new XAddOptions(null, true, false, null); + public static TrimOptions minId(RecordId minId) { + return new TrimOptions(new MinIdTrimStrategy(minId), TrimOperator.EXACT, null, null); } /** - * Disable creation of stream if it does not already exist. + * Apply specified trim operator. + *

+ * This is a member method that preserves all other options. * - * @param makeNoStream {@code true} to not create a stream if it does not already exist. - * @return new instance of {@link XAddOptions}. - * @since 2.6 + * @param trimOperator the operator to use when trimming + * @return new instance of {@link XTrimOptions}. */ - public static XAddOptions makeNoStream(boolean makeNoStream) { - return new XAddOptions(null, makeNoStream, false, null); + public TrimOptions withTrimOperator(TrimOperator trimOperator) { + return new TrimOptions(trimStrategy, trimOperator, limit, deletionPolicy); } /** - * Limit the size of the stream to the given maximum number of elements. + * Limit the maximum number of entries considered when trimming. + *

+ * This is a member method that preserves all other options. * - * @return new instance of {@link XAddOptions}. + * @param limit the maximum number of entries to examine for trimming. + * @return new instance of {@link XTrimOptions}. */ - public static XAddOptions maxlen(long maxlen) { - return new XAddOptions(maxlen, false, false, null); + public TrimOptions withLimit(long limit) { + return new TrimOptions(trimStrategy, trimOperator, limit, deletionPolicy); } /** - * Apply {@code MINID} trimming strategy, that evicts entries with IDs lower than the one specified. + * Set the deletion policy for trimming. + *

+ * This is a member method that preserves all other options. * - * @param minId the minimum record Id to retain. - * @return new instance of {@link XAddOptions}. - * @since 2.7 + * @param deletionPolicy the deletion policy to apply. + * @return new instance of {@link XTrimOptions}. */ - public XAddOptions minId(RecordId minId) { - return new XAddOptions(maxlen, nomkstream, approximateTrimming, minId); + public TrimOptions withDeletionPolicy(StreamDeletionPolicy deletionPolicy) { + return new TrimOptions(trimStrategy, trimOperator, limit, deletionPolicy); + } + + public TrimStrategy getTrimStrategy() { + return trimStrategy; } /** - * Apply efficient trimming for capped streams using the {@code ~} flag. - * - * @return new instance of {@link XAddOptions}. + * @return strategy to use when trimming entries */ - public XAddOptions approximateTrimming(boolean approximateTrimming) { - return new XAddOptions(maxlen, nomkstream, approximateTrimming, minId); + public TrimOperator getTrimOperator() { + return trimOperator; } /** - * @return {@literal true} if {@literal NOMKSTREAM} is set. - * @since 2.6 + * @return the limit to retain during trimming. + * @since 4.0 */ - public boolean isNoMkStream() { - return nomkstream; + public @Nullable Long getLimit() { + return limit; } /** - * Limit the size of the stream to the given maximum number of elements. - * - * @return can be {@literal null}. + * @return {@literal true} if {@literal LIMIT} is set. + * @since 4.0 */ - public @Nullable Long getMaxlen() { - return maxlen; + public boolean hasLimit() { + return limit != null; } /** - * @return {@literal true} if {@literal MAXLEN} is set. + * @return the deletion policy. + * @since 4.0 */ - public boolean hasMaxlen() { - return maxlen != null; + public @Nullable StreamDeletionPolicy getDeletionPolicy() { + return deletionPolicy; } /** - * @return {@literal true} if {@literal approximateTrimming} is set. + * @return {@literal true} if {@literal DELETION_POLICY} is set. + * @since 4.0 */ - public boolean isApproximateTrimming() { - return approximateTrimming; + public boolean hasDeletionPolicy() { + return deletionPolicy != null; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TrimOptions that)) { + return false; + } + if (this.trimStrategy.equals(that.trimStrategy)) { + return false; + } + if (this.trimOperator.equals(that.trimOperator)) { + return false; + } + return ObjectUtils.nullSafeEquals(deletionPolicy, that.deletionPolicy); + } + + @Override + public int hashCode() { + int result = trimStrategy.hashCode(); + result = 31 * result + trimOperator.hashCode(); + result = 31 * result + ObjectUtils.nullSafeHashCode(limit); + result = 31 * result + ObjectUtils.nullSafeHashCode(deletionPolicy); + return result; + } + } + + @NullMarked + record XTrimOptions(TrimOptions trimOptions) { + + public static XTrimOptions of(TrimOptions trimOptions) { + return new XTrimOptions(trimOptions); + } + } + + /** + * Additional options applicable for {@literal XADD} command. + * + * @author Christoph Strobl + * @author Mark John Moreno + * @author Liming Deng + * @since 2.3 + */ + @NullMarked + class XAddOptions { + + private final boolean nomkstream; + private final @Nullable TrimOptions trimOptions; + + private XAddOptions(boolean nomkstream, @Nullable TrimOptions trimOptions) { + this.nomkstream = nomkstream; + this.trimOptions = trimOptions; } /** - * @return the minimum record Id to retain during trimming. - * @since 2.7 + * Create default add options. + * + * @return new instance of {@link XAddOptions} with defaults values + * @since 2.6 */ - public @Nullable RecordId getMinId() { - return minId; + public static XAddOptions none() { + return new XAddOptions(false, null); + } + + public XAddOptions withNoMkStream(boolean nomkstream) { + return new XAddOptions(nomkstream, trimOptions); + } + + public XAddOptions withTrimOptions(@Nullable TrimOptions trimOptions) { + return new XAddOptions(nomkstream, trimOptions); } /** - * @return {@literal true} if {@literal MINID} is set. - * @since 2.7 + * @return {@literal true} if {@literal NOMKSTREAM} is set. + * @since 2.6 */ - public boolean hasMinId() { - return minId != null; + public boolean isNoMkStream() { + return nomkstream; + } + + public boolean hasTrimOptions() { + return trimOptions != null; + } + + public @Nullable TrimOptions getTrimOptions() { + return trimOptions; } @Override @@ -250,28 +327,96 @@ public boolean equals(@Nullable Object o) { if (!(o instanceof XAddOptions that)) { return false; } - if (nomkstream != that.nomkstream) { - return false; - } - if (approximateTrimming != that.approximateTrimming) { + if (!(ObjectUtils.nullSafeEquals(this.trimOptions, that.trimOptions))) { return false; } - if (!ObjectUtils.nullSafeEquals(maxlen, that.maxlen)) { - return false; - } - return ObjectUtils.nullSafeEquals(minId, that.minId); + return nomkstream == that.nomkstream; } @Override public int hashCode() { - int result = ObjectUtils.nullSafeHashCode(maxlen); + int result = ObjectUtils.nullSafeHashCode(this.trimOptions); result = 31 * result + (nomkstream ? 1 : 0); - result = 31 * result + (approximateTrimming ? 1 : 0); - result = 31 * result + ObjectUtils.nullSafeHashCode(minId); return result; } } + /** + * Deletion policy for stream entries. + * + * @author Viktoriya Kutsarova + * @since 4.0 + */ + enum StreamDeletionPolicy { + /** + * Remove entries according to the specified strategy, but preserve existing references. + */ + KEEP_REFERENCES, + /** + * Remove entries according to the specified strategy and remove references. + */ + DELETE_REFERENCES, + /** + * Remove entries that are read and acknowledged and remove references. + */ + ACKNOWLEDGED + } + + /** + * Result of a stream entry deletion operation for {@literal XDELEX} and {@literal XACKDEL} commands. + * + * @author Viktoriya Kutsarova + * @since 4.0 + */ + enum StreamEntryDeletionResult { + + UNKNOWN(-2L), + /** + * The entry ID does not exist in the stream. + */ + NOT_FOUND(-1L), + /** + * The entry was successfully deleted from the stream. + */ + DELETED(1L), + /** + * The entry was acknowledged but not deleted (when using ACKED deletion policy with dangling references). + */ + NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED(2L); + + private final long code; + + StreamEntryDeletionResult(long code) { + this.code = code; + } + + /** + * Get the numeric code for this deletion result. + * + * @return the numeric code: -1 for NOT_FOUND, 1 for DELETED, 2 for NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED + */ + public long getCode() { + return code; + } + + /** + * Convert a numeric code to a {@link StreamEntryDeletionResult}. + * + * @param code the numeric code + * @return the corresponding {@link StreamEntryDeletionResult} + * @throws IllegalArgumentException if the code is not valid + */ + public static StreamEntryDeletionResult fromCode(long code) { + return switch ((int) code) { + case -2 -> UNKNOWN; + case -1 -> NOT_FOUND; + case 1 -> DELETED; + case 2 -> NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED; + default -> throw new IllegalArgumentException("Invalid deletion result code: " + code); + }; + } + } + /** * Change the ownership of a pending message to the given new {@literal consumer} without increasing the delivered * count. @@ -543,6 +688,143 @@ default Long xDel(byte @NonNull [] key, @NonNull String @NonNull... recordIds) { */ Long xDel(byte @NonNull [] key, @NonNull RecordId @NonNull... recordIds); + /** + * Additional options applicable for {@literal XDELEX} and {@literal XACKDEL} commands. + * + * @author Viktoriya Kutsarova + * @since 4.0 + */ + class XDelOptions { + + private static final XDelOptions DEFAULT = new XDelOptions(StreamDeletionPolicy.KEEP_REFERENCES); + + private final @NonNull StreamDeletionPolicy deletionPolicy; + + private XDelOptions(@NonNull StreamDeletionPolicy deletionPolicy) { + this.deletionPolicy = deletionPolicy; + } + + /** + * Create an {@link XDelOptions} instance with default options. + *

+ * This returns the default options for the {@literal XDELEX} and {@literal XACKDEL} commands + * with {@link StreamDeletionPolicy#KEEP_REFERENCES} as the deletion policy, which preserves + * existing references in consumer groups' PELs (similar to the behavior of {@literal XDEL}). + * + * @return a default {@link XDelOptions} instance with {@link StreamDeletionPolicy#KEEP_REFERENCES}. + */ + public static XDelOptions defaults() { + return DEFAULT; + } + + /** + * Set the deletion policy for the delete operation. + * + * @param deletionPolicy the deletion policy to apply. + * @return new instance of {@link XDelOptions}. + */ + public static XDelOptions deletionPolicy(StreamDeletionPolicy deletionPolicy) { + return new XDelOptions(deletionPolicy); + } + + /** + * @return the deletion policy. + */ + @NonNull + public StreamDeletionPolicy getDeletionPolicy() { + return deletionPolicy; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof XDelOptions that)) { + return false; + } + return deletionPolicy.equals(that.deletionPolicy); + } + + @Override + public int hashCode() { + return deletionPolicy.hashCode(); + } + } + + /** + * Deletes one or multiple entries from the stream at the specified key. + *

+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries + * are deleted concerning consumer groups. + * + * @param key the {@literal key} the stream is stored at. + * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaults()} ()} for default behavior. + * @param recordIds the id's of the records to remove. + * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists, + * {@link StreamEntryDeletionResult#DELETED} if the entry was deleted, {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED} + * if the entry was not deleted but there are still dangling references (ACKED deletion policy). + * Returns {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XDELEX + */ + default List xDelEx(byte @NonNull [] key, XDelOptions options, @NonNull String @NonNull... recordIds) { + return xDelEx(key, options, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new)); + } + + /** + * Deletes one or multiple entries from the stream at the specified key. + *

+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries + * are deleted concerning consumer groups. + * + * @param key the {@literal key} the stream is stored at. + * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaults()} ()} for default behavior. + * @param recordIds the id's of the records to remove. + * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists, + * {@link StreamEntryDeletionResult#DELETED} if the entry was deleted, {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED} + * if the entry was not deleted but there are still dangling references (ACKED deletion policy). + * Returns {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XDELEX + */ + List xDelEx(byte @NonNull [] key, XDelOptions options, @NonNull RecordId @NonNull... recordIds); + + /** + * Acknowledges and conditionally deletes one or multiple entries (messages) for a stream consumer group at the specified key. + *

+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the + * given consumer group and simultaneously attempts to delete the corresponding entries from the stream. + * + * @param key the {@literal key} the stream is stored at. + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaults()} ()} for default behavior. + * @param recordIds the id's of the records to acknowledge and remove. + * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#DELETED} if the entry was acknowledged and deleted, + * {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists, {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED} + * if the entry was acknowledged but not deleted (when using ACKED deletion policy). + * Returns {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XACKDEL + */ + default List xAckDel(byte @NonNull [] key, @NonNull String group, XDelOptions options, @NonNull String @NonNull... recordIds) { + return xAckDel(key, group, options, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new)); + } + + /** + * Acknowledges and conditionally deletes one or multiple entries (messages) for a stream consumer group at the specified key. + *

+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the + * given consumer group and simultaneously attempts to delete the corresponding entries from the stream. + * + * @param key the {@literal key} the stream is stored at. + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaults()} ()} for default behavior. + * @param recordIds the id's of the records to acknowledge and remove. + * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#DELETED} if the entry was acknowledged and deleted, + * {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists, {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED} + * if the entry was acknowledged but not deleted (when using ACKED deletion policy). + * Returns {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XACKDEL + */ + List xAckDel(byte @NonNull [] key, @NonNull String group, XDelOptions options, @NonNull RecordId @NonNull... recordIds); /** * Create a consumer group. * @@ -1061,4 +1343,14 @@ public boolean hasMinIdleTime() { * @see Redis Documentation: XTRIM */ Long xTrim(byte @NonNull [] key, long count, boolean approximateTrimming); + + /** + * Trims the stream to {@code count} elements. + * + * @param key the stream key. + * @param options the trimming options. + * @return number of removed entries. {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XTRIM + */ + Long xTrim(byte @NonNull [] key, @NonNull XTrimOptions options); } diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java index 8b4a3c4302..6b40aa2703 100644 --- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java @@ -3095,6 +3095,53 @@ default Long xDel(@NonNull String key, @NonNull String @NonNull... entryIds) { Long xDel(@NonNull String key, @NonNull RecordId @NonNull... recordIds); + /** + * Deletes one or multiple entries from the stream at the specified key. + *

+ * XDELEX is an extension of the Redis Streams XDEL command that provides more control over how message entries + * are deleted concerning consumer groups. + * + * @param key the {@literal key} the stream is stored at. + * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaultOptions()} for default behavior. + * @param recordIds the id's of the records to remove. + * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists, + * {@link StreamEntryDeletionResult#DELETED} if the entry was deleted, + * {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED} + * if the entry was not deleted but there are still dangling references (ACKED deletion policy). + * Returns {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XDELEX + */ + default List xDelEx(@NonNull String key, XDelOptions options, @NonNull String @NonNull... recordIds) { + return xDelEx(key, options, entryIds(recordIds)); + } + + List xDelEx(@NonNull String key, XDelOptions options, @NonNull RecordId @NonNull... recordIds); + + /** + * Acknowledges and conditionally deletes one or multiple entries (messages) for a stream consumer group at the specified key. + *

+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the + * given consumer group and simultaneously attempts to delete the corresponding entries from the stream. + * + * @param key the {@literal key} the stream is stored at. + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. Use {@link XDelOptions#defaultOptions()} for default behavior. + * @param recordIds the id's of the records to acknowledge and remove. + * @return list of {@link StreamEntryDeletionResult} for each ID: {@link StreamEntryDeletionResult#DELETED} if + * the entry was acknowledged and deleted, {@link StreamEntryDeletionResult#NOT_FOUND} if no such ID exists, + * {@link StreamEntryDeletionResult#NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED} if the entry was acknowledged + * but not deleted (when using ACKED deletion policy). + * Returns {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XACKDEL + */ + default List xAckDel(@NonNull String key, @NonNull String group, XDelOptions options, + @NonNull String @NonNull... recordIds) { + return xAckDel(key, group, options, entryIds(recordIds)); + } + + List xAckDel(@NonNull String key, @NonNull String group, XDelOptions options, + @NonNull RecordId @NonNull... recordIds); + /** * Create a consumer group. * @@ -3522,4 +3569,14 @@ List xRevRange(@NonNull String key, org.springframework.data.domai * @see Redis Documentation: XTRIM */ Long xTrim(@NonNull String key, long count, boolean approximateTrimming); + + /** + * Trims the stream to {@code count} elements. + * + * @param key the stream key. + * @param options the trimming options. + * @return number of removed entries. {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XTRIM + */ + Long xTrim(@NonNull String key, @NonNull XTrimOptions options); } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index 5e832e9af1..00b3dcd8c1 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -33,6 +33,7 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisStreamCommands; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; @@ -44,6 +45,7 @@ import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamReadOptions; import org.springframework.util.Assert; +import redis.clients.jedis.params.XTrimParams; /** * @author Dengliming @@ -144,6 +146,37 @@ public Long xDel(byte[] key, RecordId... recordIds) { } } + @Override + public List xDelEx(byte[] key, XDelOptions options, RecordId... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(recordIds, "recordIds must not be null"); + + try { + return StreamConverters.toStreamEntryDeletionResults(connection.getCluster().xdelex(key, + StreamConverters.toStreamDeletionPolicy(options), + entryIdsToBytes(Arrays.asList(recordIds)))); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + + @Override + public List xAckDel(byte[] key, String group, XDelOptions options, RecordId... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(group, "Group must not be null"); + Assert.notNull(recordIds, "recordIds must not be null"); + + try { + return StreamConverters.toStreamEntryDeletionResults(connection.getCluster().xackdel(key, JedisConverters.toBytes(group), + StreamConverters.toStreamDeletionPolicy(options), + entryIdsToBytes(Arrays.asList(recordIds)))); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + @Override public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset) { return xGroupCreate(key, groupName, readOffset, false); @@ -380,6 +413,20 @@ public Long xTrim(byte[] key, long count, boolean approximateTrimming) { } } + @Override + public Long xTrim(byte[] key, XTrimOptions options) { + + Assert.notNull(key, "Key must not be null"); + + XTrimParams xTrimParams = StreamConverters.toXTrimParams(options); + + try { + return connection.getCluster().xtrim(key, xTrimParams); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + private DataAccessException convertJedisAccessException(Exception ex) { return connection.convertJedisAccessException(ex); } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java index d6d6570743..f619068b6a 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java @@ -24,6 +24,7 @@ import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; +import redis.clients.jedis.params.XTrimParams; import redis.clients.jedis.resps.StreamConsumerInfo; import redis.clients.jedis.resps.StreamGroupInfo; @@ -132,6 +133,33 @@ public Long xDel(byte @NonNull [] key, @NonNull RecordId @NonNull... recordIds) StreamConverters.entryIdsToBytes(Arrays.asList(recordIds))); } + @Override + public List xDelEx(byte @NonNull [] key, @NonNull XDelOptions options, + @NonNull RecordId @NonNull... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(options, "Options must not be null"); + Assert.notNull(recordIds, "recordIds must not be null"); + + return connection.invoke().from(Jedis::xdelex, ResponseCommands::xdelex, key, + StreamConverters.toStreamDeletionPolicy(options), StreamConverters.entryIdsToBytes(Arrays.asList(recordIds))) + .get(StreamConverters::toStreamEntryDeletionResults); + } + + @Override + public List xAckDel(byte @NonNull [] key, @NonNull String group, @NonNull XDelOptions options, + @NonNull RecordId @NonNull... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(group, "Group must not be null"); + Assert.notNull(options, "Options must not be null"); + Assert.notNull(recordIds, "recordIds must not be null"); + + return connection.invoke().from(Jedis::xackdel, ResponseCommands::xackdel, key, JedisConverters.toBytes(group), + StreamConverters.toStreamDeletionPolicy(options), StreamConverters.entryIdsToBytes(Arrays.asList(recordIds))) + .get(StreamConverters::toStreamEntryDeletionResults); + } + @Override public String xGroupCreate(byte @NonNull [] key, @NonNull String groupName, @NonNull ReadOffset readOffset) { return xGroupCreate(key, groupName, readOffset, false); @@ -319,4 +347,15 @@ public Long xTrim(byte @NonNull [] key, long count, boolean approximateTrimming) return connection.invoke().just(Jedis::xtrim, PipelineBinaryCommands::xtrim, key, count, approximateTrimming); } + @Override + public Long xTrim(byte @NonNull [] key, @NonNull XTrimOptions options) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(options, "XTrimOptions must not be null"); + + XTrimParams xTrimParams = StreamConverters.toXTrimParams(options); + + return connection.invoke().just(Jedis::xtrim, PipelineBinaryCommands::xtrim, key, xTrimParams); + } + } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java index 5a534864d4..ff5b31ecbb 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -15,13 +15,16 @@ */ package org.springframework.data.redis.connection.jedis; +import org.springframework.data.redis.connection.RedisStreamCommands; import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.args.StreamDeletionPolicy; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XClaimParams; import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; +import redis.clients.jedis.params.XTrimParams; import redis.clients.jedis.resps.StreamEntry; import redis.clients.jedis.resps.StreamPendingEntry; @@ -37,7 +40,12 @@ import org.jspecify.annotations.Nullable; import org.springframework.data.domain.Range; -import org.springframework.data.redis.connection.RedisStreamCommands; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.PendingMessage; @@ -57,6 +65,7 @@ * @author dengliming * @author Mark Paluch * @author Jeonggyu Choi + * @author Viktoriya Kutsarova * @since 2.3 */ class StreamConverters { @@ -207,7 +216,7 @@ static org.springframework.data.redis.connection.stream.PendingMessages toPendin } @SuppressWarnings("NullAway") - public static XAddParams toXAddParams(RecordId recordId, RedisStreamCommands.XAddOptions options) { + public static XAddParams toXAddParams(RecordId recordId, XAddOptions options) { XAddParams params = new XAddParams(); params.id(toStreamEntryId(recordId.getValue())); @@ -228,6 +237,49 @@ public static XAddParams toXAddParams(RecordId recordId, RedisStreamCommands.XAd params.approximateTrimming(); } + if (options.isExactTrimming()) { + params.exactTrimming(); + } + + if (options.hasLimit()) { + params.limit(options.getLimit()); + } + + if (options.hasDeletionPolicy()) { + params.trimmingMode(toStreamDeletionPolicy(options.getDeletionPolicy())); + } + + return params; + } + + public static XTrimParams toXTrimParams(XTrimOptions options) { + + XTrimParams params = new XTrimParams(); + + if (options.hasMaxlen()) { + params.maxLen(options.getMaxlen()); + } + + if (options.hasMinId()) { + params.minId(options.getMinId().getValue()); + } + + if (options.isApproximateTrimming()) { + params.approximateTrimming(); + } + + if (options.isExactTrimming()) { + params.exactTrimming(); + } + + if (options.hasLimit()) { + params.limit(options.getLimit()); + } + + if (options.hasDeletionPolicy()) { + params.trimmingMode(toStreamDeletionPolicy(options.getDeletionPolicy())); + } + return params; } @@ -248,7 +300,16 @@ private static StreamEntryID toStreamEntryId(String value) { return new StreamEntryID(value); } - public static XClaimParams toXClaimParams(RedisStreamCommands.XClaimOptions options) { + private static StreamDeletionPolicy toStreamDeletionPolicy(RedisStreamCommands.StreamDeletionPolicy deletionPolicy) { + + return switch (deletionPolicy) { + case KEEP_REFERENCES -> StreamDeletionPolicy.KEEP_REFERENCES; + case DELETE_REFERENCES -> StreamDeletionPolicy.DELETE_REFERENCES; + case ACKNOWLEDGED -> StreamDeletionPolicy.ACKNOWLEDGED; + }; + } + + public static XClaimParams toXClaimParams(XClaimOptions options) { XClaimParams params = XClaimParams.xClaimParams(); @@ -305,7 +366,7 @@ public static XReadGroupParams toXReadGroupParams(StreamReadOptions readOptions) } @SuppressWarnings("NullAway") - public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOptions options) { + public static XPendingParams toXPendingParams(XPendingOptions options) { Range range = (Range) options.getRange(); XPendingParams xPendingParams = XPendingParams.xPendingParams(StreamConverters.getLowerValue(range), @@ -321,4 +382,39 @@ public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOption return xPendingParams; } + public static StreamDeletionPolicy toStreamDeletionPolicy(XDelOptions options) { + return toStreamDeletionPolicy(options.getDeletionPolicy()); + } + + /** + * Convert Jedis {@link redis.clients.jedis.resps.StreamEntryDeletionResult} to Spring Data Redis + * {@link RedisStreamCommands.StreamEntryDeletionResult}. + * + * @param result the Jedis deletion result enum + * @return the corresponding Spring Data Redis enum + * @since 4.0 + */ + public static RedisStreamCommands.StreamEntryDeletionResult toStreamEntryDeletionResult( + redis.clients.jedis.resps.StreamEntryDeletionResult result) { + return switch (result) { + case NOT_FOUND -> RedisStreamCommands.StreamEntryDeletionResult.NOT_FOUND; + case DELETED -> RedisStreamCommands.StreamEntryDeletionResult.DELETED; + case NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED -> + RedisStreamCommands.StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED; + }; + } + + /** + * Convert a list of Jedis {@link redis.clients.jedis.resps.StreamEntryDeletionResult} to a {@link List} of Spring Data Redis + * {@link RedisStreamCommands.StreamEntryDeletionResult}. + * + * @param results the list of Jedis deletion result enums + * @return the list of Spring Data Redis deletion result enums + * @since 4.0 + */ + public static List toStreamEntryDeletionResults( + List results) { + return results.stream().map(StreamConverters::toStreamEntryDeletionResult).collect(Collectors.toList()); + } + } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java index 1103b5400d..f1b714e0af 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java @@ -1147,6 +1147,8 @@ static class TypeHints { COMMAND_OUTPUT_TYPE_MAPPING.put(SUNIONSTORE, IntegerOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(STRLEN, IntegerOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(TTL, IntegerOutput.class); + COMMAND_OUTPUT_TYPE_MAPPING.put(XACK, IntegerOutput.class); + COMMAND_OUTPUT_TYPE_MAPPING.put(XDEL, IntegerOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(XLEN, IntegerOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(XTRIM, IntegerOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(ZADD, IntegerOutput.class); @@ -1232,6 +1234,7 @@ static class TypeHints { COMMAND_OUTPUT_TYPE_MAPPING.put(TYPE, StatusOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(WATCH, StatusOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(UNWATCH, StatusOutput.class); + COMMAND_OUTPUT_TYPE_MAPPING.put(XGROUP, StatusOutput.class); // VALUE LIST COMMAND_OUTPUT_TYPE_MAPPING.put(HMGET, ValueListOutput.class); @@ -1277,6 +1280,10 @@ static class TypeHints { COMMAND_OUTPUT_TYPE_MAPPING.put(SINTER, ValueSetOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(SMEMBERS, ValueSetOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(SUNION, ValueSetOutput.class); + + // ENUM SET + COMMAND_OUTPUT_TYPE_MAPPING.put(XACKDEL, EnumSetOutput.class); + COMMAND_OUTPUT_TYPE_MAPPING.put(XDELEX, EnumSetOutput.class); } /** diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java index 6291583d3f..264b235e9f 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java @@ -36,7 +36,10 @@ import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; import org.springframework.data.redis.connection.ReactiveStreamCommands; +import org.springframework.data.redis.connection.ReactiveStreamCommands.AcknowledgeDeleteCommand; +import org.springframework.data.redis.connection.ReactiveStreamCommands.DeleteExCommand; import org.springframework.data.redis.connection.ReactiveStreamCommands.GroupCommand.GroupCommandAction; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.stream.ByteBufferRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.PendingMessages; @@ -99,18 +102,7 @@ public Flux> xAdd(Publisher new CommandResponse<>(command, RecordId.of(value))); @@ -160,6 +152,43 @@ public Flux> xDel(Publisher })); } + @Override + public Flux>> xDelEx(Publisher commands) { + + return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + + Assert.notNull(command.getKey(), "Key must not be null"); + Assert.notNull(command.getRecordIds(), "recordIds must not be null"); + + return cmd.xdelex(command.getKey(), + StreamConverters.toXDelArgs(command.getOptions()), + entryIdsToString(command.getRecordIds())) + .map(StreamConverters::toStreamEntryDeletionResult) + .collectList() + .map(results -> new CommandResponse<>(command, results)); + })); + } + + @Override + public Flux>> xAckDel( + Publisher commands) { + + return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + + Assert.notNull(command.getKey(), "Key must not be null"); + Assert.notNull(command.getGroup(), "Group must not be null"); + Assert.notNull(command.getRecordIds(), "recordIds must not be null"); + + return cmd.xackdel(command.getKey(), + ByteUtils.getByteBuffer(command.getGroup()), + StreamConverters.toXDelArgs(command.getOptions()), + entryIdsToString(command.getRecordIds())) + .map(StreamConverters::toStreamEntryDeletionResult) + .collectList() + .map(results -> new CommandResponse<>(command, results)); + })); + } + @Override @SuppressWarnings({ "unchecked", "rawtypes" }) public Flux> xGroup(Publisher commands) { @@ -370,9 +399,9 @@ public Flux> xTrim(Publisher comm return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null"); - Assert.notNull(command.getCount(), "Count must not be null"); + Assert.notNull(command.getOptions(), "Options must not be null"); - return cmd.xtrim(command.getKey(), command.isApproximateTrimming(), command.getCount()) + return cmd.xtrim(command.getKey(), StreamConverters.toXTrimArgs(command.getOptions())) .map(value -> new NumericResponse<>(command, value)); })); } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index d663f17d86..e47c3fbe16 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -20,6 +20,7 @@ import io.lettuce.core.XGroupCreateArgs; import io.lettuce.core.XPendingArgs; import io.lettuce.core.XReadArgs; +import io.lettuce.core.XTrimArgs; import io.lettuce.core.api.async.RedisStreamAsyncCommands; import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; @@ -84,16 +85,7 @@ public RecordId xAdd(@NonNull MapRecord xDelEx(byte @NonNull [] key, @NonNull XDelOptions options, + @NonNull RecordId @NonNull... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(options, "Options must not be null"); + Assert.notNull(recordIds, "recordIds must not be null"); + + return connection.invoke().from(RedisStreamAsyncCommands::xdelex, key, StreamConverters.toXDelArgs(options), + entryIdsToString(recordIds)).get(StreamConverters::toStreamEntryDeletionResults); + } + + @Override + public List xAckDel(byte @NonNull [] key, @NonNull String group, + @NonNull XDelOptions options, @NonNull RecordId @NonNull... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(group, "Group must not be null"); + Assert.notNull(options, "Options must not be null"); + Assert.notNull(recordIds, "recordIds must not be null"); + + return connection.invoke().from(RedisStreamAsyncCommands::xackdel, key, LettuceConverters.toBytes(group), + StreamConverters.toXDelArgs(options), entryIdsToString(recordIds)) + .get(StreamConverters::toStreamEntryDeletionResults); + } + @Override public String xGroupCreate(byte @NonNull [] key, @NonNull String groupName, @NonNull ReadOffset readOffset) { return xGroupCreate(key, groupName, readOffset, false); @@ -324,6 +342,17 @@ public Long xTrim(byte @NonNull [] key, long count, boolean approximateTrimming) return connection.invoke().just(RedisStreamAsyncCommands::xtrim, key, approximateTrimming, count); } + @Override + public Long xTrim(byte @NonNull [] key, @NonNull XTrimOptions options) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(options, "XTrimOptions must not be null"); + + XTrimArgs xTrimArgs = StreamConverters.toXTrimArgs(options); + + return connection.invoke().just(RedisStreamAsyncCommands::xtrim, key, xTrimArgs); + } + RedisClusterAsyncCommands getAsyncDedicatedConnection() { return connection.getAsyncDedicatedConnection(); } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java index ae17a9ca70..82b1cba101 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java @@ -15,17 +15,25 @@ */ package org.springframework.data.redis.connection.lettuce; +import io.lettuce.core.StreamDeletionPolicy; import io.lettuce.core.StreamMessage; +import io.lettuce.core.XAddArgs; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XReadArgs; +import io.lettuce.core.XTrimArgs; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; import java.time.Duration; import java.util.List; import org.springframework.core.convert.converter.Converter; +import org.springframework.data.redis.connection.RedisStreamCommands; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.PendingMessagesSummary; @@ -41,6 +49,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author Viktoriya Kutsarova * @since 2.2 */ @SuppressWarnings({ "rawtypes" }) @@ -67,6 +76,22 @@ static XClaimArgs toXClaimArgs(XClaimOptions options) { return XClaimOptionsToXClaimArgsConverter.INSTANCE.convert(options); } + static XAddArgs toXAddArgs(RecordId recordId, XAddOptions options) { + XAddArgs args = XAddOptionsToXAddArgsConverter.INSTANCE.convert(options); + if (!recordId.shouldBeAutoGenerated()) { + args.id(recordId.getValue()); + } + return args; + } + + static XTrimArgs toXTrimArgs(XTrimOptions options) { + return XTrimOptionsToXTrimArgsConverter.INSTANCE.convert(options); + } + + static StreamDeletionPolicy toXDelArgs(XDelOptions options) { + return toStreamDeletionPolicy(options.getDeletionPolicy()); + } + static Converter, ByteRecord> byteRecordConverter() { return (it) -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBytes(it.getBody()); } @@ -169,4 +194,121 @@ public XClaimArgs convert(XClaimOptions source) { } } + + /** + * {@link Converter} to convert {@link XAddOptions} to Lettuce's {@link XAddArgs}. + * + * @since 4.0 + */ + enum XAddOptionsToXAddArgsConverter implements Converter { + + INSTANCE; + + @Override + public XAddArgs convert(XAddOptions source) { + + XAddArgs args = new XAddArgs(); + + args.nomkstream(source.isNoMkStream()); + + if (!source.hasTrimOptions()) { + return args; + } + + RedisStreamCommands.TrimOptions trimOptions = source.getTrimOptions(); + RedisStreamCommands.TrimStrategy trimStrategy = trimOptions.getTrimStrategy(); + if (trimStrategy instanceof RedisStreamCommands.MaxLenTrimStrategy maxLenTrimStrategy) { + args.maxlen(maxLenTrimStrategy.threshold()); + } + else if (trimStrategy instanceof RedisStreamCommands.MinIdTrimStrategy minIdTrimStrategy) { + args.minId(minIdTrimStrategy.threshold().getValue()); + } + + if (trimOptions.hasLimit()) { + args.limit(trimOptions.getLimit()); + } + + args.exactTrimming(trimOptions.getTrimOperator() == RedisStreamCommands.TrimOperator.EXACT); + args.approximateTrimming(trimOptions.getTrimOperator() == RedisStreamCommands.TrimOperator.APPROXIMATE); + + if (trimOptions.hasDeletionPolicy()) { + args.trimmingMode(toStreamDeletionPolicy(trimOptions.getDeletionPolicy())); + } + + return args; + } + } + + enum XTrimOptionsToXTrimArgsConverter implements Converter { + INSTANCE; + + @Override + public XTrimArgs convert(XTrimOptions source) { + + XTrimArgs args = new XTrimArgs(); + + RedisStreamCommands.TrimOptions trimOptions = source.trimOptions(); + RedisStreamCommands.TrimStrategy trimStrategy = trimOptions.getTrimStrategy(); + if (trimStrategy instanceof RedisStreamCommands.MaxLenTrimStrategy maxLenTrimStrategy) { + args.maxlen(maxLenTrimStrategy.threshold()); + } + else if (trimStrategy instanceof RedisStreamCommands.MinIdTrimStrategy minIdTrimStrategy) { + args.minId(minIdTrimStrategy.threshold().getValue()); + } + + if (trimOptions.hasLimit()) { + args.limit(trimOptions.getLimit()); + } + + args.exactTrimming(trimOptions.getTrimOperator() == RedisStreamCommands.TrimOperator.EXACT); + args.approximateTrimming(trimOptions.getTrimOperator() == RedisStreamCommands.TrimOperator.APPROXIMATE); + + if (trimOptions.hasDeletionPolicy()) { + args.trimmingMode(toStreamDeletionPolicy(trimOptions.getDeletionPolicy())); + } + + return args; + } + } + + public static StreamDeletionPolicy toStreamDeletionPolicy(RedisStreamCommands.StreamDeletionPolicy deletionPolicy) { + + return switch (deletionPolicy) { + case KEEP_REFERENCES -> StreamDeletionPolicy.KEEP_REFERENCES; + case DELETE_REFERENCES -> StreamDeletionPolicy.DELETE_REFERENCES; + case ACKNOWLEDGED -> StreamDeletionPolicy.ACKNOWLEDGED; + }; + } + + /** + * Convert Lettuce {@link io.lettuce.core.models.stream.StreamEntryDeletionResult} to Spring Data Redis + * {@link StreamEntryDeletionResult}. + * + * @param result the Lettuce deletion result enum + * @return the corresponding Spring Data Redis enum + * @since 4.0 + */ + static RedisStreamCommands.StreamEntryDeletionResult toStreamEntryDeletionResult( + StreamEntryDeletionResult result) { + return switch (result) { + case UNKNOWN -> RedisStreamCommands.StreamEntryDeletionResult.UNKNOWN; + case NOT_FOUND -> RedisStreamCommands.StreamEntryDeletionResult.NOT_FOUND; + case DELETED -> RedisStreamCommands.StreamEntryDeletionResult.DELETED; + case NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED -> + RedisStreamCommands.StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED; + }; + } + + /** + * Convert a list of Lettuce {@link io.lettuce.core.models.stream.StreamEntryDeletionResult} to a {@link List} of Spring Data Redis + * {@link RedisStreamCommands.StreamEntryDeletionResult}. + * + * @param results the list of Lettuce deletion result enums + * @return the list of Spring Data Redis deletion result enums + * @since 4.0 + */ + static List toStreamEntryDeletionResults( + List results) { + return results.stream().map(StreamConverters::toStreamEntryDeletionResult).toList(); + } } diff --git a/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java b/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java index 46f23598cf..4b7a56afef 100644 --- a/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java @@ -22,7 +22,11 @@ import org.jspecify.annotations.NullUnmarked; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; +import org.springframework.data.redis.connection.RedisStreamCommands; import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; @@ -35,6 +39,7 @@ * @author Mark Paluch * @author Christoph Strobl * @author Dengliming + * @author Viktoriya Kutsarova * @since 2.2 */ @NullUnmarked @@ -80,6 +85,54 @@ public interface BoundStreamOperations { */ Long delete(@NonNull String @NonNull... recordIds); + /** + * Deletes one or multiple entries from the stream at the specified key with extended options. + * + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's as strings. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + List deleteWithOptions(@NonNull XDelOptions options, @NonNull String @NonNull ... recordIds); + + /** + * Deletes one or multiple entries from the stream at the specified key with extended options. + * + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + List deleteWithOptions(@NonNull XDelOptions options, @NonNull RecordId @NonNull ... recordIds); + + /** + * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key. + * + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's as strings. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + List acknowledgeAndDelete(@NonNull String group, @NonNull XDelOptions options, + @NonNull String @NonNull ... recordIds); + + /** + * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key. + * + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + List acknowledgeAndDelete(@NonNull String group, @NonNull XDelOptions options, + @NonNull RecordId @NonNull ... recordIds); + /** * Create a consumer group. * @@ -219,4 +272,18 @@ public interface BoundStreamOperations { * @see Redis Documentation: XTRIM */ Long trim(long count, boolean approximateTrimming); + + /** + * Trims the stream according to the specified {@link RedisStreamCommands.XTrimOptions}. + *

+ * Supports various trimming strategies including {@literal MAXLEN} (limit by count) and + * {@literal MINID} (evict entries older than a specific ID), with options for approximate + * or exact trimming. + * + * @param options the trimming options specifying the strategy and parameters. Must not be {@literal null}. + * @return number of removed entries. {@literal null} when used in pipeline / transaction. + * @since 2.7.4 + * @see Redis Documentation: XTRIM + */ + Long trim(@NonNull XTrimOptions options); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java index b87d0b1a0b..0979d97c92 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java @@ -15,6 +15,7 @@ */ package org.springframework.data.redis.core; +import org.springframework.data.redis.connection.RedisStreamCommands; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -35,6 +36,9 @@ import org.springframework.data.redis.connection.ReactiveStreamCommands; import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.convert.Converters; import org.springframework.data.redis.connection.stream.ByteBufferRecord; import org.springframework.data.redis.connection.stream.Consumer; @@ -179,6 +183,30 @@ public Mono delete(@NonNull K key, RecordId @NonNull... recordIds) { return createMono(streamCommands -> streamCommands.xDel(rawKey(key), recordIds)); } + @Override + public Flux deleteWithOptions(@NonNull K key, @NonNull XDelOptions options, + @NonNull RecordId @NonNull... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(options, "XDelOptions must not be null"); + Assert.notNull(recordIds, "RecordIds must not be null"); + + return createFlux(streamCommands -> streamCommands.xDelEx(rawKey(key), options, recordIds)); + } + + @Override + public Flux acknowledgeAndDelete(@NonNull K key, @NonNull String group, + @NonNull XDelOptions options, + @NonNull RecordId @NonNull... recordIds) { + + Assert.notNull(key, "Key must not be null"); + Assert.hasText(group, "Group must not be null or empty"); + Assert.notNull(options, "XDelOptions must not be null"); + Assert.notNull(recordIds, "RecordIds must not be null"); + + return createFlux(streamCommands -> streamCommands.xAckDel(rawKey(key), group, options, recordIds)); + } + @Override public Mono createGroup(@NonNull K key, @NonNull ReadOffset readOffset, @NonNull String group) { @@ -330,6 +358,14 @@ public Mono trim(@NonNull K key, long count, boolean approximateTrimming) return createMono(streamCommands -> streamCommands.xTrim(rawKey(key), count, approximateTrimming)); } + @Override + public Mono trim(@NonNull K key, @NonNull XTrimOptions options) { + Assert.notNull(key, "Key must not be null"); + Assert.notNull(options, "XTrimOptions must not be null"); + + return createMono(streamCommands -> streamCommands.xTrim(rawKey(key), options)); + } + @Override public HashMapper getHashMapper(@NonNull Class targetType) { return objectMapper.getHashMapper(targetType); diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index 1e114678a5..3739136eef 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -30,8 +30,12 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisStreamCommands; import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; @@ -177,6 +181,25 @@ public Long delete(@NonNull K key, @NonNull RecordId @NonNull... recordIds) { return execute(connection -> connection.xDel(rawKey, recordIds)); } + @Override + public List deleteWithOptions(@NonNull K key, @NonNull XDelOptions options, + @NonNull String @NonNull... recordIds) { + + byte[] rawKey = rawKey(key); + RecordId[] recordIdArray = Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new); + return execute(connection -> connection.streamCommands().xDelEx(rawKey, options, recordIdArray)); + } + + @Override + public List acknowledgeAndDelete(@NonNull K key, @NonNull String group, + @NonNull XDelOptions options, + @NonNull String @NonNull... recordIds) { + + byte[] rawKey = rawKey(key); + RecordId[] recordIdArray = Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new); + return execute(connection -> connection.streamCommands().xAckDel(rawKey, group, options, recordIdArray)); + } + @Override public String createGroup(@NonNull K key, @NonNull ReadOffset readOffset, @NonNull String group) { @@ -328,6 +351,12 @@ public Long trim(@NonNull K key, long count, boolean approximateTrimming) { return execute(connection -> connection.xTrim(rawKey, count, approximateTrimming)); } + @Override + public Long trim(@NonNull K key, @NonNull XTrimOptions options) { + byte[] rawKey = rawKey(key); + return execute(connection -> connection.streamCommands().xTrim(rawKey, options)); + } + @Override public HashMapper getHashMapper(@NonNull Class targetType) { return objectMapper.getHashMapper(targetType); diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java index df74d05b7b..a892ca1088 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java @@ -15,11 +15,13 @@ */ package org.springframework.data.redis.core; +import org.springframework.data.redis.connection.RedisStreamCommands; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; import java.util.Arrays; +import java.util.List; import java.util.Map; import org.jspecify.annotations.NonNull; @@ -28,8 +30,11 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.connection.stream.Record; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer; @@ -256,6 +261,92 @@ default Mono delete(@NonNull Record record) { */ Mono delete(@NonNull K key, @NonNull RecordId @NonNull... recordIds); + /** + * Deletes one or multiple entries from the stream at the specified key with extended options. + * + * @param key the stream key. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + default Flux deleteWithOptions(@NonNull K key, @NonNull XDelOptions options, @NonNull String @NonNull... recordIds) { + return deleteWithOptions(key, options, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new)); + } + + /** + * Deletes a given {@link Record} from the stream with extended options. + * + * @param record must not be {@literal null}. + * @param options the {@link XDelOptions} specifying deletion policy. + * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID. + */ + default Flux deleteWithOptions(@NonNull Record record, @NonNull XDelOptions options) { + Assert.notNull(record.getStream(), "Record.getStream() must not be null"); + return deleteWithOptions(record.getStream(), options, record.getId()); + } + + /** + * Deletes one or multiple entries from the stream at the specified key with extended options. + * + * @param key the stream key. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + Flux deleteWithOptions(@NonNull K key, @NonNull XDelOptions options, + @NonNull RecordId @NonNull... recordIds); + + /** + * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + default Flux acknowledgeAndDelete(@NonNull K key, @NonNull String group, + @NonNull XDelOptions options, @NonNull String @NonNull... recordIds) { + return acknowledgeAndDelete(key, group, options, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new)); + } + + /** + * Acknowledges and conditionally deletes a given {@link Record} for a stream consumer group. + * + * @param group name of the consumer group. + * @param record must not be {@literal null}. + * @param options the {@link XDelOptions} specifying deletion policy. + * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + default Flux acknowledgeAndDelete(@NonNull String group, @NonNull Record record, + @NonNull XDelOptions options) { + Assert.notNull(record.getStream(), "Record.getStream() must not be null"); + return acknowledgeAndDelete(record.getStream(), group, options, record.getId()); + } + + /** + * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record Id's. + * @return {@link Flux} emitting a list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + Flux acknowledgeAndDelete(@NonNull K key, @NonNull String group, + @NonNull XDelOptions options, + @NonNull RecordId @NonNull... recordIds); + /** * Create a consumer group at the {@link ReadOffset#latest() latest offset}. This command creates the stream if it * does not already exist. @@ -660,6 +751,21 @@ default Flux> reverseRange(@NonNull Class targetType, */ Mono trim(@NonNull K key, long count, boolean approximateTrimming); + /** + * Trims the stream according to the specified {@link XTrimOptions}. + *

+ * Supports various trimming strategies including {@literal MAXLEN} (limit by count) and + * {@literal MINID} (evict entries older than a specific ID), with options for approximate + * or exact trimming. + * + * @param key the stream key. + * @param options the trimming options specifying the strategy and parameters. Must not be {@literal null}. + * @return number of removed entries. + * @since 4.0 + * @see Redis Documentation: XTRIM + */ + Mono trim(@NonNull K key, @NonNull XTrimOptions options); + /** * Get the {@link HashMapper} for a specific type. * diff --git a/src/main/java/org/springframework/data/redis/core/RedisCommand.java b/src/main/java/org/springframework/data/redis/core/RedisCommand.java index 2571a694f5..b130a73b8b 100644 --- a/src/main/java/org/springframework/data/redis/core/RedisCommand.java +++ b/src/main/java/org/springframework/data/redis/core/RedisCommand.java @@ -284,6 +284,25 @@ public enum RedisCommand { // -- W WATCH("rw", 1), // + + // -- X + XACK("rw", 3), // + XACKDEL("rw", 3), // + XADD("rw", 3), // + XAUTOCLAIM("rw", 4), // + XCLAIM("rw", 4), // + XDEL("rw", 2), // + XDELEX("rw", 2), // + XGROUP("rw", 2), // + XINFO("r", 1), // + XLEN("r", 1), // + XPENDING("r", 1), // + XRANGE("r", 2), // + XREVRANGE("r", 2), // + XREAD("r", 2), // + XREADGROUP("rw", 4), // + XTRIM("rw", 2), // + // -- Z ZADD("rw", 3), // ZCARD("r", 1), // diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index 6580347220..c914ceff58 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -29,6 +29,9 @@ import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.connection.stream.Record; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers; @@ -241,6 +244,99 @@ default Long delete(@NonNull Record record) { */ Long delete(@NonNull K key, @NonNull RecordId @NonNull... recordIds); + /** + * Deletes one or multiple entries from the stream at the specified key with extended options. + * + * @param key the stream key. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record ids as strings. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + List deleteWithOptions(@NonNull K key, @NonNull XDelOptions options, @NonNull String @NonNull... recordIds); + + /** + * Deletes one or multiple entries from the stream at the specified key with extended options. + * + * @param key the stream key. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record ids. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + default List deleteWithOptions(@NonNull K key, @NonNull XDelOptions options, + @NonNull RecordId @NonNull... recordIds) { + return deleteWithOptions(key, options, Arrays.stream(recordIds).map(RecordId::getValue).toArray(String[]::new)); + } + + /** + * Deletes a given {@link Record} from the stream with extended options. + * + * @param record must not be {@literal null}. + * @param options the {@link XDelOptions} specifying deletion policy. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XDELEX + * @since 4.0 + */ + default List deleteWithOptions(@NonNull Record record, @NonNull XDelOptions options) { + Assert.notNull(record.getStream(), "Record.getStream() must not be null"); + return deleteWithOptions(record.getStream(), options, record.getId().getValue()); + } + + /** + * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key. + *

+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the + * given consumer group and simultaneously attempts to delete the corresponding entries from the stream. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record ids as strings. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + List acknowledgeAndDelete(@NonNull K key, @NonNull String group, @NonNull XDelOptions options, + @NonNull String @NonNull... recordIds); + + /** + * Acknowledges and conditionally deletes one or multiple entries for a stream consumer group at the specified key. + *

+ * XACKDEL combines the functionality of XACK and XDEL in Redis Streams. It acknowledges the specified entry IDs in the + * given consumer group and simultaneously attempts to delete the corresponding entries from the stream. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param options the {@link XDelOptions} specifying deletion policy. + * @param recordIds stream record ids. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + default List acknowledgeAndDelete(@NonNull K key, @NonNull String group, + @NonNull XDelOptions options, @NonNull RecordId @NonNull... recordIds) { + return acknowledgeAndDelete(key, group, options, Arrays.stream(recordIds).map(RecordId::getValue).toArray(String[]::new)); + } + + /** + * Acknowledges and conditionally deletes a given {@link Record} for a stream consumer group. + * + * @param group name of the consumer group. + * @param record must not be {@literal null}. + * @param options the {@link XDelOptions} specifying deletion policy. + * @return list of {@link StreamEntryDeletionResult} for each ID. + * @see Redis Documentation: XACKDEL + * @since 4.0 + */ + default List acknowledgeAndDelete(@NonNull String group, @NonNull Record record, + @NonNull XDelOptions options) { + Assert.notNull(record.getStream(), "Record.getStream() must not be null"); + return acknowledgeAndDelete(record.getStream(), group, options, record.getId().getValue()); + } + /** * Create a consumer group at the {@link ReadOffset#latest() latest offset}. This command creates the stream if it * does not already exist. @@ -649,6 +745,21 @@ default List> read(@NonNull Class targetType, @NonNull */ Long trim(@NonNull K key, long count, boolean approximateTrimming); + /** + * Trims the stream according to the specified {@link XTrimOptions}. + *

+ * Supports various trimming strategies including {@literal MAXLEN} (limit by count) and + * {@literal MINID} (evict entries older than a specific ID), with options for approximate + * or exact trimming. + * + * @param key the stream key. + * @param options the trimming options specifying the strategy and parameters. Must not be {@literal null}. + * @return number of removed entries. {@literal null} when used in pipeline / transaction. + * @since 2.4 + * @see Redis Documentation: XTRIM + */ + Long trim(@NonNull K key, @NonNull XTrimOptions options); + /** * Get the {@link HashMapper} for a specific type. * diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java index d09bb78ddf..b364a9d0ea 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java @@ -63,6 +63,7 @@ import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation; import org.springframework.data.redis.connection.RedisListCommands.Position; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamDeletionPolicy; import org.springframework.data.redis.connection.RedisStringCommands.BitOperation; import org.springframework.data.redis.connection.RedisStringCommands.SetOption; import org.springframework.data.redis.connection.RedisZSetCommands.ZAddArgs; @@ -4060,6 +4061,243 @@ void xAddShouldTrimStreamApprox() { assertThat((Long) results.get(3)).isBetween(1L, 3L); } + @Test // GH-3232 + @EnabledOnCommand("XADD") + void xAddShouldTrimStreamWithMinId() { + + // Add initial records to get valid IDs + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + + List initialResults = getResults(); + RecordId id1 = (RecordId) initialResults.get(0); + RecordId id2 = (RecordId) initialResults.get(1); + RecordId id3 = (RecordId) initialResults.get(2); + + // Start a new pipeline/batch for the trimming test + initConnection(); + + // Trim using MINID - keep only entries with ID >= id2 + RedisStreamCommands.XAddOptions xAddOptions = RedisStreamCommands.XAddOptions.minId(id2); + actual.add( + connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions)); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(2); + // Should have trimmed entries older than id2, so we should have 3 entries (id2, id3, and the new one) + assertThat((Long) results.get(1)).isEqualTo(3L); + } + + @Test // GH-3232 + @EnabledOnCommand("XADD") + void xAddShouldHonorLimitWithApproximateTrimming() { + + // Add multiple records + for (int i = 0; i < 100; i++) { + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2 + i))); + } + + // Execute the initial adds + getResults(); + initConnection(); + + // Use LIMIT to control trimming effort + RedisStreamCommands.XAddOptions xAddOptions = RedisStreamCommands.XAddOptions.maxlen(50) + .approximateTrimming(true).withLimit(10); + actual.add( + connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions)); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(2); + // With LIMIT, trimming may not be exact, but should be around 50-60 entries + assertThat((Long) results.get(1)).isGreaterThanOrEqualTo(50L).isLessThanOrEqualTo(101L); + } + + @Test // GH-3232 + @EnabledOnCommand("XADD") + void xAddShouldHonorExactTrimming() { + + RedisStreamCommands.XAddOptions xAddOptions = RedisStreamCommands.XAddOptions.maxlen(2).withExactTrimming(true); + actual.add( + connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions)); + actual.add( + connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions)); + actual.add( + connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions)); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(4); + // With exact trimming, should have exactly 2 entries + assertThat((Long) results.get(3)).isEqualTo(2L); + } + + @Test // GH-3232 + @EnabledOnCommand("XADD") + @EnabledOnRedisVersion("8.2") // Deletion policy requires Redis 8.2+ + void xAddShouldHonorDeletionPolicy() { + + RedisStreamCommands.XAddOptions xAddOptions = RedisStreamCommands.XAddOptions.maxlen(5) + .approximateTrimming(true) + .withDeletionPolicy(StreamDeletionPolicy.DELETE_REFERENCES); + + // Add multiple entries with deletion policy + actual.add( + connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions)); + actual.add( + connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions)); + actual.add( + connection.xAdd(StringRecord.of(Collections.singletonMap(KEY_2, VALUE_2)).withStreamKey(KEY_1), xAddOptions)); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(4); + // Verify stream was created and entries were added + assertThat((Long) results.get(3)).isGreaterThan(0L); + } + + @Test // GH-3232 + @EnabledOnCommand("XTRIM") + void xTrimShouldTrimStreamWithMaxlen() { + + // Add multiple records + for (int i = 0; i < 10; i++) { + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2 + i))); + } + + getResults(); + initConnection(); + + // Trim to 5 entries using MAXLEN + actual.add(connection.xTrim(KEY_1, RedisStreamCommands.XTrimOptions.maxlen(5))); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(2); + assertThat((Long) results.get(0)).isEqualTo(5L); // 5 entries removed + assertThat((Long) results.get(1)).isEqualTo(5L); // 5 entries remaining + } + + @Test // GH-3232 + @EnabledOnCommand("XTRIM") + void xTrimShouldTrimStreamWithMinId() { + + // Add initial records to get valid IDs + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + + List initialResults = getResults(); + RecordId id3 = (RecordId) initialResults.get(2); // Get the 3rd ID + + initConnection(); + + // Trim using MINID - keep only entries with ID >= id3 + actual.add(connection.xTrim(KEY_1, RedisStreamCommands.XTrimOptions.minId(id3))); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(2); + assertThat((Long) results.get(0)).isEqualTo(2L); // 2 entries removed (id1, id2) + assertThat((Long) results.get(1)).isEqualTo(3L); // 3 entries remaining (id3, id4, id5) + } + + @Test // GH-3232 + @EnabledOnCommand("XTRIM") + void xTrimShouldHonorApproximateTrimming() { + + // Add multiple records + for (int i = 0; i < 100; i++) { + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2 + i))); + } + + getResults(); + initConnection(); + + // Trim with approximate trimming + actual.add(connection.xTrim(KEY_1, RedisStreamCommands.XTrimOptions.maxlen(50).approximateTrimming(true))); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(2); + // With approximate trimming, the result may not be exact but should be around 50 + assertThat((Long) results.get(1)).isGreaterThanOrEqualTo(50L).isLessThanOrEqualTo(100L); + } + + @Test // GH-3232 + @EnabledOnCommand("XTRIM") + void xTrimShouldHonorExactTrimming() { + + // Add multiple records + for (int i = 0; i < 10; i++) { + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2 + i))); + } + + getResults(); + initConnection(); + + // Trim with exact trimming + actual.add(connection.xTrim(KEY_1, RedisStreamCommands.XTrimOptions.maxlen(5).exactTrimming(true))); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(2); + assertThat((Long) results.get(0)).isEqualTo(5L); // 5 entries removed + assertThat((Long) results.get(1)).isEqualTo(5L); // Exactly 5 entries remaining + } + + @Test // GH-3232 + @EnabledOnCommand("XTRIM") + void xTrimShouldHonorLimit() { + + // Add multiple records + for (int i = 0; i < 100; i++) { + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2 + i))); + } + + getResults(); + initConnection(); + + // Trim with LIMIT to control trimming effort + actual.add(connection.xTrim(KEY_1, + RedisStreamCommands.XTrimOptions.maxlen(50).approximateTrimming(true).limit(10))); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(2); + // With LIMIT, trimming may not be exact + assertThat((Long) results.get(1)).isGreaterThanOrEqualTo(50L).isLessThanOrEqualTo(100L); + } + + @Test // GH-3232 + @EnabledOnCommand("XTRIM") + @EnabledOnRedisVersion("8.2") // Deletion policy requires Redis 8.2+ + void xTrimShouldHonorDeletionPolicy() { + + // Add multiple records + for (int i = 0; i < 10; i++) { + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2 + i))); + } + + getResults(); + initConnection(); + + // Trim with deletion policy + actual.add(connection.xTrim(KEY_1, RedisStreamCommands.XTrimOptions.maxlen(5).approximateTrimming(true) + .deletionPolicy(StreamDeletionPolicy.DELETE_REFERENCES))); + actual.add(connection.xLen(KEY_1)); + + List results = getResults(); + assertThat(results).hasSize(2); + // Verify trimming was applied + assertThat((Long) results.get(1)).isGreaterThan(0L).isLessThanOrEqualTo(10L); + } + @Test // DATAREDIS-864 @EnabledOnCommand("XADD") void xReadShouldReadMessage() { diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java index 60ab80a823..af0b6bafab 100644 --- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java @@ -555,4 +555,6 @@ private void assertThatParamsHasExpiration(HSetExParams params, Protocol.Keyword assertThat(params).extracting("expiration", "expirationValue").containsExactly(expirationType, expirationValue); } } + + } diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java index 9f7a790e01..822d49b600 100644 --- a/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java +++ b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java @@ -17,18 +17,27 @@ import static org.assertj.core.api.Assertions.*; +import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XPendingParams; +import redis.clients.jedis.params.XTrimParams; import java.time.Duration; import java.time.temporal.ChronoUnit; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamDeletionPolicy; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; +import org.springframework.data.redis.connection.stream.RecordId; /** * @author Jeonggyu Choi * @author Christoph Strobl + * @author Viktoriya Kutsarova */ class StreamConvertersUnitTest { @@ -41,4 +50,201 @@ void shouldConvertIdle() { assertThat(xPendingParams).hasFieldOrPropertyWithValue("idle", Duration.of(1, ChronoUnit.HOURS).toMillis()); } + + @Nested // GH-3232 + class ToXAddParamsShould { + + @Test + void convertXAddOptionsWithMaxlen() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100); + + XAddParams params = StreamConverters.toXAddParams(recordId, options); + + assertThat(params).hasFieldOrPropertyWithValue("maxLen", 100L); + } + + @Test + void convertXAddOptionsWithMinId() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.minId(RecordId.of("1234567890-0")); + + XAddParams params = StreamConverters.toXAddParams(recordId, options); + + assertThat(params).hasFieldOrPropertyWithValue("minId", "1234567890-0"); + } + + @Test + void convertXAddOptionsWithApproximateTrimming() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100).approximateTrimming(true); + + XAddParams params = StreamConverters.toXAddParams(recordId, options); + + assertThat(params).hasFieldOrPropertyWithValue("approximateTrimming", true); + } + + @Test + void convertXAddOptionsWithExactTrimming() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100).withExactTrimming(true); + + XAddParams params = StreamConverters.toXAddParams(recordId, options); + + assertThat(params).hasFieldOrPropertyWithValue("exactTrimming", true); + } + + @Test + void convertXAddOptionsWithLimit() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100).approximateTrimming(true).withLimit(50); + + XAddParams params = StreamConverters.toXAddParams(recordId, options); + + assertThat(params).hasFieldOrPropertyWithValue("limit", 50L); + } + + @Test + void convertXAddOptionsWithDeletionPolicy() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100).withDeletionPolicy(StreamDeletionPolicy.KEEP_REFERENCES); + + XAddParams params = StreamConverters.toXAddParams(recordId, options); + + assertThat(params).hasFieldOrPropertyWithValue("trimMode", + redis.clients.jedis.args.StreamDeletionPolicy.KEEP_REFERENCES); + } + + @Test + void convertXAddOptionsWithRecordId() { + + RecordId recordId = RecordId.of("1234567890-0"); + XAddOptions options = XAddOptions.none(); + + XAddParams params = StreamConverters.toXAddParams(recordId, options); + + assertThat(params).hasFieldOrPropertyWithValue("maxLen", null); + assertThat(params).hasFieldOrPropertyWithValue("minId", null); + assertThat(params).hasFieldOrPropertyWithValue("limit", null); + assertThat(params).hasFieldOrPropertyWithValue("trimMode", null); + assertThat(params).hasFieldOrPropertyWithValue("nomkstream", false); + assertThat(params).hasFieldOrPropertyWithValue("exactTrimming", true); + assertThat(params).hasFieldOrPropertyWithValue("approximateTrimming", false); + } + } + + @Nested // GH-3232 + class ToXTrimParamsShould { + + @Test + void convertXTrimOptionsWithMaxlen() { + + XTrimOptions options = XTrimOptions.maxlen(100); + + XTrimParams params = StreamConverters.toXTrimParams(options); + + assertThat(params).hasFieldOrPropertyWithValue("maxLen", 100L); + } + + @Test + void convertXTrimOptionsWithMinId() { + + XTrimOptions options = XTrimOptions.minId(RecordId.of("1234567890-0")); + + XTrimParams params = StreamConverters.toXTrimParams(options); + + assertThat(params).hasFieldOrPropertyWithValue("minId", "1234567890-0"); + } + + @Test + void convertXTrimOptionsWithApproximateTrimming() { + + XTrimOptions options = XTrimOptions.maxlen(100).approximateTrimming(true); + + XTrimParams params = StreamConverters.toXTrimParams(options); + + assertThat(params).hasFieldOrPropertyWithValue("approximateTrimming", true); + } + + @Test + void convertXTrimOptionsWithExactTrimming() { + + XTrimOptions options = XTrimOptions.maxlen(100).exactTrimming(true); + + XTrimParams params = StreamConverters.toXTrimParams(options); + + assertThat(params).hasFieldOrPropertyWithValue("exactTrimming", true); + } + + @Test + void convertXTrimOptionsWithLimit() { + + XTrimOptions options = XTrimOptions.maxlen(100).approximateTrimming(true).limit(50); + + XTrimParams params = StreamConverters.toXTrimParams(options); + + assertThat(params).hasFieldOrPropertyWithValue("limit", 50L); + } + + @Test + void convertXTrimOptionsWithDeletionPolicy() { + + XTrimOptions options = XTrimOptions.maxlen(100).deletionPolicy(StreamDeletionPolicy.KEEP_REFERENCES); + + XTrimParams params = StreamConverters.toXTrimParams(options); + + assertThat(params).hasFieldOrPropertyWithValue("trimMode", + redis.clients.jedis.args.StreamDeletionPolicy.KEEP_REFERENCES); + } + } + + @Nested // GH-3232 + class ToStreamDeletionPolicyShould { + + @Test + void convertDefaultOptions() { + + XDelOptions options = XDelOptions.defaultOptions(); + + redis.clients.jedis.args.StreamDeletionPolicy policy = StreamConverters.toStreamDeletionPolicy(options); + + assertThat(policy).isEqualTo(redis.clients.jedis.args.StreamDeletionPolicy.KEEP_REFERENCES); + } + + @Test + void convertKeepReferencesPolicy() { + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.KEEP_REFERENCES); + + redis.clients.jedis.args.StreamDeletionPolicy policy = StreamConverters.toStreamDeletionPolicy(options); + + assertThat(policy).isEqualTo(redis.clients.jedis.args.StreamDeletionPolicy.KEEP_REFERENCES); + } + + @Test + void convertDeleteReferencesPolicy() { + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.DELETE_REFERENCES); + + redis.clients.jedis.args.StreamDeletionPolicy policy = StreamConverters.toStreamDeletionPolicy(options); + + assertThat(policy).isEqualTo(redis.clients.jedis.args.StreamDeletionPolicy.DELETE_REFERENCES); + } + + @Test + void convertAcknowledgedPolicy() { + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.ACKNOWLEDGED); + + redis.clients.jedis.args.StreamDeletionPolicy policy = StreamConverters.toStreamDeletionPolicy(options); + + assertThat(policy).isEqualTo(redis.clients.jedis.args.StreamDeletionPolicy.ACKNOWLEDGED); + } + } } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java index 334d76caa6..99e769f689 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java @@ -50,9 +50,11 @@ import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.AbstractConnectionUnitTestBase; import org.springframework.data.redis.connection.RedisServerCommands.ShutdownOption; +import org.springframework.data.redis.connection.RedisStreamCommands; import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.zset.Tuple; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.KeyScanOptions; @@ -259,6 +261,134 @@ void xaddShouldHonorNoMkStream() { assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true); } + @Test // GH-3232 + void xaddShouldHonorMinId() { + + MapRecord record = MapRecord.create("key".getBytes(), Collections.emptyMap()); + + XAddOptions options = XAddOptions.none(); + connection.streamCommands().xAdd(record, options.withMinId(RecordId.of("1234567890-0"))); + ArgumentCaptor args = ArgumentCaptor.forClass(XAddArgs.class); + verify(asyncCommandsMock).xadd(any(), args.capture(), anyMap()); + + assertThat(ReflectionTestUtils.getField(args.getValue(), "minid")).isEqualTo("1234567890-0"); + } + + @Test // GH-3232 + void xaddShouldHonorLimit() { + + MapRecord record = MapRecord.create("key".getBytes(), Collections.emptyMap()); + + connection.streamCommands().xAdd(record, XAddOptions.maxlen(100).approximateTrimming(true).withLimit(50)); + ArgumentCaptor args = ArgumentCaptor.forClass(XAddArgs.class); + verify(asyncCommandsMock).xadd(any(), args.capture(), anyMap()); + + assertThat(args.getValue()).extracting("limit").isEqualTo(50L); + } + + @Test // GH-3232 + void xaddShouldHonorExactTrimming() { + + MapRecord record = MapRecord.create("key".getBytes(), Collections.emptyMap()); + + connection.streamCommands().xAdd(record, XAddOptions.maxlen(100).withExactTrimming(true)); + ArgumentCaptor args = ArgumentCaptor.forClass(XAddArgs.class); + verify(asyncCommandsMock).xadd(any(), args.capture(), anyMap()); + + assertThat(args.getValue()).extracting("exactTrimming").isEqualTo(true); + } + + @Test // GH-3232 + void xaddShouldHonorApproximateTrimming() { + + MapRecord record = MapRecord.create("key".getBytes(), Collections.emptyMap()); + + connection.streamCommands().xAdd(record, XAddOptions.maxlen(100).approximateTrimming(true)); + ArgumentCaptor args = ArgumentCaptor.forClass(XAddArgs.class); + verify(asyncCommandsMock).xadd(any(), args.capture(), anyMap()); + + assertThat(args.getValue()).extracting("approximateTrimming").isEqualTo(true); + } + + @Test // GH-3232 + void xaddShouldHonorDeletionPolicy() { + + MapRecord record = MapRecord.create("key".getBytes(), Collections.emptyMap()); + + connection.streamCommands().xAdd(record, + XAddOptions.maxlen(100).withDeletionPolicy(RedisStreamCommands.StreamDeletionPolicy.KEEP_REFERENCES)); + ArgumentCaptor args = ArgumentCaptor.forClass(XAddArgs.class); + verify(asyncCommandsMock).xadd(any(), args.capture(), anyMap()); + + assertThat(args.getValue()).extracting("trimmingMode").isEqualTo(io.lettuce.core.StreamDeletionPolicy.KEEP_REFERENCES); + } + + @Test // GH-3232 + void xtrimShouldHonorMaxlen() { + + connection.streamCommands().xTrim("key".getBytes(), RedisStreamCommands.XTrimOptions.maxlen(100)); + ArgumentCaptor args = ArgumentCaptor.forClass(XTrimArgs.class); + verify(asyncCommandsMock).xtrim(any(), args.capture()); + + assertThat(args.getValue()).extracting("maxlen").isEqualTo(100L); + } + + @Test // GH-3232 + void xtrimShouldHonorMinId() { + + connection.streamCommands().xTrim("key".getBytes(), + RedisStreamCommands.XTrimOptions.minId(RecordId.of("1234567890-0"))); + ArgumentCaptor args = ArgumentCaptor.forClass(XTrimArgs.class); + verify(asyncCommandsMock).xtrim(any(), args.capture()); + + assertThat(ReflectionTestUtils.getField(args.getValue(), "minId")).isEqualTo("1234567890-0"); + } + + @Test // GH-3232 + void xtrimShouldHonorApproximateTrimming() { + + connection.streamCommands().xTrim("key".getBytes(), + RedisStreamCommands.XTrimOptions.maxlen(100).approximateTrimming(true)); + ArgumentCaptor args = ArgumentCaptor.forClass(XTrimArgs.class); + verify(asyncCommandsMock).xtrim(any(), args.capture()); + + assertThat(args.getValue()).extracting("approximateTrimming").isEqualTo(true); + } + + @Test // GH-3232 + void xtrimShouldHonorExactTrimming() { + + connection.streamCommands().xTrim("key".getBytes(), + RedisStreamCommands.XTrimOptions.maxlen(100).exactTrimming(true)); + ArgumentCaptor args = ArgumentCaptor.forClass(XTrimArgs.class); + verify(asyncCommandsMock).xtrim(any(), args.capture()); + + assertThat(args.getValue()).extracting("exactTrimming").isEqualTo(true); + } + + @Test // GH-3232 + void xtrimShouldHonorLimit() { + + connection.streamCommands().xTrim("key".getBytes(), + RedisStreamCommands.XTrimOptions.maxlen(100).approximateTrimming(true).limit(50)); + ArgumentCaptor args = ArgumentCaptor.forClass(XTrimArgs.class); + verify(asyncCommandsMock).xtrim(any(), args.capture()); + + assertThat(args.getValue()).extracting("limit").isEqualTo(50L); + } + + @Test // GH-3232 + void xtrimShouldHonorDeletionPolicy() { + + connection.streamCommands().xTrim("key".getBytes(), RedisStreamCommands.XTrimOptions.maxlen(100) + .deletionPolicy(RedisStreamCommands.StreamDeletionPolicy.KEEP_REFERENCES)); + ArgumentCaptor args = ArgumentCaptor.forClass(XTrimArgs.class); + verify(asyncCommandsMock).xtrim(any(), args.capture()); + + assertThat(args.getValue()).extracting("trimmingMode") + .isEqualTo(io.lettuce.core.StreamDeletionPolicy.KEEP_REFERENCES); + } + @Test // GH-2796 void scanShouldOperateUponUnsigned64BitCursorId() { diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConvertersUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConvertersUnitTests.java index 9671797ae6..43ebcd9ff9 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConvertersUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConvertersUnitTests.java @@ -24,6 +24,8 @@ import io.lettuce.core.Limit; import io.lettuce.core.RedisURI; import io.lettuce.core.SetArgs; +import io.lettuce.core.XAddArgs; +import io.lettuce.core.XTrimArgs; import io.lettuce.core.cluster.models.partitions.Partitions; import io.lettuce.core.cluster.models.partitions.RedisClusterNode.NodeFlag; @@ -43,7 +45,12 @@ import org.springframework.data.redis.connection.RedisHashCommands; import org.springframework.data.redis.connection.RedisPassword; import org.springframework.data.redis.connection.RedisSentinelConfiguration; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamDeletionPolicy; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; import org.springframework.data.redis.connection.RedisStringCommands.SetOption; +import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.core.types.RedisClientInfo; @@ -452,4 +459,193 @@ void setExAtForExpirationWithNonMillisUnixTimestamp() { .extracting("exAt").isEqualTo(fourHoursFromNowSecs); } } + + @Nested // GH-3232 + class ToXAddArgsShould { + + @Test + void convertXAddOptionsWithMaxlen() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100); + + XAddArgs args = StreamConverters.toXAddArgs(recordId, options); + + assertThat(args).extracting("maxlen").isEqualTo(100L); + } + + @Test + void convertXAddOptionsWithMinId() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.minId(RecordId.of("1234567890-0")); + + XAddArgs args = StreamConverters.toXAddArgs(recordId, options); + + assertThat(getField(args, "minid")).isEqualTo("1234567890-0"); + } + + @Test + void convertXAddOptionsWithApproximateTrimming() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100).approximateTrimming(true); + + XAddArgs args = StreamConverters.toXAddArgs(recordId, options); + + assertThat(args).extracting("approximateTrimming").isEqualTo(true); + } + + @Test + void convertXAddOptionsWithExactTrimming() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100).exactTrimming(true); + + XAddArgs args = StreamConverters.toXAddArgs(recordId, options); + + assertThat(args).extracting("exactTrimming").isEqualTo(true); + } + + @Test + void convertXAddOptionsWithLimit() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100).approximateTrimming(true).withLimit(50); + + XAddArgs args = StreamConverters.toXAddArgs(recordId, options); + + assertThat(args).extracting("limit").isEqualTo(50L); + } + + @Test + void convertXAddOptionsWithDeletionPolicy() { + + RecordId recordId = RecordId.autoGenerate(); + XAddOptions options = XAddOptions.maxlen(100).withDeletionPolicy(StreamDeletionPolicy.KEEP_REFERENCES); + + XAddArgs args = StreamConverters.toXAddArgs(recordId, options); + + assertThat(args).extracting("trimmingMode").isEqualTo(io.lettuce.core.StreamDeletionPolicy.KEEP_REFERENCES); + } + + @Test + void convertXAddOptionsWithRecordId() { + + RecordId recordId = RecordId.of("1234567890-0"); + XAddOptions options = XAddOptions.none(); + + XAddArgs args = StreamConverters.toXAddArgs(recordId, options); + + assertThat(getField(args, "id")).isEqualTo("1234567890-0"); + } + } + + @Nested // GH-3232 + class ToXTrimArgsShould { + + @Test + void convertXTrimOptionsWithMaxlen() { + + XTrimOptions options = XTrimOptions.maxlen(100); + + XTrimArgs args = StreamConverters.toXTrimArgs(options); + + assertThat(args).extracting("maxlen").isEqualTo(100L); + } + + @Test + void convertXTrimOptionsWithMinId() { + + XTrimOptions options = XTrimOptions.minId(RecordId.of("1234567890-0")); + + XTrimArgs args = StreamConverters.toXTrimArgs(options); + + assertThat(getField(args, "minId")).isEqualTo("1234567890-0"); + } + + @Test + void convertXTrimOptionsWithApproximateTrimming() { + + XTrimOptions options = XTrimOptions.maxlen(100).approximateTrimming(true); + + XTrimArgs args = StreamConverters.toXTrimArgs(options); + + assertThat(args).extracting("approximateTrimming").isEqualTo(true); + } + + @Test + void convertXTrimOptionsWithExactTrimming() { + + XTrimOptions options = XTrimOptions.maxlen(100).exactTrimming(true); + + XTrimArgs args = StreamConverters.toXTrimArgs(options); + + assertThat(args).extracting("exactTrimming").isEqualTo(true); + } + + @Test + void convertXTrimOptionsWithLimit() { + + XTrimOptions options = XTrimOptions.maxlen(100).approximateTrimming(true).limit(50); + + XTrimArgs args = StreamConverters.toXTrimArgs(options); + + assertThat(args).extracting("limit").isEqualTo(50L); + } + + @Test + void convertXTrimOptionsWithDeletionPolicy() { + + XTrimOptions options = XTrimOptions.maxlen(100).deletionPolicy(StreamDeletionPolicy.KEEP_REFERENCES); + + XTrimArgs args = StreamConverters.toXTrimArgs(options); + + assertThat(args).extracting("trimmingMode").isEqualTo(io.lettuce.core.StreamDeletionPolicy.KEEP_REFERENCES); + } + } + + @Nested // GH-3232 + class ToXDelArgsShould { + + @Test + void convertDefaultOptions() { + + XDelOptions options = XDelOptions.defaultOptions(); + + io.lettuce.core.StreamDeletionPolicy policy = StreamConverters.toXDelArgs(options); + + assertThat(policy).isEqualTo(io.lettuce.core.StreamDeletionPolicy.KEEP_REFERENCES); + } + + @Test + void convertKeepReferencesPolicy() { + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.KEEP_REFERENCES); + + io.lettuce.core.StreamDeletionPolicy policy = StreamConverters.toXDelArgs(options); + + assertThat(policy).isEqualTo(io.lettuce.core.StreamDeletionPolicy.KEEP_REFERENCES); + } + + @Test + void convertDeleteReferencesPolicy() { + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.DELETE_REFERENCES); + + io.lettuce.core.StreamDeletionPolicy policy = StreamConverters.toXDelArgs(options); + + assertThat(policy).isEqualTo(io.lettuce.core.StreamDeletionPolicy.DELETE_REFERENCES); + } + + @Test + void convertAcknowledgedPolicy() { + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.ACKNOWLEDGED); + + io.lettuce.core.StreamDeletionPolicy policy = StreamConverters.toXDelArgs(options); + + assertThat(policy).isEqualTo(io.lettuce.core.StreamDeletionPolicy.ACKNOWLEDGED); + } + } } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java index d5e4a7ab79..2a52a94224 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.*; import io.lettuce.core.XReadArgs; +import org.springframework.data.redis.connection.RedisStreamCommands; import reactor.test.StepVerifier; import java.time.Duration; @@ -32,7 +33,9 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.Limit; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.RecordId; @@ -659,4 +662,110 @@ void xClaimJustId() { .assertNext(it -> assertThat(it.getValue()).isEqualTo(expected)) // .verifyComplete(); } + + @Test // GH-3232 + void xDelExShouldDeleteEntries() { + + RecordId messageId1 = connection.streamCommands() + .xAdd(KEY_1_BBUFFER, Collections.singletonMap(KEY_2_BBUFFER, VALUE_2_BBUFFER)).block(); + RecordId messageId2 = connection.streamCommands() + .xAdd(KEY_1_BBUFFER, Collections.singletonMap(KEY_2_BBUFFER, VALUE_2_BBUFFER)).block(); + + connection.streamCommands().xLen(KEY_1_BBUFFER) // + .as(StepVerifier::create) // + .expectNext(2L) // + .verifyComplete(); + + XDelOptions options = XDelOptions.defaultOptions(); + + connection.streamCommands().xDelEx(KEY_1_BBUFFER, options, messageId1, messageId2) // + .as(StepVerifier::create) // + .expectNext(StreamEntryDeletionResult.DELETED) // + .expectNext(StreamEntryDeletionResult.DELETED) // + .verifyComplete(); + + connection.streamCommands().xLen(KEY_1_BBUFFER) // + .as(StepVerifier::create) // + .expectNext(0L) // + .verifyComplete(); + } + + @Test // GH-3232 + void xDelExWithStringIdsShouldDeleteEntries() { + + RecordId messageId1 = connection.streamCommands() + .xAdd(KEY_1_BBUFFER, Collections.singletonMap(KEY_2_BBUFFER, VALUE_2_BBUFFER)).block(); + RecordId messageId2 = connection.streamCommands() + .xAdd(KEY_1_BBUFFER, Collections.singletonMap(KEY_2_BBUFFER, VALUE_2_BBUFFER)).block(); + + XDelOptions options = XDelOptions.defaultOptions(); + + connection.streamCommands().xDelEx(KEY_1_BBUFFER, options, messageId1.getValue(), messageId2.getValue()) // + .as(StepVerifier::create) // + .expectNextCount(2) // + .verifyComplete(); + + connection.streamCommands().xLen(KEY_1_BBUFFER) // + .as(StepVerifier::create) // + .expectNext(0L) // + .verifyComplete(); + } + + @Test // GH-3232 + void xAckDelShouldAcknowledgeAndDeleteEntries() { + + RecordId messageId1 = connection.streamCommands() + .xAdd(KEY_1_BBUFFER, Collections.singletonMap(KEY_2_BBUFFER, VALUE_2_BBUFFER)).block(); + RecordId messageId2 = connection.streamCommands() + .xAdd(KEY_1_BBUFFER, Collections.singletonMap(KEY_2_BBUFFER, VALUE_2_BBUFFER)).block(); + + connection.streamCommands().xGroupCreate(KEY_1_BBUFFER, "my-group", ReadOffset.from("0-0"), true) // + .as(StepVerifier::create) // + .expectNext("OK") // + .verifyComplete(); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) // + .as(StepVerifier::create) // + .expectNextCount(2) // + .verifyComplete(); + + XDelOptions options = XDelOptions.deletionPolicy(RedisStreamCommands.StreamDeletionPolicy.ACKNOWLEDGED); + + connection.streamCommands().xAckDel(KEY_1_BBUFFER, "my-group", options, messageId1, messageId2) // + .as(StepVerifier::create) // + .expectNext(StreamEntryDeletionResult.DELETED) // + .expectNext(StreamEntryDeletionResult.DELETED) // + .verifyComplete(); + } + + @Test // GH-3232 + void xAckDelWithStringIdsShouldWork() { + + RecordId messageId1 = connection.streamCommands() + .xAdd(KEY_1_BBUFFER, Collections.singletonMap(KEY_2_BBUFFER, VALUE_2_BBUFFER)).block(); + RecordId messageId2 = connection.streamCommands() + .xAdd(KEY_1_BBUFFER, Collections.singletonMap(KEY_2_BBUFFER, VALUE_2_BBUFFER)).block(); + + connection.streamCommands().xGroupCreate(KEY_1_BBUFFER, "my-group", ReadOffset.from("0-0"), true) // + .as(StepVerifier::create) // + .expectNext("OK") // + .verifyComplete(); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) // + .as(StepVerifier::create) // + .expectNextCount(2) // + .verifyComplete(); + + XDelOptions options = XDelOptions.deletionPolicy(RedisStreamCommands.StreamDeletionPolicy.ACKNOWLEDGED); + + connection.streamCommands().xAckDel(KEY_1_BBUFFER, "my-group", options, messageId1.getValue(), + messageId2.getValue()) // + .as(StepVerifier::create) // + .expectNextCount(2) // + .verifyComplete(); + } } diff --git a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java index 5d805ac251..14faa548e0 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java @@ -18,6 +18,10 @@ import static org.assertj.core.api.Assertions.*; import static org.junit.Assume.*; +import org.springframework.data.redis.connection.RedisStreamCommands; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamDeletionPolicy; import reactor.test.StepVerifier; import java.time.Duration; @@ -343,6 +347,63 @@ void addMakeNoStreamShouldCreateStreamWhenStreamExists() { streamOperations.range(key, Range.unbounded()).as(StepVerifier::create).expectNextCount(2L).verifyComplete(); } + @Test // GH-3232 + void addWithLimitShouldHonorApproximateTrimming() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(100).approximateTrimming(true).withLimit(50); + + // Add multiple messages with limit + for (int i = 0; i < 5; i++) { + streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block(); + } + + streamOperations.size(key).as(StepVerifier::create).assertNext(size -> { + assertThat(size).isGreaterThan(0L); + }).verifyComplete(); + } + + @Test // GH-3232 + void addWithExactTrimmingShouldTrimExactly() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(2).withExactTrimming(true); + + // Add 3 messages with exact trimming to maxlen=2 + streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block(); + streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block(); + streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block(); + + // Should have exactly 2 entries + streamOperations.size(key).as(StepVerifier::create).expectNext(2L).verifyComplete(); + } + + @Test // GH-3232 + void addWithDeletionPolicyShouldApplyPolicy() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(5).approximateTrimming(true) + .withDeletionPolicy(RedisStreamCommands.StreamDeletionPolicy.DELETE_REFERENCES); + + // Add multiple messages with deletion policy + for (int i = 0; i < 3; i++) { + streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block(); + } + + streamOperations.size(key).as(StepVerifier::create).assertNext(size -> { + assertThat(size).isGreaterThan(0L); + }).verifyComplete(); + } + @Test // DATAREDIS-864 void rangeShouldReportMessages() { @@ -536,4 +597,144 @@ void claimShouldReadMessageDetails() { assertThat(claimed.getId()).isEqualTo(messageId); }).verifyComplete(); } + + @Test // GH-3232 + void deleteWithOptionsShouldDeleteEntries() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + RecordId messageId1 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + RecordId messageId2 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + RecordId messageId3 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + + streamOperations.size(key).as(StepVerifier::create).expectNext(3L).verifyComplete(); + + XDelOptions options = XDelOptions.defaultOptions(); + + streamOperations.deleteWithOptions(key, options, messageId1, messageId2).as(StepVerifier::create) + .expectNext(StreamEntryDeletionResult.DELETED) + .expectNext(StreamEntryDeletionResult.DELETED) + .verifyComplete(); + + streamOperations.size(key).as(StepVerifier::create).expectNext(1L).verifyComplete(); + } + + @Test // GH-3232 + void deleteWithOptionsUsingStringIdsShouldDeleteEntries() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + RecordId messageId1 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + RecordId messageId2 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + + streamOperations.size(key).as(StepVerifier::create).expectNext(2L).verifyComplete(); + + XDelOptions options = XDelOptions.defaultOptions(); + + streamOperations.deleteWithOptions(key, options, messageId1.getValue(), messageId2.getValue()) + .as(StepVerifier::create) + .expectNextCount(2) + .verifyComplete(); + + streamOperations.size(key).as(StepVerifier::create).expectNext(0L).verifyComplete(); + } + + @Test // GH-3232 + void deleteWithOptionsUsingRecordShouldDeleteEntry() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + Map content = Collections.singletonMap(hashKey, value); + RecordId messageId = streamOperations.add(key, content).block(); + + streamOperations.size(key).as(StepVerifier::create).expectNext(1L).verifyComplete(); + + MapRecord record = StreamRecords.newRecord().in(key).withId(messageId).ofMap(content); + XDelOptions options = XDelOptions.defaultOptions(); + + streamOperations.deleteWithOptions(record, options).as(StepVerifier::create) + .expectNext(StreamEntryDeletionResult.DELETED) + .verifyComplete(); + + streamOperations.size(key).as(StepVerifier::create).expectNext(0L).verifyComplete(); + } + + @Test // GH-3232 + void acknowledgeAndDeleteShouldAcknowledgeAndDeleteEntries() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + RecordId messageId1 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + RecordId messageId2 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + + streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").then().as(StepVerifier::create) + .verifyComplete(); + + streamOperations.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.ACKNOWLEDGED); + + streamOperations.acknowledgeAndDelete(key, "my-group", options, messageId1, messageId2) + .as(StepVerifier::create) + .expectNext(StreamEntryDeletionResult.DELETED) + .expectNext(StreamEntryDeletionResult.DELETED) + .verifyComplete(); + } + + @Test // GH-3232 + void acknowledgeAndDeleteUsingStringIdsShouldWork() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + RecordId messageId1 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + RecordId messageId2 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + + streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").then().as(StepVerifier::create) + .verifyComplete(); + + streamOperations.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.ACKNOWLEDGED); + + streamOperations.acknowledgeAndDelete(key, "my-group", options, messageId1.getValue(), messageId2.getValue()) + .as(StepVerifier::create) + .expectNextCount(2) + .verifyComplete(); + } + + @Test // GH-3232 + void acknowledgeAndDeleteUsingRecordShouldWork() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + Map content = Collections.singletonMap(hashKey, value); + RecordId messageId = streamOperations.add(key, content).block(); + + streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").then().as(StepVerifier::create) + .verifyComplete(); + + streamOperations.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + MapRecord record = StreamRecords.newRecord().in(key).withId(messageId).ofMap(content); + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.ACKNOWLEDGED); + + streamOperations.acknowledgeAndDelete("my-group", record, options).as(StepVerifier::create) + .expectNext(StreamEntryDeletionResult.DELETED) + .verifyComplete(); + } } diff --git a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java index 9df8d711d3..562448bf1c 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java @@ -35,7 +35,11 @@ import org.springframework.data.redis.Person; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStreamCommands; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamEntryDeletionResult; import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.StreamDeletionPolicy; import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension; @@ -312,6 +316,180 @@ void addMakeNoStreamShouldCreateStreamWhenStreamExists() { assertThat(streamOps.range(key, Range.unbounded())).hasSize(2); } + @Test // GH-3232 + void addWithLimitShouldHonorApproximateTrimming() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(100).approximateTrimming(true).withLimit(50); + + // Add multiple messages with limit + for (int i = 0; i < 5; i++) { + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options); + } + + assertThat(streamOps.size(key)).isGreaterThan(0L); + } + + @Test // GH-3232 + void addWithExactTrimmingShouldTrimExactly() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(2).withExactTrimming(true); + + // Add 3 messages with exact trimming to maxlen=2 + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options); + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options); + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options); + + // Should have exactly 2 entries + assertThat(streamOps.size(key)).isEqualTo(2); + } + + @Test // GH-3232 + void addWithDeletionPolicyShouldApplyPolicy() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(5).approximateTrimming(true) + .withDeletionPolicy(RedisStreamCommands.StreamDeletionPolicy.DELETE_REFERENCES); + + // Add multiple messages with deletion policy + for (int i = 0; i < 3; i++) { + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options); + } + + assertThat(streamOps.size(key)).isGreaterThan(0L); + } + + @Test // GH-3232 + void trimShouldTrimStreamWithMaxlen() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + // Add 10 messages + for (int i = 0; i < 10; i++) { + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + } + + assertThat(streamOps.size(key)).isEqualTo(10L); + + // Trim to 5 entries + Long trimmed = streamOps.trim(key, RedisStreamCommands.XTrimOptions.maxlen(5)); + + assertThat(trimmed).isEqualTo(5L); // 5 entries removed + assertThat(streamOps.size(key)).isEqualTo(5L); // 5 entries remaining + } + + @Test // GH-3232 + void trimShouldTrimStreamWithMinId() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + // Add 5 messages and capture their IDs + RecordId id1 = streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + RecordId id2 = streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + RecordId id3 = streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + RecordId id4 = streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + RecordId id5 = streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + + assertThat(streamOps.size(key)).isEqualTo(5L); + + // Trim using MINID - keep only entries with ID >= id3 + Long trimmed = streamOps.trim(key, RedisStreamCommands.XTrimOptions.minId(id3)); + + assertThat(trimmed).isEqualTo(2L); // 2 entries removed (id1, id2) + assertThat(streamOps.size(key)).isEqualTo(3L); // 3 entries remaining (id3, id4, id5) + } + + @Test // GH-3232 + void trimShouldHonorApproximateTrimming() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + // Add 100 messages + for (int i = 0; i < 100; i++) { + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + } + + assertThat(streamOps.size(key)).isEqualTo(100L); + + // Trim with approximate trimming + streamOps.trim(key, RedisStreamCommands.XTrimOptions.maxlen(50).approximateTrimming(true)); + + // With approximate trimming, the result may not be exact but should be around 50 + assertThat(streamOps.size(key)).isGreaterThanOrEqualTo(50L).isLessThanOrEqualTo(100L); + } + + @Test // GH-3232 + void trimShouldHonorExactTrimming() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + // Add 10 messages + for (int i = 0; i < 10; i++) { + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + } + + assertThat(streamOps.size(key)).isEqualTo(10L); + + // Trim with exact trimming + Long trimmed = streamOps.trim(key, RedisStreamCommands.XTrimOptions.maxlen(5).exactTrimming(true)); + + assertThat(trimmed).isEqualTo(5L); // 5 entries removed + assertThat(streamOps.size(key)).isEqualTo(5L); // Exactly 5 entries remaining + } + + @Test // GH-3232 + void trimShouldHonorLimit() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + // Add 100 messages + for (int i = 0; i < 100; i++) { + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + } + + assertThat(streamOps.size(key)).isEqualTo(100L); + + // Trim with LIMIT to control trimming effort + streamOps.trim(key, RedisStreamCommands.XTrimOptions.maxlen(50).approximateTrimming(true).limit(10)); + + // With LIMIT, trimming may not be exact + assertThat(streamOps.size(key)).isGreaterThanOrEqualTo(50L).isLessThanOrEqualTo(100L); + } + + @Test // GH-3232 + @EnabledOnRedisVersion("8.2") // Deletion policy requires Redis 8.2+ + void trimShouldHonorDeletionPolicy() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + // Add 10 messages + for (int i = 0; i < 10; i++) { + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + } + + assertThat(streamOps.size(key)).isEqualTo(10L); + + // Trim with deletion policy + streamOps.trim(key, RedisStreamCommands.XTrimOptions.maxlen(5).approximateTrimming(true) + .deletionPolicy(RedisStreamCommands.StreamDeletionPolicy.DELETE_REFERENCES)); + + // Verify trimming was applied + assertThat(streamOps.size(key)).isGreaterThan(0L).isLessThanOrEqualTo(10L); + } + @Test // DATAREDIS-864 void simpleMessageReadWriteSymmetry() { @@ -600,4 +778,140 @@ void claimShouldReadMessageDetails() { assertThat(message.getValue()).containsEntry(hashKey, value); } } + + @Test // GH-3232 + void deleteWithOptionsShouldDeleteEntries() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId3 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + assertThat(streamOps.size(key)).isEqualTo(3L); + + XDelOptions options = XDelOptions.defaultOptions(); + + List results = streamOps.deleteWithOptions(key, options, messageId1, messageId2); + + assertThat(results).hasSize(2); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(results.get(1)).isEqualTo(StreamEntryDeletionResult.DELETED); + + assertThat(streamOps.size(key)).isEqualTo(1L); + } + + @Test // GH-3232 + void deleteWithOptionsUsingStringIdsShouldDeleteEntries() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + assertThat(streamOps.size(key)).isEqualTo(2L); + + XDelOptions options = XDelOptions.defaultOptions(); + + List results = streamOps.deleteWithOptions(key, options, messageId1, messageId2); + + assertThat(results).hasSize(2); + assertThat(streamOps.size(key)).isEqualTo(0L); + } + + @Test // GH-3232 + void deleteWithOptionsUsingRecordShouldDeleteEntry() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + assertThat(streamOps.size(key)).isEqualTo(1L); + + MapRecord record = StreamRecords.newRecord().in(key).withId(messageId) + .ofMap(Collections.singletonMap(hashKey, value)); + XDelOptions options = XDelOptions.defaultOptions(); + + List results = streamOps.deleteWithOptions(record, options); + + assertThat(results).hasSize(1); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + + assertThat(streamOps.size(key)).isEqualTo(0L); + } + + @Test // GH-3232 + void acknowledgeAndDeleteShouldAcknowledgeAndDeleteEntries() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group"); + + streamOps.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed())); + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.ACKNOWLEDGED); + + List results = streamOps.acknowledgeAndDelete(key, "my-group", options, messageId1, + messageId2); + + assertThat(results).hasSize(2); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(results.get(1)).isEqualTo(StreamEntryDeletionResult.DELETED); + } + + @Test // GH-3232 + void acknowledgeAndDeleteUsingStringIdsShouldWork() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group"); + + streamOps.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed())); + + XDelOptions options = XDelOptions.deletionPolicy(StreamDeletionPolicy.ACKNOWLEDGED); + + List results = streamOps.acknowledgeAndDelete(key, "my-group", options, messageId1, + messageId2); + + assertThat(results).hasSize(2); + } + + @Test // GH-3232 + void acknowledgeAndDeleteUsingRecordShouldWork() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group"); + + streamOps.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed())); + + MapRecord record = StreamRecords.newRecord().in(key).withId(messageId) + .ofMap(Collections.singletonMap(hashKey, value)); + XDelOptions options = XDelOptions.deletionPolicy(RedisStreamCommands.StreamDeletionPolicy.ACKNOWLEDGED); + + List results = streamOps.acknowledgeAndDelete("my-group", record, options); + + assertThat(results).hasSize(1); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + } } diff --git a/src/test/kotlin/org/springframework/data/redis/core/ReactiveStreamOperationsExtensionsUnitTests.kt b/src/test/kotlin/org/springframework/data/redis/core/ReactiveStreamOperationsExtensionsUnitTests.kt index 22ea0603af..c3ecae3067 100644 --- a/src/test/kotlin/org/springframework/data/redis/core/ReactiveStreamOperationsExtensionsUnitTests.kt +++ b/src/test/kotlin/org/springframework/data/redis/core/ReactiveStreamOperationsExtensionsUnitTests.kt @@ -482,7 +482,7 @@ class ReactiveStreamOperationsExtensionsUnitTests { fun trim() { val operations = mockk>() - every { operations.trim(any(), any()) } returns Mono.just(1) + every { operations.trim(any(), any()) } returns Mono.just(1) runBlocking { assertThat(operations.trimAndAwait("foo", 1)).isEqualTo(1)