Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static StreamInfo createOrReplaceStream(JetStreamManagement jsm, String s

public static StreamInfo createOrReplaceStream(JetStreamManagement jsm, String stream, StorageType storageType, String... subjects) {
// in case the stream was here before, we want a completely new one
try { jsm.deleteStream(stream); } catch (Exception ignore) {}
safeDeleteStream(jsm, stream);

try {
return jsm.addStream(StreamConfiguration.builder()
Expand Down Expand Up @@ -467,10 +467,7 @@ public static void createCleanMemStream(Connection nc, String stream, String...
}

public static void createCleanMemStream(JetStreamManagement jsm, String stream, String... subs) throws IOException, JetStreamApiException {
try {
jsm.deleteStream(stream);
}
catch (Exception ignore) {}
safeDeleteStream(jsm, stream);

StreamConfiguration sc = StreamConfiguration.builder()
.name(stream)
Expand All @@ -479,4 +476,11 @@ public static void createCleanMemStream(JetStreamManagement jsm, String stream,
.build();
jsm.addStream(sc);
}

public static void safeDeleteStream(JetStreamManagement jsm, String stream) {
try {
jsm.deleteStream(stream);
}
catch (Exception ignore) {}
}
}
29 changes: 29 additions & 0 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class BaseConsumeOptions implements JsonSerializable {
protected final long idleHeartbeat;
protected final int thresholdPercent;
protected final boolean noWait;
protected final boolean raiseStatusWarnings;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected BaseConsumeOptions(Builder b) {
Expand All @@ -58,6 +59,7 @@ protected BaseConsumeOptions(Builder b) {
// validation handled in builder
thresholdPercent = b.thresholdPercent;
noWait = b.noWait;
raiseStatusWarnings = b.raiseStatusWarnings;

// if it's not noWait, it must have an expiresIn
// we can't check this in the builder because we can't guarantee order
Expand All @@ -81,6 +83,7 @@ public String toJson() {
addField(sb, EXPIRES_IN, expiresIn);
addField(sb, IDLE_HEARTBEAT, idleHeartbeat);
addField(sb, THRESHOLD_PERCENT, thresholdPercent);
addFldWhenTrue(sb, RAISE_STATUS_WARNINGS, raiseStatusWarnings);
addFldWhenTrue(sb, NO_WAIT, noWait);
return endJson(sb).toString();
}
Expand All @@ -101,12 +104,17 @@ public boolean isNoWait() {
return noWait;
}

public boolean raiseStatusWarnings() {
return raiseStatusWarnings;
}

protected static abstract class Builder<B, CO> {
protected int messages = -1;
protected long bytes = 0;
protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT;
protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
protected boolean noWait = false;
protected boolean raiseStatusWarnings = false;

protected abstract B getThis();

Expand Down Expand Up @@ -134,6 +142,7 @@ public B jsonValue(JsonValue jsonValue) {
bytes(readLong(jsonValue, BYTES, -1));
expiresIn(readLong(jsonValue, EXPIRES_IN, MIN_EXPIRES_MILLS));
thresholdPercent(readInteger(jsonValue, THRESHOLD_PERCENT, -1));
raiseStatusWarnings(readBoolean(jsonValue, RAISE_STATUS_WARNINGS, false));
if (readBoolean(jsonValue, NO_WAIT, false)) {
noWait();
}
Expand Down Expand Up @@ -190,6 +199,26 @@ public B thresholdPercent(int thresholdPercent) {
return getThis();
}

/**
* Raise status warning turns on sending status messages to the error listener.
* The default of to not raise status warning
* @return the builder
*/
public B raiseStatusWarnings() {
this.raiseStatusWarnings = true;
return getThis();
}

/**
* Turn on or off raise status warning turns. When on, status messages are sent to the error listener.
* The default of to not raise status warning
* @return the builder
*/
public B raiseStatusWarnings(boolean raiseStatusWarnings) {
this.raiseStatusWarnings = raiseStatusWarnings;
return getThis();
}

/**
* Build the options.
* @return the built options
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/nats/client/impl/NatsFetchConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static io.nats.client.BaseConsumeOptions.MIN_EXPIRES_MILLS;

class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer, PullManagerObserver {
private final boolean isNoWait;
private final boolean isNoWaitNoExpires;
private final long maxWaitNanos;
private final String pullSubject;
Expand All @@ -34,7 +33,7 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
{
super(cachedConsumerInfo);

isNoWait = fetchConsumeOptions.isNoWait();
boolean isNoWait = fetchConsumeOptions.isNoWait();
long expiresInMillis = fetchConsumeOptions.getExpiresInMillis();
isNoWaitNoExpires = isNoWait && expiresInMillis == ConsumerConfiguration.LONG_UNSET;

Expand All @@ -55,7 +54,7 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
.noWait(isNoWait)
.build();
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold));
pullSubject = sub._pull(pro, false, this);
pullSubject = sub._pull(pro, fetchConsumeOptions.raiseStatusWarnings(), this);
startNanos = -1;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsMessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ private void repull() {
.expiresIn(opts.getExpiresInMillis())
.idleHeartbeat(opts.getIdleHeartbeat())
.build();
sub._pull(pro, false, this);
sub._pull(pro, opts.raiseStatusWarnings(), this);
}
}
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/impl/PullMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ protected ManageResult manageStatus(Message msg) {
switch (status.getCode()) {
case NOT_FOUND_CODE:
case REQUEST_TIMEOUT_CODE:
case NO_RESPONDERS_CODE:
if (raiseStatusWarnings) {
conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status));
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public interface ApiConstants {
String PURGED = "purged";
String PUSH_BOUND = "push_bound";
String QUEUE_GROUP = "queue_group";
String RAISE_STATUS_WARNINGS = "raise_status_warnings";
String RATE_LIMIT_BPS = "rate_limit_bps";
String REPLAY_POLICY = "replay_policy";
String REPLICA = "replica";
Expand Down
28 changes: 16 additions & 12 deletions src/test/java/io/nats/client/api/StreamConfigurationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

public class StreamConfigurationTests extends JetStreamTestBase {

public static final String DEFAULT_STREAM_NAME = "sname";

private StreamConfiguration getTestConfiguration() {
String json = ResourceUtils.dataAsString("StreamConfiguration.json");
StreamConfiguration sc = StreamConfiguration.instance(JsonParser.parseUnchecked(json));
Expand All @@ -46,7 +48,9 @@ private StreamConfiguration getTestConfiguration() {
public void testRoundTrip() throws Exception {
runInJsServer(si -> si.isNewerVersionThan("2.8.4"), nc -> {
CompressionOption compressionOption = atLeast2_10(ensureRunServerInfo()) ? S2 : None;
String stream = stream();
StreamConfiguration sc = StreamConfiguration.builder(getTestConfiguration())
.name(stream)
.mirror(null)
.sources()
.replicas(1)
Expand All @@ -58,17 +62,17 @@ public void testRoundTrip() throws Exception {
.compressionOption(compressionOption)
.build();
JetStreamManagement jsm = nc.jetStreamManagement();
validate(jsm.addStream(sc).getConfiguration(), true);
validate(jsm.addStream(sc).getConfiguration(), true, stream);
});
}

@Test
public void testSerializationDeserialization() throws Exception {
String originalJson = ResourceUtils.dataAsString("StreamConfiguration.json");
StreamConfiguration sc = StreamConfiguration.instance(originalJson);
validate(sc, false);
validate(sc, false, DEFAULT_STREAM_NAME);
String serializedJson = sc.toJson();
validate(StreamConfiguration.instance(serializedJson), false);
validate(StreamConfiguration.instance(serializedJson), false, DEFAULT_STREAM_NAME);
}

@Test
Expand Down Expand Up @@ -142,13 +146,13 @@ public void testInvalidNameInJson() throws Exception{
public void testConstruction() {
StreamConfiguration testSc = getTestConfiguration();
// from json
validate(testSc, false);
validate(testSc, false, DEFAULT_STREAM_NAME);

// test toJson
validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false);
validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false, DEFAULT_STREAM_NAME);

// copy constructor
validate(StreamConfiguration.builder(testSc).build(), false);
validate(StreamConfiguration.builder(testSc).build(), false, DEFAULT_STREAM_NAME);

// builder
StreamConfiguration.Builder builder = StreamConfiguration.builder()
Expand Down Expand Up @@ -184,15 +188,15 @@ public void testConstruction() {
.metadata(testSc.getMetadata())
.firstSequence(testSc.getFirstSequence())
.consumerLimits(testSc.getConsumerLimits());
validate(builder.build(), false);
validate(builder.addSources((Source)null).build(), false);
validate(builder.build(), false, DEFAULT_STREAM_NAME);
validate(builder.addSources((Source)null).build(), false, DEFAULT_STREAM_NAME);

List<Source> sources = new ArrayList<>(testSc.getSources());
sources.add(null);
Source copy = new Source(JsonParser.parseUnchecked(sources.get(0).toJson()));
assertEquals(sources.get(0).toString(), copy.toString());
sources.add(copy);
validate(builder.addSources(sources).build(), false);
validate(builder.addSources(sources).build(), false, DEFAULT_STREAM_NAME);

// covering add a single source
sources = new ArrayList<>(testSc.getSources());
Expand All @@ -202,7 +206,7 @@ public void testConstruction() {
builder.addSource(source);
}
builder.addSource(sources.get(0));
validate(builder.build(), false);
validate(builder.build(), false, DEFAULT_STREAM_NAME);

// equals and hashcode coverage
External external = copy.getExternal();
Expand Down Expand Up @@ -485,8 +489,8 @@ public void testDiscardPolicy() {
assertEquals(DiscardPolicy.Old, builder.build().getDiscardPolicy());
}

private void validate(StreamConfiguration sc, boolean serverTest) {
assertEquals("sname", sc.getName());
private void validate(StreamConfiguration sc, boolean serverTest, String name) {
assertEquals(name, sc.getName());
assertEquals("blah blah", sc.getDescription());
assertEquals(4, sc.getSubjects().size());
assertEquals("foo", sc.getSubjects().get(0));
Expand Down
89 changes: 89 additions & 0 deletions src/test/java/io/nats/client/impl/JetStreamManagementTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME;
import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT;
Expand Down Expand Up @@ -1545,4 +1546,92 @@ public void testCreateConsumerUpdateConsumer() throws Exception {
assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject());
});
}

@Test
public void testNoRespondersWhenConsumerDeleted() throws Exception {
ListenerForTesting listener = new ListenerForTesting();
jsServer.run(new Options.Builder().errorListener(listener), TestBase::atLeast2_10_26, nc -> {
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();

String stream = stream();
String subject = subject();

assertThrows(JetStreamApiException.class, () -> jsm.getMessage(stream, 1));

createMemoryStream(jsm, stream, subject);

for (int x = 0; x < 5; x++) {
js.publish(subject, null);
}

String consumer = create1026Consumer(jsm, stream, subject);
PullSubscribeOptions so = PullSubscribeOptions.fastBind(stream, consumer);
JetStreamSubscription sub = js.subscribe(null, so);
jsm.deleteConsumer(stream, consumer);
sub.pull(5);
validate1026(sub.nextMessage(500), listener, false);

ConsumerContext context = setupFor1026Simplification(nc, jsm, listener, stream, subject);
validate1026(context.next(1000), listener, true); // simplification next never raises warnings, so empty = true

context = setupFor1026Simplification(nc, jsm, listener, stream, subject);
//noinspection resource
FetchConsumer fc = context.fetch(FetchConsumeOptions.builder().maxMessages(1).raiseStatusWarnings(false).build());
validate1026(fc.nextMessage(), listener, true); // we said not to raise status warnings in the FetchConsumeOptions

context = setupFor1026Simplification(nc, jsm, listener, stream, subject);
//noinspection resource
fc = context.fetch(FetchConsumeOptions.builder().maxMessages(1).raiseStatusWarnings().build());
validate1026(fc.nextMessage(), listener, false); // we said raise status warnings in the FetchConsumeOptions

context = setupFor1026Simplification(nc, jsm, listener, stream, subject);
IterableConsumer ic = context.iterate(ConsumeOptions.builder().raiseStatusWarnings(false).build());
validate1026(ic.nextMessage(1000), listener, true); // we said not to raise status warnings in the ConsumeOptions

context = setupFor1026Simplification(nc, jsm, listener, stream, subject);
ic = context.iterate(ConsumeOptions.builder().raiseStatusWarnings().build());
validate1026(ic.nextMessage(1000), listener, false); // we said raise status warnings in the ConsumeOptions

AtomicInteger count = new AtomicInteger();
MessageHandler handler = m -> count.incrementAndGet();

context = setupFor1026Simplification(nc, jsm, listener, stream, subject);
//noinspection resource
context.consume(ConsumeOptions.builder().raiseStatusWarnings(false).build(), handler);
Thread.sleep(100); // give time to get a message
assertEquals(0, count.get());
validate1026(null, listener, true);

context = setupFor1026Simplification(nc, jsm, listener, stream, subject);
//noinspection resource
context.consume(ConsumeOptions.builder().raiseStatusWarnings().build(), handler);
Thread.sleep(100); // give time to get a message
assertEquals(0, count.get());
validate1026(null, listener, false);
});
}

private static void validate1026(Message m, ListenerForTesting listener, boolean empty) {
assertNull(m);
sleep(100); // give time for the message to get there
assertEquals(empty, listener.getPullStatusWarnings().isEmpty());
}

private static ConsumerContext setupFor1026Simplification(Connection nc, JetStreamManagement jsm, ListenerForTesting listener, String stream, String subject) throws IOException, JetStreamApiException {
listener.reset();
String consumer = create1026Consumer(jsm, stream, subject);
ConsumerContext cCtx = nc.getConsumerContext(stream, consumer);
jsm.deleteConsumer(stream, consumer);
return cCtx;
}

private static String create1026Consumer(JetStreamManagement jsm, String stream, String subject) throws IOException, JetStreamApiException {
String consumer = name();
jsm.addOrUpdateConsumer(stream, ConsumerConfiguration.builder()
.durable(consumer)
.filterSubject(subject)
.build());
return consumer;
}
}
4 changes: 4 additions & 0 deletions src/test/java/io/nats/client/utils/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public static boolean atLeast2_9_0(ServerInfo si) {
return si.isSameOrNewerThanVersion("2.9.0");
}

public static boolean atLeast2_10_26(ServerInfo si) {
return si.isSameOrNewerThanVersion("2.10.26");
}

public static boolean atLeast2_9_1(ServerInfo si) {
return si.isSameOrNewerThanVersion("2.9.1");
}
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/nats/service/ServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public void testServiceWorkflow() throws Exception {
.build();

service1.addServiceEndpoints(seRev1);
sleep(100); // give the service some time to get running. remember it's got to subscribe on the server

for (int x = 0; x < requestCount; x++) {
verifyServiceExecution(clientNc, REVERSE_ENDPOINT_NAME, REVERSE_ENDPOINT_SUBJECT, null);
Expand Down
Loading