Skip to content

Commit

Permalink
stream/consumer info timestamps, stream configuration (#962)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 4, 2023
1 parent 9c93c28 commit 15cb79d
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 35 deletions.
15 changes: 15 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class ConsumerInfo extends ApiResponse<ConsumerInfo> {
private final long numRedelivered;
private final ClusterInfo clusterInfo;
private final boolean pushBound;
private final ZonedDateTime timestamp;

public ConsumerInfo(Message msg) {
this(parseMessage(msg));
Expand All @@ -61,6 +62,8 @@ public ConsumerInfo(JsonValue vConsumerInfo) {

clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER));
pushBound = readBoolean(jv, PUSH_BOUND);

timestamp = readDate(jv, TIMESTAMP);
}

public ConsumerConfiguration getConsumerConfiguration() {
Expand All @@ -75,6 +78,10 @@ public String getStreamName() {
return stream;
}

/**
* Gets the creation time of the consumer.
* @return the creation date and time.
*/
public ZonedDateTime getCreationTime() {
return created;
}
Expand Down Expand Up @@ -111,6 +118,14 @@ public boolean isPushBound() {
return pushBound;
}

/**
* Gets the server time the info was gathered
* @return the server gathered timed
*/
public ZonedDateTime getTimestamp() {
return timestamp;
}

public long getCalculatedPending() {
return numPending + delivered.getConsumerSequence();
}
Expand Down
26 changes: 25 additions & 1 deletion src/main/java/io/nats/client/api/StreamConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class StreamConfiguration implements JsonSerializable {
private final boolean denyPurge;
private final boolean discardNewPerSubject;
private final Map<String, String> metadata;
private final long firstSequence;

static StreamConfiguration instance(JsonValue v) {
Builder builder = new Builder();
Expand Down Expand Up @@ -96,7 +97,7 @@ static StreamConfiguration instance(JsonValue v) {
builder.denyPurge(readBoolean(v, DENY_PURGE));
builder.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT));
builder.metadata(readStringStringMap(v, METADATA));

builder.firstSequence(readLong(v, FIRST_SEQ, 1));
return builder.build();
}

Expand Down Expand Up @@ -130,6 +131,7 @@ static StreamConfiguration instance(JsonValue v) {
this.denyPurge = b.denyPurge;
this.discardNewPerSubject = b.discardNewPerSubject;
this.metadata = b.metadata;
this.firstSequence = b.firstSequence;
}

/**
Expand Down Expand Up @@ -176,6 +178,7 @@ public String toJson() {
addFldWhenTrue(sb, DENY_PURGE, denyPurge);
addFldWhenTrue(sb, DISCARD_NEW_PER_SUBJECT, discardNewPerSubject);
addField(sb, METADATA, metadata);
addFieldWhenGreaterThan(sb, FIRST_SEQ, firstSequence, 1);

return endJson(sb).toString();
}
Expand Down Expand Up @@ -407,6 +410,14 @@ public Map<String, String> getMetadata() {
return metadata;
}

/**
* The first sequence used in the stream.
* @return the first sequence
*/
public long getFirstSequence() {
return firstSequence;
}

@Override
public String toString() {
return "StreamConfiguration{" +
Expand All @@ -432,6 +443,7 @@ public String toString() {
", denyDelete=" + denyDelete +
", denyPurge=" + denyPurge +
", discardNewPerSubject=" + discardNewPerSubject +
", firstSequence=" + firstSequence +
", " + mirror +
", " + placement +
", sources=" + sources +
Expand Down Expand Up @@ -493,6 +505,7 @@ public static class Builder {
private boolean denyPurge = false;
private boolean discardNewPerSubject = false;
private Map<String, String> metadata;
private long firstSequence = 1;

/**
* Default Builder
Expand Down Expand Up @@ -535,6 +548,7 @@ public Builder(StreamConfiguration sc) {
if (sc.metadata != null) {
this.metadata = new HashMap<>(sc.metadata);
}
this.firstSequence = sc.firstSequence;
}
}

Expand Down Expand Up @@ -937,6 +951,16 @@ public Builder metadata(Map<String, String> metadata) {
return this;
}

/**
* Sets the first sequence to be used. 1 is the default. All values less than 2 are treated as 1.
* @param firstSeq specify the first_seq in the stream config when creating the stream.
* @return Builder
*/
public Builder firstSequence(long firstSeq) {
this.firstSequence = firstSeq > 1 ? firstSeq : 1;
return this;
}

/**
* Builds the StreamConfiguration
* @return a stream configuration.
Expand Down
28 changes: 21 additions & 7 deletions src/main/java/io/nats/client/api/StreamInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,29 @@
*/
public class StreamInfo extends ApiResponse<StreamInfo> {

private final ZonedDateTime created;
private final ZonedDateTime createTime;
private final StreamConfiguration config;
private final StreamState state;
private final StreamState streamState;
private final ClusterInfo clusterInfo;
private final MirrorInfo mirrorInfo;
private final List<SourceInfo> sourceInfos;
private final ZonedDateTime timestamp;

public StreamInfo(Message msg) {
this(parseUnchecked(msg.getData()));
}

public StreamInfo(JsonValue vStreamInfo) {
super(vStreamInfo);
created = readDate(jv, CREATED);
createTime = readDate(jv, CREATED);
config = StreamConfiguration.instance(readValue(jv, CONFIG));
state = new StreamState(readValue(jv, STATE));
streamState = new StreamState(readValue(jv, STATE));
clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER));
mirrorInfo = MirrorInfo.optionalInstance(readValue(jv, MIRROR));
sourceInfos = SourceInfo.optionalListOf(readValue(jv, SOURCES));
timestamp = readDate(jv, TIMESTAMP);
}

/**
* Gets the stream configuration.
* @return the stream configuration.
Expand All @@ -63,15 +65,15 @@ public StreamConfiguration getConfiguration() {
* @return the stream state
*/
public StreamState getStreamState() {
return state;
return streamState;
}

/**
* Gets the creation time of the stream.
* @return the creation date and time.
*/
public ZonedDateTime getCreateTime() {
return created;
return createTime;
}

public MirrorInfo getMirrorInfo() {
Expand All @@ -85,4 +87,16 @@ public List<SourceInfo> getSourceInfos() {
public ClusterInfo getClusterInfo() {
return clusterInfo;
}

public StreamConfiguration getConfig() {
return config;
}

/**
* Gets the server time the info was gathered
* @return the server gathered timed
*/
public ZonedDateTime getTimestamp() {
return timestamp;
}
}
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/api/StreamState.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public long getByteCount() {
}

/**
* Gets the first sequence number of the stream.
*
* Gets the first sequence number of the stream. May be 0 if there are no messages.
* @return a sequence number
*/
public long getFirstSequence() {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public interface ApiConstants {
String TEMPLATE_OWNER = "template_owner";
String TIERS = "tiers";
String TIME = "time";
String TIMESTAMP = "ts";
String TLS = "tls_required";
String TOTAL = "total";
String TYPE = "type";
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/io/nats/client/support/JsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,20 @@ public static void addFieldWhenGteMinusOne(StringBuilder sb, String fname, Long
}
}

/**
* Appends a json field to a string builder.
* @param sb string builder
* @param fname fieldname
* @param value field value
*/
public static void addFieldWhenGreaterThan(StringBuilder sb, String fname, Long value, long gt) {
if (value != null && value > gt) {
sb.append(Q);
jsonEncode(sb, fname);
sb.append(QCOLON).append(value).append(COMMA);
}
}

/**
* Appends a json field to a string builder.
* @param sb string builder
Expand Down
11 changes: 2 additions & 9 deletions src/test/java/io/nats/client/api/ConsumerInfoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,13 @@ public class ConsumerInfoTests {

static JsonValue vConsumerInfo = JsonParser.parseUnchecked(dataAsString("ConsumerInfo.json"));

@Test
public void testTime() {
long start = System.currentTimeMillis();
for (int x = 0; x < 1_000_000; x++) {
new ConsumerInfo(vConsumerInfo);
}
System.out.println(System.currentTimeMillis() - start);
}

@Test
public void testConsumerInfo() {
ConsumerInfo ci = new ConsumerInfo(vConsumerInfo);
assertEquals("foo-stream", ci.getStreamName());
assertEquals("foo-consumer", ci.getName());
assertEquals(DateTimeUtils.parseDateTime("2020-11-05T19:33:21.163377Z"), ci.getCreationTime());
assertEquals(DateTimeUtils.parseDateTime("2023-08-29T19:33:21.163377Z"), ci.getTimestamp());

SequencePair sp = ci.getDelivered();
assertEquals(1, sp.getConsumerSequence());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public void testConstruction() {
.denyDelete(testSc.getDenyDelete())
.denyPurge(testSc.getDenyPurge())
.discardNewPerSubject(testSc.isDiscardNewPerSubject())
.metadata(metaData);
.metadata(metaData)
.firstSequence(82942);
validate(builder.build(), false);
validate(builder.addSources((Source)null).build(), false);

Expand Down Expand Up @@ -426,6 +427,7 @@ private void validate(StreamConfiguration sc, boolean serverTest) {

assertEquals(1, sc.getMetadata().size());
assertEquals("meta-bar", sc.getMetadata().get("meta-foo"));
assertEquals(82942, sc.getFirstSequence());
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/nats/client/api/StreamInfoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -42,15 +41,16 @@ public void testStreamInfo() {
}

private void validateStreamInfo(StreamInfo si) {
ZonedDateTime zdt = DateTimeUtils.parseDateTime("2021-01-25T20:09:10.6225191Z");
assertEquals(zdt, si.getCreateTime());
assertEquals(DateTimeUtils.parseDateTime("2021-01-25T20:09:10.6225191Z"), si.getCreateTime());
assertEquals(DateTimeUtils.parseDateTime("2023-08-29T19:33:21.163377Z"), si.getTimestamp());

StreamConfiguration sc = si.getConfiguration();
assertEquals("streamName", sc.getName());
assertEquals(3, sc.getSubjects().size());
assertEquals("sub0", sc.getSubjects().get(0));
assertEquals("sub1", sc.getSubjects().get(1));
assertEquals("x.>", sc.getSubjects().get(2));
assertEquals(82942, sc.getFirstSequence());

assertEquals(RetentionPolicy.Limits, sc.getRetentionPolicy());
assertEquals(DiscardPolicy.Old, sc.getDiscardPolicy());
Expand Down

0 comments on commit 15cb79d

Please sign in to comment.