Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
try {
long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait
nmcb = new NatsMessageConsumerBase(cachedConsumerInfo.get());
nmcb.initSub(subscribe(null, null, null, inactiveThreshold));
nmcb.initSub(subscribe(null, null, null, inactiveThreshold), false);
nmcb.setConsumerName(consumerName.get()); // the call to subscribe sets this
trackConsume(nmcb); // this has to be done after the nmcb is fully set up
nmcb.sub._pull(PullRequestOptions.builder(1)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsFetchConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
.minPending(fetchConsumeOptions.getMinPending())
.minAckPending(fetchConsumeOptions.getMinAckPending())
.build();
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold));
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold), false);
pullSubject = sub._pull(pro, fetchConsumeOptions.raiseStatusWarnings(), this);
startNanos = -1;
}
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ JetStreamSubscription createSubscription(String userSubscribeSubject,
SubscribeOptions so;
String stream;
ConsumerConfiguration userCC;
boolean ordered;
String settledDeliverGroup = null; // push might set this

if (isPullMode) {
Expand Down Expand Up @@ -459,7 +458,6 @@ else if (inboxDeliver == null) {
else {
settledConsumerName = null; // the server will give us a name if the user's was null
}

settledCC = ccBuilder.build();
}

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/nats/client/impl/NatsMessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager
thresholdMessages = bm - rePullMessages;
thresholdBytes = bb == 0 ? Integer.MIN_VALUE : bb - rePullBytes;

doSub();
doSub(true);
}

@Override
Expand All @@ -57,15 +57,15 @@ public void heartbeatError() {
}
else {
shutdownSub();
doSub();
doSub(false);
}
}
catch (JetStreamApiException | IOException e) {
setupHbAlarmToTrigger();
}
}

