Skip to content

Commit

Permalink
Merge pull request #142 from streamkap-com/133-deadlock-when-connecti…
Browse files Browse the repository at this point in the history
…on-try-to-disconnect

DBZ-7570/#133: add workaround using SO_LINGER with 0 timeout
  • Loading branch information
Naros committed Apr 22, 2024
2 parents 0f38e43 + 217aac3 commit 3a1527f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
18 changes: 16 additions & 2 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public X509Certificate[] getAcceptedIssuers() {
private volatile long binlogPosition = 4;
private volatile long connectionId;
private SSLMode sslMode = SSLMode.DISABLED;
private boolean useNonGracefulDisconnect = false;

protected GtidSet gtidSet;
protected final Object gtidSetAccessLock = new Object();
Expand Down Expand Up @@ -249,6 +250,10 @@ public void setSSLMode(SSLMode sslMode) {
this.sslMode = sslMode;
}

public void setUseNonGracefulDisconnect(boolean useNonGracefulDisconnect) {
this.useNonGracefulDisconnect = useNonGracefulDisconnect;
}

public long getMasterServerId() {
return this.masterServerId;
}
Expand Down Expand Up @@ -891,7 +896,7 @@ public void run() {
if (connectionLost) {
logger.info("Keepalive: Trying to restore lost connection to " + hostname + ":" + port);
try {
terminateConnect();
terminateConnect(useNonGracefulDisconnect);
connect(connectTimeout);
} catch (Exception ce) {
logger.warning("keepalive: Failed to restore connection to " + hostname + ":" + port +
Expand Down Expand Up @@ -1341,8 +1346,11 @@ private static boolean awaitTerminationInterruptibly(ExecutorService executorSer
}

private void terminateConnect() throws IOException {
terminateConnect(false);
}
private void terminateConnect(boolean force) throws IOException {
do {
disconnectChannel();
disconnectChannel(force);
} while (!tryLockInterruptibly(connectLock, 1000, TimeUnit.MILLISECONDS));
connectLock.unlock();
}
Expand All @@ -1356,8 +1364,14 @@ private static boolean tryLockInterruptibly(Lock lock, long time, TimeUnit unit)
}

private void disconnectChannel() throws IOException {
disconnectChannel(false);
}
private void disconnectChannel(boolean force) throws IOException {
connected = false;
if (channel != null && channel.isOpen()) {
if (force) {
channel.setShouldUseSoLinger0();
}
channel.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.net.ssl.SSLSocket;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.Channel;

/**
Expand All @@ -38,6 +39,7 @@ public class PacketChannel implements Channel {
private Socket socket;
private ByteArrayInputStream inputStream;
private ByteArrayOutputStream outputStream;
private boolean shouldUseSoLinger0 = false;

public PacketChannel(String hostname, int port) throws IOException {
this(new Socket(hostname, port));
Expand Down Expand Up @@ -109,6 +111,10 @@ public boolean isSSL() {
return isSSL;
}

public void setShouldUseSoLinger0() {
shouldUseSoLinger0 = true;
}

@Override
public boolean isOpen() {
return !socket.isClosed();
Expand All @@ -126,6 +132,14 @@ public void close() throws IOException {
} catch (Exception e) {
// ignore
}
if (shouldUseSoLinger0) {
try {
socket.setSoLinger(true, 0);
} catch (SocketException e) {
// ignore
}
}
socket.close();
shouldUseSoLinger0 = false;
}
}

0 comments on commit 3a1527f

Please sign in to comment.