Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TimeoutException instead of CancellationException on timeout #1031

Merged
merged 4 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ public class Options {
*/
@Deprecated
public static final String PROP_UTF8_SUBJECTS = "allow.utf8.subjects";
/**
* Property used to throw {@link java.util.concurrent.TimeoutException} on timeout instead of {@link java.util.concurrent.CancellationException}.
* {@link Builder#useTimeoutException()}.
*/
public static final String PROP_USE_TIMEOUT_EXCEPTION = PFX + "use.timeout.exception";

// ----------------------------------------------------------------------------------------------------
// PROTOCOL CONNECT OPTION CONSTANTS
Expand Down Expand Up @@ -571,6 +576,7 @@ public class Options {
private final boolean discardMessagesWhenOutgoingQueueFull;
private final boolean ignoreDiscoveredServers;
private final boolean tlsFirst;
private final boolean useTimeoutException;

private final AuthHandler authHandler;
private final ReconnectDelayHandler reconnectDelayHandler;
Expand Down Expand Up @@ -679,6 +685,7 @@ public static class Builder {
private boolean discardMessagesWhenOutgoingQueueFull = DEFAULT_DISCARD_MESSAGES_WHEN_OUTGOING_QUEUE_FULL;
private boolean ignoreDiscoveredServers = false;
private boolean tlsFirst = false;
private boolean useTimeoutException = false;
private ServerPool serverPool = null;
private DispatcherFactory dispatcherFactory = null;

Expand Down Expand Up @@ -804,6 +811,7 @@ public Builder properties(Properties props) {

booleanProperty(props, PROP_IGNORE_DISCOVERED_SERVERS, b -> this.ignoreDiscoveredServers = b);
booleanProperty(props, PROP_TLS_FIRST, b -> this.tlsFirst = b);
booleanProperty(props, PROP_USE_TIMEOUT_EXCEPTION, b -> this.useTimeoutException = b);

classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
Expand Down Expand Up @@ -1489,6 +1497,15 @@ public Builder tlsFirst() {
return this;
}

/**
* Throw {@link java.util.concurrent.TimeoutException} on timeout instead of {@link java.util.concurrent.CancellationException}?
* @return the Builder for chaining
*/
public Builder useTimeoutException() {
this.useTimeoutException = true;
return this;
}

/**
* Set the ServerPool implementation for connections to use instead of the default implementation
* @param serverPool the implementation
Expand Down Expand Up @@ -1667,6 +1684,7 @@ public Builder(Options o) {

this.ignoreDiscoveredServers = o.ignoreDiscoveredServers;
this.tlsFirst = o.tlsFirst;
this.useTimeoutException = o.useTimeoutException;

this.serverPool = o.serverPool;
this.dispatcherFactory = o.dispatcherFactory;
Expand Down Expand Up @@ -1729,6 +1747,7 @@ private Options(Builder b) {

this.ignoreDiscoveredServers = b.ignoreDiscoveredServers;
this.tlsFirst = b.tlsFirst;
this.useTimeoutException = b.useTimeoutException;

this.serverPool = b.serverPool;
this.dispatcherFactory = b.dispatcherFactory;
Expand Down Expand Up @@ -2117,6 +2136,14 @@ public boolean isTlsFirst() {
return tlsFirst;
}

/**
* Get whether to throw {@link java.util.concurrent.TimeoutException} on timeout instead of {@link java.util.concurrent.CancellationException}.
* @return the flag
*/
public boolean useTimeoutException() {
return useTimeoutException;
}

/**
* Get the ServerPool implementation. If null, a default implementation is used.
* @return the ServerPool implementation
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ CompletableFuture<Message> requestFutureInternal(String subject, Headers headers
String responseToken = getResponseToken(responseInbox);
NatsRequestCompletableFuture future =
new NatsRequestCompletableFuture(cancelAction,
futureTimeout == null ? options.getRequestCleanupInterval() : futureTimeout);
futureTimeout == null ? options.getRequestCleanupInterval() : futureTimeout, options.useTimeoutException());

if (!oldStyle) {
responsesAwaiting.put(responseToken, future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

/**
* This is an internal class and is only public for access.
Expand All @@ -18,11 +19,13 @@ public enum CancelAction { CANCEL, REPORT, COMPLETE }
private final long timeOutAfter;
private boolean wasCancelledClosing;
private boolean wasCancelledTimedOut;
private final boolean useTimeoutException;

public NatsRequestCompletableFuture(CancelAction cancelAction, Duration timeout) {
public NatsRequestCompletableFuture(CancelAction cancelAction, Duration timeout, boolean useTimeoutException) {
this.cancelAction = cancelAction;
timeOutAfter = System.currentTimeMillis() + 10 + (timeout == null ? DEFAULT_TIMEOUT : timeout.toMillis());
// 10 extra millis allows for communication time, probably more than needed but...
this.useTimeoutException = useTimeoutException;
}

public void cancelClosing() {
Expand All @@ -32,7 +35,8 @@ public void cancelClosing() {

public void cancelTimedOut() {
wasCancelledTimedOut = true;
completeExceptionally(new CancellationException("Future cancelled, response not registered in time, likely due to server disconnect."));
final String message = "Future cancelled, response not registered in time, likely due to server disconnect.";
completeExceptionally(useTimeoutException ? new TimeoutException(message) : new CancellationException(message));
}

public CancelAction getCancelAction() {
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/nats/client/impl/RequestTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ public void testThrowsEmptySubject() {

@Test
public void testNatsRequestCompletableFuture() throws InterruptedException {
NatsRequestCompletableFuture f = new NatsRequestCompletableFuture(CancelAction.CANCEL, Duration.ofHours(-1));
NatsRequestCompletableFuture f = new NatsRequestCompletableFuture(CancelAction.CANCEL, Duration.ofHours(-1), true);
assertEquals(CancelAction.CANCEL, f.getCancelAction());
assertTrue(f.hasExceededTimeout());
assertFalse(f.wasCancelledClosing());
Expand All @@ -690,14 +690,14 @@ public void testNatsRequestCompletableFuture() throws InterruptedException {
assertTrue(f.wasCancelledClosing());
assertTrue(f.wasCancelledTimedOut());

f = new NatsRequestCompletableFuture(CancelAction.COMPLETE, Duration.ofHours(-1));
f = new NatsRequestCompletableFuture(CancelAction.COMPLETE, Duration.ofHours(-1), true);
assertEquals(CancelAction.COMPLETE, f.getCancelAction());

f = new NatsRequestCompletableFuture(CancelAction.REPORT, Duration.ofHours(-1));
f = new NatsRequestCompletableFuture(CancelAction.REPORT, Duration.ofHours(-1), true);
assertEquals(CancelAction.REPORT, f.getCancelAction());

// coverage for null timeout
f = new NatsRequestCompletableFuture(CancelAction.CANCEL, null);
f = new NatsRequestCompletableFuture(CancelAction.CANCEL, null, true);
Thread.sleep(Options.DEFAULT_REQUEST_CLEANUP_INTERVAL.toMillis() + 100);
assertTrue(f.hasExceededTimeout());
}
Expand Down