Skip to content

Commit

Permalink
handleCommunicationIssue uses forceClose
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Jun 5, 2024
1 parent aa60105 commit 17c4779
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
35 changes: 22 additions & 13 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,9 @@ public void run() {
} catch (Exception exp) {
processException(exp);
try {
this.closeSocket(false);
// allow force reconnect since this is pretty exceptional,
// a connection failure while trying to connect
this.closeSocket(false, true);
} catch (InterruptedException e) {
processException(e);
}
Expand Down Expand Up @@ -691,7 +693,9 @@ void handleCommunicationIssue(Exception io) {
// waiting on read/write threads
executor.submit(() -> {
try {
this.closeSocket(true);
// any issue that brings us here is pretty serious
// so we are comfortable forcing the close
this.closeSocket(true, true);
} catch (InterruptedException e) {
processException(e);
Thread.currentThread().interrupt();
Expand All @@ -701,7 +705,7 @@ void handleCommunicationIssue(Exception io) {

// Close socket is called when another connect attempt is possible
// Close is called when the connection should shut down, period
void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException {
// Ensure we close the socket exclusively within one thread.
closeSocketLock.lock();
try {
Expand All @@ -720,7 +724,7 @@ void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
statusLock.unlock();
}

closeSocketImpl();
closeSocketImpl(forceClose);

statusLock.lock();
try {
Expand Down Expand Up @@ -749,10 +753,10 @@ void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
*/
@Override
public void close() throws InterruptedException {
this.close(true);
this.close(true, false);
}

void close(boolean checkDrainStatus) throws InterruptedException {
void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException {
statusLock.lock();
try {
if (checkDrainStatus && this.isDraining()) {
Expand All @@ -779,7 +783,7 @@ void close(boolean checkDrainStatus) throws InterruptedException {
this.reconnectWaiter.cancel(true);
}

closeSocketImpl();
closeSocketImpl(forceClose);

this.dispatchers.forEach((nuid, d) -> d.stop(false));

Expand Down Expand Up @@ -831,7 +835,7 @@ void close(boolean checkDrainStatus) throws InterruptedException {
}

// Should only be called from closeSocket or close
void closeSocketImpl() {
void closeSocketImpl(boolean forceClose) {
this.currentServer = null;

// Signal both to stop.
Expand All @@ -854,8 +858,13 @@ void closeSocketImpl() {

// Close the current socket and cancel anyone waiting for it
try {
if (this.dataPort != null) {
this.dataPort.close();
if (dataPort != null) {
if (forceClose) {
dataPort.forceClose();
}
else {
dataPort.close();
}
}

} catch (IOException ex) {
Expand Down Expand Up @@ -2121,7 +2130,7 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
try {
this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know
} catch (Exception e) {
this.close(false);
this.close(false, false);
throw e;
}

Expand Down Expand Up @@ -2163,13 +2172,13 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
}
}

this.close(false); // close the connection after the last flush
this.close(false, false); // close the connection after the last flush
tracker.complete(consumers.isEmpty());
} catch (TimeoutException | InterruptedException e) {
this.processException(e);
} finally {
try {
this.close(false);// close the connection after the last flush
this.close(false, false);// close the connection after the last flush
} catch (InterruptedException e) {
processException(e);
Thread.currentThread().interrupt();
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/io/nats/client/impl/SocketDataPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -166,15 +167,10 @@ public void close() throws IOException {
@Override
public void forceClose() throws IOException {
try {
// If we are here, and are being asked to force close,
// there is no need to linger. The dev might have set
// their own linger, in which case use theirs,
// otherwise set it to 0 for the quickest close
if (soLinger < 0) {
socket.setSoLinger(true, 0);
}
// If we are being asked to force close, there is no need to linger.
socket.setSoLinger(true, 0);
}
catch (IOException e) {
catch (SocketException e) {
// don't want to fail if I couldn't set linger
}
close();
Expand Down

0 comments on commit 17c4779

Please sign in to comment.