diff --git a/src/main/java/io/nats/client/api/KeyValueOperation.java b/src/main/java/io/nats/client/api/KeyValueOperation.java index 197019f64..3ca728e02 100644 --- a/src/main/java/io/nats/client/api/KeyValueOperation.java +++ b/src/main/java/io/nats/client/api/KeyValueOperation.java @@ -12,9 +12,6 @@ // limitations under the License. package io.nats.client.api; -import java.util.HashMap; -import java.util.Map; - /** * Key Value Operations Enum */ @@ -27,20 +24,29 @@ public enum KeyValueOperation { this.headerValue = headerValue; } - private static final Map strEnumHash = new HashMap<>(); - public String getHeaderValue() { return headerValue; } - static { - for (KeyValueOperation kvo : KeyValueOperation.values()) { - strEnumHash.put(kvo.headerValue, kvo); - } + public static KeyValueOperation instance(String s) { + if (PUT.headerValue.equals(s)) return PUT; + if (DELETE.headerValue.equals(s)) return DELETE; + if (PURGE.headerValue.equals(s)) return PURGE; + return null; } public static KeyValueOperation getOrDefault(String s, KeyValueOperation dflt) { - KeyValueOperation kvo = s == null ? null : strEnumHash.get(s.toUpperCase()); + KeyValueOperation kvo = instance(s); return kvo == null ? dflt : kvo; } + + public static KeyValueOperation instanceByMarkerReason(String markerReason) { + if ("Remove".equals(markerReason)) { + return DELETE; + } + if ("MaxAge".equals(markerReason) || "Purge".equals(markerReason)) { + return PURGE; + } + return null; + } } diff --git a/src/main/java/io/nats/client/support/NatsKeyValueUtil.java b/src/main/java/io/nats/client/support/NatsKeyValueUtil.java index 858b8baac..ea6b3be78 100644 --- a/src/main/java/io/nats/client/support/NatsKeyValueUtil.java +++ b/src/main/java/io/nats/client/support/NatsKeyValueUtil.java @@ -20,8 +20,7 @@ import io.nats.client.impl.Headers; import static io.nats.client.support.NatsConstants.DOT; -import static io.nats.client.support.NatsJetStreamConstants.ROLLUP_HDR; -import static io.nats.client.support.NatsJetStreamConstants.ROLLUP_HDR_SUBJECT; +import static io.nats.client.support.NatsJetStreamConstants.*; public abstract class NatsKeyValueUtil { @@ -64,8 +63,23 @@ public static String getOperationHeader(Headers h) { return h == null ? null : h.getFirst(KV_OPERATION_HEADER_KEY); } + public static String getNatsMarkerReasonHeader(Headers h) { + return h == null ? null : h.getFirst(NATS_MARKER_REASON_HDR); + } + public static KeyValueOperation getOperation(Headers h) { - return KeyValueOperation.getOrDefault(getOperationHeader(h), KeyValueOperation.PUT); + KeyValueOperation kvo = null; + String hs = getOperationHeader(h); + if (hs != null) { + kvo = KeyValueOperation.instance(hs); + } + if (kvo == null) { + hs = getNatsMarkerReasonHeader(h); + if (hs != null) { + kvo = KeyValueOperation.instanceByMarkerReason(hs); + } + } + return kvo == null ? KeyValueOperation.PUT : kvo; } public static Headers getDeleteHeaders() { diff --git a/src/test/java/io/nats/client/impl/KeyValueTests.java b/src/test/java/io/nats/client/impl/KeyValueTests.java index 61a0d9be1..b368907e8 100644 --- a/src/test/java/io/nats/client/impl/KeyValueTests.java +++ b/src/test/java/io/nats/client/impl/KeyValueTests.java @@ -25,6 +25,7 @@ import java.util.*; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static io.nats.client.JetStreamOptions.DEFAULT_JS_OPTIONS; import static io.nats.client.api.KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS; @@ -764,7 +765,7 @@ public void testCreateAndUpdate() throws Exception { kv.delete(key); kv.create(key, "abc".getBytes()); - // 7. allowed to update a key that is deleted, as long as you have it's revision + // 7. allowed to update a key that is deleted, as long as you have its revision kv.delete(key); nc.flush(Duration.ofSeconds(1)); @@ -1841,5 +1842,65 @@ public void testLimitMarker() throws Exception { .build()); }); } + + @Test + public void testLimitMarkerAlso() throws Exception { + jsServer.run(TestBase::atLeast2_11, nc -> { + String bucket = bucket(); + String key = key(); + + KeyValueManagement kvm = nc.keyValueManagement(); + KeyValueConfiguration config = KeyValueConfiguration.builder() + .name(bucket) + .storageType(StorageType.Memory) + .limitMarker(Duration.ofSeconds(6)) + .build(); + kvm.create(config); + + KeyValue kv = nc.keyValue(bucket); + + AtomicInteger puts = new AtomicInteger(); + AtomicInteger dels = new AtomicInteger(); + AtomicInteger purge = new AtomicInteger(); + AtomicInteger eod = new AtomicInteger(); + + KeyValueWatcher watcher = new KeyValueWatcher() { + @Override + public void watch(KeyValueEntry keyValueEntry) { + if (keyValueEntry.getOperation() == KeyValueOperation.PUT) { + puts.incrementAndGet(); + } + else if (keyValueEntry.getOperation() == KeyValueOperation.DELETE) { + dels.incrementAndGet(); + } + else if (keyValueEntry.getOperation() == KeyValueOperation.PURGE) { + purge.incrementAndGet(); + } + } + + @Override + public void endOfData() { + eod.incrementAndGet(); + } + }; + + NatsKeyValueWatchSubscription watch = kv.watchAll(watcher); + + kv.create(key, dataBytes(), MessageTtl.seconds(2)); + + KeyValueEntry kve = kv.get(key); + assertNotNull(kve); + + sleep(2100); // longer than the message ttl + + kve = kv.get(key); + assertNull(kve); + + assertEquals(1, puts.get()); + assertEquals(0, dels.get()); + assertEquals(1, purge.get()); + assertEquals(1, eod.get()); + }); + } }