Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
id("com.jfrog.bintray") version "1.8.4"
}

version = "2.4.3"
version = "2.4.4"
group = "com.rethinkdb"

java.sourceCompatibility = JavaVersion.VERSION_1_8
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/**
* The default {@link ConnectionSocket.Factory} and {@link ResponsePump.Factory} for any default connections.
Expand Down Expand Up @@ -177,6 +178,7 @@ public String toString() {
}

private static class ThreadResponsePump implements ResponsePump {
private final AtomicReference<Throwable> shutdownReason = new AtomicReference<>();
private final Thread thread;
private Map<Long, CompletableFuture<Response>> awaiting = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -220,7 +222,7 @@ public ThreadResponsePump(ConnectionSocket socket, boolean daemon) {
@Override
public @NotNull CompletableFuture<Response> await(long token) {
if (awaiting == null) {
throw new ReqlDriverError("Response pump closed.");
throw new ReqlDriverError("Response pump closed.", shutdownReason.get());
}
CompletableFuture<Response> future = new CompletableFuture<>();
awaiting.put(token, future);
Expand All @@ -234,6 +236,7 @@ public boolean isAlive() {

private void shutdown(Throwable t) {
Map<Long, CompletableFuture<Response>> awaiting = this.awaiting;
this.shutdownReason.compareAndSet(null, t);
this.awaiting = null;
thread.interrupt();
if (awaiting != null) {
Expand All @@ -243,7 +246,7 @@ private void shutdown(Throwable t) {

@Override
public void shutdownPump() {
shutdown(new ReqlDriverError("Response pump closed."));
shutdown(new Throwable("Shutdown was requested."));
}

@Override
Expand Down
Loading