Skip to content

Commit

Permalink
handle better cases where a pull status message comes from a previous…
Browse files Browse the repository at this point in the history
… pull (#899)
  • Loading branch information
scottf committed May 3, 2023
1 parent 56fbf47 commit b452b42
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 43 deletions.
8 changes: 5 additions & 3 deletions src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ public static long extractId(Message m) {
return extractId(m.getData());
}

public static void publishInBackground(JetStream js, String subject, String prefix, int count) {
new Thread(() -> {
public static Thread publishInBackground(JetStream js, String subject, String prefix, int count) {
Thread t = new Thread(() -> {
try {
for (int x = 1; x <= count; x++) {
String data = prefix + "-" + x;
Expand All @@ -283,7 +283,9 @@ public static void publishInBackground(JetStream js, String subject, String pref
e.printStackTrace();
System.exit(-1);
}
}).start();
});
t.start();
return t;
}

// ----------------------------------------------------------------------------------------------------
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,16 @@ else if (so.isBind()) {
}

// 4. If no deliver subject (inbox) provided or found, make an inbox.
final String fnlInboxDeliver = inboxDeliver == null ? conn.createInbox() : inboxDeliver;
final String fnlInboxDeliver;
if (isPullMode) {
fnlInboxDeliver = conn.createInbox() + ".*";
}
else if (inboxDeliver == null) {
fnlInboxDeliver = conn.createInbox();
}
else {
fnlInboxDeliver = inboxDeliver;
}

// 5. If consumer does not exist, create and settle on the config. Name will have to wait
// If the consumer exists, I know what the settled info is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class NatsJetStreamPullSubscription extends NatsJetStreamSubscription {

AtomicLong pullId = new AtomicLong();

NatsJetStreamPullSubscription(String sid, String subject,
NatsConnection connection, NatsDispatcher dispatcher,
NatsJetStream js,
Expand All @@ -42,26 +45,32 @@ boolean isPullMode() {
*/
@Override
public void pull(int batchSize) {
pull(PullRequestOptions.builder(batchSize).build());
_pull(PullRequestOptions.builder(batchSize).build());
}

/**
* {@inheritDoc}
*/
@Override
public void pull(PullRequestOptions pullRequestOptions) {
_pull(pullRequestOptions);
}

private String _pull(PullRequestOptions pullRequestOptions) {
String publishSubject = js.prependPrefix(String.format(JSAPI_CONSUMER_MSG_NEXT, stream, consumerName));
manager.startPullRequest(pullRequestOptions);
connection.publish(publishSubject, getSubject(), pullRequestOptions.serialize());
String pullId = getSubject().replace("*", Long.toString(this.pullId.incrementAndGet()));
connection.publish(publishSubject, pullId, pullRequestOptions.serialize());
connection.lenientFlushBuffer();
return pullId;
}

/**
* {@inheritDoc}
*/
@Override
public void pullNoWait(int batchSize) {
pull(PullRequestOptions.noWait(batchSize).build());
_pull(PullRequestOptions.noWait(batchSize).build());
}

/**
Expand All @@ -70,7 +79,7 @@ public void pullNoWait(int batchSize) {
@Override
public void pullNoWait(int batchSize, Duration expiresIn) {
durationGtZeroRequired(expiresIn, "NoWait Expires In");
pull(PullRequestOptions.noWait(batchSize).expiresIn(expiresIn).build());
_pull(PullRequestOptions.noWait(batchSize).expiresIn(expiresIn).build());
}

/**
Expand All @@ -79,7 +88,7 @@ public void pullNoWait(int batchSize, Duration expiresIn) {
@Override
public void pullNoWait(int batchSize, long expiresInMillis) {
durationGtZeroRequired(expiresInMillis, "NoWait Expires In");
pull(PullRequestOptions.noWait(batchSize).expiresIn(expiresInMillis).build());
_pull(PullRequestOptions.noWait(batchSize).expiresIn(expiresInMillis).build());
}

/**
Expand All @@ -88,7 +97,7 @@ public void pullNoWait(int batchSize, long expiresInMillis) {
@Override
public void pullExpiresIn(int batchSize, Duration expiresIn) {
durationGtZeroRequired(expiresIn, "Expires In");
pull(PullRequestOptions.builder(batchSize).expiresIn(expiresIn).build());
_pull(PullRequestOptions.builder(batchSize).expiresIn(expiresIn).build());
}

/**
Expand All @@ -97,7 +106,7 @@ public void pullExpiresIn(int batchSize, Duration expiresIn) {
@Override
public void pullExpiresIn(int batchSize, long expiresInMillis) {
durationGtZeroRequired(expiresInMillis, "Expires In");
pull(PullRequestOptions.builder(batchSize).expiresIn(expiresInMillis).build());
_pull(PullRequestOptions.builder(batchSize).expiresIn(expiresInMillis).build());
}

/**
Expand Down Expand Up @@ -133,7 +142,7 @@ private List<Message> _fetch(int batchSize, long maxWaitMillis) {
maxWaitMillis > MIN_MILLIS
? maxWaitMillis - EXPIRE_LESS_MILLIS
: maxWaitMillis);
pull(PullRequestOptions.builder(batchLeft).expiresIn(expires).build());
String pullId = _pull(PullRequestOptions.builder(batchLeft).expiresIn(expires).build());

// timeout > 0 process as many messages we can in that time period
// If we get a message that either manager handles, we try again, but
Expand All @@ -152,9 +161,13 @@ private List<Message> _fetch(int batchSize, long maxWaitMillis) {
break;
case TERMINUS:
case ERROR:
return messages;
// reply match will be null on pushes, all status are "managed" anyway, so ignored in this loop
// otherwise (pull) if there is a match, the status applies
if (pullId.equals(msg.getSubject())) {
return messages;
}
}
// case STATUS, try again while we have time
// case pull not match / other ManageResult (i.e. STATUS), try again while we have time
timeLeftNanos = maxWaitNanos - (System.nanoTime() - start);
}
}
Expand All @@ -172,18 +185,13 @@ private List<Message> drainAlreadyBuffered(int batchSize) {
if (msg == null) {
return messages; // no more message currently queued
}
switch (manager.manage(msg)) {
case MESSAGE:
messages.add(msg);
if (messages.size() == batchSize) {
return messages;
}
break;
case TERMINUS:
case ERROR:
if (manager.manage(msg) == MessageManager.ManageResult.MESSAGE) {
messages.add(msg);
if (messages.size() == batchSize) {
return messages;
}
}
// case STATUS, we need to try again
// since this is buffered, no non-message applies, try again
}
}
catch (InterruptedException ignore) {
Expand Down Expand Up @@ -242,7 +250,7 @@ public Message next() {
}

// if there were some messages buffered, reduce the raw pull batch size
pull(PullRequestOptions.builder(batchLeft).expiresIn(maxWaitMillis).build());
String pullId = _pull(PullRequestOptions.builder(batchLeft).expiresIn(maxWaitMillis).build());

final long timeout = maxWaitMillis;

Expand All @@ -264,7 +272,7 @@ public boolean hasNext() {
}

if (buffered.size() == 0) {
msg = _nextUnmanaged(timeout);
msg = _nextUnmanaged(timeout, pullId);
if (msg == null) {
done = true;
return false;
Expand Down
35 changes: 18 additions & 17 deletions src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.nats.client.*;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.impl.MessageManager.ManageResult;
import io.nats.client.support.NatsJetStreamConstants;

import java.io.IOException;
Expand Down Expand Up @@ -83,7 +84,7 @@ public Message nextMessage(Duration timeout) throws InterruptedException, Illega
return _nextUnmanagedNullOrLteZero(timeout);
}

return _nextUnmanaged(timeout.toMillis());
return _nextUnmanaged(timeout.toMillis(), null);
}

@Override
Expand All @@ -92,7 +93,7 @@ public Message nextMessage(long timeoutMillis) throws InterruptedException, Ille
return _nextUnmanagedNullOrLteZero(Duration.ZERO);
}

return _nextUnmanaged(timeoutMillis);
return _nextUnmanaged(timeoutMillis, null);
}

protected Message _nextUnmanagedNullOrLteZero(Duration timeout) throws InterruptedException {
Expand All @@ -104,29 +105,25 @@ protected Message _nextUnmanagedNullOrLteZero(Duration timeout) throws Interrupt
if (msg == null) {
return null; // no message currently queued
}
switch (manager.manage(msg)) {
case MESSAGE:
if (manager.manage(msg) == ManageResult.MESSAGE) {
return msg;
case TERMINUS:
case ERROR:
return null;
}
// case STATUS, we need to try again
// since this is strictly called from user calls of nextMessage, non-messages are considered managed
}
}

protected static final long MIN_MILLIS = 20;
protected static final long EXPIRE_LESS_MILLIS = 10;

protected Message _nextUnmanaged(long timeout) throws InterruptedException {

protected Message _nextUnmanaged(long timeout, String replyMatch) throws InterruptedException {
// timeout > 0 process as many messages we can in that time period
// If we get a message that either manager handles, we try again, but
// with a shorter timeout based on what we already used up
long elapsed = 0;
long start = System.currentTimeMillis();
while (elapsed < timeout) {
Message msg = nextMessageInternal( Duration.ofMillis(Math.max(MIN_MILLIS, timeout - elapsed)) );
long start = System.nanoTime();
long timeoutNanos = timeout * 1_000_000;
long timeLeftNanos = timeoutNanos;
while (timeLeftNanos > 0) {
Message msg = nextMessageInternal( Duration.ofNanos(timeLeftNanos) );
if (msg == null) {
return null; // normal timeout
}
Expand All @@ -135,10 +132,14 @@ protected Message _nextUnmanaged(long timeout) throws InterruptedException {
return msg;
case TERMINUS:
case ERROR:
return null;
// reply match will be null on pushes and all status are "managed" so ignored in this loop
// otherwise (pull) if there is a match, the status applies
if (replyMatch != null && replyMatch.equals(msg.getSubject())) {
return null;
}
}
// case STATUS, try again while we have time
elapsed = System.currentTimeMillis() - start;
// case push managed / pull not match / other ManageResult (i.e. STATUS), try again while we have time
timeLeftNanos = timeoutNanos - (System.nanoTime() - start);
}
return null;
}
Expand Down

0 comments on commit b452b42

Please sign in to comment.