Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions src/main/java/io/nats/client/BaseConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,13 @@ public interface BaseConsumerContext {
@NonNull
MessageConsumer consume(@NonNull ConsumeOptions consumeOptions, @Nullable Dispatcher dispatcher, @NonNull MessageHandler handler) throws IOException, JetStreamApiException;

// TODO - PINNED CONSUMER SUPPORT
// /**
// * Unpins this consumer
// * @param group the group name of the consumer's group
// * @throws IOException covers various communication issues with the NATS
// * server such as timeout or interruption
// * @throws JetStreamApiException the request had an error related to the data
// * @return true if the delete succeeded
// */
// boolean unpin(String group) throws IOException, JetStreamApiException;
/**
* Unpins this consumer
* @param group the group name of the consumer's group
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @return true if the delete succeeded
*/
boolean unpin(String group) throws IOException, JetStreamApiException;
}
23 changes: 11 additions & 12 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,17 @@ public interface JetStreamManagement {
*/
boolean deleteMessage(String streamName, long seq, boolean erase) throws IOException, JetStreamApiException;

// TODO - PINNED CONSUMER SUPPORT
// /**
// * Unpins a consumer
// * @param streamName name of the stream
// * @param consumerName name of consumer
// * @param consumerGroup name of the consumer's group
// * @throws IOException covers various communication issues with the NATS
// * server such as timeout or interruption
// * @throws JetStreamApiException the request had an error related to the data
// * @return true if the delete succeeded
// */
// boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException;
/**
* Unpins a consumer
* @param streamName name of the stream
* @param consumerName name of consumer
* @param consumerGroup name of the consumer's group
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @return true if the delete succeeded
*/
boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException;

/**
* Gets a context for publishing and subscribing to subjects backed by Jetstream streams
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/io/nats/client/PullRequestOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,15 @@ public String toJson() {
JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);
JsonUtils.addField(sb, GROUP, group);
JsonUtils.addFieldWhenGtZero(sb, PRIORITY, priority);
// TODO - PINNED CONSUMER SUPPORT
// JsonUtils.addField(sb, ID, getPinId());
JsonUtils.addField(sb, ID, getPinId());
JsonUtils.addField(sb, MIN_PENDING, minPending);
JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending);
return JsonUtils.endJson(sb).toString();
}

// TODO - PINNED CONSUMER SUPPORT
// protected String getPinId() {
// return null;
// }
protected String getPinId() {
return null;
}

/**
* Get the batch size option value
Expand Down
55 changes: 26 additions & 29 deletions src/main/java/io/nats/client/api/PriorityGroupState.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@
import io.nats.client.support.JsonValue;
import io.nats.client.support.JsonValueUtils;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

import java.time.ZonedDateTime;
import java.util.List;

import static io.nats.client.support.ApiConstants.GROUP;
import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonValueUtils.readDate;

/**
* Status of a specific consumer priority group
*/
public class PriorityGroupState {
private final String group;

// TODO - PINNED CONSUMER SUPPORT
// private final String pinnedClientId;
// private final ZonedDateTime pinnedTime;
private final String pinnedClientId;
private final ZonedDateTime pinnedTime;

static List<PriorityGroupState> optionalListOf(JsonValue vpgStates) {
return JsonValueUtils.optionalListOf(vpgStates, PriorityGroupState::new);
}

PriorityGroupState(JsonValue vpgState) {
group = JsonValueUtils.readString(vpgState, GROUP);
// TODO - PINNED CONSUMER SUPPORT
// pinnedClientId = JsonValueUtils.readString(vpgState, PINNED_CLIENT_ID);
// pinnedTime = readDate(vpgState, PINNED_TS);
pinnedClientId = JsonValueUtils.readString(vpgState, PINNED_CLIENT_ID);
pinnedTime = readDate(vpgState, PINNED_TS);
}

/**
Expand All @@ -51,33 +51,30 @@ public String getGroup() {
return group;
}

// TODO - PINNED CONSUMER SUPPORT
// /**
// * The generated ID of the pinned client
// * @return the id
// */
// @Nullable
// public String getPinnedClientId() {
// return pinnedClientId;
// }
/**
* The generated ID of the pinned client
* @return the id
*/
@Nullable
public String getPinnedClientId() {
return pinnedClientId;
}

// TODO - PINNED CONSUMER SUPPORT
// /**
// * The timestamp when the client was pinned
// * @return the timestamp
// */
// @Nullable
// public ZonedDateTime getPinnedTime() {
// return pinnedTime;
// }
/**
* The timestamp when the client was pinned
* @return the timestamp
*/
@Nullable
public ZonedDateTime getPinnedTime() {
return pinnedTime;
}

@Override
public String toString() {
return "PriorityGroupState{" +
"group='" + group + '\'' +
// TODO - PINNED CONSUMER SUPPORT
// ", pinnedClientId='" + pinnedClientId + '\'' +
// ", pinnedTime=" + pinnedTime +
", pinnedClientId='" + pinnedClientId + '\'' +
", pinnedTime=" + pinnedTime +
'}';
}
}
6 changes: 2 additions & 4 deletions src/main/java/io/nats/client/api/PriorityPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
public enum PriorityPolicy {
None("none"),
Overflow("overflow"),
Prioritized("prioritized");

// TODO - PINNED CONSUMER SUPPORT
// PinnedClient("pinned_client")
Prioritized("prioritized"),
PinnedClient("pinned_client");

private final String policy;

Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/nats/client/impl/MessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,14 @@ protected void trackJsMessage(Message msg) {
NatsJetStreamMetaData meta = msg.metaData();
lastStreamSeq = meta.streamSequence();
lastConsumerSeq++;
// TODO - PINNED CONSUMER SUPPORT
// subTrackJsMessage(msg); // for subclasses so they don't have to acquire the lock
subTrackJsMessage(msg); // for subclasses so they don't have to acquire the lock
}
finally {
stateChangeLock.unlock();
}
}

// TODO - PINNED CONSUMER SUPPORT
// protected void subTrackJsMessage(Message msg) {}
protected void subTrackJsMessage(Message msg) {}

protected void handleHeartbeatError() {
conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));
Expand Down
49 changes: 23 additions & 26 deletions src/main/java/io/nats/client/impl/NatsConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.OrderedConsumerConfiguration;
import io.nats.client.api.PriorityPolicy;
import io.nats.client.support.Validator;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
Expand Down Expand Up @@ -144,15 +145,14 @@ private void checkState() throws IOException {
}
}

