Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/main/java/io/nats/client/api/KeyValueOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// limitations under the License.
package io.nats.client.api;

import java.util.HashMap;
import java.util.Map;

/**
* Key Value Operations Enum
*/
Expand All @@ -27,20 +24,29 @@ public enum KeyValueOperation {
this.headerValue = headerValue;
}

private static final Map<String, KeyValueOperation> strEnumHash = new HashMap<>();

public String getHeaderValue() {
return headerValue;
}

static {
for (KeyValueOperation kvo : KeyValueOperation.values()) {
strEnumHash.put(kvo.headerValue, kvo);
}
public static KeyValueOperation instance(String s) {
if (PUT.headerValue.equals(s)) return PUT;
if (DELETE.headerValue.equals(s)) return DELETE;
if (PURGE.headerValue.equals(s)) return PURGE;
return null;
}

public static KeyValueOperation getOrDefault(String s, KeyValueOperation dflt) {
KeyValueOperation kvo = s == null ? null : strEnumHash.get(s.toUpperCase());
KeyValueOperation kvo = instance(s);
return kvo == null ? dflt : kvo;
}

public static KeyValueOperation instanceByMarkerReason(String markerReason) {
if ("Remove".equals(markerReason)) {
return DELETE;
}
if ("MaxAge".equals(markerReason) || "Purge".equals(markerReason)) {
return PURGE;
}
return null;
}
}
20 changes: 17 additions & 3 deletions src/main/java/io/nats/client/support/NatsKeyValueUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import io.nats.client.impl.Headers;

import static io.nats.client.support.NatsConstants.DOT;
import static io.nats.client.support.NatsJetStreamConstants.ROLLUP_HDR;
import static io.nats.client.support.NatsJetStreamConstants.ROLLUP_HDR_SUBJECT;
import static io.nats.client.support.NatsJetStreamConstants.*;

public abstract class NatsKeyValueUtil {

Expand Down Expand Up @@ -64,8 +63,23 @@ public static String getOperationHeader(Headers h) {
return h == null ? null : h.getFirst(KV_OPERATION_HEADER_KEY);
}

public static String getNatsMarkerReasonHeader(Headers h) {
return h == null ? null : h.getFirst(NATS_MARKER_REASON_HDR);
}

public static KeyValueOperation getOperation(Headers h) {
return KeyValueOperation.getOrDefault(getOperationHeader(h), KeyValueOperation.PUT);
KeyValueOperation kvo = null;
String hs = getOperationHeader(h);
if (hs != null) {
kvo = KeyValueOperation.instance(hs);
}
if (kvo == null) {
hs = getNatsMarkerReasonHeader(h);
if (hs != null) {
kvo = KeyValueOperation.instanceByMarkerReason(hs);
}
}
return kvo == null ? KeyValueOperation.PUT : kvo;
}

public static Headers getDeleteHeaders() {
Expand Down
63 changes: 62 additions & 1 deletion src/test/java/io/nats/client/impl/KeyValueTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static io.nats.client.JetStreamOptions.DEFAULT_JS_OPTIONS;
import static io.nats.client.api.KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS;
Expand Down Expand Up @@ -764,7 +765,7 @@ public void testCreateAndUpdate() throws Exception {
kv.delete(key);
kv.create(key, "abc".getBytes());

// 7. allowed to update a key that is deleted, as long as you have it's revision
// 7. allowed to update a key that is deleted, as long as you have its revision
kv.delete(key);
nc.flush(Duration.ofSeconds(1));

Expand Down Expand Up @@ -1841,5 +1842,65 @@ public void testLimitMarker() throws Exception {
.build());
});
}

@Test
public void testLimitMarkerAlso() throws Exception {
jsServer.run(TestBase::atLeast2_11, nc -> {
String bucket = bucket();
String key = key();

KeyValueManagement kvm = nc.keyValueManagement();
KeyValueConfiguration config = KeyValueConfiguration.builder()
.name(bucket)
.storageType(StorageType.Memory)
.limitMarker(Duration.ofSeconds(6))
.build();
kvm.create(config);

KeyValue kv = nc.keyValue(bucket);

AtomicInteger puts = new AtomicInteger();
AtomicInteger dels = new AtomicInteger();
AtomicInteger purge = new AtomicInteger();
AtomicInteger eod = new AtomicInteger();

KeyValueWatcher watcher = new KeyValueWatcher() {
@Override
public void watch(KeyValueEntry keyValueEntry) {
if (keyValueEntry.getOperation() == KeyValueOperation.PUT) {
puts.incrementAndGet();
}
else if (keyValueEntry.getOperation() == KeyValueOperation.DELETE) {
dels.incrementAndGet();
}
else if (keyValueEntry.getOperation() == KeyValueOperation.PURGE) {
purge.incrementAndGet();
}
}

@Override
public void endOfData() {
eod.incrementAndGet();
}
};

NatsKeyValueWatchSubscription watch = kv.watchAll(watcher);

kv.create(key, dataBytes(), MessageTtl.seconds(2));

KeyValueEntry kve = kv.get(key);
assertNotNull(kve);

sleep(2100); // longer than the message ttl

kve = kv.get(key);
assertNull(kve);

assertEquals(1, puts.get());
assertEquals(0, dels.get());
assertEquals(1, purge.get());
assertEquals(1, eod.get());
});
}
}

Loading