Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJ-278] Improve prepared statement on failover
  • Loading branch information
rusher committed Apr 18, 2016
1 parent 09af757 commit d46dedc
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -6,7 +6,7 @@
<artifactId>mariadb-java-client</artifactId>
<packaging>jar</packaging>
<name>mariadb-java-client</name>
<version>1.4.2</version>
<version>1.4.3-SNAPSHOT</version>
<description>JDBC driver for MariaDB and MySQL</description>
<url>https://mariadb.com/kb/en/mariadb/about-mariadb-connector-j/</url>

Expand Down
Expand Up @@ -303,7 +303,13 @@ public boolean isQueryRelaunchable(Method method, Object[] args) {
if (method != null) {
if ("executeQuery".equals(method.getName()) && args[1] instanceof String) {
return ((String) args[1]).toUpperCase().startsWith("SELECT");
} else if ("executePreparedQuery".equals(method.getName()) && args[2] instanceof String) {
} else if (("executePreparedQuery".equals(method.getName()) || "executePreparedQueryAfterFailover".equals(method.getName()))
&& args[2] instanceof String) {
PrepareResult prepareResult = (PrepareResult) args[1];
if (!prepareResult.isExecuteOnMaster()) {
//query must normally be launched on slave.
return true;
}
return ((String) args[2]).toUpperCase().startsWith("SELECT");
}
}
Expand Down
Expand Up @@ -50,6 +50,7 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS
package org.mariadb.jdbc.internal.failover;

import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.internal.MariaDbType;
import org.mariadb.jdbc.internal.protocol.Protocol;
import org.mariadb.jdbc.internal.util.ExceptionMapper;
import org.mariadb.jdbc.internal.util.dao.PrepareResult;
Expand Down Expand Up @@ -132,8 +133,18 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
case METHOD_CLOSED_EXPLICIT:
this.listener.preClose();
return null;
case METHOD_PROLOG_PROXY:
case METHOD_EXECUTE_PREPARED_QUERY:
if (((PrepareResult) args[0]).mustRePrepareOnSlave() && !this.listener.hasHostFail()) {
//PrepareStatement was to be executed on slave, but since a failover was running on master connection. Slave connection is up
// again, so has to be reprepared on slave
try {
this.listener.rePrepareOnSlave(((PrepareResult) args[0]), (String) args[2], (MariaDbType[]) args[4]);
} catch (QueryException q) {
//error during reprepare, will do executed on master.
}
}
//No break, must continue
case METHOD_PROLOG_PROXY:
try {
return listener.invoke(method, args, ((PrepareResult) args[0]).getUnProxiedProtocol());
} catch (InvocationTargetException e) {
Expand Down
Expand Up @@ -51,15 +51,15 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS

import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.UrlParser;
import org.mariadb.jdbc.internal.MariaDbType;
import org.mariadb.jdbc.internal.util.dao.PrepareResult;
import org.mariadb.jdbc.internal.util.dao.QueryException;
import org.mariadb.jdbc.internal.protocol.Protocol;
import org.mariadb.jdbc.internal.failover.tools.SearchFilter;

import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

public interface Listener {
FailoverProxy getProxy();
Expand Down Expand Up @@ -129,4 +129,5 @@ void throwFailoverMessage(HostAddress failHostAddress, boolean wasMaster, QueryE

boolean checkMasterStatus(SearchFilter searchFilter);

void rePrepareOnSlave(PrepareResult oldPrepareResult, String sql, MariaDbType[] parameterTypeHeader) throws QueryException;
}
Expand Up @@ -51,10 +51,12 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS

import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.UrlParser;
import org.mariadb.jdbc.internal.MariaDbType;
import org.mariadb.jdbc.internal.failover.AbstractMastersListener;
import org.mariadb.jdbc.internal.failover.HandleErrorResult;
import org.mariadb.jdbc.internal.failover.thread.FailoverLoop;
import org.mariadb.jdbc.internal.failover.tools.SearchFilter;
import org.mariadb.jdbc.internal.util.dao.PrepareResult;
import org.mariadb.jdbc.internal.util.dao.QueryException;
import org.mariadb.jdbc.internal.util.constant.HaMode;
import org.mariadb.jdbc.internal.protocol.MasterProtocol;
Expand Down Expand Up @@ -316,4 +318,7 @@ public boolean checkMasterStatus(SearchFilter searchFilter) {
return false;
}

public void rePrepareOnSlave(PrepareResult oldPrepareResult, String sql, MariaDbType[] parameterTypeHeader) {
//no slave
}
}
Expand Up @@ -51,8 +51,10 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS

import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.UrlParser;
import org.mariadb.jdbc.internal.MariaDbType;
import org.mariadb.jdbc.internal.failover.AbstractMastersSlavesListener;
import org.mariadb.jdbc.internal.failover.thread.FailoverLoop;
import org.mariadb.jdbc.internal.util.dao.PrepareResult;
import org.mariadb.jdbc.internal.util.dao.ReconnectDuringTransactionException;
import org.mariadb.jdbc.internal.util.scheduler.DynamicSizedSchedulerInterface;
import org.mariadb.jdbc.internal.util.scheduler.SchedulerServiceProviderHolder;
Expand Down Expand Up @@ -740,4 +742,26 @@ public boolean checkMasterStatus(SearchFilter searchFilter) {
return false;
}

@Override
public void rePrepareOnSlave(PrepareResult oldPrepareResult, String sql, MariaDbType[] parameterTypeHeader) throws QueryException {
if (secondaryProtocol != null && !isSecondaryHostFail()) {
//prepare on slave
PrepareResult prepareResult = secondaryProtocol.prepare(sql, true);

//reset header status
for (int i = 0; i < parameterTypeHeader.length; i++) {
parameterTypeHeader[i] = null;
}

//release prepare on master
try {
prepareResult.getUnProxiedProtocol().releasePrepareStatement(prepareResult, sql);
} catch (QueryException exception) {
//released failed.
}

//replace prepare data
oldPrepareResult.failover(prepareResult.getStatementId(), secondaryProtocol);
}
}
}
Expand Up @@ -51,10 +51,10 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS
package org.mariadb.jdbc.internal.util.constant;

public final class Version {
public static final String version = "1.4.2";
public static final String version = "1.4.3-SNAPSHOT";
public static final int majorVersion = 1;
public static final int minorVersion = 4;
public static final int patchVersion = 2;
public static final String qualifier = "";
public static final int patchVersion = 3;
public static final String qualifier = "SNAPSHOT";

}
Expand Up @@ -59,6 +59,8 @@ public class PrepareResult {
private ColumnInformation[] columns;
private ColumnInformation[] parameters;
private Protocol unProxiedProtocol;
private final boolean executeOnMaster;


//share indicator
private volatile int shareCounter = 1;
Expand All @@ -77,6 +79,7 @@ public PrepareResult(int statementId, ColumnInformation[] columns, ColumnInforma
this.columns = columns;
this.parameters = parameters;
this.unProxiedProtocol = unProxiedProtocol;
this.executeOnMaster = unProxiedProtocol.isMasterConnection();
}

/**
Expand All @@ -87,6 +90,8 @@ public PrepareResult(int statementId, ColumnInformation[] columns, ColumnInforma
public void failover(int statementId, Protocol unProxiedProtocol) {
this.statementId = statementId;
this.unProxiedProtocol = unProxiedProtocol;
this.shareCounter = 1;
this.isBeingDeallocate = false;

}

Expand Down Expand Up @@ -151,4 +156,18 @@ public ColumnInformation[] getParameters() {
public Protocol getUnProxiedProtocol() {
return unProxiedProtocol;
}

/**
* If PrepareStatement was executed initially on slave connection, but after a failover, was using temporary the master connection,
* indicate if has to be reprepare on Slave connection when reconnected.
*
* @return true if has to be reprepared on Slave connection
*/
public boolean mustRePrepareOnSlave() {
return executeOnMaster == unProxiedProtocol.isMasterConnection();
}

public boolean isExecuteOnMaster() {
return executeOnMaster;
}
}
25 changes: 22 additions & 3 deletions src/test/java/org/mariadb/jdbc/failover/BaseReplication.java
Expand Up @@ -4,17 +4,16 @@
import org.junit.Assume;
import org.junit.Test;
import org.mariadb.jdbc.MariaDbServerPreparedStatement;
import org.mariadb.jdbc.internal.util.dao.PrepareResult;

import java.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public abstract class BaseReplication extends BaseMonoServer {

@Test
public void failoverSlaveToMasterPrepareStatement() throws Throwable {
Connection connection = null;
Expand All @@ -37,7 +36,8 @@ public void failoverSlaveToMasterPrepareStatement() throws Throwable {
final int currentPrepareId = getPrepareResult((MariaDbServerPreparedStatement) preparedStatement).getStatementId();
int slaveServerId = getServerId(connection);
Assert.assertFalse(masterServerId == slaveServerId);
stopProxy(slaveServerId);
//stop slave for a few seconds
stopProxy(slaveServerId, 2000);

//test failover
preparedStatement.setInt(1, 1);
Expand All @@ -50,6 +50,25 @@ public void failoverSlaveToMasterPrepareStatement() throws Throwable {

Assert.assertTrue(masterServerId == currentServerId);
Assert.assertFalse(connection.isReadOnly());
Thread.sleep(2000);
boolean hasReturnOnSlave = false;

for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
preparedStatement.setInt(1, 1);
rs = preparedStatement.executeQuery();
rs.next();
Assert.assertEquals("Harriba !", rs.getString(1));

currentServerId = getServerId(connection);
if (currentServerId != masterServerId) {
hasReturnOnSlave = true;
Assert.assertTrue(connection.isReadOnly());
break;
}
}
Assert.assertTrue("Prepare statement has not return on Slave",hasReturnOnSlave);

} finally {
if (connection != null) {
connection.close();
Expand Down

0 comments on commit d46dedc

Please sign in to comment.