// TODO - PINNED CONSUMER SUPPORT
// private void checkNotPinned(String label) throws IOException {
// ConsumerInfo ci = cachedConsumerInfo.get();
// if (ci != null) {
// if (ci.getConsumerConfiguration().getPriorityPolicy() == PriorityPolicy.PinnedClient) {
// throw new IOException("Pinned not allowed with " + label);
// }
// }
// }
private void checkNotPinned(String label) throws IOException {
ConsumerInfo ci = cachedConsumerInfo.get();
if (ci != null) {
if (ci.getConsumerConfiguration().getPriorityPolicy() == PriorityPolicy.PinnedClient) {
throw new IOException("Pinned not allowed with " + label);
}
}
}

private NatsMessageConsumerBase trackConsume(NatsMessageConsumerBase con) {
lastConsumer.set(con);
Expand Down Expand Up @@ -222,8 +222,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
try {
stateLock.lock();
checkState();
// TODO - PINNED CONSUMER SUPPORT
// checkNotPinned("Next");
checkNotPinned("Next");

try {
long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait
Expand Down Expand Up @@ -292,8 +291,7 @@ public FetchConsumer fetch(@NonNull FetchConsumeOptions fetchConsumeOptions) thr
try {
stateLock.lock();
checkState();
// TODO - PINNED CONSUMER SUPPORT
// checkNotPinned("Fetch");
checkNotPinned("Fetch");
return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo.get(), fetchConsumeOptions));
}
finally {
Expand Down Expand Up @@ -378,17 +376,16 @@ public MessageConsumer consume(@NonNull ConsumeOptions consumeOptions,
}
}

// TODO - PINNED CONSUMER SUPPORT
// @Override
// public boolean unpin(String group) throws IOException, JetStreamApiException {
// String name = consumerName.get();
// if (name == null) {
// ConsumerInfo ci = cachedConsumerInfo.get();
// if (ci == null) {
// ci = getConsumerInfo();
// }
// name = ci.getName();
// }
// return streamCtx.jsm.unpinConsumer(streamCtx.streamName, name, group);
// }
@Override
public boolean unpin(String group) throws IOException, JetStreamApiException {
String name = consumerName.get();
if (name == null) {
ConsumerInfo ci = cachedConsumerInfo.get();
if (ci == null) {
ci = getConsumerInfo();
}
name = ci.getName();
}
return streamCtx.jsm.unpinConsumer(streamCtx.streamName, name, group);
}
}
22 changes: 10 additions & 12 deletions src/main/java/io/nats/client/impl/NatsFetchConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,16 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
inactiveThreshold = expiresInMillis * 110 / 100; // 10% longer than the wait
}

