Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better handle pull status message comes from a previous pull #899

Merged
merged 1 commit into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ public static long extractId(Message m) {
return extractId(m.getData());
}

public static void publishInBackground(JetStream js, String subject, String prefix, int count) {
new Thread(() -> {
public static Thread publishInBackground(JetStream js, String subject, String prefix, int count) {
Thread t = new Thread(() -> {
try {
for (int x = 1; x <= count; x++) {
String data = prefix + "-" + x;
Expand All @@ -283,7 +283,9 @@ public static void publishInBackground(JetStream js, String subject, String pref
e.printStackTrace();
System.exit(-1);
}
}).start();
});
t.start();
return t;
}

// ----------------------------------------------------------------------------------------------------
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,16 @@ else if (so.isBind()) {
}

// 4. If no deliver subject (inbox) provided or found, make an inbox.
final String fnlInboxDeliver = inboxDeliver == null ? conn.createInbox() : inboxDeliver;
final String fnlInboxDeliver;
if (isPullMode) {
fnlInboxDeliver = conn.createInbox() + ".*";
}
else if (inboxDeliver == null) {
fnlInboxDeliver = conn.createInbox();
}
else {
fnlInboxDeliver = inboxDeliver;
}

// 5. If consumer does not exist, create and settle on the config. Name will have to wait
// If the consumer exists, I know what the settled info is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class NatsJetStreamPullSubscription extends NatsJetStreamSubscription {

AtomicLong pullId = new AtomicLong();

NatsJetStreamPullSubscription(String sid, String subject,
NatsConnection connection, NatsDispatcher dispatcher,
NatsJetStream js,
Expand All @@ -42,26 +45,32 @@ boolean isPullMode() {
*/
@Override
public void pull(int batchSize) {
pull(PullRequestOptions.builder(batchSize).build());
_pull(PullRequestOptions.builder(batchSize).build());
}

/**
* {@inheritDoc}
*/
@Override
public void pull(PullRequestOptions pullRequestOptions) {
_pull(pullRequestOptions);
}

private String _pull(PullRequestOptions pullRequestOptions) {
String publishSubject = js.prependPrefix(String.format(JSAPI_CONSUMER_MSG_NEXT, stream, consumerName));
manager.startPullRequest(pullRequestOptions);
connection.publish(publishSubject, getSubject(), pullRequestOptions.serialize());
String pullId = getSubject().replace("*", Long.toString(this.pullId.incrementAndGet()));
connection.publish(publishSubject, pullId, pullRequestOptions.serialize());
connection.lenientFlushBuffer();
return pullId;
}

/**
* {@inheritDoc}
*/
@Override
public void pullNoWait(int batchSize) {
pull(PullRequestOptions.noWait(batchSize).build());
_pull(PullRequestOptions.noWait(batchSize).build());
}

/**
Expand All @@ -70,7 +79,7 @@ public void pullNoWait(int batchSize) {
@Override
public void pullNoWait(int batchSize, Duration expiresIn) {
durationGtZeroRequired(expiresIn, "NoWait Expires In");
pull(PullRequestOptions.noWait(batchSize).expiresIn(expiresIn).build());
_pull(PullRequestOptions.noWait(batchSize).expiresIn(expiresIn).build());
}

/**
Expand All @@ -79,7 +88,7 @@ public void pullNoWait(int batchSize, Duration expiresIn) {
@Override
public void pullNoWait(int batchSize, long expiresInMillis) {
durationGtZeroRequired(expiresInMillis, "NoWait Expires In");
pull(PullRequestOptions.noWait(batchSize).expiresIn(expiresInMillis).build());
_pull(PullRequestOptions.noWait(batchSize).expiresIn(expiresInMillis).build());
}

/**
Expand All @@ -88,7 +97,7 @@ public void pullNoWait(int batchSize, long expiresInMillis) {
@Override
public void pullExpiresIn(int batchSize, Duration expiresIn) {
durationGtZeroRequired(expiresIn, "Expires In");
pull(PullRequestOptions.builder(batchSize).expiresIn(expiresIn).build());
_pull(PullRequestOptions.builder(batchSize).expiresIn(expiresIn).build());
}

/**
Expand All @@ -97,7 +106,7 @@ public void pullExpiresIn(int batchSize, Duration expiresIn) {
@Override
public void pullExpiresIn(int batchSize, long expiresInMillis) {
durationGtZeroRequired(expiresInMillis, "Expires In");
pull(PullRequestOptions.builder(batchSize).expiresIn(expiresInMillis).build());
_pull(PullRequestOptions.builder(batchSize).expiresIn(expiresInMillis).build());
}

/**
Expand Down Expand Up @@ -133,7 +142,7 @@ private List<Message> _fetch(int batchSize, long maxWaitMillis) {
maxWaitMillis > MIN_MILLIS
? maxWaitMillis - EXPIRE_LESS_MILLIS
: maxWaitMillis);
pull(PullRequestOptions.builder(batchLeft).expiresIn(expires).build());
String pullId = _pull(PullRequestOptions.builder(batchLeft).expiresIn(expires).build());

// timeout > 0 process as many messages we can in that time period
// If we get a message that either manager handles, we try again, but
Expand All @@ -152,9 +161,13 @@ private List<Message> _fetch(int batchSize, long maxWaitMillis) {
break;
case TERMINUS:
case ERROR:
return messages;
// reply match will be null on pushes, all status are "managed" anyway, so ignored in this loop
// otherwise (pull) if there is a match, the status applies
if (pullId.equals(msg.getSubject())) {
return messages;
}
}
// case STATUS, try again while we have time
// case pull not match / other ManageResult (i.e. STATUS), try again while we have time
timeLeftNanos = maxWaitNanos - (System.nanoTime() - start);
}
}
Expand All @@ -172,18 +185,13 @@ private List<Message> drainAlreadyBuffered(int batchSize) {
if (msg == null) {
return messages; // no more message currently queued
}
switch (manager.manage(msg)) {
case MESSAGE:
messages.add(msg);
if (messages.size() == batchSize) {
return messages;
}
break;
case TERMINUS:
case ERROR:
if (manager.manage(msg) == MessageManager.ManageResult.MESSAGE) {
messages.add(msg);
if (messages.size() == batchSize) {
return messages;
}
}
// case STATUS, we need to try again
// since this is buffered, no non-message applies, try again
}
}
catch (InterruptedException ignore) {
Expand Down Expand Up @@ -242,7 +250,7 @@ public Message next() {
}

// if there were some messages buffered, reduce the raw pull batch size
pull(PullRequestOptions.builder(batchLeft).expiresIn(maxWaitMillis).build());
String pullId = _pull(PullRequestOptions.builder(batchLeft).expiresIn(maxWaitMillis).build());

final long timeout = maxWaitMillis;

Expand All @@ -264,7 +272,7 @@ public boolean hasNext() {
}

if (buffered.size() == 0) {
msg = _nextUnmanaged(timeout);
msg = _nextUnmanaged(timeout, pullId);
if (msg == null) {
done = true;
return false;
Expand Down
35 changes: 18 additions & 17 deletions src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.nats.client.*;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.impl.MessageManager.ManageResult;
import io.nats.client.support.NatsJetStreamConstants;

import java.io.IOException;
Expand Down Expand Up @@ -83,7 +84,7 @@ public Message nextMessage(Duration timeout) throws InterruptedException, Illega
return _nextUnmanagedNullOrLteZero(timeout);
}

return _nextUnmanaged(timeout.toMillis());
return _nextUnmanaged(timeout.toMillis(), null);
}

@Override
Expand All @@ -92,7 +93,7 @@ public Message nextMessage(long timeoutMillis) throws InterruptedException, Ille
return _nextUnmanagedNullOrLteZero(Duration.ZERO);
}

return _nextUnmanaged(timeoutMillis);
return _nextUnmanaged(timeoutMillis, null);
}

protected Message _nextUnmanagedNullOrLteZero(Duration timeout) throws InterruptedException {
Expand All @@ -104,29 +105,25 @@ protected Message _nextUnmanagedNullOrLteZero(Duration timeout) throws Interrupt
if (msg == null) {
return null; // no message currently queued
}
switch (manager.manage(msg)) {
case MESSAGE:
if (manager.manage(msg) == ManageResult.MESSAGE) {
return msg;
case TERMINUS:
case ERROR:
return null;
}
// case STATUS, we need to try again
// since this is strictly called from user calls of nextMessage, non-messages are considered managed
}
}

protected static final long MIN_MILLIS = 20;
protected static final long EXPIRE_LESS_MILLIS = 10;

protected Message _nextUnmanaged(long timeout) throws InterruptedException {

protected Message _nextUnmanaged(long timeout, String replyMatch) throws InterruptedException {
// timeout > 0 process as many messages we can in that time period
// If we get a message that either manager handles, we try again, but
// with a shorter timeout based on what we already used up
long elapsed = 0;
long start = System.currentTimeMillis();
while (elapsed < timeout) {
Message msg = nextMessageInternal( Duration.ofMillis(Math.max(MIN_MILLIS, timeout - elapsed)) );
long start = System.nanoTime();
long timeoutNanos = timeout * 1_000_000;
long timeLeftNanos = timeoutNanos;
while (timeLeftNanos > 0) {
Message msg = nextMessageInternal( Duration.ofNanos(timeLeftNanos) );
if (msg == null) {
return null; // normal timeout
}
Expand All @@ -135,10 +132,14 @@ protected Message _nextUnmanaged(long timeout) throws InterruptedException {
return msg;
case TERMINUS:
case ERROR:
return null;
// reply match will be null on pushes and all status are "managed" so ignored in this loop
// otherwise (pull) if there is a match, the status applies
if (replyMatch != null && replyMatch.equals(msg.getSubject())) {
return null;
}
}
// case STATUS, try again while we have time
elapsed = System.currentTimeMillis() - start;
// case push managed / pull not match / other ManageResult (i.e. STATUS), try again while we have time
timeLeftNanos = timeoutNanos - (System.nanoTime() - start);
}
return null;
}
Expand Down
Loading