Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream/consumer info timestamps, stream config first_seq #962

Merged
merged 5 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class ConsumerInfo extends ApiResponse<ConsumerInfo> {
private final long numRedelivered;
private final ClusterInfo clusterInfo;
private final boolean pushBound;
private final ZonedDateTime timestamp;

public ConsumerInfo(Message msg) {
this(parseMessage(msg));
Expand All @@ -61,6 +62,8 @@ public ConsumerInfo(JsonValue vConsumerInfo) {

clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER));
pushBound = readBoolean(jv, PUSH_BOUND);

timestamp = readDate(jv, TIMESTAMP);
}

public ConsumerConfiguration getConsumerConfiguration() {
Expand All @@ -75,6 +78,10 @@ public String getStreamName() {
return stream;
}

/**
* Gets the creation time of the consumer.
* @return the creation date and time.
*/
public ZonedDateTime getCreationTime() {
return created;
}
Expand Down Expand Up @@ -111,6 +118,14 @@ public boolean isPushBound() {
return pushBound;
}

/**
* Gets the server time the info was gathered
* @return the server gathered timed
*/
public ZonedDateTime getTimestamp() {
return timestamp;
}

public long getCalculatedPending() {
return numPending + delivered.getConsumerSequence();
}
Expand Down
26 changes: 25 additions & 1 deletion src/main/java/io/nats/client/api/StreamConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class StreamConfiguration implements JsonSerializable {
private final boolean denyPurge;
private final boolean discardNewPerSubject;
private final Map<String, String> metadata;
private final long firstSequence;

static StreamConfiguration instance(JsonValue v) {
Builder builder = new Builder();
Expand Down Expand Up @@ -96,7 +97,7 @@ static StreamConfiguration instance(JsonValue v) {
builder.denyPurge(readBoolean(v, DENY_PURGE));
builder.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT));
builder.metadata(readStringStringMap(v, METADATA));

builder.firstSequence(readLong(v, FIRST_SEQ, 1));
return builder.build();
}

Expand Down Expand Up @@ -130,6 +131,7 @@ static StreamConfiguration instance(JsonValue v) {
this.denyPurge = b.denyPurge;
this.discardNewPerSubject = b.discardNewPerSubject;
this.metadata = b.metadata;
this.firstSequence = b.firstSequence;
}

/**
Expand Down Expand Up @@ -176,6 +178,7 @@ public String toJson() {
addFldWhenTrue(sb, DENY_PURGE, denyPurge);
addFldWhenTrue(sb, DISCARD_NEW_PER_SUBJECT, discardNewPerSubject);
addField(sb, METADATA, metadata);
addFieldWhenGreaterThan(sb, FIRST_SEQ, firstSequence, 1);

return endJson(sb).toString();
}
Expand Down Expand Up @@ -407,6 +410,14 @@ public Map<String, String> getMetadata() {
return metadata;
}

/**
* The first sequence used in the stream.
* @return the first sequence
*/
public long getFirstSequence() {
return firstSequence;
}

@Override
public String toString() {
return "StreamConfiguration{" +
Expand All @@ -432,6 +443,7 @@ public String toString() {
", denyDelete=" + denyDelete +
", denyPurge=" + denyPurge +
", discardNewPerSubject=" + discardNewPerSubject +
", firstSequence=" + firstSequence +
", " + mirror +
", " + placement +
", sources=" + sources +
Expand Down Expand Up @@ -493,6 +505,7 @@ public static class Builder {
private boolean denyPurge = false;
private boolean discardNewPerSubject = false;
private Map<String, String> metadata;
private long firstSequence = 1;

/**
* Default Builder
Expand Down Expand Up @@ -535,6 +548,7 @@ public Builder(StreamConfiguration sc) {
if (sc.metadata != null) {
this.metadata = new HashMap<>(sc.metadata);
}
this.firstSequence = sc.firstSequence;
}
}

Expand Down Expand Up @@ -937,6 +951,16 @@ public Builder metadata(Map<String, String> metadata) {
return this;
}

/**
* Sets the first sequence to be used. 1 is the default. All values less than 2 are treated as 1.
* @param firstSeq specify the first_seq in the stream config when creating the stream.
* @return Builder
*/
public Builder firstSequence(long firstSeq) {
this.firstSequence = firstSeq > 1 ? firstSeq : 1;
return this;
}

/**
* Builds the StreamConfiguration
* @return a stream configuration.
Expand Down
28 changes: 21 additions & 7 deletions src/main/java/io/nats/client/api/StreamInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,29 @@
*/
public class StreamInfo extends ApiResponse<StreamInfo> {

private final ZonedDateTime created;
private final ZonedDateTime createTime;
private final StreamConfiguration config;
private final StreamState state;
private final StreamState streamState;
private final ClusterInfo clusterInfo;
private final MirrorInfo mirrorInfo;
private final List<SourceInfo> sourceInfos;
private final ZonedDateTime timestamp;

public StreamInfo(Message msg) {
this(parseUnchecked(msg.getData()));
}

public StreamInfo(JsonValue vStreamInfo) {
super(vStreamInfo);
created = readDate(jv, CREATED);
createTime = readDate(jv, CREATED);
config = StreamConfiguration.instance(readValue(jv, CONFIG));
state = new StreamState(readValue(jv, STATE));
streamState = new StreamState(readValue(jv, STATE));
clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER));
mirrorInfo = MirrorInfo.optionalInstance(readValue(jv, MIRROR));
sourceInfos = SourceInfo.optionalListOf(readValue(jv, SOURCES));
timestamp = readDate(jv, TIMESTAMP);
}

/**
* Gets the stream configuration.
* @return the stream configuration.
Expand All @@ -63,15 +65,15 @@ public StreamConfiguration getConfiguration() {
* @return the stream state
*/
public StreamState getStreamState() {
return state;
return streamState;
}

/**
* Gets the creation time of the stream.
* @return the creation date and time.
*/
public ZonedDateTime getCreateTime() {
return created;
return createTime;
}

public MirrorInfo getMirrorInfo() {
Expand All @@ -85,4 +87,16 @@ public List<SourceInfo> getSourceInfos() {
public ClusterInfo getClusterInfo() {
return clusterInfo;
}

public StreamConfiguration getConfig() {
return config;
}

/**
* Gets the server time the info was gathered
* @return the server gathered timed
*/
public ZonedDateTime getTimestamp() {
return timestamp;
}
}
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/api/StreamState.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public long getByteCount() {
}

/**
* Gets the first sequence number of the stream.
*
* Gets the first sequence number of the stream. May be 0 if there are no messages.
* @return a sequence number
*/
public long getFirstSequence() {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public interface ApiConstants {
String TEMPLATE_OWNER = "template_owner";
String TIERS = "tiers";
String TIME = "time";
String TIMESTAMP = "ts";
String TLS = "tls_required";
String TOTAL = "total";
String TYPE = "type";
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/io/nats/client/support/JsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,20 @@ public static void addFieldWhenGteMinusOne(StringBuilder sb, String fname, Long
}
}

/**
* Appends a json field to a string builder.
* @param sb string builder
* @param fname fieldname
* @param value field value
*/
public static void addFieldWhenGreaterThan(StringBuilder sb, String fname, Long value, long gt) {
if (value != null && value > gt) {
sb.append(Q);
jsonEncode(sb, fname);
sb.append(QCOLON).append(value).append(COMMA);
}
}

/**
* Appends a json field to a string builder.
* @param sb string builder
Expand Down
11 changes: 2 additions & 9 deletions src/test/java/io/nats/client/api/ConsumerInfoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,13 @@ public class ConsumerInfoTests {

static JsonValue vConsumerInfo = JsonParser.parseUnchecked(dataAsString("ConsumerInfo.json"));

@Test
public void testTime() {
long start = System.currentTimeMillis();
for (int x = 0; x < 1_000_000; x++) {
new ConsumerInfo(vConsumerInfo);
}
System.out.println(System.currentTimeMillis() - start);
}

@Test
public void testConsumerInfo() {
ConsumerInfo ci = new ConsumerInfo(vConsumerInfo);
assertEquals("foo-stream", ci.getStreamName());
assertEquals("foo-consumer", ci.getName());
assertEquals(DateTimeUtils.parseDateTime("2020-11-05T19:33:21.163377Z"), ci.getCreationTime());
assertEquals(DateTimeUtils.parseDateTime("2023-08-29T19:33:21.163377Z"), ci.getTimestamp());

SequencePair sp = ci.getDelivered();
assertEquals(1, sp.getConsumerSequence());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public void testConstruction() {
.denyDelete(testSc.getDenyDelete())
.denyPurge(testSc.getDenyPurge())
.discardNewPerSubject(testSc.isDiscardNewPerSubject())
.metadata(metaData);
.metadata(metaData)
.firstSequence(82942);
validate(builder.build(), false);
validate(builder.addSources((Source)null).build(), false);

Expand Down Expand Up @@ -426,6 +427,7 @@ private void validate(StreamConfiguration sc, boolean serverTest) {

assertEquals(1, sc.getMetadata().size());
assertEquals("meta-bar", sc.getMetadata().get("meta-foo"));
assertEquals(82942, sc.getFirstSequence());
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/nats/client/api/StreamInfoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -42,15 +41,16 @@ public void testStreamInfo() {
}

private void validateStreamInfo(StreamInfo si) {
ZonedDateTime zdt = DateTimeUtils.parseDateTime("2021-01-25T20:09:10.6225191Z");
assertEquals(zdt, si.getCreateTime());
assertEquals(DateTimeUtils.parseDateTime("2021-01-25T20:09:10.6225191Z"), si.getCreateTime());
assertEquals(DateTimeUtils.parseDateTime("2023-08-29T19:33:21.163377Z"), si.getTimestamp());

StreamConfiguration sc = si.getConfiguration();
assertEquals("streamName", sc.getName());
assertEquals(3, sc.getSubjects().size());
assertEquals("sub0", sc.getSubjects().get(0));
assertEquals("sub1", sc.getSubjects().get(1));
assertEquals("x.>", sc.getSubjects().get(2));
assertEquals(82942, sc.getFirstSequence());

assertEquals(RetentionPolicy.Limits, sc.getRetentionPolicy());
assertEquals(DiscardPolicy.Old, sc.getDiscardPolicy());
Expand Down