Skip to content

Commit

Permalink
[CONJ-492] Failover handle reconnect on KILL command
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Jul 24, 2017
1 parent 7cb369f commit 4202e8d
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,14 @@ public HandleErrorResult handleFailover(SQLException qe, Method method, Object[]
+ " ] connection fail. Reason : " + qe.getMessage());
addToBlacklist(currentProtocol.getHostAddress());
}
return primaryFail(method, args);

//check that failover is due to kill command
boolean killCmd = qe != null
&& qe.getSQLState() != null
&& qe.getSQLState().equals("70100")
&& 1927 == qe.getErrorCode();

return primaryFail(method, args, killCmd);
}

/**
Expand Down Expand Up @@ -421,7 +428,7 @@ public UrlParser getUrlParser() {

public abstract void switchReadOnlyConnection(Boolean readonly) throws SQLException;

public abstract HandleErrorResult primaryFail(Method method, Object[] args) throws Throwable;
public abstract HandleErrorResult primaryFail(Method method, Object[] args, boolean killCmd) throws Throwable;

/**
* Throw a human readable message after a failoverException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ public HandleErrorResult handleFailover(SQLException qe, Method method, Object[]
if (isExplicitClosed()) {
throw new SQLException("Connection has been closed !");
}

//check that failover is due to kill command
boolean killCmd = qe != null
&& qe.getSQLState() != null
&& qe.getSQLState().equals("70100")
&& 1927 == qe.getErrorCode();

if (protocol.mustBeMasterConnection()) {
if (!protocol.isMasterConnection()) {
logger.warn("SQL Primary node [" + this.currentProtocol.getHostAddress().toString()
Expand All @@ -108,15 +115,15 @@ public HandleErrorResult handleFailover(SQLException qe, Method method, Object[]

addToBlacklist(protocol.getHostAddress());
}
return primaryFail(method, args);
return primaryFail(method, args, killCmd);
} else {
if (setSecondaryHostFail()) {
logger.warn("SQL secondary node [" + this.currentProtocol.getHostAddress().toString()
+ ", conn " + this.currentProtocol.getServerThreadId()
+ " ] connection fail. Reason : " + qe.getMessage());
addToBlacklist(protocol.getHostAddress());
}
return secondaryFail(method, args);
return secondaryFail(method, args, killCmd);
}
}

Expand Down Expand Up @@ -182,7 +189,7 @@ public SearchFilter getFilterForFailedHost() {
return new SearchFilter(isMasterHostFail(), isSecondaryHostFail());
}

public abstract HandleErrorResult secondaryFail(Method method, Object[] args) throws Throwable;
public abstract HandleErrorResult secondaryFail(Method method, Object[] args, boolean killCmd) throws Throwable;

public abstract void foundActiveSecondary(Protocol newSecondaryProtocol) throws SQLException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,21 @@ private Object executeInvocation(Method method, Object[] args, boolean isSecondE

queryException = addHostInformationToException(queryException, protocol);

//check that failover is due to kill command
boolean killCmd = queryException != null
&& queryException.getSQLState() != null
&& queryException.getSQLState().equals("70100")
&& 1927 == queryException.getErrorCode();

if (killCmd) {
handleFailOver(queryException, method, args, protocol);
return null;
}

if (hasToHandleFailover(queryException)) {
return handleFailOver(queryException, method, args, protocol);
}

//error is "The MySQL server is running with the %s option so it cannot execute this statement"
//checking that server was master has not been demote to slave without resetting connections
if (queryException.getErrorCode() == 1290
Expand All @@ -251,7 +263,7 @@ private Object executeInvocation(Method method, Object[] args, boolean isSecondE
lock.lock();
try {
protocol.close();
isReconnected = listener.primaryFail(null, null).isReconnected;
isReconnected = listener.primaryFail(null, null, false).isReconnected;
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -288,6 +300,7 @@ private Object handleFailOver(SQLException qe, Method method, Object[] args, Pro
failHostAddress = protocol.getHostAddress();
failIsMaster = protocol.isMasterConnection();
}

HandleErrorResult handleErrorResult = listener.handleFailover(qe, method, args, protocol);
if (handleErrorResult.mustThrowError) {
listener.throwFailoverMessage(failHostAddress, failIsMaster, qe, handleErrorResult.isReconnected);
Expand All @@ -306,13 +319,15 @@ private Object handleFailOver(SQLException qe, Method method, Object[] args, Pro
* 08004 : SQL server rejected SQL connection
* 08006 : connection failure
* 08007 : transaction resolution unknown
* 70100 : connection was killed
* 70100 : connection was killed if error code is "1927"
*
* @param exception the Exception
* @return true if there has been a connection error that must be handled by failover
*/
public boolean hasToHandleFailover(SQLException exception) {
return exception.getSQLState() != null && exception.getSQLState().startsWith("08");
return exception.getSQLState() != null
&& (exception.getSQLState().startsWith("08")
|| (exception.getSQLState().equals("70100") && 1927 == exception.getErrorCode())) ;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public HandleErrorResult(boolean isReconnected) {
this.isReconnected = isReconnected;
}

public HandleErrorResult(boolean isReconnected, boolean mustThrowError) {
this.isReconnected = isReconnected;
this.mustThrowError = mustThrowError;
}

@Override
public String toString() {
return "HandleErrorResult{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public interface Listener {

void switchReadOnlyConnection(Boolean readonly) throws SQLException;

HandleErrorResult primaryFail(Method method, Object[] args) throws Throwable;
HandleErrorResult primaryFail(Method method, Object[] args, boolean killCmd) throws Throwable;

Object invoke(Method method, Object[] args, Protocol specificProtocol) throws Throwable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void preClose() throws SQLException {
}

@Override
public HandleErrorResult primaryFail(Method method, Object[] args) throws Throwable {
public HandleErrorResult primaryFail(Method method, Object[] args, boolean killCmd) throws Throwable {
boolean alreadyClosed = !currentProtocol.isConnected();
boolean inTransaction = currentProtocol != null && currentProtocol.inTransaction();

Expand All @@ -156,6 +156,9 @@ public HandleErrorResult primaryFail(Method method, Object[] args) throws Throwa
try {
reconnectFailedConnection(new SearchFilter(true, false));
handleFailLoop();

if (killCmd) return new HandleErrorResult(true, false);

if (alreadyClosed || (!alreadyClosed && !inTransaction && isQueryRelaunchable(method, args))) {
logger.info("Connection to master lost, new master " + currentProtocol.getHostAddress() + " found"
+ ", query type permit to be re-execute on new server without throwing exception");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,12 +521,13 @@ public void switchReadOnlyConnection(Boolean mustBeReadOnly) throws SQLException
/**
* To handle the newly detected failover on the master connection.
*
* @param method the initial called method
* @param args the initial args
* @param method the initial called method
* @param args the initial args
* @param killCmd is the fail due to a KILL cmd
* @return an object to indicate if the previous Exception must be thrown, or the object resulting if a failover worked
* @throws Throwable if failover has not been catch
*/
public HandleErrorResult primaryFail(Method method, Object[] args) throws Throwable {
public HandleErrorResult primaryFail(Method method, Object[] args, boolean killCmd) throws Throwable {
boolean alreadyClosed = !masterProtocol.isConnected();
boolean inTransaction = masterProtocol != null && masterProtocol.inTransaction();

Expand Down Expand Up @@ -580,6 +581,9 @@ public HandleErrorResult primaryFail(Method method, Object[] args) throws Throwa
try {
reconnectFailedConnection(new SearchFilter(true, urlParser.getOptions().failOnReadOnly));
handleFailLoop();

if (killCmd) return new HandleErrorResult(true, false);

if (currentReadOnlyAsked //use master connection temporary in replacement of slave
|| alreadyClosed //connection was already close
|| (!alreadyClosed && !inTransaction && isQueryRelaunchable(method, args))) { //connection was not in transaction
Expand Down Expand Up @@ -658,12 +662,13 @@ private boolean pingSecondaryProtocol(Protocol protocol) {
/**
* To handle the newly detected failover on the secondary connection.
*
* @param method the initial called method
* @param args the initial args
* @param method the initial called method
* @param args the initial args
* @param killCmd is fail due to a KILL command
* @return an object to indicate if the previous Exception must be thrown, or the object resulting if a failover worked
* @throws Throwable if failover has not catch error
*/
public HandleErrorResult secondaryFail(Method method, Object[] args) throws Throwable {
public HandleErrorResult secondaryFail(Method method, Object[] args, boolean killCmd) throws Throwable {
proxy.lock.lock();
try {
if (pingSecondaryProtocol(this.secondaryProtocol)) {
Expand Down Expand Up @@ -711,6 +716,9 @@ public HandleErrorResult secondaryFail(Method method, Object[] args) throws Thro
proxy.lock.unlock();
}
}

if (killCmd) return new HandleErrorResult(true, false);

logger.info("Connection to slave lost, new slave " + currentProtocol.getHostAddress() + ", conn:"
+ currentProtocol.getServerThreadId() + " found"
+ ", query is re-execute on new server without throwing exception");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private void doRun() {

if (masterFail && listener.setMasterHostFail()) {
try {
listener.primaryFail(null, null);
listener.primaryFail(null, null, false);
} catch (Throwable t) {
//do nothing
}
Expand Down
34 changes: 29 additions & 5 deletions src/test/java/org/mariadb/jdbc/ErrorMessageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mariadb.jdbc.internal.protocol.Protocol;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.sql.DataSource;
import java.sql.*;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

public class ErrorMessageTest extends BaseTest {

Expand Down Expand Up @@ -252,4 +251,29 @@ private void executeBigBatchWithException(Connection connection) throws SQLExcep
}
}

@Test
public void testFailOverKillCmd() throws Throwable {
DataSource ds = new MariaDbDataSource("jdbc:mariadb:failover//"
+ ((hostname != null) ? hostname : "localhost") + ":" + port + ","
+ ((hostname != null) ? hostname : "localhost") + ":" + port
+ "/" + database + "?user=" + username + (password != null ? "&password=" + password : ""));

try (Connection connection = ds.getConnection()) {
Protocol protocol = getProtocolFromConnection(connection);
Statement stmt = connection.createStatement();
long threadId = protocol.getServerThreadId();
stmt.executeQuery("KILL " + threadId);
stmt.executeQuery("SELECT 1");
long newThreadId = protocol.getServerThreadId();
assertNotEquals(threadId, newThreadId);
PreparedStatement preparedStatement = connection.prepareStatement("KILL ?");
preparedStatement.setLong(1, newThreadId);
preparedStatement.execute();

stmt.executeQuery("SELECT 1");
long anotherNewThreadId = protocol.getServerThreadId();
assertNotEquals(anotherNewThreadId, newThreadId);

}
}
}

0 comments on commit 4202e8d

Please sign in to comment.