// TODO - PINNED CONSUMER SUPPORT
// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
.maxBytes(fetchConsumeOptions.getMaxBytes())
.expiresIn(expiresInMillis)
.idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat())
.noWait(isNoWait)
.group(fetchConsumeOptions.getGroup())
.priority(fetchConsumeOptions.getPriority())
.minPending(fetchConsumeOptions.getMinPending())
.minAckPending(fetchConsumeOptions.getMinAckPending())
.build();
PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm == null ? null : pmm.currentPinId,
PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
.maxBytes(fetchConsumeOptions.getMaxBytes())
.expiresIn(expiresInMillis)
.idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat())
.noWait(isNoWait)
.group(fetchConsumeOptions.getGroup())
.priority(fetchConsumeOptions.getPriority())
.minPending(fetchConsumeOptions.getMinPending())
.minAckPending(fetchConsumeOptions.getMinAckPending()));
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold), false);
pullSubject = sub._pull(pro, fetchConsumeOptions.raiseStatusWarnings(), this);
startNanos = -1;
Expand Down
27 changes: 13 additions & 14 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,20 +362,19 @@ public boolean deleteMessage(String streamName, long seq, boolean erase) throws
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
}

// TODO - PINNED CONSUMER SUPPORT
// /**
// * {@inheritDoc}
// */
// @Override
// public boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException {
// validateNotNull(streamName, "Stream Name");
// validateNotNull(consumerName, "Consumer Name");
// validateNotNull(consumerGroup, "Consumer Group");
// String subj = String.format(JSAPI_CONSUMER_UNPIN, streamName, consumerName);
// byte[] payload = String.format("{\"group\": \"%s\"}", consumerGroup).getBytes();
// Message resp = makeRequestResponseRequired(subj, payload, getTimeout());
// return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
// }
/**
* {@inheritDoc}
*/
@Override
public boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
validateNotNull(consumerName, "Consumer Name");
validateNotNull(consumerGroup, "Consumer Group");
String subj = String.format(JSAPI_CONSUMER_UNPIN, streamName, consumerName);
byte[] payload = String.format("{\"group\": \"%s\"}", consumerGroup).getBytes();
Message resp = makeRequestResponseRequired(subj, payload, getTimeout());
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
}

/**
* {@inheritDoc}
Expand Down
20 changes: 9 additions & 11 deletions src/main/java/io/nats/client/impl/NatsMessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,15 @@ else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.p
private void repull() {
int rePullMessages = Math.max(1, consumeOpts.getBatchSize() - pmm.pendingMessages);
long rePullBytes = consumeOpts.getBatchBytes() == 0 ? 0 : consumeOpts.getBatchBytes() - pmm.pendingBytes;
// TODO - PINNED CONSUMER SUPPORT
// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
PullRequestOptions pro = PullRequestOptions.builder(rePullMessages)
.maxBytes(rePullBytes)
.expiresIn(consumeOpts.getExpiresInMillis())
.idleHeartbeat(consumeOpts.getIdleHeartbeat())
.group(consumeOpts.getGroup())
.priority(consumeOpts.getPriority())
.minPending(consumeOpts.getMinPending())
.minAckPending(consumeOpts.getMinAckPending())
.build();
PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
PullRequestOptions.builder(rePullMessages)
.maxBytes(rePullBytes)
.expiresIn(consumeOpts.getExpiresInMillis())
.idleHeartbeat(consumeOpts.getIdleHeartbeat())
.group(consumeOpts.getGroup())
.priority(consumeOpts.getPriority())
.minPending(consumeOpts.getMinPending())
.minAckPending(consumeOpts.getMinAckPending()));
sub._pull(pro, consumeOpts.raiseStatusWarnings(), this);
}
}
Loading
Loading