diff --git a/src/main/java/io/nats/client/BaseConsumerContext.java b/src/main/java/io/nats/client/BaseConsumerContext.java index bdf961e83..efe0d0f63 100644 --- a/src/main/java/io/nats/client/BaseConsumerContext.java +++ b/src/main/java/io/nats/client/BaseConsumerContext.java @@ -183,14 +183,13 @@ public interface BaseConsumerContext { @NonNull MessageConsumer consume(@NonNull ConsumeOptions consumeOptions, @Nullable Dispatcher dispatcher, @NonNull MessageHandler handler) throws IOException, JetStreamApiException; -// TODO - PINNED CONSUMER SUPPORT -// /** -// * Unpins this consumer -// * @param group the group name of the consumer's group -// * @throws IOException covers various communication issues with the NATS -// * server such as timeout or interruption -// * @throws JetStreamApiException the request had an error related to the data -// * @return true if the delete succeeded -// */ -// boolean unpin(String group) throws IOException, JetStreamApiException; + /** + * Unpins this consumer + * @param group the group name of the consumer's group + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @return true if the delete succeeded + */ + boolean unpin(String group) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index e7e582d04..283522cfb 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -357,18 +357,17 @@ public interface JetStreamManagement { */ boolean deleteMessage(String streamName, long seq, boolean erase) throws IOException, JetStreamApiException; -// TODO - PINNED CONSUMER SUPPORT -// /** -// * Unpins a consumer -// * @param streamName name of the stream -// * @param consumerName name of consumer -// * @param consumerGroup name of the consumer's group -// * @throws IOException covers various communication issues with the NATS -// * server such as timeout or interruption -// * @throws JetStreamApiException the request had an error related to the data -// * @return true if the delete succeeded -// */ -// boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException; + /** + * Unpins a consumer + * @param streamName name of the stream + * @param consumerName name of consumer + * @param consumerGroup name of the consumer's group + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @return true if the delete succeeded + */ + boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException; /** * Gets a context for publishing and subscribing to subjects backed by Jetstream streams diff --git a/src/main/java/io/nats/client/PullRequestOptions.java b/src/main/java/io/nats/client/PullRequestOptions.java index cc9bbe595..f3beb0c1d 100644 --- a/src/main/java/io/nats/client/PullRequestOptions.java +++ b/src/main/java/io/nats/client/PullRequestOptions.java @@ -60,17 +60,15 @@ public String toJson() { JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat); JsonUtils.addField(sb, GROUP, group); JsonUtils.addFieldWhenGtZero(sb, PRIORITY, priority); -// TODO - PINNED CONSUMER SUPPORT -// JsonUtils.addField(sb, ID, getPinId()); + JsonUtils.addField(sb, ID, getPinId()); JsonUtils.addField(sb, MIN_PENDING, minPending); JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending); return JsonUtils.endJson(sb).toString(); } -// TODO - PINNED CONSUMER SUPPORT -// protected String getPinId() { -// return null; -// } + protected String getPinId() { + return null; + } /** * Get the batch size option value diff --git a/src/main/java/io/nats/client/api/PriorityGroupState.java b/src/main/java/io/nats/client/api/PriorityGroupState.java index 86e6a5a42..5db7aa328 100644 --- a/src/main/java/io/nats/client/api/PriorityGroupState.java +++ b/src/main/java/io/nats/client/api/PriorityGroupState.java @@ -16,20 +16,21 @@ import io.nats.client.support.JsonValue; import io.nats.client.support.JsonValueUtils; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; +import java.time.ZonedDateTime; import java.util.List; -import static io.nats.client.support.ApiConstants.GROUP; +import static io.nats.client.support.ApiConstants.*; +import static io.nats.client.support.JsonValueUtils.readDate; /** * Status of a specific consumer priority group */ public class PriorityGroupState { private final String group; - -// TODO - PINNED CONSUMER SUPPORT -// private final String pinnedClientId; -// private final ZonedDateTime pinnedTime; + private final String pinnedClientId; + private final ZonedDateTime pinnedTime; static List optionalListOf(JsonValue vpgStates) { return JsonValueUtils.optionalListOf(vpgStates, PriorityGroupState::new); @@ -37,9 +38,8 @@ static List optionalListOf(JsonValue vpgStates) { PriorityGroupState(JsonValue vpgState) { group = JsonValueUtils.readString(vpgState, GROUP); -// TODO - PINNED CONSUMER SUPPORT -// pinnedClientId = JsonValueUtils.readString(vpgState, PINNED_CLIENT_ID); -// pinnedTime = readDate(vpgState, PINNED_TS); + pinnedClientId = JsonValueUtils.readString(vpgState, PINNED_CLIENT_ID); + pinnedTime = readDate(vpgState, PINNED_TS); } /** @@ -51,33 +51,30 @@ public String getGroup() { return group; } -// TODO - PINNED CONSUMER SUPPORT -// /** -// * The generated ID of the pinned client -// * @return the id -// */ -// @Nullable -// public String getPinnedClientId() { -// return pinnedClientId; -// } + /** + * The generated ID of the pinned client + * @return the id + */ + @Nullable + public String getPinnedClientId() { + return pinnedClientId; + } -// TODO - PINNED CONSUMER SUPPORT -// /** -// * The timestamp when the client was pinned -// * @return the timestamp -// */ -// @Nullable -// public ZonedDateTime getPinnedTime() { -// return pinnedTime; -// } + /** + * The timestamp when the client was pinned + * @return the timestamp + */ + @Nullable + public ZonedDateTime getPinnedTime() { + return pinnedTime; + } @Override public String toString() { return "PriorityGroupState{" + "group='" + group + '\'' + -// TODO - PINNED CONSUMER SUPPORT -// ", pinnedClientId='" + pinnedClientId + '\'' + -// ", pinnedTime=" + pinnedTime + + ", pinnedClientId='" + pinnedClientId + '\'' + + ", pinnedTime=" + pinnedTime + '}'; } } diff --git a/src/main/java/io/nats/client/api/PriorityPolicy.java b/src/main/java/io/nats/client/api/PriorityPolicy.java index 9b905f564..b76dccad8 100644 --- a/src/main/java/io/nats/client/api/PriorityPolicy.java +++ b/src/main/java/io/nats/client/api/PriorityPolicy.java @@ -24,10 +24,8 @@ public enum PriorityPolicy { None("none"), Overflow("overflow"), - Prioritized("prioritized"); - -// TODO - PINNED CONSUMER SUPPORT -// PinnedClient("pinned_client") + Prioritized("prioritized"), + PinnedClient("pinned_client"); private final String policy; diff --git a/src/main/java/io/nats/client/impl/MessageManager.java b/src/main/java/io/nats/client/impl/MessageManager.java index 11221b607..79ef4baee 100644 --- a/src/main/java/io/nats/client/impl/MessageManager.java +++ b/src/main/java/io/nats/client/impl/MessageManager.java @@ -97,16 +97,14 @@ protected void trackJsMessage(Message msg) { NatsJetStreamMetaData meta = msg.metaData(); lastStreamSeq = meta.streamSequence(); lastConsumerSeq++; -// TODO - PINNED CONSUMER SUPPORT -// subTrackJsMessage(msg); // for subclasses so they don't have to acquire the lock + subTrackJsMessage(msg); // for subclasses so they don't have to acquire the lock } finally { stateChangeLock.unlock(); } } -// TODO - PINNED CONSUMER SUPPORT -// protected void subTrackJsMessage(Message msg) {} + protected void subTrackJsMessage(Message msg) {} protected void handleHeartbeatError() { conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq)); diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index e0a91f526..d47c6cacf 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -17,6 +17,7 @@ import io.nats.client.api.ConsumerConfiguration; import io.nats.client.api.ConsumerInfo; import io.nats.client.api.OrderedConsumerConfiguration; +import io.nats.client.api.PriorityPolicy; import io.nats.client.support.Validator; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; @@ -144,15 +145,14 @@ private void checkState() throws IOException { } } -// TODO - PINNED CONSUMER SUPPORT -// private void checkNotPinned(String label) throws IOException { -// ConsumerInfo ci = cachedConsumerInfo.get(); -// if (ci != null) { -// if (ci.getConsumerConfiguration().getPriorityPolicy() == PriorityPolicy.PinnedClient) { -// throw new IOException("Pinned not allowed with " + label); -// } -// } -// } + private void checkNotPinned(String label) throws IOException { + ConsumerInfo ci = cachedConsumerInfo.get(); + if (ci != null) { + if (ci.getConsumerConfiguration().getPriorityPolicy() == PriorityPolicy.PinnedClient) { + throw new IOException("Pinned not allowed with " + label); + } + } + } private NatsMessageConsumerBase trackConsume(NatsMessageConsumerBase con) { lastConsumer.set(con); @@ -222,8 +222,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException try { stateLock.lock(); checkState(); -// TODO - PINNED CONSUMER SUPPORT -// checkNotPinned("Next"); + checkNotPinned("Next"); try { long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait @@ -292,8 +291,7 @@ public FetchConsumer fetch(@NonNull FetchConsumeOptions fetchConsumeOptions) thr try { stateLock.lock(); checkState(); -// TODO - PINNED CONSUMER SUPPORT -// checkNotPinned("Fetch"); + checkNotPinned("Fetch"); return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo.get(), fetchConsumeOptions)); } finally { @@ -378,17 +376,16 @@ public MessageConsumer consume(@NonNull ConsumeOptions consumeOptions, } } -// TODO - PINNED CONSUMER SUPPORT -// @Override -// public boolean unpin(String group) throws IOException, JetStreamApiException { -// String name = consumerName.get(); -// if (name == null) { -// ConsumerInfo ci = cachedConsumerInfo.get(); -// if (ci == null) { -// ci = getConsumerInfo(); -// } -// name = ci.getName(); -// } -// return streamCtx.jsm.unpinConsumer(streamCtx.streamName, name, group); -// } + @Override + public boolean unpin(String group) throws IOException, JetStreamApiException { + String name = consumerName.get(); + if (name == null) { + ConsumerInfo ci = cachedConsumerInfo.get(); + if (ci == null) { + ci = getConsumerInfo(); + } + name = ci.getName(); + } + return streamCtx.jsm.unpinConsumer(streamCtx.streamName, name, group); + } } diff --git a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java index e5045e8f3..898b28dc6 100644 --- a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java @@ -48,18 +48,16 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer inactiveThreshold = expiresInMillis * 110 / 100; // 10% longer than the wait } -// TODO - PINNED CONSUMER SUPPORT -// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId, - PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages()) - .maxBytes(fetchConsumeOptions.getMaxBytes()) - .expiresIn(expiresInMillis) - .idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat()) - .noWait(isNoWait) - .group(fetchConsumeOptions.getGroup()) - .priority(fetchConsumeOptions.getPriority()) - .minPending(fetchConsumeOptions.getMinPending()) - .minAckPending(fetchConsumeOptions.getMinAckPending()) - .build(); + PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm == null ? null : pmm.currentPinId, + PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages()) + .maxBytes(fetchConsumeOptions.getMaxBytes()) + .expiresIn(expiresInMillis) + .idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat()) + .noWait(isNoWait) + .group(fetchConsumeOptions.getGroup()) + .priority(fetchConsumeOptions.getPriority()) + .minPending(fetchConsumeOptions.getMinPending()) + .minAckPending(fetchConsumeOptions.getMinAckPending())); initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold), false); pullSubject = sub._pull(pro, fetchConsumeOptions.raiseStatusWarnings(), this); startNanos = -1; diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index cd3a6ee43..b21793185 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -362,20 +362,19 @@ public boolean deleteMessage(String streamName, long seq, boolean erase) throws return new SuccessApiResponse(resp).throwOnHasError().getSuccess(); } -// TODO - PINNED CONSUMER SUPPORT -// /** -// * {@inheritDoc} -// */ -// @Override -// public boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException { -// validateNotNull(streamName, "Stream Name"); -// validateNotNull(consumerName, "Consumer Name"); -// validateNotNull(consumerGroup, "Consumer Group"); -// String subj = String.format(JSAPI_CONSUMER_UNPIN, streamName, consumerName); -// byte[] payload = String.format("{\"group\": \"%s\"}", consumerGroup).getBytes(); -// Message resp = makeRequestResponseRequired(subj, payload, getTimeout()); -// return new SuccessApiResponse(resp).throwOnHasError().getSuccess(); -// } + /** + * {@inheritDoc} + */ + @Override + public boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException { + validateNotNull(streamName, "Stream Name"); + validateNotNull(consumerName, "Consumer Name"); + validateNotNull(consumerGroup, "Consumer Group"); + String subj = String.format(JSAPI_CONSUMER_UNPIN, streamName, consumerName); + byte[] payload = String.format("{\"group\": \"%s\"}", consumerGroup).getBytes(); + Message resp = makeRequestResponseRequired(subj, payload, getTimeout()); + return new SuccessApiResponse(resp).throwOnHasError().getSuccess(); + } /** * {@inheritDoc} diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java index 2c1acbcaa..16dcb7508 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java @@ -104,17 +104,15 @@ else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.p private void repull() { int rePullMessages = Math.max(1, consumeOpts.getBatchSize() - pmm.pendingMessages); long rePullBytes = consumeOpts.getBatchBytes() == 0 ? 0 : consumeOpts.getBatchBytes() - pmm.pendingBytes; -// TODO - PINNED CONSUMER SUPPORT -// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId, - PullRequestOptions pro = PullRequestOptions.builder(rePullMessages) - .maxBytes(rePullBytes) - .expiresIn(consumeOpts.getExpiresInMillis()) - .idleHeartbeat(consumeOpts.getIdleHeartbeat()) - .group(consumeOpts.getGroup()) - .priority(consumeOpts.getPriority()) - .minPending(consumeOpts.getMinPending()) - .minAckPending(consumeOpts.getMinAckPending()) - .build(); + PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId, + PullRequestOptions.builder(rePullMessages) + .maxBytes(rePullBytes) + .expiresIn(consumeOpts.getExpiresInMillis()) + .idleHeartbeat(consumeOpts.getIdleHeartbeat()) + .group(consumeOpts.getGroup()) + .priority(consumeOpts.getPriority()) + .minPending(consumeOpts.getMinPending()) + .minAckPending(consumeOpts.getMinAckPending())); sub._pull(pro, consumeOpts.raiseStatusWarnings(), this); } } diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java index 46f1972c7..2ab1cb18b 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java @@ -15,6 +15,7 @@ import io.nats.client.JetStreamApiException; import io.nats.client.MessageConsumer; +import io.nats.client.PullRequestOptions; import io.nats.client.api.ConsumerInfo; import java.io.IOException; @@ -139,18 +140,17 @@ protected void shutdownSub() { } } -// TODO - PINNED CONSUMER SUPPORT -// static class PinnablePullRequestOptions extends PullRequestOptions { -// final String pinId; -// -// public PinnablePullRequestOptions(String pinId, Builder b) { -// super(b); -// this.pinId = pinId; -// } -// -// @Override -// protected String getPinId() { -// return pinId; -// } -// } + static class PinnablePullRequestOptions extends PullRequestOptions { + final String pinId; + + public PinnablePullRequestOptions(String pinId, Builder b) { + super(b); + this.pinId = pinId; + } + + @Override + protected String getPinId() { + return pinId; + } + } } diff --git a/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java index 7eddea73d..91a0dbd7a 100644 --- a/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java @@ -160,9 +160,8 @@ public MessageConsumer consume(@NonNull ConsumeOptions consumeOptions, @Nullable return impl.consume(consumeOptions, dispatcher, handler); } -// TODO - PINNED CONSUMER SUPPORT -// @Override -// public boolean unpin(String group) throws IOException, JetStreamApiException { -// return impl.unpin(group); -// } + @Override + public boolean unpin(String group) throws IOException, JetStreamApiException { + return impl.unpin(group); + } } diff --git a/src/main/java/io/nats/client/impl/PullMessageManager.java b/src/main/java/io/nats/client/impl/PullMessageManager.java index 5ecb060a6..c1538803e 100644 --- a/src/main/java/io/nats/client/impl/PullMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullMessageManager.java @@ -19,8 +19,7 @@ import io.nats.client.support.Status; import static io.nats.client.impl.MessageManager.ManageResult.*; -import static io.nats.client.support.NatsJetStreamConstants.NATS_PENDING_BYTES; -import static io.nats.client.support.NatsJetStreamConstants.NATS_PENDING_MESSAGES; +import static io.nats.client.support.NatsJetStreamConstants.*; import static io.nats.client.support.Status.*; class PullMessageManager extends MessageManager { @@ -30,8 +29,7 @@ class PullMessageManager extends MessageManager { protected boolean trackingBytes; protected boolean raiseStatusWarnings; protected PullManagerObserver pullManagerObserver; -// TODO - PINNED CONSUMER SUPPORT -// protected String currentPinId; + protected String currentPinId; protected PullMessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) { super(conn, so, syncMode); @@ -152,13 +150,12 @@ protected ManageResult manage(Message msg) { return manageStatus(msg); } -// TODO - PINNED CONSUMER SUPPORT -// @Override -// protected void subTrackJsMessage(Message msg) { -// if (msg.hasHeaders()) { -// currentPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR); -// } -// } + @Override + protected void subTrackJsMessage(Message msg) { + if (msg.hasHeaders()) { + currentPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR); + } + } protected ManageResult manageStatus(Message msg) { Status status = msg.getStatus(); @@ -193,10 +190,9 @@ protected ManageResult manageStatus(Message msg) { } break; -// TODO - PINNED CONSUMER SUPPORT -// case PIN_ERROR_CODE: -// currentPinId = null; -// return STATUS_TERMINUS; + case PIN_ERROR_CODE: + currentPinId = null; + return STATUS_TERMINUS; } // All unknown 409s are errors, since that basically means the client is not aware of them. diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index b58f6f237..4cf05bf4b 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -164,9 +164,8 @@ public interface ApiConstants { String PAUSE_REMAINING = "pause_remaining"; String PAUSE_UNTIL = "pause_until"; String PERSIST_MODE = "persist_mode"; -// TODO - PINNED CONSUMER SUPPORT -// String PINNED_CLIENT_ID = "pinned_client_id"; -// String PINNED_TS = "pinned_ts"; + String PINNED_CLIENT_ID = "pinned_client_id"; + String PINNED_TS = "pinned_ts"; String PLACEMENT = "placement"; String PORT = "port"; String PRIORITY = "priority"; diff --git a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java index 84aa52bb9..20ce8c234 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java @@ -86,9 +86,8 @@ public interface NatsJetStreamConstants { // JSAPI_MSG_DELETE is the endpoint to remove a message. String JSAPI_MSG_DELETE = "STREAM.MSG.DELETE.%s"; -// TODO - PINNED CONSUMER SUPPORT -// // JSAPI_CONSUMER_UNPIN is the endpoint to unpin a consumer -// String JSAPI_CONSUMER_UNPIN = "CONSUMER.UNPIN.%s.%s"; + // JSAPI_CONSUMER_UNPIN is the endpoint to unpin a consumer + String JSAPI_CONSUMER_UNPIN = "CONSUMER.UNPIN.%s.%s"; String MSG_ID_HDR = "Nats-Msg-Id"; String EXPECTED_STREAM_HDR = "Nats-Expected-Stream"; @@ -144,8 +143,7 @@ public interface NatsJetStreamConstants { String NATS_BATCH_SEQUENCE_HDR = "Nats-Batch-Sequence"; String NATS_BATCH_COMMIT_HDR = "Nats-Batch-Commit"; -// TODO - PINNED CONSUMER SUPPORT -// String NATS_PIN_ID_HDR = "Nats-Pin-Id"; + String NATS_PIN_ID_HDR = "Nats-Pin-Id"; int JS_CONSUMER_NOT_FOUND_ERR = 10014; int JS_NO_MESSAGE_FOUND_ERR = 10037; diff --git a/src/main/java/io/nats/client/support/Status.java b/src/main/java/io/nats/client/support/Status.java index d939aeef8..7c543ff53 100644 --- a/src/main/java/io/nats/client/support/Status.java +++ b/src/main/java/io/nats/client/support/Status.java @@ -38,9 +38,8 @@ public class Status { public static final int CONFLICT_CODE = 409; public static final int EOB_CODE = 204; -// TODO - PINNED CONSUMER SUPPORT -// public static final int PIN_ERROR_CODE = 423; -// public static final byte[] PIN_ERROR_CODE_BYTES = ("" + PIN_ERROR_CODE).getBytes(ISO_8859_1); + public static final int PIN_ERROR_CODE = 423; + public static final byte[] PIN_ERROR_CODE_BYTES = ("" + PIN_ERROR_CODE).getBytes(ISO_8859_1); public static final String BAD_REQUEST = "Bad Request"; // 400 public static final String NO_MESSAGES = "No Messages"; // 404 diff --git a/src/test/java/io/nats/client/api/ConsumerInfoTests.java b/src/test/java/io/nats/client/api/ConsumerInfoTests.java index 2f7bda5c0..5e03496a9 100644 --- a/src/test/java/io/nats/client/api/ConsumerInfoTests.java +++ b/src/test/java/io/nats/client/api/ConsumerInfoTests.java @@ -90,14 +90,12 @@ public void testConsumerInfo() { PriorityGroupState pgs = ci.getPriorityGroupStates().get(0); assertEquals("group1", pgs.getGroup()); -// TODO - PINNED CONSUMER SUPPORT -// assertEquals("pci1", pgs.getPinnedClientId()); -// assertEquals(DateTimeUtils.parseDateTime("2025-09-24T16:01:01.163377Z"), pgs.getPinnedTime()); + assertEquals("pci1", pgs.getPinnedClientId()); + assertEquals(DateTimeUtils.parseDateTime("2025-09-24T16:01:01.163377Z"), pgs.getPinnedTime()); pgs = ci.getPriorityGroupStates().get(1); assertEquals("group2", pgs.getGroup()); -// TODO - PINNED CONSUMER SUPPORT -// assertEquals("pci2", pgs.getPinnedClientId()); -// assertEquals(DateTimeUtils.parseDateTime("2025-09-24T16:02:02.163377Z"), pgs.getPinnedTime()); + assertEquals("pci2", pgs.getPinnedClientId()); + assertEquals(DateTimeUtils.parseDateTime("2025-09-24T16:02:02.163377Z"), pgs.getPinnedTime()); assertNotNull(pgs.toString()); // COVERAGE diff --git a/src/test/java/io/nats/client/impl/JetStreamConsumerTests.java b/src/test/java/io/nats/client/impl/JetStreamConsumerTests.java index 395e04d82..cb49fd537 100644 --- a/src/test/java/io/nats/client/impl/JetStreamConsumerTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamConsumerTests.java @@ -412,7 +412,7 @@ private static void validateMultipleSubjectFilterSub(JetStreamSubscription sub, @Test public void testRaiseStatusWarnings1194() throws Exception { - ListenerForTesting listenerForTesting = new ListenerForTesting(true, true); + ListenerForTesting listenerForTesting = new ListenerForTesting(false, false); runInJsServer(listenerForTesting, nc -> { // Setup JetStreamManagement jsm = nc.jetStreamManagement(); diff --git a/src/test/java/io/nats/client/impl/JetStreamPullTests.java b/src/test/java/io/nats/client/impl/JetStreamPullTests.java index 13c706102..e26d6255d 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPullTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPullTests.java @@ -26,9 +26,12 @@ import java.io.IOException; import java.time.Duration; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,6 +39,7 @@ import static io.nats.client.api.ConsumerConfiguration.builder; import static io.nats.client.support.ApiConstants.*; +import static io.nats.client.support.NatsJetStreamConstants.NATS_PIN_ID_HDR; import static io.nats.client.support.Status.*; import static org.junit.jupiter.api.Assertions.*; @@ -1408,133 +1412,132 @@ public void testPrioritized() throws Exception { }); } -// TODO - PINNED CONSUMER SUPPORT -// @Test -// public void testPinnedClient() throws Exception { -// // have 3 consumers in the same group all PriorityPolicy.PinnedClient -// // start consuming, tracking pin ids and counts -// // unpin 10 times and make sure that new pins are made -// ListenerForTesting l = new ListenerForTesting(); -// Options.Builder b = Options.builder().errorListener(l); -// jsServer.run(b, TestBase::atLeast2_12, nc -> { -// JetStreamManagement jsm = nc.jetStreamManagement(); -// TestingStreamContainer tsc = new TestingStreamContainer(jsm); -// JetStream js = nc.jetStream(); -// -// String consumer = name(); -// String group = variant(); -// -// ConsumerConfiguration cc = ConsumerConfiguration.builder() -// .filterSubject(tsc.subject()) -// .name(consumer) -// .priorityGroups(group) -// .priorityPolicy(PriorityPolicy.PinnedClient) -// .build(); -// -// StreamContext streamContext = nc.getStreamContext(tsc.stream); -// ConsumerContext consumerContext1 = streamContext.createOrUpdateConsumer(cc); -// ConsumerContext consumerContext2 = streamContext.getConsumerContext(consumer); -// ConsumerContext consumerContext3 = streamContext.getConsumerContext(consumer); -// -// //noinspection resource -// assertThrows(IOException.class, () -> consumerContext1.fetchMessages(10)); -// -// Set pinIds = new HashSet<>(); -// AtomicInteger count1 = new AtomicInteger(); -// AtomicInteger count2 = new AtomicInteger(); -// AtomicInteger count3 = new AtomicInteger(); -// MessageHandler handler1 = msg -> { -// msg.ack(); -// assertNotNull(msg.getHeaders()); -// String natsPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR); -// assertNotNull(natsPinId); -// pinIds.add(natsPinId); -// count1.incrementAndGet(); -// }; -// MessageHandler handler2 = msg -> { -// msg.ack(); -// assertNotNull(msg.getHeaders()); -// String natsPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR); -// assertNotNull(natsPinId); -// pinIds.add(natsPinId); -// count2.incrementAndGet(); -// }; -// MessageHandler handler3 = msg -> { -// msg.ack(); -// assertNotNull(msg.getHeaders()); -// String natsPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR); -// assertNotNull(natsPinId); -// pinIds.add(natsPinId); -// count3.incrementAndGet(); -// }; -// -// ConsumeOptions co = ConsumeOptions.builder() -// .batchSize(10) -// .expiresIn(1000) -// .group(group) -// .build(); -// -// MessageConsumer mc1 = consumerContext1.consume(co, handler1); -// MessageConsumer mc2 = consumerContext2.consume(co, handler2); -// MessageConsumer mc3 = consumerContext3.consume(co, handler3); -// -// AtomicBoolean pub = new AtomicBoolean(true); -// Thread t = new Thread(() -> { -// int count = 0; -// while (pub.get()) { -// ++count; -// try { -// js.publish(tsc.subject(), ("x" + count).getBytes()); -// sleep(20); -// } -// catch (Exception e) { -// fail(e); -// return; -// } -// } -// }); -// t.start(); -// -// int unpins = 0; -// while (unpins++ < 10) { -// sleep(650); -// switch (ThreadLocalRandom.current().nextInt(0, 4)) { -// case 0: -// assertTrue(consumerContext1.unpin(group)); -// break; -// case 1: -// assertTrue(consumerContext2.unpin(group)); -// break; -// case 2: -// assertTrue(consumerContext3.unpin(group)); -// break; -// case 3: -// assertTrue(jsm.unpinConsumer(tsc.stream, consumer, group)); -// break; -// } -// assertTrue(consumerContext1.unpin(group)); -// } -// sleep(650); -// -// pub.set(false); -// t.join(); -// mc1.close(); -// mc2.close(); -// mc3.close(); -// -// assertTrue(pinIds.size() > 3); -// int c1 = count1.get(); -// int c2 = count2.get(); -// int c3 = count3.get(); -// if (c1 > 0) { -// assertTrue(c2 > 0 || c3 > 0); -// } -// else if (c2 > 0) { -// assertTrue(c3 > 0); -// } -// else { -// fail("At least 2 consumers should have gotten messages"); -// } -// }); -// } + @Test + public void testPinnedClient() throws Exception { + // have 3 consumers in the same group all PriorityPolicy.PinnedClient + // start consuming, tracking pin ids and counts + // unpin 10 times and make sure that new pins are made + ListenerForTesting l = new ListenerForTesting(); + Options.Builder b = Options.builder().errorListener(l); + jsServer.run(b, TestBase::atLeast2_12, nc -> { + JetStreamManagement jsm = nc.jetStreamManagement(); + TestingStreamContainer tsc = new TestingStreamContainer(jsm); + JetStream js = nc.jetStream(); + + String consumer = name(); + String group = variant(); + + ConsumerConfiguration cc = ConsumerConfiguration.builder() + .filterSubject(tsc.subject()) + .name(consumer) + .priorityGroups(group) + .priorityPolicy(PriorityPolicy.PinnedClient) + .build(); + + StreamContext streamContext = nc.getStreamContext(tsc.stream); + ConsumerContext consumerContext1 = streamContext.createOrUpdateConsumer(cc); + ConsumerContext consumerContext2 = streamContext.getConsumerContext(consumer); + ConsumerContext consumerContext3 = streamContext.getConsumerContext(consumer); + + //noinspection resource + assertThrows(IOException.class, () -> consumerContext1.fetchMessages(10)); + + Set pinIds = new HashSet<>(); + AtomicInteger count1 = new AtomicInteger(); + AtomicInteger count2 = new AtomicInteger(); + AtomicInteger count3 = new AtomicInteger(); + MessageHandler handler1 = msg -> { + msg.ack(); + assertNotNull(msg.getHeaders()); + String natsPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR); + assertNotNull(natsPinId); + pinIds.add(natsPinId); + count1.incrementAndGet(); + }; + MessageHandler handler2 = msg -> { + msg.ack(); + assertNotNull(msg.getHeaders()); + String natsPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR); + assertNotNull(natsPinId); + pinIds.add(natsPinId); + count2.incrementAndGet(); + }; + MessageHandler handler3 = msg -> { + msg.ack(); + assertNotNull(msg.getHeaders()); + String natsPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR); + assertNotNull(natsPinId); + pinIds.add(natsPinId); + count3.incrementAndGet(); + }; + + ConsumeOptions co = ConsumeOptions.builder() + .batchSize(10) + .expiresIn(1000) + .group(group) + .build(); + + MessageConsumer mc1 = consumerContext1.consume(co, handler1); + MessageConsumer mc2 = consumerContext2.consume(co, handler2); + MessageConsumer mc3 = consumerContext3.consume(co, handler3); + + AtomicBoolean pub = new AtomicBoolean(true); + Thread t = new Thread(() -> { + int count = 0; + while (pub.get()) { + ++count; + try { + js.publish(tsc.subject(), ("x" + count).getBytes()); + sleep(20); + } + catch (Exception e) { + fail(e); + return; + } + } + }); + t.start(); + + int unpins = 0; + while (unpins++ < 10) { + sleep(650); + switch (ThreadLocalRandom.current().nextInt(0, 4)) { + case 0: + assertTrue(consumerContext1.unpin(group)); + break; + case 1: + assertTrue(consumerContext2.unpin(group)); + break; + case 2: + assertTrue(consumerContext3.unpin(group)); + break; + case 3: + assertTrue(jsm.unpinConsumer(tsc.stream, consumer, group)); + break; + } + assertTrue(consumerContext1.unpin(group)); + } + sleep(650); + + pub.set(false); + t.join(); + mc1.close(); + mc2.close(); + mc3.close(); + + assertTrue(pinIds.size() > 3); + int c1 = count1.get(); + int c2 = count2.get(); + int c3 = count3.get(); + if (c1 > 0) { + assertTrue(c2 > 0 || c3 > 0); + } + else if (c2 > 0) { + assertTrue(c3 > 0); + } + else { + fail("At least 2 consumers should have gotten messages"); + } + }); + } }