Skip to content

Commit

Permalink
Polishing.
Browse files Browse the repository at this point in the history
Reuse connection-closed exception factory method.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed Feb 16, 2022
1 parent aacf383 commit 5e9d030
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,22 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
Assert.requireNonNull(takeUntil, "takeUntil must not be null");
Assert.requireNonNull(requests, "requests must not be null");

return this.messageSubscriber.addConversation(takeUntil, requests, it -> this.requestSink.emitNext(it, Sinks.EmitFailureHandler.FAIL_FAST), this::isConnected);
if (!isConnected()) {
return Flux.error(this.messageSubscriber.createClientClosedException());
}

return this.messageSubscriber.addConversation(takeUntil, requests, this::doSendRequest, this::isConnected);
}

@Override
public void send(FrontendMessage message) {
Assert.requireNonNull(message, "requests must not be null");

this.requestSink.emitNext(Mono.just(message), Sinks.EmitFailureHandler.FAIL_FAST);
doSendRequest(Mono.just(message));
}

private void doSendRequest(Publisher<FrontendMessage> it) {
this.requestSink.emitNext(it, Sinks.EmitFailureHandler.FAIL_FAST);
}

private Mono<Void> resumeError(Throwable throwable) {
Expand Down Expand Up @@ -676,7 +684,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
sink.onRequest(value -> onRequest(conversation, value));

if (!isConnected.get()) {
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
sink.error(createClientClosedException());
return;
}

Expand All @@ -689,12 +697,15 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
sender.accept(requestMessages);
} else {
sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));

}
}
});
}

PostgresConnectionClosedException createClientClosedException() {
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed");
}

/**
* {@link Subscription#request(long)} callback. Request more for a {@link Conversation}. Potentially, demands also more upstream elements.
*
Expand Down

0 comments on commit 5e9d030

Please sign in to comment.