Skip to content

Commit

Permalink
better pull error message and async tests (#888)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Apr 13, 2023
1 parent f9a23f3 commit e18f13e
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 164 deletions.
30 changes: 13 additions & 17 deletions src/main/java/io/nats/client/JetStreamStatusException.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,43 @@
* JetStreamStatusException is used to indicate an unknown status message was received.
*/
public class JetStreamStatusException extends IllegalStateException {
@Deprecated
public static final String DEFAULT_DESCRIPTION = "Unknown or unprocessed status message";

private final JetStreamSubscription sub;
private final String description;
private final Status status;

/**
* Construct an exception with a status message
* Construct JetStreamStatusException for a subscription and a status
* @param sub the subscription
* @param status the status
*/
public JetStreamStatusException(JetStreamSubscription sub, Status status) {
this(sub, DEFAULT_DESCRIPTION, status);
this(sub, status, status.getMessageWithCode());
}

/**
* Construct an exception with a status message
* @param sub the subscription
* @param description custom description
* @param status the status
* Construct JetStreamStatusException for a subscription and a status and a custom message
* @param sub the subscription
* @param status the status
* @param message the exception message
*/
public JetStreamStatusException(JetStreamSubscription sub, String description, Status status) {
super(description + ": " + status.getMessage());
public JetStreamStatusException(JetStreamSubscription sub, Status status, String message) {
super(message);
this.sub = sub;
this.description = description;
this.status = status;
}

/**
* Construct an exception with a status message
* Construct JetStreamStatusException for a status
* @param status the status
*/
public JetStreamStatusException(Status status) {
super(status.getMessageWithCode());
this.sub = null;
this.description = status.toString();
this.status = status;
this(null, status, status.getMessageWithCode());
}

/**
* Get the subscription this issue occurred on
*
* @return the subscription
*/
public JetStreamSubscription getSubscription() {
Expand All @@ -71,8 +66,9 @@ public JetStreamSubscription getSubscription() {
* Get the description
* @return the description
*/
@Deprecated
public String getDescription() {
return description;
return getMessage();
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/nats/client/impl/PushMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected ManageResult manageStatus(Message msg) {

conn.executeCallback((c, el) -> el.unhandledStatus(c, sub, status));
if (syncMode) {
throw new JetStreamStatusException(sub, status);
throw new JetStreamStatusException(sub, status,
"Unknown or unprocessed status message: " + status.getMessageWithCode());
}
return ERROR;
}
Expand Down
25 changes: 15 additions & 10 deletions src/test/java/io/nats/client/RequestTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,33 @@

package io.nats.client;

import io.nats.client.impl.TestHandler;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

import static io.nats.client.utils.TestBase.*;
import static io.nats.client.utils.TestBase.standardConnection;
import static io.nats.client.utils.TestBase.subject;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class RequestTests {

@Test
public void testRequestNoResponder() throws Exception {
try (NatsTestServer ts = new NatsTestServer(false);
Connection ncCancel = standardConnection(ts.getURI());
Connection ncReport = standardConnection(standardOptionsBuilder(ts.getURI()).reportNoResponders().build());
)
{
assertThrows(CancellationException.class, () -> ncCancel.request(subject(999), null).get());
ExecutionException ee = assertThrows(ExecutionException.class, () -> ncReport.request(subject(999), null).get());
assertTrue(ee.getCause() instanceof JetStreamStatusException);
assertTrue(ee.getMessage().contains("503 No Responders Available For Request"));
try (NatsTestServer ts = new NatsTestServer(false)) {
Options optCancel = Options.builder().server(ts.getURI()).errorListener(new TestHandler()).build();
Options optReport = Options.builder().server(ts.getURI()).reportNoResponders().errorListener(new TestHandler()).build();
try (Connection ncCancel = standardConnection(optCancel);
Connection ncReport = standardConnection(optReport);
)
{
assertThrows(CancellationException.class, () -> ncCancel.request(subject(999), null).get());
ExecutionException ee = assertThrows(ExecutionException.class, () -> ncReport.request(subject(999), null).get());
assertTrue(ee.getCause() instanceof JetStreamStatusException);
assertTrue(ee.getMessage().contains("503 No Responders Available For Request"));
}
}
}
}
49 changes: 24 additions & 25 deletions src/test/java/io/nats/client/impl/ConsumeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public void testFetch() throws Exception {
// 1. Different fetch sizes demonstrate expiration behavior

// 1A. equal number of messages than the fetch size
_testFetch(nc, 20, 0);
_testFetch("1A", nc, 20, 0);

// 1B. more messages than the fetch size
_testFetch(nc, 10, 0);
_testFetch("1B", nc, 10, 0);

// 1C. fewer messages than the fetch size
_testFetch(nc, 40, 0);
_testFetch("1C", nc, 40, 0);

// don't test bytes before 2.9.1
if (nc.getServerInfo().isOlderThanVersion("2.9.1")) {
Expand All @@ -56,17 +56,20 @@ public void testFetch() throws Exception {
// - each test message is approximately 100 bytes

// 2A. max bytes is reached before message count
_testFetch(nc, 20, 750);
_testFetch("2A", nc, 20, 750);

// 2B. fetch size is reached before byte count
_testFetch(nc, 10, 1500);
_testFetch("2B", nc, 10, 1500);

// 2C. fewer bytes than the byte count
_testFetch(nc, 40, 3000);
_testFetch("2C", nc, 40, 3000);

// 3. simple-consumer-40msgs was created in 1C and has no messages available
_testFetch("3", nc, 40, 0);
});
}

private static void _testFetch(Connection nc, int maxMessages, int maxBytes) throws IOException, JetStreamApiException, InterruptedException {
private static void _testFetch(String label, Connection nc, int maxMessages, int maxBytes) throws IOException, JetStreamApiException, InterruptedException {
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();

Expand All @@ -83,7 +86,7 @@ private static void _testFetch(Connection nc, int maxMessages, int maxBytes) thr
FetchConsumeOptions fetchConsumeOptions = FetchConsumeOptions.builder()
.maxMessages(maxMessages) // usually you would use only one or the other
.maxBytes(maxBytes, maxMessages) // /\ /\
.expiresIn(1500)
.expiresIn(2000)
.build();

long start = System.currentTimeMillis();
Expand All @@ -99,27 +102,23 @@ private static void _testFetch(Connection nc, int maxMessages, int maxBytes) thr
}
long elapsed = System.currentTimeMillis() - start;

if (maxBytes > 0) {
if (maxMessages > 20) {
switch (label) {
case "1A":
case "1B":
case "2B":
assertEquals(maxMessages, rcvd);
assertTrue(elapsed < 250);
break;
case "1C":
case "2C":
case "3":
assertTrue(rcvd < maxMessages);
assertTrue(elapsed >= 1500);
}
else if (maxMessages * 100 > maxBytes) {
break;
case "2A":
assertTrue(rcvd < maxMessages);
assertTrue(elapsed < 250);
}
else {
assertEquals(maxMessages, rcvd);
assertTrue(elapsed < 250);
}
}
else if (maxMessages > 20) {
assertTrue(rcvd < maxMessages);
assertTrue(elapsed >= 1500);
}
else {
assertEquals(maxMessages, rcvd);
assertTrue(elapsed < 250);
break;
}
}

Expand Down
Loading

0 comments on commit e18f13e

Please sign in to comment.