Skip to content

Commit

Permalink
Remove doOnNext(…) hook to close the sink if the connection is closed…
Browse files Browse the repository at this point in the history
… while emitting requests.

Once the conversation is accepted, we no longer need to check on a new backend message whether the connection is closed as a channelInactive()/connection.close() signal terminates conversations anyway.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed Feb 16, 2022
1 parent 6c98f43 commit dcf2e23
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.channel.AbortedException;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
Expand Down Expand Up @@ -513,6 +514,11 @@ private void handleClose() {
}

private void handleConnectionError(Throwable error) {

if (AbortedException.isConnectionReset(error) && !isConnected()) {
drainError(() -> this.messageSubscriber.createClientClosedException(error));
}

drainError(() -> new PostgresConnectionException(error));
}

Expand Down Expand Up @@ -554,6 +560,10 @@ public PostgresConnectionClosedException(String reason) {
super(reason);
}

public PostgresConnectionClosedException(String reason, @Nullable Throwable cause) {
super(reason, cause);
}

}

static class PostgresConnectionException extends R2dbcNonTransientResourceException {
Expand Down Expand Up @@ -689,7 +699,7 @@ private class BackendMessageSubscriber implements CoreSubscriber<BackendMessage>

private Subscription upstream;

public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender,
public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Publisher<FrontendMessage>> sender,
Supplier<Boolean> isConnected) {

return Flux.create(sink -> {
Expand All @@ -707,13 +717,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
return;
}

Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {
if (!isConnected.get()) {
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
}
});

sender.accept(requestMessages);
sender.accept(requests);
} else {
sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));
}
Expand All @@ -722,7 +726,11 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
}

PostgresConnectionClosedException createClientClosedException() {
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed");
return createClientClosedException(null);
}

PostgresConnectionClosedException createClientClosedException(@Nullable Throwable cause) {
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed", cause);
}

/**
Expand Down

0 comments on commit dcf2e23

Please sign in to comment.