Skip to content

Commit

Permalink
issue 502
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 2, 2021
1 parent fa93ff7 commit 6604da3
Show file tree
Hide file tree
Showing 8 changed files with 447 additions and 245 deletions.
410 changes: 238 additions & 172 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java

Large diffs are not rendered by default.

28 changes: 3 additions & 25 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
import io.nats.client.*;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerConfiguration.CcNumeric;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.Validator;

import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

import static io.nats.client.support.ApiConstants.SUBJECT;
Expand Down Expand Up @@ -246,7 +244,7 @@ NatsJetStreamSubscription createSubscription(String subject,
}

// 2A. Flow Control / heartbeat not always valid
if (userCC.isFlowControl() || userCC.getIdleHeartbeat().toMillis() > 0) {
if (userCC.isFlowControl() || (userCC.getIdleHeartbeat() != null && userCC.getIdleHeartbeat().toMillis() > 0)) {
if (isPullMode) {
throw JsSubFcHbNotValidPull.instance();
}
Expand Down Expand Up @@ -316,7 +314,7 @@ else if (!serverCc.getDeliverGroup().equals(qgroup)) {
// modifications are not allowed
// previous checks for deliver subject and filter subject matching are now
// in the changes function
if (userIsModifiedVsServer(userCC, serverCc)) {
if (userCC.wouldBeChangeTo(serverCc)) {
throw JsSubExistingConsumerCannotBeModified.instance();
}

Expand Down Expand Up @@ -361,7 +359,7 @@ else if (so.isBind()) {

NatsSubscriptionFactory factory = (sid, lSubject, lQgroup, lConn, lDispatcher)
-> NatsJetStreamSubscription.getInstance(sid, lSubject, lQgroup, lConn, lDispatcher,
asm, this, isPullMode, fnlStream, fnlConsumerName, fnlInboxDeliver);
asm, this, isPullMode, fnlStream, fnlConsumerName, fnlInboxDeliver);

NatsJetStreamSubscription sub;
if (dispatcher == null) {
Expand Down Expand Up @@ -391,26 +389,6 @@ else if (so.isBind()) {
return sub;
}

static boolean userIsModifiedVsServer(ConsumerConfiguration user, ConsumerConfiguration server) {

return user.isFlowControl() != server.isFlowControl()
|| user.getDeliverPolicy() != server.getDeliverPolicy()
|| user.getAckPolicy() != server.getAckPolicy()
|| user.getReplayPolicy() != server.getReplayPolicy()

|| CcNumeric.START_SEQ.notEqual(user.getStartSequence(), server.getStartSequence())
|| CcNumeric.MAX_DELIVER.notEqual(user.getMaxDeliver(), server.getMaxDeliver())
|| CcNumeric.RATE_LIMIT.notEqual(user.getRateLimit(), server.getRateLimit())
|| CcNumeric.MAX_ACK_PENDING.notEqual(user.getMaxAckPending(), server.getMaxAckPending())
|| CcNumeric.MAX_PULL_WAITING.notEqual(user.getMaxPullWaiting(), server.getMaxPullWaiting())

|| !Objects.equals(user.getStartTime(), server.getStartTime())
|| !Objects.equals(user.getAckWait(), server.getAckWait())
|| !Objects.equals(user.getIdleHeartbeat(), server.getIdleHeartbeat())
|| !Objects.equals(user.getDescription(), server.getDescription())
|| !Objects.equals(user.getSampleFrequency(), server.getSampleFrequency());
}

/**
* {@inheritDoc}
*/
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/impl/PushAutoStatusManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public class PushAutoStatusManager implements AutoStatusManager {
alarmPeriodSetting = 0;
}
else {
idleHeartbeatSetting = cc.getIdleHeartbeat().toMillis();
if (idleHeartbeatSetting == 0) {
idleHeartbeatSetting = cc.getIdleHeartbeat() == null ? 0 : cc.getIdleHeartbeat().toMillis();
if (idleHeartbeatSetting <= 0) {
alarmPeriodSetting = 0;
hb = false;
}
Expand Down
28 changes: 22 additions & 6 deletions src/main/java/io/nats/client/support/JsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,12 @@ public static void addFieldEvenEmpty(StringBuilder sb, String fname, String valu
* @param fname fieldname
* @param value field value
*/
public static void addField(StringBuilder sb, String fname, boolean value) {
sb.append(Q);
jsonEncode(sb, fname);
sb.append(QCOLON).append(value ? "true" : "false").append(COMMA);
public static void addField(StringBuilder sb, String fname, Boolean value) {
if (value != null) {
sb.append(Q);
jsonEncode(sb, fname);
sb.append(QCOLON).append(value ? "true" : "false").append(COMMA);
}
}

/**
Expand All @@ -308,8 +310,22 @@ public static void addFldWhenTrue(StringBuilder sb, String fname, Boolean value)
* @param fname fieldname
* @param value field value
*/
public static void addField(StringBuilder sb, String fname, long value) {
if (value >= 0) {
public static void addField(StringBuilder sb, String fname, Integer value) {
if (value != null && value >= 0) {
sb.append(Q);
jsonEncode(sb, fname);
sb.append(QCOLON).append(value).append(COMMA);
}
}

/**
* Appends a json field to a string builder.
* @param sb string builder
* @param fname fieldname
* @param value field value
*/
public static void addField(StringBuilder sb, String fname, Long value) {
if (value != null && value >= 0) {
sb.append(Q);
jsonEncode(sb, fname);
sb.append(QCOLON).append(value).append(COMMA);
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/io/nats/client/support/Validator.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,21 @@ public static Duration validateDurationNotRequiredGtOrEqZero(long millis) {
return Duration.ofMillis(millis);
}


public static Duration validateDurationNotRequiredNotLessThanMin(Duration provided, Duration minimum) {
if (provided != null && provided.toNanos() < minimum.toNanos())
{
throw new IllegalArgumentException("Duration must be greater than or equal to " + minimum.toNanos() + " nanos.");
}

return provided;
}

public static Duration validateDurationNotRequiredNotLessThanMin(long millis, Duration minimum)
{
return validateDurationNotRequiredNotLessThanMin(Duration.ofMillis(millis), minimum);
}

public static void validateNotNull(Object o, String fieldName) {
if (o == null) {
throw new IllegalArgumentException(fieldName + " cannot be null");
Expand Down
89 changes: 55 additions & 34 deletions src/test/java/io/nats/client/api/ConsumerConfigurationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,15 @@ public void testBuilder() {
.headersOnly(true)
.build();

assertEquals(AckPolicy.Explicit, c.getAckPolicy());
assertEquals(Duration.ofSeconds(99), c.getAckWait());
assertEquals(Duration.ofSeconds(66), c.getIdleHeartbeat());
assertEquals(DeliverPolicy.ByStartSequence, c.getDeliverPolicy());
assertEquals(DELIVER, c.getDeliverSubject());
assertEquals("blah", c.getDescription());
assertEquals(DURABLE, c.getDurable());
assertEquals("fs", c.getFilterSubject());
assertEquals(5555, c.getMaxDeliver());
assertEquals(6666, c.getMaxAckPending());
assertEquals(4242, c.getRateLimit());
assertEquals(ReplayPolicy.Original, c.getReplayPolicy());
assertEquals(2001, c.getStartSequence());
assertEquals(zdt, c.getStartTime());
assertEquals(73, c.getMaxPullWaiting());
assertTrue(c.isFlowControl());
assertTrue(c.getHeadersOnly());
assertAsBuilt(c, zdt);

ConsumerCreateRequest ccr = new ConsumerCreateRequest(STREAM, c);
assertEquals(STREAM, ccr.getStreamName());
assertNotNull(ccr.getConfig());

String json = ccr.toJson();
c = new ConsumerConfiguration(json);
assertEquals(AckPolicy.Explicit, c.getAckPolicy());
assertEquals(Duration.ofSeconds(99), c.getAckWait());
assertEquals(Duration.ofSeconds(66), c.getIdleHeartbeat());
assertEquals(DeliverPolicy.ByStartSequence, c.getDeliverPolicy());
assertEquals(DELIVER, c.getDeliverSubject());
assertEquals("blah", c.getDescription());
assertEquals(DURABLE, c.getDurable());
assertEquals("fs", c.getFilterSubject());
assertEquals(5555, c.getMaxDeliver());
assertEquals(4242, c.getRateLimit());
assertEquals(ReplayPolicy.Original, c.getReplayPolicy());
assertEquals(2001, c.getStartSequence());
assertEquals(zdt.toEpochSecond(), c.getStartTime().toEpochSecond());
assertEquals(73, c.getMaxPullWaiting());
assertTrue(c.isFlowControl());
assertTrue(c.getHeadersOnly());
assertAsBuilt(c, zdt);

assertNotNull(ccr.toString()); // COVERAGE
assertNotNull(c.toString()); // COVERAGE
Expand Down Expand Up @@ -119,12 +88,35 @@ public void testBuilder() {
assertEquals(ReplayPolicy.Instant, c.getReplayPolicy());
assertEquals(Duration.ofSeconds(9), c.getAckWait());
assertEquals(Duration.ofSeconds(6), c.getIdleHeartbeat());

assertDefaultCc(ConsumerConfiguration.builder().build());
}

private void assertAsBuilt(ConsumerConfiguration c, ZonedDateTime zdt) {
assertEquals(AckPolicy.Explicit, c.getAckPolicy());
assertEquals(Duration.ofSeconds(99), c.getAckWait());
assertEquals(Duration.ofSeconds(66), c.getIdleHeartbeat());
assertEquals(DeliverPolicy.ByStartSequence, c.getDeliverPolicy());
assertEquals(DELIVER, c.getDeliverSubject());
assertEquals("blah", c.getDescription());
assertEquals(DURABLE, c.getDurable());
assertEquals("fs", c.getFilterSubject());
assertEquals(5555, c.getMaxDeliver());
assertEquals(6666, c.getMaxAckPending());
assertEquals(4242, c.getRateLimit());
assertEquals(ReplayPolicy.Original, c.getReplayPolicy());
assertEquals(2001, c.getStartSequence());
assertEquals(zdt, c.getStartTime());
assertEquals(73, c.getMaxPullWaiting());
assertTrue(c.isFlowControl());
assertTrue(c.getHeadersOnly());
}

@Test
public void testParsingAndSetters() {
String configJSON = dataAsString("ConsumerConfiguration.json");
ConsumerConfiguration c = new ConsumerConfiguration(configJSON);
assertEquals("foo-desc", c.getDescription());
assertEquals(DeliverPolicy.All, c.getDeliverPolicy());
assertEquals(AckPolicy.All, c.getAckPolicy());
assertEquals(Duration.ofSeconds(30), c.getAckWait());
Expand All @@ -134,7 +126,6 @@ public void testParsingAndSetters() {
assertEquals(ReplayPolicy.Original, c.getReplayPolicy());
assertEquals(2020, c.getStartTime().getYear(), 2020);
assertEquals(21, c.getStartTime().getSecond(), 21);
assertEquals("blah blah", c.getDescription());
assertEquals("foo-durable", c.getDurable());
assertEquals("bar", c.getDeliverSubject());
assertEquals("foo-filter", c.getFilterSubject());
Expand All @@ -143,5 +134,35 @@ public void testParsingAndSetters() {
assertTrue(c.isFlowControl());
assertEquals(128, c.getMaxPullWaiting());
assertTrue(c.getHeadersOnly());
assertEquals(99, c.getStartSequence());

assertDefaultCc(new ConsumerConfiguration("{}"));
}

private static void assertDefaultCc(ConsumerConfiguration c)
{
assertEquals(DeliverPolicy.All, c.getDeliverPolicy());
assertEquals(AckPolicy.Explicit, c.getAckPolicy());
assertEquals(ReplayPolicy.Instant, c.getReplayPolicy());
assertNull(c.getDurable());
assertNull(c.getDeliverGroup());
assertNull(c.getDeliverSubject());
assertNull(c.getFilterSubject());
assertNull(c.getDescription());
assertNull(c.getSampleFrequency());

assertNull(c.getAckWait());
assertNull(c.getIdleHeartbeat());

assertNull(c.getStartTime());

assertFalse(c.isFlowControl());
assertFalse(c.getHeadersOnly());

assertEquals(ConsumerConfiguration.CcNumeric.START_SEQ.initial(), c.getStartSequence());
assertEquals(ConsumerConfiguration.CcNumeric.MAX_DELIVER.initial(), c.getMaxDeliver());
assertEquals(ConsumerConfiguration.CcNumeric.RATE_LIMIT.initial(), c.getRateLimit());
assertEquals(ConsumerConfiguration.CcNumeric.MAX_ACK_PENDING.initial(), c.getMaxAckPending());
assertEquals(ConsumerConfiguration.CcNumeric.MAX_PULL_WAITING.initial(), c.getMaxPullWaiting());
}
}

0 comments on commit 6604da3

Please sign in to comment.