void doSub() throws JetStreamApiException, IOException {
void doSub(boolean first) throws JetStreamApiException, IOException {
MessageHandler mh = userMessageHandler == null ? null : msg -> {
userMessageHandler.onMessage(msg);
if (stopped.get() && pmm.noMorePending()) {
Expand All @@ -75,7 +75,7 @@ void doSub() throws JetStreamApiException, IOException {
try {
stopped.set(false);
finished.set(false);
super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null));
super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null), !first);
repull();
}
catch (JetStreamApiException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}

void initSub(NatsJetStreamPullSubscription sub) {
void initSub(NatsJetStreamPullSubscription sub, boolean clearCachedConsumerInfo) {
this.sub = sub;
this.consumerName = sub.getConsumerName();
if (clearCachedConsumerInfo) {
cachedConsumerInfo = null;
}
pmm = (PullMessageManager)sub.manager;
}

Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/nats/client/impl/OrderedMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.nats.client.api.ConsumerCreateRequest;
import io.nats.client.api.ConsumerInfo;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static io.nats.client.impl.MessageManager.ManageResult.MESSAGE;
Expand All @@ -27,7 +28,7 @@

class OrderedMessageManager extends PushMessageManager {

protected long expectedExternalConsumerSeq;
protected final AtomicLong expectedExternalConsumerSeq;
protected final AtomicReference<String> targetSid;

protected OrderedMessageManager(
Expand All @@ -40,13 +41,13 @@ protected OrderedMessageManager(
boolean syncMode)
{
super(conn, js, stream, so, originalCc, queueMode, syncMode);
expectedExternalConsumerSeq = 1; // always starts at 1
expectedExternalConsumerSeq = new AtomicLong(1); // always starts at 1
targetSid = new AtomicReference<>();
}

@Override
protected void startup(NatsJetStreamSubscription sub) {
expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1
expectedExternalConsumerSeq.set(1); // consumer always starts with consumer sequence 1
super.startup(sub);
targetSid.set(sub.getSID());
}
Expand All @@ -59,12 +60,12 @@ protected ManageResult manage(Message msg) {

if (msg.isJetStream()) {
long receivedConsumerSeq = msg.metaData().consumerSequence();
if (expectedExternalConsumerSeq != receivedConsumerSeq) {
if (expectedExternalConsumerSeq.get() != receivedConsumerSeq) {
handleErrorCondition();
return STATUS_HANDLED;
}
trackJsMessage(msg);
expectedExternalConsumerSeq++;
expectedExternalConsumerSeq.incrementAndGet();
return MESSAGE;
}

Expand All @@ -80,7 +81,7 @@ protected void handleHeartbeatError() {
private void handleErrorCondition() {
try {
targetSid.set(null);
expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1
expectedExternalConsumerSeq.set(1); // consumer always starts with consumer sequence 1

// 1. re-subscribe. This means killing the sub then making a new one.
// New sub needs a new deliverSubject
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/nats/client/impl/PullMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
Headers h = msg.getHeaders();
if (h != null) {
try {
//noinspection DataFlowIssue WE ALREADY CATCH THE EXCEPTION
m = Integer.parseInt(h.getFirst(NATS_PENDING_MESSAGES));
//noinspection DataFlowIssue WE ALREADY CATCH THE EXCEPTION
b = Long.parseLong(h.getFirst(NATS_PENDING_BYTES));
}
catch (NumberFormatException ignore) {
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/nats/client/impl/PullOrderedMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.nats.client.SubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static io.nats.client.impl.MessageManager.ManageResult.MESSAGE;
Expand All @@ -27,7 +28,7 @@ class PullOrderedMessageManager extends PullMessageManager {
protected final ConsumerConfiguration originalCc;
protected final NatsJetStream js;
protected final String stream;
protected long expectedExternalConsumerSeq;
protected final AtomicLong expectedExternalConsumerSeq;
protected final AtomicReference<String> targetSid;

protected PullOrderedMessageManager(NatsConnection conn,
Expand All @@ -38,13 +39,13 @@ protected PullOrderedMessageManager(NatsConnection conn,
this.js = js;
this.stream = stream;
this.originalCc = originalCc;
expectedExternalConsumerSeq = 1; // always starts at 1
expectedExternalConsumerSeq = new AtomicLong(1); // always starts at 1
targetSid = new AtomicReference<>();
}

@Override
protected void startup(NatsJetStreamSubscription sub) {
expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1
expectedExternalConsumerSeq.set(1); // consumer always starts with consumer sequence 1
super.startup(sub);
targetSid.set(sub.getSID());
}
Expand All @@ -57,17 +58,17 @@ protected ManageResult manage(Message msg) {

if (msg.isJetStream()) {
long receivedConsumerSeq = msg.metaData().consumerSequence();
if (expectedExternalConsumerSeq != receivedConsumerSeq) {
if (expectedExternalConsumerSeq.get() != receivedConsumerSeq) {
targetSid.set(null);
expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1
expectedExternalConsumerSeq.set(1); // consumer always starts with consumer sequence 1
resetTracking();
if (pullManagerObserver != null) {
pullManagerObserver.heartbeatError();
}
return STATUS_HANDLED;
}
trackJsMessage(msg);
expectedExternalConsumerSeq++;
expectedExternalConsumerSeq.incrementAndGet();
return MESSAGE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testOrderedConsumerSync() throws Exception {
// Setup sync subscription
_testOrderedConsumerSync(js, tsc, null, PushSubscribeOptions.builder().ordered(true).build());

String consumerName = "prefix"; // prefix();
String consumerName = prefix();
_testOrderedConsumerSync(js, tsc, consumerName, PushSubscribeOptions.builder().name(consumerName).ordered(true).build());
});
}
Expand Down
20 changes: 16 additions & 4 deletions src/test/java/io/nats/client/impl/JetStreamPubTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,27 @@ public void testPublishExpectations() throws Exception {
poLsss = PublishOptions.builder()
.expectedLastSubjectSequenceSubject("not-even-a-subject")
.build();
pa = js.publish(sub3, dataBytes(508), poLsss);
assertPublishAck(pa, stream1, 14);
if (atLeast2_12()) {
PublishOptions fpoLsss = poLsss;
assertThrows(JetStreamApiException.class, () -> js.publish(sub3, dataBytes(508), fpoLsss));
}
else {
pa = js.publish(sub3, dataBytes(508), poLsss);
assertPublishAck(pa, stream1, 14);
}

poLsss = PublishOptions.builder()
.expectedLastSequence(14)
.expectedLastSubjectSequenceSubject("not-even-a-subject")
.build();
pa = js.publish(sub3, dataBytes(509), poLsss);
assertPublishAck(pa, stream1, 15);
if (atLeast2_12()) {
PublishOptions fpoLsss = poLsss;
assertThrows(JetStreamApiException.class, () -> js.publish(sub3, dataBytes(509), fpoLsss));
}
else {
pa = js.publish(sub3, dataBytes(509), poLsss);
assertPublishAck(pa, stream1, 15);
}

poLsss = PublishOptions.builder()
.expectedLastSubjectSequence(15)
Expand Down
18 changes: 8 additions & 10 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ private void validateConsumerName(BaseConsumerContext bcc, MessageConsumer consu

private String validateConsumerNameForOrdered(BaseConsumerContext bcc, MessageConsumer consumer, String prefix) throws IOException, JetStreamApiException {
String bccConsumerName = bcc.getConsumerName();
if (prefix == null) {
assertNotNull(bccConsumerName);
}
else {
assertNotNull(bccConsumerName);
if (prefix != null) {
assertTrue(bccConsumerName.startsWith(prefix));
}

Expand Down Expand Up @@ -240,8 +238,6 @@ private void _testFetch(String label, Connection nc, TestingStreamContainer tsc,
cc = builder.name(consumerName).inactiveThreshold(10_000).build();
}
jsm.addOrUpdateConsumer(tsc.stream, cc);

// Consumer[Context]
consumerContext = ctx.getConsumerContext(consumerName);
assertEquals(consumerName, consumerContext.getConsumerName());
}
Expand Down Expand Up @@ -1037,9 +1033,10 @@ private void _testOrderedFetch(StreamContext sctx, int expectedStreamSeq, Ordere
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
assertNull(occtx.getConsumerName());
FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build();
String prefix = occ.getConsumerNamePrefix();
String firstConsumerName;
try (FetchConsumer fcon = occtx.fetch(fco)) {
firstConsumerName = validateConsumerNameForOrdered(occtx, null, occ.getConsumerNamePrefix());
firstConsumerName = validateConsumerNameForOrdered(occtx, null, prefix);
Message m = fcon.nextMessage();
while (m != null) {
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
Expand All @@ -1055,8 +1052,10 @@ private void _testOrderedFetch(StreamContext sctx, int expectedStreamSeq, Ordere
}
// this should finish without error
try (FetchConsumer fcon = occtx.fetch(fco)) {
validateConsumerNameForOrdered(occtx, null, occ.getConsumerNamePrefix());
assertNotEquals(firstConsumerName, occtx.getConsumerName());
validateConsumerNameForOrdered(occtx, null, prefix);
if (prefix != null) {
assertNotEquals(firstConsumerName, occtx.getConsumerName());
}
Message m = fcon.nextMessage();
while (expectedStreamSeq <= 6) {
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
Expand Down Expand Up @@ -1751,7 +1750,6 @@ public void testReconnectOverOrdered() throws Exception {
standardConnectionWait(nc);
sleep(6000); // long enough to get messages and for the hb alarm to have tripped
}
validateConsumerNameForOrdered(orderedConsumerContext, mcon, null);
assertNotEquals(firstConsumerName, orderedConsumerContext.getConsumerName());

assertTrue(allInOrder.get());
Expand Down
14 changes: 11 additions & 3 deletions src/test/java/io/nats/client/utils/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ public static boolean before2_11(ServerInfo si) {
return si.isOlderThanVersion("2.11");
}

public static boolean atLeast2_11_2(ServerInfo si) {
return si.isSameOrNewerThanVersion("2.11.2");
public static boolean atLeast2_12() {
return atLeast2_12(RUN_SERVER_INFO);
}

public static boolean atLeast2_12(ServerInfo si) {
Expand Down Expand Up @@ -534,8 +534,16 @@ public static String variant() {
return NUID.nextGlobalSequence();
}

private static int pi0 = -1;
private static int pi1 = 0;
public static String prefix() {
return PREFIX + "-"+ variant();
if (++pi0 == 26) {
pi0 = 0;
if (++pi1 == 26) {
pi1 = 0;
}
}
return PREFIX + (char)('A' + pi1) + (char)('A' + pi0);
}

public static String stream() {
Expand Down
Loading