From 477c7df8d12a121c59fb75a2f4225d639bc4c78d Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 29 Aug 2023 15:24:32 -0400 Subject: [PATCH 1/5] Stream and Consumer Info gathered timestamps --- .../java/io/nats/client/api/ConsumerInfo.java | 15 ++++++++++ .../java/io/nats/client/api/StreamInfo.java | 28 ++++++++++++++----- .../io/nats/client/support/ApiConstants.java | 1 + .../io/nats/client/api/ConsumerInfoTests.java | 11 ++------ .../io/nats/client/api/StreamInfoTests.java | 5 ++-- .../client/impl/JetStreamManagementTests.java | 12 ++++++++ src/test/resources/data/ConsumerInfo.json | 1 + src/test/resources/data/StreamInfo.json | 1 + 8 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/main/java/io/nats/client/api/ConsumerInfo.java b/src/main/java/io/nats/client/api/ConsumerInfo.java index 4838fcfc5..4db77aaf0 100644 --- a/src/main/java/io/nats/client/api/ConsumerInfo.java +++ b/src/main/java/io/nats/client/api/ConsumerInfo.java @@ -38,6 +38,7 @@ public class ConsumerInfo extends ApiResponse { private final long numRedelivered; private final ClusterInfo clusterInfo; private final boolean pushBound; + private final ZonedDateTime timestamp; public ConsumerInfo(Message msg) { this(parseMessage(msg)); @@ -61,6 +62,8 @@ public ConsumerInfo(JsonValue vConsumerInfo) { clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER)); pushBound = readBoolean(jv, PUSH_BOUND); + + timestamp = readDate(jv, TS); } public ConsumerConfiguration getConsumerConfiguration() { @@ -75,6 +78,10 @@ public String getStreamName() { return stream; } + /** + * Gets the creation time of the consumer. + * @return the creation date and time. + */ public ZonedDateTime getCreationTime() { return created; } @@ -111,6 +118,14 @@ public boolean isPushBound() { return pushBound; } + /** + * Gets the server time the info was gathered + * @return the server gathered timed + */ + public ZonedDateTime getTimestamp() { + return timestamp; + } + public long getCalculatedPending() { return numPending + delivered.getConsumerSequence(); } diff --git a/src/main/java/io/nats/client/api/StreamInfo.java b/src/main/java/io/nats/client/api/StreamInfo.java index 58d81b4fd..6a5bbe74f 100644 --- a/src/main/java/io/nats/client/api/StreamInfo.java +++ b/src/main/java/io/nats/client/api/StreamInfo.java @@ -29,12 +29,13 @@ */ public class StreamInfo extends ApiResponse { - private final ZonedDateTime created; + private final ZonedDateTime createTime; private final StreamConfiguration config; - private final StreamState state; + private final StreamState streamState; private final ClusterInfo clusterInfo; private final MirrorInfo mirrorInfo; private final List sourceInfos; + private final ZonedDateTime timestamp; public StreamInfo(Message msg) { this(parseUnchecked(msg.getData())); @@ -42,14 +43,15 @@ public StreamInfo(Message msg) { public StreamInfo(JsonValue vStreamInfo) { super(vStreamInfo); - created = readDate(jv, CREATED); + createTime = readDate(jv, CREATED); config = StreamConfiguration.instance(readValue(jv, CONFIG)); - state = new StreamState(readValue(jv, STATE)); + streamState = new StreamState(readValue(jv, STATE)); clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER)); mirrorInfo = MirrorInfo.optionalInstance(readValue(jv, MIRROR)); sourceInfos = SourceInfo.optionalListOf(readValue(jv, SOURCES)); + timestamp = readDate(jv, TS); } - + /** * Gets the stream configuration. * @return the stream configuration. @@ -63,7 +65,7 @@ public StreamConfiguration getConfiguration() { * @return the stream state */ public StreamState getStreamState() { - return state; + return streamState; } /** @@ -71,7 +73,7 @@ public StreamState getStreamState() { * @return the creation date and time. */ public ZonedDateTime getCreateTime() { - return created; + return createTime; } public MirrorInfo getMirrorInfo() { @@ -85,4 +87,16 @@ public List getSourceInfos() { public ClusterInfo getClusterInfo() { return clusterInfo; } + + public StreamConfiguration getConfig() { + return config; + } + + /** + * Gets the server time the info was gathered + * @return the server gathered timed + */ + public ZonedDateTime getTimestamp() { + return timestamp; + } } diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index b8c10db3b..8bfca1ef1 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -183,6 +183,7 @@ public interface ApiConstants { String TEMPLATE_OWNER = "template_owner"; String TIERS = "tiers"; String TIME = "time"; + String TS = "ts"; String TLS = "tls_required"; String TOTAL = "total"; String TYPE = "type"; diff --git a/src/test/java/io/nats/client/api/ConsumerInfoTests.java b/src/test/java/io/nats/client/api/ConsumerInfoTests.java index 289318cd0..76a9f27b8 100644 --- a/src/test/java/io/nats/client/api/ConsumerInfoTests.java +++ b/src/test/java/io/nats/client/api/ConsumerInfoTests.java @@ -28,20 +28,13 @@ public class ConsumerInfoTests { static JsonValue vConsumerInfo = JsonParser.parseUnchecked(dataAsString("ConsumerInfo.json")); - @Test - public void testTime() { - long start = System.currentTimeMillis(); - for (int x = 0; x < 1_000_000; x++) { - new ConsumerInfo(vConsumerInfo); - } - System.out.println(System.currentTimeMillis() - start); - } - @Test public void testConsumerInfo() { ConsumerInfo ci = new ConsumerInfo(vConsumerInfo); assertEquals("foo-stream", ci.getStreamName()); assertEquals("foo-consumer", ci.getName()); + assertEquals(DateTimeUtils.parseDateTime("2020-11-05T19:33:21.163377Z"), ci.getCreationTime()); + assertEquals(DateTimeUtils.parseDateTime("2023-08-29T19:33:21.163377Z"), ci.getTimestamp()); SequencePair sp = ci.getDelivered(); assertEquals(1, sp.getConsumerSequence()); diff --git a/src/test/java/io/nats/client/api/StreamInfoTests.java b/src/test/java/io/nats/client/api/StreamInfoTests.java index 0f948f006..f9def5c14 100644 --- a/src/test/java/io/nats/client/api/StreamInfoTests.java +++ b/src/test/java/io/nats/client/api/StreamInfoTests.java @@ -19,7 +19,6 @@ import org.junit.jupiter.api.Test; import java.time.Duration; -import java.time.ZonedDateTime; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -42,8 +41,8 @@ public void testStreamInfo() { } private void validateStreamInfo(StreamInfo si) { - ZonedDateTime zdt = DateTimeUtils.parseDateTime("2021-01-25T20:09:10.6225191Z"); - assertEquals(zdt, si.getCreateTime()); + assertEquals(DateTimeUtils.parseDateTime("2021-01-25T20:09:10.6225191Z"), si.getCreateTime()); + assertEquals(DateTimeUtils.parseDateTime("2023-08-29T19:33:21.163377Z"), si.getTimestamp()); StreamConfiguration sc = si.getConfiguration(); assertEquals("streamName", sc.getName()); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 22c202a30..b204cece3 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -315,6 +315,12 @@ public void testGetStreamInfo() throws Exception { assertEquals(0, si.getStreamState().getSubjects().size()); assertEquals(0, si.getStreamState().getDeletedCount()); assertEquals(0, si.getStreamState().getDeleted().size()); + if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { + assertNotNull(si.getTimestamp()); + } + else { + assertNull(si.getTimestamp()); + } List packs = new ArrayList<>(); JetStream js = nc.jetStream(); @@ -910,6 +916,12 @@ public void testGetConsumerInfo() throws Exception { assertEquals(STREAM, ci.getStreamName()); assertEquals(DURABLE, ci.getName()); assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(STREAM, durable(999))); + if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { + assertNotNull(ci.getTimestamp()); + } + else { + assertNull(ci.getTimestamp()); + } }); } diff --git a/src/test/resources/data/ConsumerInfo.json b/src/test/resources/data/ConsumerInfo.json index 0a540d2c3..0abc6702e 100644 --- a/src/test/resources/data/ConsumerInfo.json +++ b/src/test/resources/data/ConsumerInfo.json @@ -3,6 +3,7 @@ "stream_name": "foo-stream", "name": "foo-consumer", "created": "2020-11-05T19:33:21.163377Z", + "ts": "2023-08-29T19:33:21.163377Z", "config": { "durable_name": "foo-consumer", "deliver_subject": "bar", diff --git a/src/test/resources/data/StreamInfo.json b/src/test/resources/data/StreamInfo.json index 0d84120d9..ae7c1152a 100644 --- a/src/test/resources/data/StreamInfo.json +++ b/src/test/resources/data/StreamInfo.json @@ -26,6 +26,7 @@ } }, "created": "2021-01-25T20:09:10.6225191Z", + "ts": "2023-08-29T19:33:21.163377Z", "state": { "messages": 11, "bytes": 12, From fb086c21634f87f9044d2635d0e283a643f6c7c6 Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 29 Aug 2023 17:34:59 -0400 Subject: [PATCH 2/5] Stream and Consumer Info gathered timestamps --- src/main/java/io/nats/client/api/ConsumerInfo.java | 2 +- src/main/java/io/nats/client/api/StreamInfo.java | 2 +- src/main/java/io/nats/client/support/ApiConstants.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/nats/client/api/ConsumerInfo.java b/src/main/java/io/nats/client/api/ConsumerInfo.java index 4db77aaf0..1a1276f69 100644 --- a/src/main/java/io/nats/client/api/ConsumerInfo.java +++ b/src/main/java/io/nats/client/api/ConsumerInfo.java @@ -63,7 +63,7 @@ public ConsumerInfo(JsonValue vConsumerInfo) { clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER)); pushBound = readBoolean(jv, PUSH_BOUND); - timestamp = readDate(jv, TS); + timestamp = readDate(jv, TIMESTAMP); } public ConsumerConfiguration getConsumerConfiguration() { diff --git a/src/main/java/io/nats/client/api/StreamInfo.java b/src/main/java/io/nats/client/api/StreamInfo.java index 6a5bbe74f..56ecec54a 100644 --- a/src/main/java/io/nats/client/api/StreamInfo.java +++ b/src/main/java/io/nats/client/api/StreamInfo.java @@ -49,7 +49,7 @@ public StreamInfo(JsonValue vStreamInfo) { clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER)); mirrorInfo = MirrorInfo.optionalInstance(readValue(jv, MIRROR)); sourceInfos = SourceInfo.optionalListOf(readValue(jv, SOURCES)); - timestamp = readDate(jv, TS); + timestamp = readDate(jv, TIMESTAMP); } /** diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index 8bfca1ef1..03dffe1e8 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -183,7 +183,7 @@ public interface ApiConstants { String TEMPLATE_OWNER = "template_owner"; String TIERS = "tiers"; String TIME = "time"; - String TS = "ts"; + String TIMESTAMP = "ts"; String TLS = "tls_required"; String TOTAL = "total"; String TYPE = "type"; From b8b046242fc833fd6744044aaec120c37b64e392 Mon Sep 17 00:00:00 2001 From: scottf Date: Tue, 29 Aug 2023 21:54:29 -0400 Subject: [PATCH 3/5] StreamInfo first_seq --- .../nats/client/api/StreamConfiguration.java | 27 ++++++++++- .../io/nats/client/support/JsonUtils.java | 14 ++++++ .../io/nats/client/api/StreamInfoTests.java | 1 + .../client/impl/JetStreamManagementTests.java | 45 ++++++++++++------- .../nats/client/support/JsonUtilsTests.java | 9 ++++ src/test/resources/data/StreamInfo.json | 3 +- 6 files changed, 82 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/nats/client/api/StreamConfiguration.java b/src/main/java/io/nats/client/api/StreamConfiguration.java index b060eb058..c21cccf71 100644 --- a/src/main/java/io/nats/client/api/StreamConfiguration.java +++ b/src/main/java/io/nats/client/api/StreamConfiguration.java @@ -65,6 +65,7 @@ public class StreamConfiguration implements JsonSerializable { private final boolean denyPurge; private final boolean discardNewPerSubject; private final Map metadata; + private final long firstSequence; static StreamConfiguration instance(JsonValue v) { Builder builder = new Builder(); @@ -96,7 +97,7 @@ static StreamConfiguration instance(JsonValue v) { builder.denyPurge(readBoolean(v, DENY_PURGE)); builder.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT)); builder.metadata(readStringStringMap(v, METADATA)); - + builder.firstSeq(readLong(v, FIRST_SEQ, 1)); return builder.build(); } @@ -130,6 +131,7 @@ static StreamConfiguration instance(JsonValue v) { this.denyPurge = b.denyPurge; this.discardNewPerSubject = b.discardNewPerSubject; this.metadata = b.metadata; + this.firstSequence = b.firstSeq; } /** @@ -177,6 +179,8 @@ public String toJson() { addFldWhenTrue(sb, DISCARD_NEW_PER_SUBJECT, discardNewPerSubject); addField(sb, METADATA, metadata); + addFieldWhenGreaterThan(sb, FIRST_SEQ, firstSequence, 1); + return endJson(sb).toString(); } @@ -407,6 +411,14 @@ public Map getMetadata() { return metadata; } + /** + * The first sequence used in the stream. + * @return the first sequence + */ + public long getFirstSequence() { + return firstSequence; + } + @Override public String toString() { return "StreamConfiguration{" + @@ -431,6 +443,7 @@ public String toString() { ", mirrorDirect=" + mirrorDirect + ", denyDelete=" + denyDelete + ", denyPurge=" + denyPurge + + ", firstSequence=" + firstSequence + ", discardNewPerSubject=" + discardNewPerSubject + ", " + mirror + ", " + placement + @@ -493,6 +506,7 @@ public static class Builder { private boolean denyPurge = false; private boolean discardNewPerSubject = false; private Map metadata; + private long firstSeq = -1; /** * Default Builder @@ -535,6 +549,7 @@ public Builder(StreamConfiguration sc) { if (sc.metadata != null) { this.metadata = new HashMap<>(sc.metadata); } + this.firstSeq = sc.firstSequence; } } @@ -937,6 +952,16 @@ public Builder metadata(Map metadata) { return this; } + /** + * Sets the first sequence to be used. Must be greater than 0. 1 is the default. + * @param firstSeq specify the first_seq in the stream config when creating. + * @return Builder + */ + public Builder firstSeq(long firstSeq) { + this.firstSeq = validateGtZeroOrMinus1(firstSeq, "First Sequence"); + return this; + } + /** * Builds the StreamConfiguration * @return a stream configuration. diff --git a/src/main/java/io/nats/client/support/JsonUtils.java b/src/main/java/io/nats/client/support/JsonUtils.java index ff37ce44d..a1275a0b2 100644 --- a/src/main/java/io/nats/client/support/JsonUtils.java +++ b/src/main/java/io/nats/client/support/JsonUtils.java @@ -247,6 +247,20 @@ public static void addFieldWhenGteMinusOne(StringBuilder sb, String fname, Long } } + /** + * Appends a json field to a string builder. + * @param sb string builder + * @param fname fieldname + * @param value field value + */ + public static void addFieldWhenGreaterThan(StringBuilder sb, String fname, Long value, long gt) { + if (value != null && value > gt) { + sb.append(Q); + jsonEncode(sb, fname); + sb.append(QCOLON).append(value).append(COMMA); + } + } + /** * Appends a json field to a string builder. * @param sb string builder diff --git a/src/test/java/io/nats/client/api/StreamInfoTests.java b/src/test/java/io/nats/client/api/StreamInfoTests.java index f9def5c14..9f3f2b2f7 100644 --- a/src/test/java/io/nats/client/api/StreamInfoTests.java +++ b/src/test/java/io/nats/client/api/StreamInfoTests.java @@ -50,6 +50,7 @@ private void validateStreamInfo(StreamInfo si) { assertEquals("sub0", sc.getSubjects().get(0)); assertEquals("sub1", sc.getSubjects().get(1)); assertEquals("x.>", sc.getSubjects().get(2)); + assertEquals(82966, sc.getFirstSequence()); assertEquals(RetentionPolicy.Limits, sc.getRetentionPolicy()); assertEquals(DiscardPolicy.Old, sc.getDiscardPolicy()); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index b204cece3..b911f9342 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -302,45 +302,60 @@ public void testGetStreamInfo() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); assertThrows(JetStreamApiException.class, () -> jsm.getStreamInfo(STREAM)); + JetStream js = nc.jetStream(); + if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { + String stream = stream(); + StreamConfiguration sc = StreamConfiguration.builder() + .name(stream) + .storageType(StorageType.Memory) + .firstSeq(42) + .subjects("test-first-seq").build(); + StreamInfo si = jsm.addStream(sc); + assertNotNull(si.getTimestamp()); + assertEquals(42, si.getConfiguration().getFirstSequence()); + PublishAck pa = js.publish("test-first-seq", null); + assertEquals(42, pa.getSeqno()); + } + String[] subjects = new String[6]; for (int x = 0; x < 5; x++) { subjects[x] = subject(x + 1); } subjects[5] = "foo.>"; - createMemoryStream(jsm, STREAM, subjects); - StreamInfo si = jsm.getStreamInfo(STREAM); - assertEquals(STREAM, si.getConfiguration().getName()); + String stream = stream(); + createMemoryStream(jsm, stream, subjects); + + StreamInfo si = jsm.getStreamInfo(stream); + assertEquals(stream, si.getConfiguration().getName()); assertEquals(0, si.getStreamState().getSubjectCount()); assertEquals(0, si.getStreamState().getSubjects().size()); assertEquals(0, si.getStreamState().getDeletedCount()); assertEquals(0, si.getStreamState().getDeleted().size()); - if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { - assertNotNull(si.getTimestamp()); - } - else { + + if (nc.getServerInfo().isOlderThanVersion("2.10")) { assertNull(si.getTimestamp()); } + assertEquals(1, si.getConfiguration().getFirstSequence()); List packs = new ArrayList<>(); - JetStream js = nc.jetStream(); for (int x = 0; x < 5; x++) { jsPublish(js, subject(x + 1), x + 1); PublishAck pa = jsPublish(js, subject(x + 1), data(x + 2)); packs.add(pa); - jsm.deleteMessage(STREAM, pa.getSeqno()); + jsm.deleteMessage(stream, pa.getSeqno()); } jsPublish(js, "foo.bar", 6); - si = jsm.getStreamInfo(STREAM); - assertEquals(STREAM, si.getConfiguration().getName()); + si = jsm.getStreamInfo(stream); + assertEquals(stream, si.getConfiguration().getName()); assertEquals(6, si.getStreamState().getSubjectCount()); assertEquals(0, si.getStreamState().getSubjects().size()); assertEquals(5, si.getStreamState().getDeletedCount()); assertEquals(0, si.getStreamState().getDeleted().size()); - si = jsm.getStreamInfo(STREAM, StreamInfoOptions.builder().allSubjects().deletedDetails().build()); - assertEquals(STREAM, si.getConfiguration().getName()); + si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().allSubjects().deletedDetails().build()); + assertEquals(stream, si.getConfiguration().getName()); assertEquals(6, si.getStreamState().getSubjectCount()); List list = si.getStreamState().getSubjects(); assertNotNull(list); @@ -367,7 +382,7 @@ public void testGetStreamInfo() throws Exception { jsPublish(js, "foo.baz", 2); sleep(100); - si = jsm.getStreamInfo(STREAM, StreamInfoOptions.builder().filterSubjects("foo.>").deletedDetails().build()); + si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().filterSubjects("foo.>").deletedDetails().build()); assertEquals(7, si.getStreamState().getSubjectCount()); list = si.getStreamState().getSubjects(); assertNotNull(list); @@ -383,7 +398,7 @@ public void testGetStreamInfo() throws Exception { assertNotNull(s); assertEquals(2, s.getCount()); - si = jsm.getStreamInfo(STREAM, StreamInfoOptions.builder().filterSubjects(subject(5)).build()); + si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().filterSubjects(subject(5)).build()); list = si.getStreamState().getSubjects(); assertNotNull(list); assertEquals(1, list.size()); diff --git a/src/test/java/io/nats/client/support/JsonUtilsTests.java b/src/test/java/io/nats/client/support/JsonUtilsTests.java index 1ea381627..f64f40eed 100644 --- a/src/test/java/io/nats/client/support/JsonUtilsTests.java +++ b/src/test/java/io/nats/client/support/JsonUtilsTests.java @@ -241,6 +241,15 @@ public void testAddFields() { addField(sb, "zdt", DateTimeUtils.gmtNow()); assertEquals(180, sb.length()); + + addFieldWhenGreaterThan(sb, "xgt", 0L, 1); + assertEquals(180, sb.length()); + + addFieldWhenGreaterThan(sb, "xgt", 1L, 1); + assertEquals(180, sb.length()); + + addFieldWhenGreaterThan(sb, "xgt", 2L, 1); + assertEquals(188, sb.length()); } static final String EXPECTED_LIST_JSON = "{\"a1\":[\"one\"],\"a2\":[\"two\",\"too\"],\"l1\":[\"one\"],\"l2\":[\"two\",\"too\"],\"j1\":[{\"filter\":\"sub1\",\"keep\":421}],\"j2\":[{\"filter\":\"sub2\",\"seq\":732},{\"filter\":\"sub3\"}],\"d1\":[1000000],\"d2\":[2000000,3000000]}"; diff --git a/src/test/resources/data/StreamInfo.json b/src/test/resources/data/StreamInfo.json index ae7c1152a..657c6013d 100644 --- a/src/test/resources/data/StreamInfo.json +++ b/src/test/resources/data/StreamInfo.json @@ -23,7 +23,8 @@ "ptag1", "ptag2" ] - } + }, + "first_seq": 82966 }, "created": "2021-01-25T20:09:10.6225191Z", "ts": "2023-08-29T19:33:21.163377Z", From a6db81c10f094b6c5173e11386c1089338d0246a Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 30 Aug 2023 11:54:39 -0400 Subject: [PATCH 4/5] StreamInfo first_seq better tests --- .../nats/client/api/StreamConfiguration.java | 19 ++++++------- .../java/io/nats/client/api/StreamState.java | 16 +++++------ .../client/api/StreamConfigurationTests.java | 4 ++- .../io/nats/client/api/StreamInfoTests.java | 2 +- .../client/impl/JetStreamManagementTests.java | 28 ++++++++++--------- .../resources/data/StreamConfiguration.json | 1 + src/test/resources/data/StreamInfo.json | 2 +- 7 files changed, 38 insertions(+), 34 deletions(-) diff --git a/src/main/java/io/nats/client/api/StreamConfiguration.java b/src/main/java/io/nats/client/api/StreamConfiguration.java index c21cccf71..f25a861fe 100644 --- a/src/main/java/io/nats/client/api/StreamConfiguration.java +++ b/src/main/java/io/nats/client/api/StreamConfiguration.java @@ -97,7 +97,7 @@ static StreamConfiguration instance(JsonValue v) { builder.denyPurge(readBoolean(v, DENY_PURGE)); builder.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT)); builder.metadata(readStringStringMap(v, METADATA)); - builder.firstSeq(readLong(v, FIRST_SEQ, 1)); + builder.firstSequence(readLong(v, FIRST_SEQ, 1)); return builder.build(); } @@ -131,7 +131,7 @@ static StreamConfiguration instance(JsonValue v) { this.denyPurge = b.denyPurge; this.discardNewPerSubject = b.discardNewPerSubject; this.metadata = b.metadata; - this.firstSequence = b.firstSeq; + this.firstSequence = b.firstSequence; } /** @@ -178,7 +178,6 @@ public String toJson() { addFldWhenTrue(sb, DENY_PURGE, denyPurge); addFldWhenTrue(sb, DISCARD_NEW_PER_SUBJECT, discardNewPerSubject); addField(sb, METADATA, metadata); - addFieldWhenGreaterThan(sb, FIRST_SEQ, firstSequence, 1); return endJson(sb).toString(); @@ -443,8 +442,8 @@ public String toString() { ", mirrorDirect=" + mirrorDirect + ", denyDelete=" + denyDelete + ", denyPurge=" + denyPurge + - ", firstSequence=" + firstSequence + ", discardNewPerSubject=" + discardNewPerSubject + + ", firstSequence=" + firstSequence + ", " + mirror + ", " + placement + ", sources=" + sources + @@ -506,7 +505,7 @@ public static class Builder { private boolean denyPurge = false; private boolean discardNewPerSubject = false; private Map metadata; - private long firstSeq = -1; + private long firstSequence = 1; /** * Default Builder @@ -549,7 +548,7 @@ public Builder(StreamConfiguration sc) { if (sc.metadata != null) { this.metadata = new HashMap<>(sc.metadata); } - this.firstSeq = sc.firstSequence; + this.firstSequence = sc.firstSequence; } } @@ -953,12 +952,12 @@ public Builder metadata(Map metadata) { } /** - * Sets the first sequence to be used. Must be greater than 0. 1 is the default. - * @param firstSeq specify the first_seq in the stream config when creating. + * Sets the first sequence to be used. 1 is the default. All values less than 2 are treated as 1. + * @param firstSeq specify the first_seq in the stream config when creating the stream. * @return Builder */ - public Builder firstSeq(long firstSeq) { - this.firstSeq = validateGtZeroOrMinus1(firstSeq, "First Sequence"); + public Builder firstSequence(long firstSeq) { + this.firstSequence = firstSeq > 1 ? firstSeq : 1; return this; } diff --git a/src/main/java/io/nats/client/api/StreamState.java b/src/main/java/io/nats/client/api/StreamState.java index be1230dfb..6a5aba05b 100644 --- a/src/main/java/io/nats/client/api/StreamState.java +++ b/src/main/java/io/nats/client/api/StreamState.java @@ -24,8 +24,8 @@ public class StreamState { private final long msgs; private final long bytes; - private final long firstSeq; - private final long lastSeq; + private final long firstSequence; + private final long lastSequence; private final long consumerCount; private final long subjectCount; private final long deletedCount; @@ -38,8 +38,8 @@ public class StreamState { StreamState(JsonValue vStreamState) { msgs = readLong(vStreamState, MESSAGES, 0); bytes = readLong(vStreamState, BYTES, 0); - firstSeq = readLong(vStreamState, FIRST_SEQ, 0); - lastSeq = readLong(vStreamState, LAST_SEQ, 0); + firstSequence = readLong(vStreamState, FIRST_SEQ, 0); + lastSequence = readLong(vStreamState, LAST_SEQ, 0); consumerCount = readLong(vStreamState, CONSUMER_COUNT, 0); firstTime = readDate(vStreamState, FIRST_TS); lastTime = readDate(vStreamState, LAST_TS); @@ -74,7 +74,7 @@ public long getByteCount() { * @return a sequence number */ public long getFirstSequence() { - return firstSeq; + return firstSequence; } /** @@ -92,7 +92,7 @@ public ZonedDateTime getFirstTime() { * @return a sequence number */ public long getLastSequence() { - return lastSeq; + return lastSequence; } /** @@ -162,8 +162,8 @@ public String toString() { return "StreamState{" + "msgs=" + msgs + ", bytes=" + bytes + - ", firstSeq=" + firstSeq + - ", lastSeq=" + lastSeq + + ", firstSeq=" + firstSequence + + ", lastSeq=" + lastSequence + ", consumerCount=" + consumerCount + ", firstTime=" + firstTime + ", lastTime=" + lastTime + diff --git a/src/test/java/io/nats/client/api/StreamConfigurationTests.java b/src/test/java/io/nats/client/api/StreamConfigurationTests.java index 218c02bca..5a2c55e7c 100644 --- a/src/test/java/io/nats/client/api/StreamConfigurationTests.java +++ b/src/test/java/io/nats/client/api/StreamConfigurationTests.java @@ -98,7 +98,8 @@ public void testConstruction() { .denyDelete(testSc.getDenyDelete()) .denyPurge(testSc.getDenyPurge()) .discardNewPerSubject(testSc.isDiscardNewPerSubject()) - .metadata(metaData); + .metadata(metaData) + .firstSequence(82942); validate(builder.build(), false); validate(builder.addSources((Source)null).build(), false); @@ -426,6 +427,7 @@ private void validate(StreamConfiguration sc, boolean serverTest) { assertEquals(1, sc.getMetadata().size()); assertEquals("meta-bar", sc.getMetadata().get("meta-foo")); + assertEquals(82942, sc.getFirstSequence()); } } diff --git a/src/test/java/io/nats/client/api/StreamInfoTests.java b/src/test/java/io/nats/client/api/StreamInfoTests.java index 9f3f2b2f7..8a2e6dd4f 100644 --- a/src/test/java/io/nats/client/api/StreamInfoTests.java +++ b/src/test/java/io/nats/client/api/StreamInfoTests.java @@ -50,7 +50,7 @@ private void validateStreamInfo(StreamInfo si) { assertEquals("sub0", sc.getSubjects().get(0)); assertEquals("sub1", sc.getSubjects().get(1)); assertEquals("x.>", sc.getSubjects().get(2)); - assertEquals(82966, sc.getFirstSequence()); + assertEquals(82942, sc.getFirstSequence()); assertEquals(RetentionPolicy.Limits, sc.getRetentionPolicy()); assertEquals(DiscardPolicy.Old, sc.getDiscardPolicy()); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index b911f9342..ac4a1ae75 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -81,6 +81,21 @@ public void testStreamCreate() throws Exception { assertEquals(0, ss.getFirstSequence()); assertEquals(0, ss.getLastSequence()); assertEquals(0, ss.getConsumerCount()); + + if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { + JetStream js = nc.jetStream(); + String stream = stream(); + sc = StreamConfiguration.builder() + .name(stream) + .storageType(StorageType.Memory) + .firstSequence(42) + .subjects("test-first-seq").build(); + si = jsm.addStream(sc); + assertNotNull(si.getTimestamp()); + assertEquals(42, si.getConfiguration().getFirstSequence()); + PublishAck pa = js.publish("test-first-seq", null); + assertEquals(42, pa.getSeqno()); + } }); } @@ -303,19 +318,6 @@ public void testGetStreamInfo() throws Exception { assertThrows(JetStreamApiException.class, () -> jsm.getStreamInfo(STREAM)); JetStream js = nc.jetStream(); - if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { - String stream = stream(); - StreamConfiguration sc = StreamConfiguration.builder() - .name(stream) - .storageType(StorageType.Memory) - .firstSeq(42) - .subjects("test-first-seq").build(); - StreamInfo si = jsm.addStream(sc); - assertNotNull(si.getTimestamp()); - assertEquals(42, si.getConfiguration().getFirstSequence()); - PublishAck pa = js.publish("test-first-seq", null); - assertEquals(42, pa.getSeqno()); - } String[] subjects = new String[6]; for (int x = 0; x < 5; x++) { diff --git a/src/test/resources/data/StreamConfiguration.json b/src/test/resources/data/StreamConfiguration.json index 331dcdafb..c034e6da4 100644 --- a/src/test/resources/data/StreamConfiguration.json +++ b/src/test/resources/data/StreamConfiguration.json @@ -23,6 +23,7 @@ "allow_direct": true, "mirror_direct": true, "metadata":{"meta-foo":"meta-bar"}, + "first_seq": 82942, "placement": { "cluster": "clstr", "tags": ["tag1", "tag2"] diff --git a/src/test/resources/data/StreamInfo.json b/src/test/resources/data/StreamInfo.json index 657c6013d..2ea6ec4a1 100644 --- a/src/test/resources/data/StreamInfo.json +++ b/src/test/resources/data/StreamInfo.json @@ -24,7 +24,7 @@ "ptag2" ] }, - "first_seq": 82966 + "first_seq": 82942 }, "created": "2021-01-25T20:09:10.6225191Z", "ts": "2023-08-29T19:33:21.163377Z", From 1d1e8d867124e370ff492f5f694bd32969361748 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 30 Aug 2023 12:26:27 -0400 Subject: [PATCH 5/5] StreamConfiguration first_seq better tests --- .../java/io/nats/client/api/StreamState.java | 19 +++++++++---------- .../client/impl/JetStreamManagementTests.java | 3 +++ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/nats/client/api/StreamState.java b/src/main/java/io/nats/client/api/StreamState.java index 6a5aba05b..37027e549 100644 --- a/src/main/java/io/nats/client/api/StreamState.java +++ b/src/main/java/io/nats/client/api/StreamState.java @@ -24,8 +24,8 @@ public class StreamState { private final long msgs; private final long bytes; - private final long firstSequence; - private final long lastSequence; + private final long firstSeq; + private final long lastSeq; private final long consumerCount; private final long subjectCount; private final long deletedCount; @@ -38,8 +38,8 @@ public class StreamState { StreamState(JsonValue vStreamState) { msgs = readLong(vStreamState, MESSAGES, 0); bytes = readLong(vStreamState, BYTES, 0); - firstSequence = readLong(vStreamState, FIRST_SEQ, 0); - lastSequence = readLong(vStreamState, LAST_SEQ, 0); + firstSeq = readLong(vStreamState, FIRST_SEQ, 0); + lastSeq = readLong(vStreamState, LAST_SEQ, 0); consumerCount = readLong(vStreamState, CONSUMER_COUNT, 0); firstTime = readDate(vStreamState, FIRST_TS); lastTime = readDate(vStreamState, LAST_TS); @@ -69,12 +69,11 @@ public long getByteCount() { } /** - * Gets the first sequence number of the stream. - * + * Gets the first sequence number of the stream. May be 0 if there are no messages. * @return a sequence number */ public long getFirstSequence() { - return firstSequence; + return firstSeq; } /** @@ -92,7 +91,7 @@ public ZonedDateTime getFirstTime() { * @return a sequence number */ public long getLastSequence() { - return lastSequence; + return lastSeq; } /** @@ -162,8 +161,8 @@ public String toString() { return "StreamState{" + "msgs=" + msgs + ", bytes=" + bytes + - ", firstSeq=" + firstSequence + - ", lastSeq=" + lastSequence + + ", firstSeq=" + firstSeq + + ", lastSeq=" + lastSeq + ", consumerCount=" + consumerCount + ", firstTime=" + firstTime + ", lastTime=" + lastTime + diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index ac4a1ae75..4343b7487 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -338,6 +338,9 @@ public void testGetStreamInfo() throws Exception { if (nc.getServerInfo().isOlderThanVersion("2.10")) { assertNull(si.getTimestamp()); } + else { + assertNotNull(si.getTimestamp()); + } assertEquals(1, si.getConfiguration().getFirstSequence()); List packs = new ArrayList<>();