diff --git a/README.md b/README.md index 295549fb5..f9f294c87 100644 --- a/README.md +++ b/README.md @@ -859,54 +859,55 @@ You can however set the deliver policy which will be used to start the subscript * Consumer creation * Object Store operations -| Name | Group | Code | Description | -|----------------------------------------------|-------|-------|-----------------------------------------------------------------------------------------------------| -| JsSoDurableMismatch | SO | 90101 | Builder durable must match the consumer configuration durable if both are provided. | -| JsSoDeliverGroupMismatch | SO | 90102 | Builder deliver group must match the consumer configuration deliver group if both are provided. | -| JsSoDeliverSubjectMismatch | SO | 90103 | Builder deliver subject must match the consumer configuration deliver subject if both are provided. | -| JsSoOrderedNotAllowedWithBind | SO | 90104 | Bind is not allowed with an ordered consumer. | -| JsSoOrderedNotAllowedWithDeliverGroup | SO | 90105 | Deliver group is not allowed with an ordered consumer. | -| JsSoOrderedNotAllowedWithDurable | SO | 90106 | Durable is not allowed with an ordered consumer. | -| JsSoOrderedNotAllowedWithDeliverSubject | SO | 90107 | Deliver subject is not allowed with an ordered consumer. | -| JsSoOrderedRequiresAckPolicyNone | SO | 90108 | Ordered consumer requires Ack Policy None. | -| JsSoOrderedRequiresMaxDeliverOfOne | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. | -| JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. | -| JsSoOrderedMemStorageNotSuppliedOrTrue | SO | 90111 | Mem Storage must be true if supplied. | -| JsSoOrderedReplicasNotSuppliedOrOne | SO | 90112 | Replicas must be 1 if supplied. | -| JsSoNameOrDurableRequiredForBind | SO | 90113 | Name or Durable required for Bind. | -| JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. | -| JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. | -| JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. | -| JsSubQueueDeliverGroupMismatch | SUB | 90004 | Queue / deliver group mismatch. | -| JsSubFcHbNotValidPull | SUB | 90005 | Flow Control and/or heartbeat is not valid with a pull subscription. | -| JsSubFcHbNotValidQueue | SUB | 90006 | Flow Control and/or heartbeat is not valid in queue mode. | -| JsSubNoMatchingStreamForSubject | SUB | 90007 | No matching streams for subject. | -| JsSubConsumerAlreadyConfiguredAsPush | SUB | 90008 | Consumer is already configured as a push consumer. | -| JsSubConsumerAlreadyConfiguredAsPull | SUB | 90009 | Consumer is already configured as a pull consumer. | -| _removed_ | SUB | 90010 | | -| JsSubSubjectDoesNotMatchFilter | SUB | 90011 | Subject does not match consumer configuration filter. | -| JsSubConsumerAlreadyBound | SUB | 90012 | Consumer is already bound to a subscription. | -| JsSubExistingConsumerNotQueue | SUB | 90013 | Existing consumer is not configured as a queue / deliver group. | -| JsSubExistingConsumerIsQueue | SUB | 90014 | Existing consumer is configured as a queue / deliver group. | -| JsSubExistingQueueDoesNotMatchRequestedQueue | SUB | 90015 | Existing consumer deliver group does not match requested queue / deliver group. | -| JsSubExistingConsumerCannotBeModified | SUB | 90016 | Existing consumer cannot be modified. | -| JsSubConsumerNotFoundRequiredInBind | SUB | 90017 | Consumer not found, required in bind mode. | -| JsSubOrderedNotAllowOnQueues | SUB | 90018 | Ordered consumer not allowed on queues. | -| JsSubPushCantHaveMaxBatch | SUB | 90019 | Push subscriptions cannot supply max batch. | -| JsSubPushCantHaveMaxBytes | SUB | 90020 | Push subscriptions cannot supply max bytes. | -| JsSubPushAsyncCantSetPending | SUB | 90021 | Pending limits must be set directly on the dispatcher. | -| JsConsumerCreate290NotAvailable | CON | 90301 | Name field not valid when v2.9.0 consumer create api is not available. | -| JsConsumerNameDurableMismatch | CON | 90302 | Name must match durable if both are supplied. | -| JsMultipleFilterSubjects210NotAvailable | CON | 90303 | Multiple filter subjects not available until server version 2.10.0. | -| OsObjectNotFound | OS | 90201 | The object was not found. | -| OsObjectIsDeleted | OS | 90202 | The object is deleted. | -| OsObjectAlreadyExists | OS | 90203 | An object with that name already exists. | -| OsCantLinkToLink | OS | 90204 | A link cannot link to another link. | -| OsGetDigestMismatch | OS | 90205 | Digest does not match meta data. | -| OsGetChunksMismatch | OS | 90206 | Number of chunks does not match meta data. | -| OsGetSizeMismatch | OS | 90207 | Total size does not match meta data. | -| OsGetLinkToBucket | OS | 90208 | Cannot get object, it is a link to a bucket. | -| OsLinkNotAllowOnPut | OS | 90209 | Link not allowed in metadata when putting an object. | +| Name | Group | Code | Description | +|----------------------------------------------|-------|-------|----------------------------------------------------------------------------------------------------------------| +| JsSoDurableMismatch | SO | 90101 | Builder durable must match the consumer configuration durable if both are provided. | +| JsSoDeliverGroupMismatch | SO | 90102 | Builder deliver group must match the consumer configuration deliver group if both are provided. | +| JsSoDeliverSubjectMismatch | SO | 90103 | Builder deliver subject must match the consumer configuration deliver subject if both are provided. | +| JsSoOrderedNotAllowedWithBind | SO | 90104 | Bind is not allowed with an ordered consumer. | +| JsSoOrderedNotAllowedWithDeliverGroup | SO | 90105 | Deliver group is not allowed with an ordered consumer. | +| JsSoOrderedNotAllowedWithDurable | SO | 90106 | Durable is not allowed with an ordered consumer. | +| JsSoOrderedNotAllowedWithDeliverSubject | SO | 90107 | Deliver subject is not allowed with an ordered consumer. | +| JsSoOrderedRequiresAckPolicyNone | SO | 90108 | Ordered consumer requires Ack Policy None. | +| JsSoOrderedRequiresMaxDeliverOfOne | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. | +| JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. | +| JsSoOrderedMemStorageNotSuppliedOrTrue | SO | 90111 | Mem Storage must be true if supplied. | +| JsSoOrderedReplicasNotSuppliedOrOne | SO | 90112 | Replicas must be 1 if supplied. | +| JsSoNameOrDurableRequiredForBind | SO | 90113 | Name or Durable required for Bind. | +| JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. | +| JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. | +| JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. | +| JsSubQueueDeliverGroupMismatch | SUB | 90004 | Queue / deliver group mismatch. | +| JsSubFcHbNotValidPull | SUB | 90005 | Flow Control and/or heartbeat is not valid with a pull subscription. | +| JsSubFcHbNotValidQueue | SUB | 90006 | Flow Control and/or heartbeat is not valid in queue mode. | +| JsSubNoMatchingStreamForSubject | SUB | 90007 | No matching streams for subject. | +| JsSubConsumerAlreadyConfiguredAsPush | SUB | 90008 | Consumer is already configured as a push consumer. | +| JsSubConsumerAlreadyConfiguredAsPull | SUB | 90009 | Consumer is already configured as a pull consumer. | +| _removed_ | SUB | 90010 | | +| JsSubSubjectDoesNotMatchFilter | SUB | 90011 | Subject does not match consumer configuration filter. | +| JsSubConsumerAlreadyBound | SUB | 90012 | Consumer is already bound to a subscription. | +| JsSubExistingConsumerNotQueue | SUB | 90013 | Existing consumer is not configured as a queue / deliver group. | +| JsSubExistingConsumerIsQueue | SUB | 90014 | Existing consumer is configured as a queue / deliver group. | +| JsSubExistingQueueDoesNotMatchRequestedQueue | SUB | 90015 | Existing consumer deliver group does not match requested queue / deliver group. | +| JsSubExistingConsumerCannotBeModified | SUB | 90016 | Existing consumer cannot be modified. | +| JsSubConsumerNotFoundRequiredInBind | SUB | 90017 | Consumer not found, required in bind mode. | +| JsSubOrderedNotAllowOnQueues | SUB | 90018 | Ordered consumer not allowed on queues. | +| JsSubPushCantHaveMaxBatch | SUB | 90019 | Push subscriptions cannot supply max batch. | +| JsSubPushCantHaveMaxBytes | SUB | 90020 | Push subscriptions cannot supply max bytes. | +| JsSubPushAsyncCantSetPending | SUB | 90021 | Pending limits must be set directly on the dispatcher. | +| JsSubSubjectNeededToLookupStream | SUB | 90022 | Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject. | +| JsConsumerCreate290NotAvailable | CON | 90301 | Name field not valid when v2.9.0 consumer create api is not available. | +| JsConsumerNameDurableMismatch | CON | 90302 | Name must match durable if both are supplied. | +| JsMultipleFilterSubjects210NotAvailable | CON | 90303 | Multiple filter subjects not available until server version 2.10.0. | +| OsObjectNotFound | OS | 90201 | The object was not found. | +| OsObjectIsDeleted | OS | 90202 | The object is deleted. | +| OsObjectAlreadyExists | OS | 90203 | An object with that name already exists. | +| OsCantLinkToLink | OS | 90204 | A link cannot link to another link. | +| OsGetDigestMismatch | OS | 90205 | Digest does not match meta data. | +| OsGetChunksMismatch | OS | 90206 | Number of chunks does not match meta data. | +| OsGetSizeMismatch | OS | 90207 | Total size does not match meta data. | +| OsGetLinkToBucket | OS | 90208 | Cannot get object, it is a link to a bucket. | +| OsLinkNotAllowOnPut | OS | 90209 | Link not allowed in metadata when putting an object. | ### Message Acknowledgements diff --git a/src/main/java/io/nats/client/SubscribeOptions.java b/src/main/java/io/nats/client/SubscribeOptions.java index 4bfd419a6..f334f2056 100644 --- a/src/main/java/io/nats/client/SubscribeOptions.java +++ b/src/main/java/io/nats/client/SubscribeOptions.java @@ -55,13 +55,25 @@ protected SubscribeOptions(Builder builder, boolean isPull, stream = validateStreamName(builder.stream, bind); // required when bind mode - // read the names and do basic validation - String temp = validateConsumerName( - validateMustMatchIfBothSupplied(builder.name, builder.cc == null ? null : builder.cc.getName(), JsSoNameMismatch), - false); - String durable = validateDurable( - validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch), - false); + // read the consumer names and do basic validation + // A1. validate name input + String temp = validateMustMatchIfBothSupplied( + builder.name, + builder.cc == null ? null : builder.cc.getName(), + JsSoNameMismatch); + // B1. Must be a valid consumer name if supplied + temp = validateConsumerName(temp, false); + + // A2. validate durable input + String durable = validateMustMatchIfBothSupplied( + builder.durable, + builder.cc == null ? null : builder.cc.getDurable(), + JsSoDurableMismatch); + + // B2. Must be a valid consumer name if supplied + durable = validateDurable(durable, false); + + // C. name must match durable if both supplied name = validateMustMatchIfBothSupplied(temp, durable, JsConsumerNameDurableMismatch); if (bind && name == null) { @@ -105,7 +117,7 @@ protected SubscribeOptions(Builder builder, boolean isPull, .ackPolicy(AckPolicy.None) .maxDeliver(1) .ackWait(Duration.ofHours(22)) - .name(temp) + .name(name) .memStorage(true) .numReplicas(1); @@ -119,7 +131,7 @@ protected SubscribeOptions(Builder builder, boolean isPull, .durable(durable) .deliverSubject(deliverSubject) .deliverGroup(deliverGroup) - .name(temp) + .name(name) .build(); } } diff --git a/src/main/java/io/nats/client/api/ConsumerConfiguration.java b/src/main/java/io/nats/client/api/ConsumerConfiguration.java index 8a16c76d1..e3c093e7e 100644 --- a/src/main/java/io/nats/client/api/ConsumerConfiguration.java +++ b/src/main/java/io/nats/client/api/ConsumerConfiguration.java @@ -66,7 +66,6 @@ public class ConsumerConfiguration implements JsonSerializable { protected final String name; protected final String deliverSubject; protected final String deliverGroup; - protected final List filterSubjects; protected final String sampleFrequency; protected final ZonedDateTime startTime; protected final Duration ackWait; @@ -86,6 +85,7 @@ public class ConsumerConfiguration implements JsonSerializable { protected final Boolean memStorage; protected final List backoff; protected final Map metadata; + protected final List filterSubjects; protected ConsumerConfiguration(ConsumerConfiguration cc) { this.deliverPolicy = cc.deliverPolicy; @@ -96,7 +96,6 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) { this.name = cc.name; this.deliverSubject = cc.deliverSubject; this.deliverGroup = cc.deliverGroup; - this.filterSubjects = cc.filterSubjects; this.sampleFrequency = cc.sampleFrequency; this.startTime = cc.startTime; this.ackWait = cc.ackWait; @@ -116,6 +115,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) { this.memStorage = cc.memStorage; this.backoff = cc.backoff == null ? null : new ArrayList<>(cc.backoff); this.metadata = cc.metadata == null ? null : new HashMap<>(cc.metadata); + this.filterSubjects = cc.filterSubjects == null ? null : new ArrayList<>(cc.filterSubjects); } ConsumerConfiguration(JsonValue v) { @@ -128,15 +128,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) { name = readString(v, NAME); deliverSubject = readString(v, DELIVER_SUBJECT); deliverGroup = readString(v, DELIVER_GROUP); - String tempFs = readString(v, FILTER_SUBJECT); - if (tempFs != null) { - filterSubjects = Collections.singletonList(tempFs); - } - else { - filterSubjects = readStringList(v, FILTER_SUBJECTS); - } sampleFrequency = readString(v, SAMPLE_FREQ); - startTime = readDate(v, OPT_START_TIME); ackWait = readNanos(v, ACK_WAIT); idleHeartbeat = readNanos(v, IDLE_HEARTBEAT); @@ -158,6 +150,14 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) { backoff = readNanosList(v, BACKOFF, true); metadata = readStringStringMap(v, METADATA); + + String tempFs = emptyAsNull(readString(v, FILTER_SUBJECT)); + if (tempFs == null) { + filterSubjects = readOptionalStringList(v, FILTER_SUBJECTS); + } + else { + filterSubjects = Collections.singletonList(tempFs); + } } // For the builder @@ -172,7 +172,6 @@ protected ConsumerConfiguration(Builder b) this.name = b.name; this.startTime = b.startTime; this.ackWait = b.ackWait; - this.filterSubjects = b.filterSubjects; this.sampleFrequency = b.sampleFrequency; this.deliverSubject = b.deliverSubject; this.deliverGroup = b.deliverGroup; @@ -195,6 +194,7 @@ protected ConsumerConfiguration(Builder b) this.backoff = b.backoff; this.metadata = b.metadata; + this.filterSubjects = b.filterSubjects; } /** @@ -216,12 +216,6 @@ public String toJson() { JsonUtils.addFieldAsNanos(sb, ACK_WAIT, ackWait); JsonUtils.addFieldWhenGtZero(sb, MAX_DELIVER, maxDeliver); JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending); - if (filterSubjects.size() > 1) { - JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects); - } - else if (filterSubjects.size() == 1) { - JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0)); - } JsonUtils.addField(sb, REPLAY_POLICY, GetOrDefault(replayPolicy).toString()); JsonUtils.addField(sb, SAMPLE_FREQ, sampleFrequency); JsonUtils.addFieldWhenGtZero(sb, RATE_LIMIT_BPS, rateLimit); @@ -237,6 +231,14 @@ else if (filterSubjects.size() == 1) { JsonUtils.addField(sb, NUM_REPLICAS, numReplicas); JsonUtils.addField(sb, MEM_STORAGE, memStorage); JsonUtils.addField(sb, METADATA, metadata); + if (filterSubjects != null) { + if (filterSubjects.size() > 1) { + JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects); + } + else if (filterSubjects.size() == 1) { + JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0)); + } + } return endJson(sb).toString(); } @@ -329,21 +331,31 @@ public long getMaxDeliver() { } /** - * Gets the first filter subject of this consumer configuration. Will be null if there are none. + * Gets the filter subject of this consumer configuration. + * With the introduction of multiple filter subjects, this method will + * return null if there are not exactly one filter subjects * @return the first filter subject. */ public String getFilterSubject() { - return filterSubjects.isEmpty() ? null : filterSubjects.get(0); + return filterSubjects == null || filterSubjects.size() != 1 ? null : filterSubjects.get(0); } /** - * Gets the filter subjects as a list. May be empty, but won't be null + * Gets the filter subjects as a list. May be null, otherwise won't be empty * @return the list */ public List getFilterSubjects() { return filterSubjects; } + /** + * Whether there are multiple filter subjects for this consumer configuration. + * @return true if there are multiple filter subjects + */ + public boolean hasMultipleFilterSubjects() { + return filterSubjects != null && filterSubjects.size() > 1; + } + /** * Gets the replay policy of this consumer configuration. * @return the replay policy. @@ -635,7 +647,6 @@ public static class Builder { private String name; private String deliverSubject; private String deliverGroup; - private List filterSubjects = new ArrayList<>(); private String sampleFrequency; private ZonedDateTime startTime; @@ -659,6 +670,7 @@ public static class Builder { private List backoff; private Map metadata; + private List filterSubjects; public Builder() {} @@ -673,7 +685,6 @@ public Builder(ConsumerConfiguration cc) { this.name = cc.name; this.deliverSubject = cc.deliverSubject; this.deliverGroup = cc.deliverGroup; - this.filterSubjects = new ArrayList<>(cc.filterSubjects); this.sampleFrequency = cc.sampleFrequency; this.startTime = cc.startTime; @@ -701,6 +712,9 @@ public Builder(ConsumerConfiguration cc) { if (cc.metadata != null) { this.metadata = new HashMap<>(cc.metadata); } + if (cc.filterSubjects != null) { + this.filterSubjects = new ArrayList<>(cc.filterSubjects); + } } } @@ -848,13 +862,16 @@ public Builder maxDeliver(long maxDeliver) { /** * Sets the filter subject of the ConsumerConfiguration. + * Replaces any other filter subjects set in the builder * @param filterSubject the filter subject * @return Builder */ public Builder filterSubject(String filterSubject) { - this.filterSubjects.clear(); - if (!nullOrEmpty(filterSubject)) { - this.filterSubjects.add(filterSubject); + if (nullOrEmpty(filterSubject)) { + this.filterSubjects = null; + } + else { + this.filterSubjects = Collections.singletonList(filterSubject); } return this; } @@ -862,7 +879,8 @@ public Builder filterSubject(String filterSubject) { /** * Sets the filter subjects of the ConsumerConfiguration. - * @param filterSubjects the array of filter subjects + * Replaces any other filter subjects set in the builder + * @param filterSubjects one or more filter subjects * @return Builder */ public Builder filterSubjects(String... filterSubjects) { @@ -871,11 +889,12 @@ public Builder filterSubjects(String... filterSubjects) { /** * Sets the filter subjects of the ConsumerConfiguration. + * Replaces any other filter subjects set in the builder * @param filterSubjects the list of filter subjects * @return Builder */ public Builder filterSubjects(List filterSubjects) { - this.filterSubjects.clear(); + this.filterSubjects = new ArrayList<>(); if (filterSubjects != null) { for (String fs : filterSubjects) { if (!nullOrEmpty(fs)) { @@ -883,6 +902,9 @@ public Builder filterSubjects(List filterSubjects) { } } } + if (this.filterSubjects.isEmpty()) { + this.filterSubjects = null; + } return this; } @@ -1255,36 +1277,7 @@ public PullSubscribeOptions buildPullSubscribeOptions(String stream) { @Override public String toString() { - return "ConsumerConfiguration{" + - "description='" + description + '\'' + - ", durable='" + durable + '\'' + - ", name='" + name + '\'' + - ", deliverPolicy=" + deliverPolicy + - ", deliverSubject='" + deliverSubject + '\'' + - ", deliverGroup='" + deliverGroup + '\'' + - ", startSeq=" + startSeq + - ", startTime=" + startTime + - ", ackPolicy=" + ackPolicy + - ", ackWait=" + ackWait + - ", maxDeliver=" + maxDeliver + - ", filterSubjects='" + String.join(",", filterSubjects) + '\'' + - ", replayPolicy=" + replayPolicy + - ", sampleFrequency='" + sampleFrequency + '\'' + - ", rateLimit=" + rateLimit + - ", maxAckPending=" + maxAckPending + - ", idleHeartbeat=" + idleHeartbeat + - ", flowControl=" + flowControl + - ", maxPullWaiting=" + maxPullWaiting + - ", maxBatch=" + maxBatch + - ", maxBytes=" + maxBytes + - ", maxExpires=" + maxExpires + - ", numReplicas=" + numReplicas + - ", headersOnly=" + headersOnly + - ", memStorage=" + memStorage + - ", inactiveThreshold=" + inactiveThreshold + - ", backoff=" + backoff + - ", metadata=" + metadata + - '}'; + return "ConsumerConfiguration " + toJson(); } protected static int getOrUnset(Integer val) diff --git a/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java b/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java index 9cdc06567..5799a48a6 100644 --- a/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java +++ b/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java @@ -132,13 +132,17 @@ public OrderedConsumerConfiguration headersOnly(Boolean headersOnly) { } public String getFilterSubject() { - return filterSubjects.get(0); + return filterSubjects == null || filterSubjects.size() != 1 ? null : filterSubjects.get(0); } public List getFilterSubjects() { return filterSubjects; } + public boolean hasMultipleFilterSubjects() { + return filterSubjects != null && filterSubjects.size() > 1; + } + public DeliverPolicy getDeliverPolicy() { return deliverPolicy; } diff --git a/src/main/java/io/nats/client/api/SourceBase.java b/src/main/java/io/nats/client/api/SourceBase.java index 3f89384d0..6102e5e01 100644 --- a/src/main/java/io/nats/client/api/SourceBase.java +++ b/src/main/java/io/nats/client/api/SourceBase.java @@ -29,7 +29,7 @@ import static io.nats.client.support.JsonUtils.beginJson; import static io.nats.client.support.JsonUtils.endJson; import static io.nats.client.support.JsonValueUtils.readValue; -import static io.nats.client.support.Validator.listsAreEquivalent; +import static io.nats.client.support.Validator.consumerFilterSubjectsAreEquivalent; public abstract class SourceBase implements JsonSerializable { private final String name; @@ -194,7 +194,7 @@ public boolean equals(Object o) { if (!Objects.equals(filterSubject, that.filterSubject)) return false; if (!Objects.equals(external, that.external)) return false; - return listsAreEquivalent(subjectTransforms, that.subjectTransforms); + return consumerFilterSubjectsAreEquivalent(subjectTransforms, that.subjectTransforms); } @Override diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index 72d56b9cc..69f4e08d7 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -254,7 +254,7 @@ JetStreamSubscription createSubscription(String userSubscribeSubject, SubscribeOptions so; String stream; ConsumerConfiguration userCC; - String deliverGroup = null; // push might set this + String settledDeliverGroup = null; // push might set this if (isPullMode) { so = pullSubscribeOptions; // options must have already been checked to be non-null @@ -276,8 +276,8 @@ JetStreamSubscription createSubscription(String userSubscribeSubject, if (userCC.maxBytesWasSet()) { throw JsSubPushCantHaveMaxBytes.instance(); } // figure out the queue name - deliverGroup = validateMustMatchIfBothSupplied(userCC.getDeliverGroup(), queueName, JsSubQueueDeliverGroupMismatch); - if (so.isOrdered() && deliverGroup != null) { + settledDeliverGroup = validateMustMatchIfBothSupplied(userCC.getDeliverGroup(), queueName, JsSubQueueDeliverGroupMismatch); + if (so.isOrdered() && settledDeliverGroup != null) { throw JsSubOrderedNotAllowOnQueues.instance(); } @@ -294,25 +294,25 @@ JetStreamSubscription createSubscription(String userSubscribeSubject, if (isPullMode) { throw JsSubFcHbNotValidPull.instance(); } - if (deliverGroup != null) { + if (settledDeliverGroup != null) { throw JsSubFcHbNotValidQueue.instance(); } } - // 2. figure out user provided subjects and prepare the userCcFilterSubjects + // 2. figure out user provided subjects and prepare the settledFilterSubjects userSubscribeSubject = emptyAsNull(userSubscribeSubject); - List userCcFilterSubjects = new ArrayList<>(); - if (userCC.getFilterSubject() == null) { // empty filterSubjects gives null - // userCC.filterSubjects empty, populate userCcFilterSubjects w/userSubscribeSubject if possible + List settledFilterSubjects = new ArrayList<>(); + if (userCC.getFilterSubjects() == null) { // empty filterSubjects gives null + // userCC.filterSubjects empty, populate settledFilterSubjects w/userSubscribeSubject if possible if (userSubscribeSubject != null) { - userCcFilterSubjects.add(userSubscribeSubject); + settledFilterSubjects.add(userSubscribeSubject); } } else { // userCC.filterSubjects not empty, validate them - userCcFilterSubjects.addAll(userCC.getFilterSubjects()); + settledFilterSubjects.addAll(userCC.getFilterSubjects()); // If userSubscribeSubject is provided it must be one of the filter subjects. - if (userSubscribeSubject != null && !userCcFilterSubjects.contains(userSubscribeSubject)) { + if (userSubscribeSubject != null && !settledFilterSubjects.contains(userSubscribeSubject)) { throw JsSubSubjectDoesNotMatchFilter.instance(); } } @@ -320,10 +320,10 @@ JetStreamSubscription createSubscription(String userSubscribeSubject, // 3. Did they tell me what stream? No? look it up. final String settledStream; if (stream == null) { - if (userCcFilterSubjects.isEmpty()) { - throw new IllegalArgumentException("Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject."); + if (settledFilterSubjects.isEmpty()) { + throw JsSubSubjectNeededToLookupStream.instance(); } - settledStream = lookupStreamBySubject(userCcFilterSubjects.get(0)); + settledStream = lookupStreamBySubject(settledFilterSubjects.get(0)); if (settledStream == null) { throw JsSubNoMatchingStreamForSubject.instance(); } @@ -367,7 +367,7 @@ else if (nullOrEmpty(serverCC.getDeliverSubject())) { if (serverCC.getDeliverGroup() == null) { // lookedUp was null, means existing consumer is not a queue consumer - if (deliverGroup == null) { + if (settledDeliverGroup == null) { // ok fine, no queue requested and the existing consumer is also not a queue consumer // we must check if the consumer is in use though if (serverInfo.isPushBound()) { @@ -378,21 +378,23 @@ else if (nullOrEmpty(serverCC.getDeliverSubject())) { throw JsSubExistingConsumerNotQueue.instance(); } } - else if (deliverGroup == null) { + else if (settledDeliverGroup == null) { throw JsSubExistingConsumerIsQueue.instance(); } - else if (!serverCC.getDeliverGroup().equals(deliverGroup)) { + else if (!serverCC.getDeliverGroup().equals(settledDeliverGroup)) { throw JsSubExistingQueueDoesNotMatchRequestedQueue.instance(); } // consumer already exists, make sure the filter subject matches // subscribeSubject, if supplied came from the user directly // or in the userCC or might not have been in either place - if (userCcFilterSubjects.isEmpty()) { + if (settledFilterSubjects.isEmpty()) { // still also might be null, which the server treats as > - userCcFilterSubjects = serverCC.getFilterSubjects(); + if (serverCC.getFilterSubjects() != null) { + settledFilterSubjects = serverCC.getFilterSubjects(); + } } - else if (!listsAreEquivalent(userCcFilterSubjects, serverCC.getFilterSubjects())) { + else if (!consumerFilterSubjectsAreEquivalent(settledFilterSubjects, serverCC.getFilterSubjects())) { throw JsSubSubjectDoesNotMatchFilter.instance(); } @@ -417,8 +419,8 @@ else if (inboxDeliver == null) { // 6. If consumer does not exist, create and settle on the config. Name will have to wait // If the consumer exists, I know what the settled info is - final String settledConsumerName; final ConsumerConfiguration settledCC; + final String settledConsumerName; if (so.isFastBind() || serverCC != null) { settledCC = serverCC; settledConsumerName = so.getName(); // will never be null in this case @@ -434,9 +436,9 @@ else if (inboxDeliver == null) { // userCC.filterSubjects might have originally been empty // but there might have been a userSubscribeSubject, // so this makes sure it's resolved either way - ccBuilder.filterSubjects(userCcFilterSubjects); + ccBuilder.filterSubjects(settledFilterSubjects); - ccBuilder.deliverGroup(deliverGroup); + ccBuilder.deliverGroup(settledDeliverGroup); settledCC = ccBuilder.build(); settledConsumerName = null; // the server will give us a name @@ -453,7 +455,7 @@ else if (inboxDeliver == null) { } else { MessageManagerFactory mmFactory = so.isOrdered() ? _pushOrderedMessageManagerFactory : _pushMessageManagerFactory; - mm = mmFactory.createMessageManager(conn, this, settledStream, so, settledCC, false, dispatcher == null); + mm = mmFactory.createMessageManager(conn, this, settledStream, so, settledCC, settledDeliverGroup != null, dispatcher == null); subFactory = (sid, lSubject, lQgroup, lConn, lDispatcher) -> { NatsJetStreamSubscription nsub = new NatsJetStreamSubscription(sid, lSubject, lQgroup, lConn, lDispatcher, this, settledStream, settledConsumerName, mm); @@ -465,11 +467,11 @@ else if (inboxDeliver == null) { } NatsJetStreamSubscription sub; if (dispatcher == null) { - sub = (NatsJetStreamSubscription) conn.createSubscription(settledInboxDeliver, deliverGroup, null, subFactory); + sub = (NatsJetStreamSubscription) conn.createSubscription(settledInboxDeliver, settledDeliverGroup, null, subFactory); } else { AsyncMessageHandler handler = new AsyncMessageHandler(mm, userHandler, isAutoAck, settledCC); - sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(settledInboxDeliver, deliverGroup, handler, subFactory); + sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(settledInboxDeliver, settledDeliverGroup, handler, subFactory); } // 8. The consumer might need to be created, do it here @@ -514,14 +516,14 @@ public List getChanges(ConsumerConfiguration serverCc) { if (startTime != null && !startTime.equals(serverCcc.startTime)) { changes.add("startTime"); } - if (!filterSubjects.isEmpty() && !listsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); } if (description != null && !description.equals(serverCcc.description)) { changes.add("description"); } if (sampleFrequency != null && !sampleFrequency.equals(serverCcc.sampleFrequency)) { changes.add("sampleFrequency"); } if (deliverSubject != null && !deliverSubject.equals(serverCcc.deliverSubject)) { changes.add("deliverSubject"); } if (deliverGroup != null && !deliverGroup.equals(serverCcc.deliverGroup)) { changes.add("deliverGroup"); } - if (backoff != null && !listsAreEquivalent(backoff, serverCcc.backoff)) { changes.add("backoff"); } + if (backoff != null && !consumerFilterSubjectsAreEquivalent(backoff, serverCcc.backoff)) { changes.add("backoff"); } if (metadata != null && !mapsAreEquivalent(metadata, serverCcc.metadata)) { changes.add("metadata"); } + if (filterSubjects != null && !consumerFilterSubjectsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); } // do not need to check Durable because the original is retrieved by the durable name diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java index 64e011d82..26ecaa371 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import static io.nats.client.support.NatsConstants.GT; import static io.nats.client.support.NatsJetStreamClientError.JsConsumerCreate290NotAvailable; import static io.nats.client.support.NatsJetStreamClientError.JsMultipleFilterSubjects210NotAvailable; import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction; @@ -80,21 +81,23 @@ ConsumerInfo _createConsumer(String streamName, ConsumerConfiguration config) th throw JsConsumerCreate290NotAvailable.instance(); } - boolean multipleFilterSubject = config.getFilterSubjects().size() > 1; - if (multipleFilterSubject && !multipleSubjectFilter210Available) { + boolean hasMultipleFilterSubjects = config.hasMultipleFilterSubjects(); + + // seems strange that this could happen, but checking anyway... + if (hasMultipleFilterSubjects && !multipleSubjectFilter210Available) { throw JsMultipleFilterSubjects210NotAvailable.instance(); } String durable = config.getDurable(); String subj; // new consumer create not available before 290 and can't be used with multiple filter subjects - if (consumerCreate290Available && !multipleFilterSubject) { + if (consumerCreate290Available && !hasMultipleFilterSubjects) { if (consumerName == null) { // if both consumerName and durable are null, generate a name consumerName = durable == null ? generateConsumerName() : durable; } - String fs = config.getFilterSubject(); - if (fs == null || fs.equals(">")) { + String fs = config.getFilterSubject(); // we've already determined not multiple so this gives us 1 or null + if (fs == null || fs.equals(GT)) { subj = String.format(JSAPI_CONSUMER_CREATE_V290, streamName, consumerName); } else { diff --git a/src/main/java/io/nats/client/support/NatsConstants.java b/src/main/java/io/nats/client/support/NatsConstants.java index 44e96262c..07b753096 100644 --- a/src/main/java/io/nats/client/support/NatsConstants.java +++ b/src/main/java/io/nats/client/support/NatsConstants.java @@ -38,6 +38,8 @@ public interface NatsConstants { String EMPTY = ""; String CRLF = "\r\n"; String DOT = "."; + String GT = ">"; + String STAR = "*"; byte TAB = '\t'; byte SP = ' '; diff --git a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java index 4250635e8..6bda9e345 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java @@ -41,6 +41,7 @@ public class NatsJetStreamClientError { public static final NatsJetStreamClientError JsSubPushCantHaveMaxBatch = new NatsJetStreamClientError(SUB, 90019, "Push subscriptions cannot supply max batch."); public static final NatsJetStreamClientError JsSubPushCantHaveMaxBytes = new NatsJetStreamClientError(SUB, 90020, "Push subscriptions cannot supply max bytes."); public static final NatsJetStreamClientError JsSubPushAsyncCantSetPending = new NatsJetStreamClientError(SUB, 90021, "Pending limits must be set directly on the dispatcher."); + public static final NatsJetStreamClientError JsSubSubjectNeededToLookupStream = new NatsJetStreamClientError(SUB, 90022, "Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject."); public static final NatsJetStreamClientError JsSoDurableMismatch = new NatsJetStreamClientError(SO, 90101, "Builder durable must match the consumer configuration durable if both are provided."); public static final NatsJetStreamClientError JsSoDeliverGroupMismatch = new NatsJetStreamClientError(SO, 90102, "Builder deliver group must match the consumer configuration deliver group if both are provided."); diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java index 2c70f9b1c..c36659fa7 100644 --- a/src/main/java/io/nats/client/support/Validator.java +++ b/src/main/java/io/nats/client/support/Validator.java @@ -64,11 +64,15 @@ public static String validateSubjectTerm(String subject, String label, boolean r case '\t': throw new IllegalArgumentException(label + " cannot contain space, tab, carriage return or linefeed character"); case '*': - case '>': if (sl != 1) { throw new IllegalArgumentException(label + " wildcard improperly placed."); } break; + case '>': + if (sl != 1 || (seg + 1 != segments.length)) { + throw new IllegalArgumentException(label + " wildcard improperly placed."); + } + break; } } } @@ -593,20 +597,22 @@ public static boolean isSemVer(String s) { return SEMVER_PATTERN.matcher(s).find(); } - public static boolean listsAreEquivalent(List l1, List l2) + // This function tests filter subject equivalency + // It does not care what order and also assumes that there are no duplicates. + // From the server: consumer subject filters cannot overlap [10138] + public static boolean consumerFilterSubjectsAreEquivalent(List l1, List l2) { - int s1 = l1 == null ? 0 : l1.size(); - int s2 = l2 == null ? 0 : l2.size(); + if (l1 == null || l1.isEmpty()) { + return l2 == null || l2.isEmpty(); + } - if (s1 != s2) { + if (l2 == null || l1.size() != l2.size()) { return false; } - if (s1 > 0) { - for (T t : l1) { - if (!l2.contains(t)) { - return false; - } + for (T t : l1) { + if (!l2.contains(t)) { + return false; } } return true; diff --git a/src/test/java/io/nats/client/RequestTests.java b/src/test/java/io/nats/client/RequestTests.java index 86973207a..9137a377a 100644 --- a/src/test/java/io/nats/client/RequestTests.java +++ b/src/test/java/io/nats/client/RequestTests.java @@ -13,9 +13,12 @@ package io.nats.client; +import io.nats.client.api.StorageType; +import io.nats.client.api.StreamConfiguration; import io.nats.client.impl.TestHandler; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -28,7 +31,7 @@ public class RequestTests { @Test public void testRequestNoResponder() throws Exception { - try (NatsTestServer ts = new NatsTestServer(false)) { + try (NatsTestServer ts = new NatsTestServer(false, true)) { Options optCancel = Options.builder().server(ts.getURI()).errorListener(new TestHandler()).build(); Options optReport = Options.builder().server(ts.getURI()).reportNoResponders().errorListener(new TestHandler()).build(); try (Connection ncCancel = standardConnection(optCancel); @@ -39,6 +42,19 @@ public void testRequestNoResponder() throws Exception { ExecutionException ee = assertThrows(ExecutionException.class, () -> ncReport.request(subject(999), null).get()); assertTrue(ee.getCause() instanceof JetStreamStatusException); assertTrue(ee.getMessage().contains("503 No Responders Available For Request")); + + ncCancel.jetStreamManagement().addStream( + StreamConfiguration.builder() + .name("testRequestNoResponder").subjects("trnrExists").storageType(StorageType.Memory) + .build()); + + JetStream jsCancel = ncCancel.jetStream(); + JetStream jsReport = ncReport.jetStream(); + + IOException ioe = assertThrows(IOException.class, () -> jsCancel.publish("not-exist", null)); + assertTrue(ioe.getMessage().contains("503")); + ioe = assertThrows(IOException.class, () -> jsReport.publish("trnrNotExist", null)); + assertTrue(ioe.getMessage().contains("503")); } } } diff --git a/src/test/java/io/nats/client/SubscriberTests.java b/src/test/java/io/nats/client/SubscriberTests.java index 968c8c003..79a7d7d06 100644 --- a/src/test/java/io/nats/client/SubscriberTests.java +++ b/src/test/java/io/nats/client/SubscriberTests.java @@ -487,4 +487,4 @@ public void testInvalidSubjectsAndQueueNames() throws Exception { } } } -} \ No newline at end of file +} diff --git a/src/test/java/io/nats/client/impl/ErrorListenerTests.java b/src/test/java/io/nats/client/impl/ErrorListenerTests.java index 845a91d63..ddb5b7704 100644 --- a/src/test/java/io/nats/client/impl/ErrorListenerTests.java +++ b/src/test/java/io/nats/client/impl/ErrorListenerTests.java @@ -144,7 +144,7 @@ public void testErrorOnNoAuth() throws Exception { fail(); } catch (Exception ignore) {} - assertTrue(handler.errorsEventually("Authorization Violation", 5000)); + assertTrue(handler.errorsEventually("Authorization Violation", 10000)); } } diff --git a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java index 704665a30..43fe57d4d 100644 --- a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java @@ -277,18 +277,23 @@ public void testJetStreamSubscribeErrors() throws Exception { Dispatcher d = nc.createDispatcher(); for (String bad : BAD_SUBJECTS_OR_QUEUES) { - // subject - IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(bad)); - assertTrue(iae.getMessage().startsWith("Subject")); - iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(bad, (PushSubscribeOptions)null)); - assertTrue(iae.getMessage().startsWith("Subject")); - - // queue - if (bad != null && !bad.isEmpty()) { + if (bad == null || bad.isEmpty()) { + // subject + IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(bad)); + assertTrue(iae.getMessage().startsWith("Subject")); + assertClientError(JsSubSubjectNeededToLookupStream, () -> js.subscribe(bad, (PushSubscribeOptions)null)); + } + else { + // subject + IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(bad)); + assertTrue(iae.getMessage().startsWith("Subject")); + iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(bad, (PushSubscribeOptions)null)); + assertTrue(iae.getMessage().startsWith("Subject")); + + // queue iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(SUBJECT, bad, null)); assertTrue(iae.getMessage().startsWith("Queue")); - iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(SUBJECT, bad, d, m -> { - }, false, null)); + iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(SUBJECT, bad, d, m -> {}, false, null)); assertTrue(iae.getMessage().startsWith("Queue")); } } @@ -801,67 +806,72 @@ public void testSubscribeDurableConsumerMustMatch() throws Exception { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - createDefaultTestStream(jsm); + String stream = stream(); + String subject = subject(); + createMemoryStream(nc, stream, subject); // push - nc.jetStreamManagement().addOrUpdateConsumer(STREAM, pushDurableBuilder().build()); - - changeExPush(js, pushDurableBuilder().deliverPolicy(DeliverPolicy.Last), "deliverPolicy"); - changeExPush(js, pushDurableBuilder().deliverPolicy(DeliverPolicy.New), "deliverPolicy"); - changeExPush(js, pushDurableBuilder().ackPolicy(AckPolicy.None), "ackPolicy"); - changeExPush(js, pushDurableBuilder().ackPolicy(AckPolicy.All), "ackPolicy"); - changeExPush(js, pushDurableBuilder().replayPolicy(ReplayPolicy.Original), "replayPolicy"); - - changeExPush(js, pushDurableBuilder().flowControl(10000), "flowControl"); - changeExPush(js, pushDurableBuilder().headersOnly(true), "headersOnly"); - - changeExPush(js, pushDurableBuilder().startTime(ZonedDateTime.now()), "startTime"); - changeExPush(js, pushDurableBuilder().ackWait(Duration.ofMillis(1)), "ackWait"); - changeExPush(js, pushDurableBuilder().description("x"), "description"); - changeExPush(js, pushDurableBuilder().sampleFrequency("x"), "sampleFrequency"); - changeExPush(js, pushDurableBuilder().idleHeartbeat(Duration.ofMillis(1000)), "idleHeartbeat"); - changeExPush(js, pushDurableBuilder().maxExpires(Duration.ofMillis(1000)), "maxExpires"); - changeExPush(js, pushDurableBuilder().inactiveThreshold(Duration.ofMillis(1000)), "inactiveThreshold"); + String uname = durable(); + String deliver = deliver(); + nc.jetStreamManagement().addOrUpdateConsumer(stream, pushDurableBuilder(subject, uname, deliver).build()); + + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).deliverPolicy(DeliverPolicy.Last), "deliverPolicy"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).deliverPolicy(DeliverPolicy.New), "deliverPolicy"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).ackPolicy(AckPolicy.None), "ackPolicy"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).ackPolicy(AckPolicy.All), "ackPolicy"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).replayPolicy(ReplayPolicy.Original), "replayPolicy"); + + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).flowControl(10000), "flowControl"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).headersOnly(true), "headersOnly"); + + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).startTime(ZonedDateTime.now()), "startTime"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).ackWait(Duration.ofMillis(1)), "ackWait"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).description("x"), "description"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).sampleFrequency("x"), "sampleFrequency"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).idleHeartbeat(Duration.ofMillis(1000)), "idleHeartbeat"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).maxExpires(Duration.ofMillis(1000)), "maxExpires"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).inactiveThreshold(Duration.ofMillis(1000)), "inactiveThreshold"); // value - changeExPush(js, pushDurableBuilder().maxDeliver(MAX_DELIVER_MIN), "maxDeliver"); - changeExPush(js, pushDurableBuilder().maxAckPending(0), "maxAckPending"); - changeExPush(js, pushDurableBuilder().ackWait(0), "ackWait"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).maxDeliver(MAX_DELIVER_MIN), "maxDeliver"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).maxAckPending(0), "maxAckPending"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).ackWait(0), "ackWait"); // value unsigned - changeExPush(js, pushDurableBuilder().startSequence(1), "startSequence"); - changeExPush(js, pushDurableBuilder().rateLimit(1), "rateLimit"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).startSequence(1), "startSequence"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).rateLimit(1), "rateLimit"); // unset doesn't fail because the server provides a value equal to the unset - changeOkPush(js, pushDurableBuilder().maxDeliver(INTEGER_UNSET)); + changeOkPush(js, subject, pushDurableBuilder(subject, uname, deliver).maxDeliver(INTEGER_UNSET)); // unset doesn't fail because the server does not provide a value // negatives are considered the unset - changeOkPush(js, pushDurableBuilder().startSequence(ULONG_UNSET)); - changeOkPush(js, pushDurableBuilder().startSequence(-1)); - changeOkPush(js, pushDurableBuilder().rateLimit(ULONG_UNSET)); - changeOkPush(js, pushDurableBuilder().rateLimit(-1)); + changeOkPush(js, subject, pushDurableBuilder(subject, uname, deliver).startSequence(ULONG_UNSET)); + changeOkPush(js, subject, pushDurableBuilder(subject, uname, deliver).startSequence(-1)); + changeOkPush(js, subject, pushDurableBuilder(subject, uname, deliver).rateLimit(ULONG_UNSET)); + changeOkPush(js, subject, pushDurableBuilder(subject, uname, deliver).rateLimit(-1)); // unset fail b/c the server does set a value that is not equal to the unset or the minimum - changeExPush(js, pushDurableBuilder().maxAckPending(LONG_UNSET), "maxAckPending"); - changeExPush(js, pushDurableBuilder().maxAckPending(0), "maxAckPending"); - changeExPush(js, pushDurableBuilder().ackWait(LONG_UNSET), "ackWait"); - changeExPush(js, pushDurableBuilder().ackWait(0), "ackWait"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).maxAckPending(LONG_UNSET), "maxAckPending"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).maxAckPending(0), "maxAckPending"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).ackWait(LONG_UNSET), "ackWait"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).ackWait(0), "ackWait"); // pull - nc.jetStreamManagement().addOrUpdateConsumer(STREAM, pullDurableBuilder().build()); + String lname = durable(); + nc.jetStreamManagement().addOrUpdateConsumer(stream, pullDurableBuilder(subject, lname).build()); // value - changeExPull(js, pullDurableBuilder().maxPullWaiting(0), "maxPullWaiting"); - changeExPull(js, pullDurableBuilder().maxBatch(0), "maxBatch"); - changeExPull(js, pullDurableBuilder().maxBytes(0), "maxBytes"); + changeExPull(js, subject, pullDurableBuilder(subject, lname).maxPullWaiting(0), "maxPullWaiting"); + changeExPull(js, subject, pullDurableBuilder(subject, lname).maxBatch(0), "maxBatch"); + changeExPull(js, subject, pullDurableBuilder(subject, lname).maxBytes(0), "maxBytes"); // unsets fail b/c the server does set a value - changeExPull(js, pullDurableBuilder().maxPullWaiting(-1), "maxPullWaiting"); + changeExPull(js, subject, pullDurableBuilder(subject, lname).maxPullWaiting(-1), "maxPullWaiting"); // unset - changeOkPull(js, pullDurableBuilder().maxBatch(-1)); - changeOkPull(js, pullDurableBuilder().maxBytes(-1)); + changeOkPull(js, subject, pullDurableBuilder(subject, lname).maxBatch(-1)); + changeOkPull(js, subject, pullDurableBuilder(subject, lname).maxBytes(-1)); // metadata Map metadataA = new HashMap<>(); metadataA.put("a", "A"); @@ -869,39 +879,39 @@ public void testSubscribeDurableConsumerMustMatch() throws Exception { if (nc.getServerInfo().isNewerVersionThan("2.9.99")) { // metadata server null versus new not null - nc.jetStreamManagement().addOrUpdateConsumer(STREAM, pushDurableBuilder().build()); - changeExPush(js, pushDurableBuilder().metadata(metadataA), "metadata"); + nc.jetStreamManagement().addOrUpdateConsumer(stream, pushDurableBuilder(subject, uname, deliver).build()); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).metadata(metadataA), "metadata"); // metadata server not null versus new null - nc.jetStreamManagement().addOrUpdateConsumer(STREAM, pushDurableBuilder().metadata(metadataA).build()); - changeOkPush(js, pushDurableBuilder()); + nc.jetStreamManagement().addOrUpdateConsumer(stream, pushDurableBuilder(subject, uname, deliver).metadata(metadataA).build()); + changeOkPush(js, subject, pushDurableBuilder(subject, uname, deliver)); // metadata server not null versus new not null but different - changeExPush(js, pushDurableBuilder().metadata(metadataB), "metadata"); + changeExPush(js, subject, pushDurableBuilder(subject, uname, deliver).metadata(metadataB), "metadata"); // metadata server not null versus new not null and same - changeOkPush(js, pushDurableBuilder().metadata(metadataA)); + changeOkPush(js, subject, pushDurableBuilder(subject, uname, deliver).metadata(metadataA)); } }); } - private void changeOkPush(JetStream js, Builder builder) throws IOException, JetStreamApiException { - unsubscribeEnsureNotBound(js.subscribe(SUBJECT, builder.buildPushSubscribeOptions())); + private void changeOkPush(JetStream js, String subject, Builder builder) throws IOException, JetStreamApiException { + unsubscribeEnsureNotBound(js.subscribe(subject, builder.buildPushSubscribeOptions())); } - private void changeOkPull(JetStream js, Builder builder) throws IOException, JetStreamApiException { - unsubscribeEnsureNotBound(js.subscribe(SUBJECT, builder.buildPullSubscribeOptions())); + private void changeOkPull(JetStream js, String subject, Builder builder) throws IOException, JetStreamApiException { + unsubscribeEnsureNotBound(js.subscribe(subject, builder.buildPullSubscribeOptions())); } - private void changeExPush(JetStream js, Builder builder, String changedField) { + private void changeExPush(JetStream js, String subject, Builder builder, String changedField) { IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, - () -> js.subscribe(SUBJECT, PushSubscribeOptions.builder().configuration(builder.build()).build())); + () -> js.subscribe(subject, PushSubscribeOptions.builder().configuration(builder.build()).build())); _changeEx(iae, changedField); } - private void changeExPull(JetStream js, Builder builder, String changedField) { + private void changeExPull(JetStream js, String subject, Builder builder, String changedField) { IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, - () -> js.subscribe(SUBJECT, PullSubscribeOptions.builder().configuration(builder.build()).build())); + () -> js.subscribe(subject, PullSubscribeOptions.builder().configuration(builder.build()).build())); _changeEx(iae, changedField); } @@ -911,12 +921,12 @@ private void _changeEx(IllegalArgumentException iae, String changedField) { assertTrue(iaeMessage.contains(changedField)); } - private Builder pushDurableBuilder() { - return builder().durable(PUSH_DURABLE).deliverSubject(DELIVER).filterSubject(SUBJECT); + private Builder pushDurableBuilder(String subject, String durable, String deliver) { + return builder().durable(durable).deliverSubject(deliver).filterSubject(subject); } - private Builder pullDurableBuilder() { - return builder().durable(PULL_DURABLE).filterSubject(SUBJECT); + private Builder pullDurableBuilder(String subject, String durable) { + return builder().durable(durable).filterSubject(subject); } @Test diff --git a/src/test/java/io/nats/client/impl/JetStreamPullTests.java b/src/test/java/io/nats/client/impl/JetStreamPullTests.java index 4408b77b4..1e765fd31 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPullTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPullTests.java @@ -16,6 +16,7 @@ import io.nats.client.*; import io.nats.client.api.AckPolicy; import io.nats.client.api.ConsumerConfiguration; +import io.nats.client.support.Status; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -32,6 +33,15 @@ public class JetStreamPullTests extends JetStreamTestBase { + class ErrorListenerPullImpl extends ErrorListenerLoggerImpl { + @Override + public void pullStatusWarning(Connection conn, JetStreamSubscription sub, Status status) {} + } + + private Options.Builder noPullWarnings() { + return Options.builder().errorListener(new ErrorListenerPullImpl()); + } + @Test public void testFetch() throws Exception { runInJsServer(nc -> { @@ -293,7 +303,7 @@ public void testBasic() throws Exception { @Test public void testNoWait() throws Exception { - runInJsServer(nc -> { + runInJsServer(noPullWarnings(), nc -> { // Create our JetStream context. JetStream js = nc.jetStream(); @@ -367,7 +377,7 @@ public void testNoWait() throws Exception { @Test public void testPullExpires() throws Exception { - runInJsServer(nc -> { + runInJsServer(noPullWarnings(), nc -> { // Create our JetStream context. JetStream js = nc.jetStream(); @@ -598,7 +608,7 @@ public void testAckWaitTimeout() throws Exception { @Test public void testDurable() throws Exception { - runInJsServer(nc -> { + runInJsServer(noPullWarnings(), nc -> { // create the stream. CreateStreamResult csr = createMemoryStream(nc); String durable = durable(); @@ -638,7 +648,7 @@ public void testDurable() throws Exception { @Test public void testNamed() throws Exception { - runInJsServer(this::atLeast290, nc -> { + runInJsServer(noPullWarnings(), this::atLeast290, nc -> { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); @@ -1007,12 +1017,7 @@ public void testExceedsMaxRequestBytes1stMessageAsyncSub() throws Exception { @Test public void testExceedsMaxRequestBytesNthMessageSyncSub() throws Exception { TestHandler handler = new TestHandler(); - AtomicBoolean skip = new AtomicBoolean(false); - runInJsServer(handler, nc -> { - skip.set(versionIsBefore(nc, "2.9.1")); - if (skip.get()) { - return; - } + runInJsServer(this::atLeast291, handler, nc -> { createDefaultTestStream(nc); JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); @@ -1040,12 +1045,7 @@ public void testExceedsMaxRequestBytesNthMessageSyncSub() throws Exception { @Test public void testExceedsMaxRequestBytesExactBytes() throws Exception { TestHandler handler = new TestHandler(); - AtomicBoolean skip = new AtomicBoolean(false); - runInJsServer(handler, nc -> { - skip.set(versionIsBefore(nc, "2.9.1")); - if (skip.get()) { - return; - } + runInJsServer(this::atLeast291, handler, nc -> { createDefaultTestStream(nc); JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); diff --git a/src/test/java/io/nats/client/impl/JetStreamPushAsyncTests.java b/src/test/java/io/nats/client/impl/JetStreamPushAsyncTests.java index f17be60e6..5ec396bed 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPushAsyncTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPushAsyncTests.java @@ -164,7 +164,9 @@ public void testPushAsyncFlowControl() throws Exception { JetStream js = nc.jetStream(); // create the stream. - createDefaultTestStream(nc); + String stream = stream(); + String subject = subject(); + createMemoryStream(nc, stream, subject); byte[] data = new byte[8192]; @@ -174,7 +176,7 @@ public void testPushAsyncFlowControl() throws Exception { for (int x = 100_000; x < MSG_COUNT + 100_000; x++) { byte[] fill = (""+ x).getBytes(); System.arraycopy(fill, 0, data, 0, 6); - js.publish(NatsMessage.builder().subject(SUBJECT).data(data).build()); + js.publish(NatsMessage.builder().subject(subject).data(data).build()); } // create a dispatcher without a default handler. @@ -190,13 +192,14 @@ public void testPushAsyncFlowControl() throws Exception { if (set.get().add(id)) { count.incrementAndGet(); } + sleep(5); // slow the process down to hopefully get flow control more often msg.ack(); msgLatch.countDown(); }; ConsumerConfiguration cc = ConsumerConfiguration.builder().flowControl(1000).build(); PushSubscribeOptions pso = PushSubscribeOptions.builder().configuration(cc).build(); - js.subscribe(SUBJECT, dispatcher, handler, false, pso); + js.subscribe(subject, dispatcher, handler, false, pso); // Wait for messages to arrive using the countdown latch. // make sure we don't wait forever diff --git a/src/test/java/io/nats/client/support/ValidatorTests.java b/src/test/java/io/nats/client/support/ValidatorTests.java index 9a9e3cfd9..a060f6863 100644 --- a/src/test/java/io/nats/client/support/ValidatorTests.java +++ b/src/test/java/io/nats/client/support/ValidatorTests.java @@ -40,8 +40,8 @@ public void testValidateSubject() { allowedRequired(Validator::validateSubject, UTF_ONLY_STRINGS); allowedRequired(Validator::validateSubject, Arrays.asList(STAR_SEGMENT, GT_LAST_SEGMENT)); notAllowedRequired(Validator::validateSubject, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_CR, HAS_LF)); - notAllowedRequired(Validator::validateSubject, Arrays.asList(STARTS_WITH_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, EMPTY_SEGMENT)); - notAllowedRequired(Validator::validateSubject, Arrays.asList(ENDS_WITH_DOT, ENDS_WITH_SPACE, ENDS_WITH_CR, ENDS_WITH_LF, ENDS_WITH_TAB)); + notAllowedRequired(Validator::validateSubject, Arrays.asList(STARTS_WITH_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, EMPTY_SEGMENT, GT_NOT_LAST_SEGMENT)); + notAllowedRequired(Validator::validateSubject, Arrays.asList(ENDS_WITH_DOT, ENDS_WITH_DOT_SPACE, ENDS_WITH_CR, ENDS_WITH_LF, ENDS_WITH_TAB)); // subject not required, null and empty both mean not supplied allowedNotRequiredEmptyAsNull(Validator::validateSubject, Arrays.asList(null, EMPTY)); @@ -49,8 +49,8 @@ public void testValidateSubject() { allowedNotRequired(Validator::validateSubject, UTF_ONLY_STRINGS); allowedNotRequired(Validator::validateSubject, Arrays.asList(STAR_SEGMENT, GT_LAST_SEGMENT)); notAllowedNotRequired(Validator::validateSubject, Arrays.asList(HAS_SPACE, HAS_CR, HAS_LF)); - notAllowedNotRequired(Validator::validateSubject, Arrays.asList(STARTS_WITH_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, EMPTY_SEGMENT)); - notAllowedNotRequired(Validator::validateSubject, Arrays.asList(ENDS_WITH_DOT, ENDS_WITH_SPACE, ENDS_WITH_CR, ENDS_WITH_LF, ENDS_WITH_TAB)); + notAllowedNotRequired(Validator::validateSubject, Arrays.asList(STARTS_WITH_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, EMPTY_SEGMENT, GT_NOT_LAST_SEGMENT)); + notAllowedNotRequired(Validator::validateSubject, Arrays.asList(ENDS_WITH_DOT, ENDS_WITH_DOT_SPACE, ENDS_WITH_CR, ENDS_WITH_LF, ENDS_WITH_TAB)); } @Test @@ -545,7 +545,7 @@ public void testSemver() { } @Test - public void testListsAreEqual() { + public void testConsumerFilterSubjectsAreEquivalent() { List l1 = Arrays.asList("one", "two"); List l2 = Arrays.asList("two", "one"); List l3 = Arrays.asList("one", "not"); @@ -553,47 +553,47 @@ public void testListsAreEqual() { List l5 = null; List l6 = new ArrayList<>(); - assertTrue(listsAreEquivalent(l1, l1)); - assertTrue(listsAreEquivalent(l1, l2)); - assertFalse(listsAreEquivalent(l1, l3)); - assertFalse(listsAreEquivalent(l1, l4)); - assertFalse(listsAreEquivalent(l1, l5)); - assertFalse(listsAreEquivalent(l1, l6)); - - assertTrue(listsAreEquivalent(l2, l1)); - assertTrue(listsAreEquivalent(l2, l2)); - assertFalse(listsAreEquivalent(l2, l3)); - assertFalse(listsAreEquivalent(l2, l4)); - assertFalse(listsAreEquivalent(l2, l5)); - assertFalse(listsAreEquivalent(l2, l6)); - - assertFalse(listsAreEquivalent(l3, l1)); - assertFalse(listsAreEquivalent(l3, l2)); - assertTrue(listsAreEquivalent(l3, l3)); - assertFalse(listsAreEquivalent(l3, l4)); - assertFalse(listsAreEquivalent(l3, l5)); - assertFalse(listsAreEquivalent(l3, l6)); - - assertFalse(listsAreEquivalent(l4, l1)); - assertFalse(listsAreEquivalent(l4, l2)); - assertFalse(listsAreEquivalent(l4, l3)); - assertTrue(listsAreEquivalent(l4, l4)); - assertFalse(listsAreEquivalent(l4, l5)); - assertFalse(listsAreEquivalent(l4, l6)); - - assertFalse(listsAreEquivalent(l5, l1)); - assertFalse(listsAreEquivalent(l5, l2)); - assertFalse(listsAreEquivalent(l5, l3)); - assertFalse(listsAreEquivalent(l5, l4)); - assertTrue(listsAreEquivalent(l5, l5)); - assertTrue(listsAreEquivalent(l5, l6)); - - assertFalse(listsAreEquivalent(l6, l1)); - assertFalse(listsAreEquivalent(l6, l2)); - assertFalse(listsAreEquivalent(l6, l3)); - assertFalse(listsAreEquivalent(l6, l4)); - assertTrue(listsAreEquivalent(l6, l5)); - assertTrue(listsAreEquivalent(l6, l6)); + assertTrue(consumerFilterSubjectsAreEquivalent(l1, l1)); + assertTrue(consumerFilterSubjectsAreEquivalent(l1, l2)); + assertFalse(consumerFilterSubjectsAreEquivalent(l1, l3)); + assertFalse(consumerFilterSubjectsAreEquivalent(l1, l4)); + assertFalse(consumerFilterSubjectsAreEquivalent(l1, l5)); + assertFalse(consumerFilterSubjectsAreEquivalent(l1, l6)); + + assertTrue(consumerFilterSubjectsAreEquivalent(l2, l1)); + assertTrue(consumerFilterSubjectsAreEquivalent(l2, l2)); + assertFalse(consumerFilterSubjectsAreEquivalent(l2, l3)); + assertFalse(consumerFilterSubjectsAreEquivalent(l2, l4)); + assertFalse(consumerFilterSubjectsAreEquivalent(l2, l5)); + assertFalse(consumerFilterSubjectsAreEquivalent(l2, l6)); + + assertFalse(consumerFilterSubjectsAreEquivalent(l3, l1)); + assertFalse(consumerFilterSubjectsAreEquivalent(l3, l2)); + assertTrue(consumerFilterSubjectsAreEquivalent(l3, l3)); + assertFalse(consumerFilterSubjectsAreEquivalent(l3, l4)); + assertFalse(consumerFilterSubjectsAreEquivalent(l3, l5)); + assertFalse(consumerFilterSubjectsAreEquivalent(l3, l6)); + + assertFalse(consumerFilterSubjectsAreEquivalent(l4, l1)); + assertFalse(consumerFilterSubjectsAreEquivalent(l4, l2)); + assertFalse(consumerFilterSubjectsAreEquivalent(l4, l3)); + assertTrue(consumerFilterSubjectsAreEquivalent(l4, l4)); + assertFalse(consumerFilterSubjectsAreEquivalent(l4, l5)); + assertFalse(consumerFilterSubjectsAreEquivalent(l4, l6)); + + assertFalse(consumerFilterSubjectsAreEquivalent(l5, l1)); + assertFalse(consumerFilterSubjectsAreEquivalent(l5, l2)); + assertFalse(consumerFilterSubjectsAreEquivalent(l5, l3)); + assertFalse(consumerFilterSubjectsAreEquivalent(l5, l4)); + assertTrue(consumerFilterSubjectsAreEquivalent(l5, l5)); + assertTrue(consumerFilterSubjectsAreEquivalent(l5, l6)); + + assertFalse(consumerFilterSubjectsAreEquivalent(l6, l1)); + assertFalse(consumerFilterSubjectsAreEquivalent(l6, l2)); + assertFalse(consumerFilterSubjectsAreEquivalent(l6, l3)); + assertFalse(consumerFilterSubjectsAreEquivalent(l6, l4)); + assertTrue(consumerFilterSubjectsAreEquivalent(l6, l5)); + assertTrue(consumerFilterSubjectsAreEquivalent(l6, l6)); } @Test diff --git a/src/test/java/io/nats/client/utils/TestBase.java b/src/test/java/io/nats/client/utils/TestBase.java index 33b204e8a..0470c8ee5 100644 --- a/src/test/java/io/nats/client/utils/TestBase.java +++ b/src/test/java/io/nats/client/utils/TestBase.java @@ -40,10 +40,11 @@ public class TestBase { public static final String STAR_SEGMENT = "*.star.*.segment.*"; + public static final String GT_NOT_LAST_SEGMENT = "gt.>.notlast"; public static final String GT_LAST_SEGMENT = "gt.last.>"; public static final String STARTS_WITH_DOT = ".starts-with-dot"; public static final String ENDS_WITH_DOT = "ends-with-dot."; - public static final String ENDS_WITH_SPACE = "ends-with-space. "; + public static final String ENDS_WITH_DOT_SPACE = "ends-with-space. "; public static final String ENDS_WITH_CR = "ends-with-space.\r"; public static final String ENDS_WITH_LF = "ends-with-space.\n"; public static final String ENDS_WITH_TAB = "ends-with-space.\t"; @@ -54,7 +55,7 @@ public class TestBase { public static final String PLAIN = "plain"; public static final String HAS_SPACE = "has space"; public static final String STARTS_SPACE = " startsspace"; - public static final String ENDS_SPACE = " endssspace "; + public static final String ENDS_SPACE = "endsspace "; public static final String HAS_PRINTABLE = "has-print!able"; public static final String HAS_DOT = "has.dot"; public static final String HAS_DASH = "has-dash"; @@ -120,10 +121,18 @@ public static void runInJsServer(ErrorListener el, InServerTest inServerTest) th runInServer(false, true, new Options.Builder().errorListener(el), null, inServerTest); } + public static void runInJsServer(VersionCheck vc, ErrorListener el, InServerTest inServerTest) throws Exception { + runInServer(false, true, new Options.Builder().errorListener(el), vc, inServerTest); + } + public static void runInJsServer(Options.Builder builder, InServerTest inServerTest) throws Exception { runInServer(false, true, builder, null, inServerTest); } + public static void runInJsServer(Options.Builder builder, VersionCheck vc, InServerTest inServerTest) throws Exception { + runInServer(false, true, builder, vc, inServerTest); + } + public static void runInJsServer(VersionCheck vc, InServerTest inServerTest) throws Exception { runInServer(false, true, null, vc, inServerTest); }