diff --git a/src/main/java/org/mariadb/jdbc/HostAddress.java b/src/main/java/org/mariadb/jdbc/HostAddress.java index b80625466..0d68e4ce3 100644 --- a/src/main/java/org/mariadb/jdbc/HostAddress.java +++ b/src/main/java/org/mariadb/jdbc/HostAddress.java @@ -50,8 +50,8 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS package org.mariadb.jdbc; -import org.mariadb.jdbc.internal.util.constant.ParameterConstant; import org.mariadb.jdbc.internal.util.constant.HaMode; +import org.mariadb.jdbc.internal.util.constant.ParameterConstant; import java.util.ArrayList; import java.util.List; @@ -107,11 +107,11 @@ public static List parse(String spec, HaMode haMode) { String[] tokens = spec.trim().split(","); List arr = new ArrayList<>(tokens.length); - for (int i = 0; i < tokens.length; i++) { - if (tokens[i].startsWith("address=")) { - arr.add(parseParameterHostAddress(tokens[i])); + for (String token : tokens) { + if (token.startsWith("address=")) { + arr.add(parseParameterHostAddress(token)); } else { - arr.add(parseSimpleHostAddress(tokens[i])); + arr.add(parseSimpleHostAddress(token)); } } @@ -242,13 +242,7 @@ public boolean equals(Object obj) { HostAddress that = (HostAddress) obj; - if (port != that.port) { - return false; - } - if (host != null ? !host.equals(that.host) : that.host != null) { - return false; - } - return !(type != null ? !type.equals(that.type) : that.type != null); + return port == that.port && (host != null ? host.equals(that.host) : that.host == null && !(type != null ? !type.equals(that.type) : that.type != null)); } @@ -261,4 +255,3 @@ public int hashCode() { } - diff --git a/src/main/java/org/mariadb/jdbc/MariaDbResultSetMetaData.java b/src/main/java/org/mariadb/jdbc/MariaDbResultSetMetaData.java index 90392c3f4..0f28a38dd 100644 --- a/src/main/java/org/mariadb/jdbc/MariaDbResultSetMetaData.java +++ b/src/main/java/org/mariadb/jdbc/MariaDbResultSetMetaData.java @@ -249,7 +249,7 @@ public int getScale(final int column) throws SQLException { * @throws SQLException if a database access error occurs */ public String getTableName(final int column) throws SQLException { - if (returnTableAlias == true) { + if (returnTableAlias) { return getColumnInformation(column).getTable(); } else { return getColumnInformation(column).getOriginalTable(); diff --git a/src/main/java/org/mariadb/jdbc/internal/failover/FailoverProxy.java b/src/main/java/org/mariadb/jdbc/internal/failover/FailoverProxy.java index 1b0bf1302..b987e8582 100644 --- a/src/main/java/org/mariadb/jdbc/internal/failover/FailoverProxy.java +++ b/src/main/java/org/mariadb/jdbc/internal/failover/FailoverProxy.java @@ -85,7 +85,7 @@ public class FailoverProxy implements InvocationHandler { private Listener listener; /** - * Procy constructor. + * Proxy constructor. * @param listener failover implementation. * @param lock synchronisation lock * @throws QueryException if connection error occur @@ -245,10 +245,7 @@ private Object handleFailOver(QueryException qe, Method method, Object[] args, P * @return true if there has been a connection error that must be handled by failover */ public boolean hasToHandleFailover(QueryException exception) { - if (exception.getSqlState() != null && exception.getSqlState().startsWith("08")) { - return true; - } - return false; + return exception.getSqlState() != null && exception.getSqlState().startsWith("08"); } /** diff --git a/src/main/java/org/mariadb/jdbc/internal/failover/impl/AuroraListener.java b/src/main/java/org/mariadb/jdbc/internal/failover/impl/AuroraListener.java index 99803f87d..d207280be 100644 --- a/src/main/java/org/mariadb/jdbc/internal/failover/impl/AuroraListener.java +++ b/src/main/java/org/mariadb/jdbc/internal/failover/impl/AuroraListener.java @@ -52,20 +52,38 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS import org.mariadb.jdbc.HostAddress; import org.mariadb.jdbc.UrlParser; import org.mariadb.jdbc.internal.failover.tools.SearchFilter; +import org.mariadb.jdbc.internal.protocol.AuroraProtocol; +import org.mariadb.jdbc.internal.protocol.Protocol; import org.mariadb.jdbc.internal.queryresults.SingleExecutionResult; import org.mariadb.jdbc.internal.queryresults.resultset.MariaSelectResultSet; import org.mariadb.jdbc.internal.util.dao.QueryException; -import org.mariadb.jdbc.internal.protocol.AuroraProtocol; -import org.mariadb.jdbc.internal.protocol.Protocol; import org.mariadb.jdbc.internal.util.dao.ReconnectDuringTransactionException; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Collections; +import java.util.Date; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.TimeZone; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class AuroraListener extends MastersSlavesListener { + + private final Logger log = Logger.getLogger(AuroraListener.class.getName()); + private final Pattern clusterPattern = Pattern.compile("(.+)\\.cluster-([a-z0-9]+\\.[a-z0-9\\-]+\\.rds\\.amazonaws\\.com)"); + private final HostAddress clusterHostAddress; + private String urlEndStr = ""; + private final SimpleDateFormat sqlDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private String dbName = "information_schema"; + /** * Constructor for Aurora. * This differ from standard failover because : @@ -78,6 +96,30 @@ public AuroraListener(UrlParser urlParser) { super(urlParser); masterProtocol = null; secondaryProtocol = null; + clusterHostAddress = findClusterHostAddress(urlParser); + } + + /** + * Retrieves the cluster host address from the UrlParser instance. + * + * @param urlParser object that holds the connection information + * @return cluster host address + */ + private HostAddress findClusterHostAddress(UrlParser urlParser) { + List hostAddresses = urlParser.getHostAddresses(); + Matcher matcher; + for (HostAddress hostAddress: hostAddresses) { + matcher = clusterPattern.matcher(hostAddress.host); + if (matcher.find()) { + urlEndStr = "." + matcher.group(2); + return hostAddress; + } + } + return null; + } + + public HostAddress getClusterHostAddress() { + return clusterHostAddress; } /** @@ -138,10 +180,15 @@ public void reconnectFailedConnection(SearchFilter searchFilter) throws QueryExc } } + if (urlParser.getHostAddresses().size() < 2 + || (urlParser.getHostAddresses().size() == 2 + && urlParser.getHostAddresses().contains(getClusterHostAddress()))) { + searchFilter = new SearchFilter(true, false); + } if ((isMasterHostFail() || isSecondaryHostFail()) || searchFilter.isInitialConnection()) { //while permit to avoid case when succeeded creating a new Master connection - //and ping master connection fail a few millissecond after, + //and ping master connection fail a few milliseconds after, //resulting a masterConnection not initialized. do { AuroraProtocol.loop(this, loopAddress, searchFilter); @@ -154,13 +201,128 @@ public void reconnectFailedConnection(SearchFilter searchFilter) throws QueryExc } } while (searchFilter.isInitialConnection() && masterProtocol == null); } + + if (getCurrentProtocol() != null && !getCurrentProtocol().isClosed()) { + retrieveAllEndpointsAndSet(getCurrentProtocol()); + } + } + /** + * Retrieves the information necessary to add a new endpoint. + * Calls the methods that retrieves the instance identifiers and sets urlParser accordingly. + * + * @param protocol current protocol connected to + * @throws QueryException + */ + public void retrieveAllEndpointsAndSet(Protocol protocol) throws QueryException { + // For a given cluster, same port for all endpoints and same end host address + int port = protocol.getPort(); + if (urlEndStr.equals("") && protocol.getHost().indexOf(".") > -1){ + urlEndStr = protocol.getHost().substring(protocol.getHost().indexOf(".")); + } + + List endpoints = getCurrentEndpointIdentifiers(protocol); + if (System.getProperty("auroraFailoverTesting") != null) { + if (urlParser.getHostAddresses().size() != endpoints.size()+1) { + setUrlParserFromEndpoints(endpoints, port); + } + } else { + setUrlParserFromEndpoints(endpoints, port); + } + + } /** - * Aurora replica doesn't have the master endpoint but the master instance name. - * since the end point normally use the instance name like "instancename.some_ugly_string.region.rds.amazonaws.com", - * if an endpoint start with this instance name, it will be checked first. + * Retrieves all endpoints of a cluster from the appropriate database table. + * + * @param protocol current protocol connected to + * @return instance endpoints of the cluster + * @throws QueryException + */ + private List getCurrentEndpointIdentifiers(Protocol protocol) throws QueryException { + List endpoints = new ArrayList<>(); + try { + proxy.lock.lock(); + try { + // Deleted instance may remain in db for 24 hours so ignoring instances that have had no change for IGNORE_TIME_IN_MINUTES + Date date = new Date(); + int IGNORE_TIME_IN_MINUTES = 3; + Timestamp currentTime = new Timestamp(date.getTime() - IGNORE_TIME_IN_MINUTES*60*1000); + sqlDateFormat.setTimeZone(TimeZone.getTimeZone(protocol.getServerData("system_time_zone"))); + + SingleExecutionResult queryResult = new SingleExecutionResult(null, 0, true, false); + protocol.executeQuery(queryResult, + "select server_id, session_id from " + dbName + ".replica_host_status " + + "where last_update_timestamp > '" + sqlDateFormat.format(currentTime) + "'", + ResultSet.TYPE_FORWARD_ONLY); + MariaSelectResultSet resultSet = queryResult.getResult(); + + while (resultSet.next()) { + endpoints.add(resultSet.getString(1) + urlEndStr); + } + + } finally { + proxy.lock.unlock(); + } + } catch (SQLException se) { + log.log(Level.WARNING, "SQL exception occurred: " + se); + } catch (QueryException qe) { + if (protocol.getProxy().hasToHandleFailover(qe)) { + if (masterProtocol.equals(protocol)) { + setMasterHostFail(); + } else if (secondaryProtocol.equals(protocol)) { + setSecondaryHostFail(); + } + addToBlacklist(protocol.getHostAddress()); + reconnectFailedConnection(new SearchFilter(isMasterHostFail(), isSecondaryHostFail())); + } + } + + return endpoints; + } + + /** + * Sets urlParser if there are any changes in the instance endpoints available. + * + * @param endpoints instance identifiers + * @param port port that is common to all endpoints + */ + private void setUrlParserFromEndpoints(List endpoints, int port) { + List addresses = Collections.synchronizedList(urlParser.getHostAddresses()); + + List currentHosts = new ArrayList<>(); + synchronized (addresses) { + for (HostAddress address : addresses) { + currentHosts.add(address.host); + } + } + + synchronized (addresses) { + Iterator iterator = addresses.iterator(); + while (iterator.hasNext() && endpoints.size() > 0) { + HostAddress address = iterator.next(); + if (!endpoints.contains(address.host)) { + removeFromBlacklist(address); + iterator.remove(); + } + } + } + + synchronized (addresses) { + for (String endpoint : endpoints) { + if (!currentHosts.contains(endpoint)) { + HostAddress newHostAddress = new HostAddress(endpoint, port, null); + addresses.add(newHostAddress); + } + } + } + } + + /** + * Looks for the current master/writer instance via the secondary protocol if it is found within 3 attempts. + * Should it not be able to connect, the host is blacklisted and null is returned. + * Otherwise, it will open a new connection to the cluster endpoint and retrieve the data from there. * * @param secondaryProtocol the current secondary protocol * @param loopAddress list of possible hosts @@ -168,33 +330,107 @@ public void reconnectFailedConnection(SearchFilter searchFilter) throws QueryExc */ public HostAddress searchByStartName(Protocol secondaryProtocol, List loopAddress) { if (!isSecondaryHostFail()) { - MariaSelectResultSet queryResult = null; - try { - proxy.lock.lock(); + int checkWriterAttempts = 3; + HostAddress currentWriter = null; + + do { try { - SingleExecutionResult executionResult = new SingleExecutionResult(null, 0, true, false); - secondaryProtocol.executeQuery(false, executionResult, - "select server_id from information_schema.replica_host_status where session_id = 'MASTER_SESSION_ID'", - ResultSet.TYPE_FORWARD_ONLY); - queryResult = executionResult.getResultSet(); - queryResult.next(); - } finally { - proxy.lock.unlock(); + currentWriter = searchForMasterHostAddress(false, secondaryProtocol, loopAddress); + } catch (QueryException qe) { + if (proxy.hasToHandleFailover(qe) && setSecondaryHostFail()) { + addToBlacklist(secondaryProtocol.getHostAddress()); + return null; + } } - String masterHostName = queryResult.getString(1); - for (int i = 0; i < loopAddress.size(); i++) { - if (loopAddress.get(i).host.startsWith(masterHostName)) { - return loopAddress.get(i); + checkWriterAttempts--; + } while (currentWriter == null && checkWriterAttempts > 0); + + // Handling special case where no writer is found from secondaryProtocol + if (currentWriter == null && getClusterHostAddress() != null) { + AuroraProtocol possibleMasterProtocol = AuroraProtocol.getNewProtocol(getProxy(), getUrlParser()); + possibleMasterProtocol.setHostAddress(getClusterHostAddress()); + try { + possibleMasterProtocol.connect(); + possibleMasterProtocol.setMustBeMasterConnection(true); + foundActiveMaster(possibleMasterProtocol); + } catch (QueryException qe) { + if (proxy.hasToHandleFailover(qe)) { + addToBlacklist(possibleMasterProtocol.getHostAddress()); } } - } catch (SQLException exception) { - //eat exception because cannot happen in this getString() - } catch (QueryException qe) { - if (proxy.hasToHandleFailover(qe) && setSecondaryHostFail()) { - addToBlacklist(secondaryProtocol.getHostAddress()); + } + + return currentWriter; + } + return null; + } + + /** + * Aurora replica doesn't have the master endpoint but the master instance name. + * since the end point normally use the instance name like "instance-name.some_unique_string.region.rds.amazonaws.com", + * if an endpoint start with this instance name, it will be checked first. + * Otherwise, the endpoint ending string is extracted and used since the writer was newly created. + * + * @param protocol current protocol + * @param loopAddress list of possible hosts + * @return the probable host address or null if no valid endpoint found + * @throws QueryException + */ + private HostAddress searchForMasterHostAddress(Protocol protocol, List loopAddress) throws QueryException { + String masterHostName = null; + proxy.lock.lock(); + try { + Date date = new Date(); + int IGNORE_TIME_IN_MINUTES = 3; + Timestamp currentTime = new Timestamp(date.getTime() - IGNORE_TIME_IN_MINUTES*60*1000); + sqlDateFormat.setTimeZone(TimeZone.getTimeZone(protocol.getServerData("system_time_zone"))); + + SingleExecutionResult executionResult = new SingleExecutionResult(null, 0, true, false); + protocol.executeQuery(executionResult, + "select server_id from " + dbName + ".replica_host_status " + + "where session_id = 'MASTER_SESSION_ID' " + + "and last_update_timestamp = (" + + "select max(last_update_timestamp) from " + dbName + ".replica_host_status " + + "where session_id = 'MASTER_SESSION_ID' " + + "and last_update_timestamp > '" + currentTime + "')", + ResultSet.TYPE_FORWARD_ONLY); + MariaSelectResultSet queryResult = executionResult.getResult(); + + if (!queryResult.isBeforeFirst()) { + return null; + } else { + queryResult.next(); + masterHostName = queryResult.getString(1); + } + + } catch (SQLException sqle) { + //eat exception because cannot happen in this getString() + } finally { + proxy.lock.unlock(); + } + + Matcher matcher; + if (masterHostName != null) { + for (HostAddress hostAddress: loopAddress) { + matcher = clusterPattern.matcher(hostAddress.host); + if (hostAddress.host.startsWith(masterHostName) && !matcher.find()) { + return hostAddress; } } + + HostAddress masterHostAddress; + if (urlEndStr.equals("") && protocol.getHost().indexOf(".") > -1) { + urlEndStr = protocol.getHost().substring(protocol.getHost().indexOf(".")); + } else { + return null; + } + + masterHostAddress = new HostAddress(masterHostName + urlEndStr, protocol.getPort(), null); + loopAddress.add(masterHostAddress); + urlParser.setHostAddresses(loopAddress); + return masterHostAddress; } + return null; } diff --git a/src/main/java/org/mariadb/jdbc/internal/failover/impl/MastersFailoverListener.java b/src/main/java/org/mariadb/jdbc/internal/failover/impl/MastersFailoverListener.java index 447fd0941..5b670992b 100644 --- a/src/main/java/org/mariadb/jdbc/internal/failover/impl/MastersFailoverListener.java +++ b/src/main/java/org/mariadb/jdbc/internal/failover/impl/MastersFailoverListener.java @@ -282,7 +282,7 @@ public void foundActiveMaster(Protocol protocol) throws QueryException { * @throws QueryException if reconnect a new connection but there was an active transaction. */ public void reconnect() throws QueryException { - boolean inTransaction = currentProtocol != null && currentProtocol.inTransaction();; + boolean inTransaction = currentProtocol != null && currentProtocol.inTransaction(); reconnectFailedConnection(new SearchFilter(true, false)); handleFailLoop(); if (inTransaction) { diff --git a/src/main/java/org/mariadb/jdbc/internal/protocol/AuroraProtocol.java b/src/main/java/org/mariadb/jdbc/internal/protocol/AuroraProtocol.java index 1ffa0f891..b6d85e148 100644 --- a/src/main/java/org/mariadb/jdbc/internal/protocol/AuroraProtocol.java +++ b/src/main/java/org/mariadb/jdbc/internal/protocol/AuroraProtocol.java @@ -66,6 +66,7 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS import java.util.concurrent.locks.ReentrantLock; public class AuroraProtocol extends MastersSlavesProtocol { + public AuroraProtocol(final UrlParser url, final ReentrantLock lock) { super(url, lock); } @@ -76,11 +77,8 @@ public AuroraProtocol(final UrlParser url, final ReentrantLock lock) { * * @param listener aurora failover to call back if master is found * @param probableMaster probable master host - * @param searchFilter search filter - * @throws QueryException exception */ - public static void searchProbableMaster(AuroraListener listener, HostAddress probableMaster, - SearchFilter searchFilter) throws QueryException { + public static void searchProbableMaster(AuroraListener listener, HostAddress probableMaster) { AuroraProtocol protocol = getNewProtocol(listener.getProxy(), listener.getUrlParser()); try { @@ -122,6 +120,13 @@ public static void loop(AuroraListener listener, final List address } int maxConnectionTry = listener.getRetriesAllDown(); QueryException lastQueryException = null; + HostAddress probableMasterHost = null; + + // Only one address means cluster so only possible connection + if (listener.getClusterHostAddress() != null && loopAddresses.size() < 2 + && !loopAddresses.contains(listener.getClusterHostAddress())) { + loopAddresses.add(listener.getClusterHostAddress()); + } while (!loopAddresses.isEmpty() || (!searchFilter.isFailoverLoop() && maxConnectionTry > 0)) { protocol = getNewProtocol(listener.getProxy(), listener.getUrlParser()); @@ -134,7 +139,16 @@ public static void loop(AuroraListener listener, final List address try { HostAddress host = loopAddresses.pollFirst(); if (host == null) { - loopAddresses.addAll(listener.getUrlParser().getHostAddresses()); + for (HostAddress hostAddress : listener.getUrlParser().getHostAddresses()) { + if (!hostAddress.equals(listener.getClusterHostAddress())) { + loopAddresses.add(hostAddress); + } + } + // Use cluster last as backup + if (listener.getClusterHostAddress() != null && listener.getUrlParser().getHostAddresses().size() < 2) { + loopAddresses.add(listener.getClusterHostAddress()); + } + host = loopAddresses.pollFirst(); } protocol.setHostAddress(host); @@ -148,24 +162,41 @@ public static void loop(AuroraListener listener, final List address listener.removeFromBlacklist(protocol.getHostAddress()); if (listener.isMasterHostFailReconnect() && protocol.isMasterConnection()) { - if (foundMaster(listener, protocol, searchFilter)) { - return; + // Look for secondary when only known endpoint is the cluster endpoint + if (searchFilter.isFineIfFoundOnlyMaster() && listener.getUrlParser().getHostAddresses().size() <= 1 + && protocol.getHostAddress().equals(listener.getClusterHostAddress())) { + listener.retrieveAllEndpointsAndSet(protocol); + if (listener.getUrlParser().getHostAddresses().size() > 1) { + searchFilter = new SearchFilter(false); + loopAddresses.addAll(listener.getUrlParser().getHostAddresses()); + } } - } else if (listener.isSecondaryHostFailReconnect() && !protocol.isMasterConnection()) { - if (foundSecondary(listener, protocol, searchFilter)) { + if (foundMaster(listener, protocol, searchFilter)) { return; } - HostAddress probableMasterHost = listener.searchByStartName(protocol, listener.getUrlParser().getHostAddresses()); - if (probableMasterHost != null) { - loopAddresses.remove(probableMasterHost); - AuroraProtocol.searchProbableMaster(listener, probableMasterHost, searchFilter); - if (listener.isMasterHostFailReconnect() && searchFilter.isFineIfFoundOnlySlave()) { + + } else if (!protocol.isMasterConnection()) { + if (listener.isSecondaryHostFailReconnect()) { + if (foundSecondary(listener, protocol, searchFilter)) { return; } } + + if (listener.isSecondaryHostFailReconnect() + || (listener.isMasterHostFailReconnect() && probableMasterHost == null)) { + probableMasterHost = listener.searchByStartName(protocol, listener.getUrlParser().getHostAddresses()); + if (probableMasterHost != null) { + loopAddresses.remove(probableMasterHost); + AuroraProtocol.searchProbableMaster(listener, probableMasterHost); + if (listener.isMasterHostFailReconnect() && searchFilter.isFineIfFoundOnlySlave()) { + return; + } + } + } } else { protocol.close(); } + } catch (QueryException e) { lastQueryException = e; listener.addToBlacklist(protocol.getHostAddress()); @@ -181,9 +212,14 @@ public static void loop(AuroraListener listener, final List address loopAddresses = new ArrayDeque<>(listener.getBlacklistKeys()); } + // Try to connect to the cluster if no other connection is good + if (maxConnectionTry == 0 && !loopAddresses.contains(listener.getClusterHostAddress()) && listener.getClusterHostAddress() != null) { + loopAddresses.add(listener.getClusterHostAddress()); + } + } - if (listener.isMasterHostFailReconnect() || listener.isMasterHostFailReconnect()) { + if (listener.isMasterHostFailReconnect() || listener.isSecondaryHostFailReconnect()) { String error = "No active connection found for replica"; if (listener.isMasterHostFailReconnect()) { error = "No active connection found for master"; @@ -196,7 +232,7 @@ public static void loop(AuroraListener listener, final List address } /** - * Initilize new protocol instance. + * Initialize new protocol instance. * @param proxy proxy * @param urlParser connection string data's * @return new AuroraProtocol diff --git a/src/main/java/org/mariadb/jdbc/internal/queryresults/SingleExecutionResult.java b/src/main/java/org/mariadb/jdbc/internal/queryresults/SingleExecutionResult.java index e1593c401..6505c7690 100644 --- a/src/main/java/org/mariadb/jdbc/internal/queryresults/SingleExecutionResult.java +++ b/src/main/java/org/mariadb/jdbc/internal/queryresults/SingleExecutionResult.java @@ -138,9 +138,7 @@ public int getFirstAffectedRows() { return (int) affectedRows; } - public void addStatsError() { - ; - } + public void addStatsError() { } public MariaSelectResultSet getResultSet() { return result; diff --git a/src/main/java/org/mariadb/jdbc/internal/queryresults/resultset/MariaSelectResultSet.java b/src/main/java/org/mariadb/jdbc/internal/queryresults/resultset/MariaSelectResultSet.java index 31e925927..1673687b7 100644 --- a/src/main/java/org/mariadb/jdbc/internal/queryresults/resultset/MariaSelectResultSet.java +++ b/src/main/java/org/mariadb/jdbc/internal/queryresults/resultset/MariaSelectResultSet.java @@ -595,10 +595,7 @@ public boolean isBeforeFirst() throws SQLException { @Override public boolean isAfterLast() throws SQLException { checkClose(); - if (dataFetchTime > 0) { - return rowPointer >= resultSetSize && resultSetSize > 0; - } - return false; + return dataFetchTime > 0 && rowPointer >= resultSetSize && resultSetSize > 0; } @Override @@ -715,11 +712,7 @@ public boolean previous() throws SQLException { } else { if (rowPointer > -1) { rowPointer--; - if (rowPointer == -1) { - return false; - } else { - return true; - } + return rowPointer != -1; } return false; } diff --git a/src/main/java/org/mariadb/jdbc/internal/stream/PacketOutputStream.java b/src/main/java/org/mariadb/jdbc/internal/stream/PacketOutputStream.java index d7bbce105..0e853c8c8 100644 --- a/src/main/java/org/mariadb/jdbc/internal/stream/PacketOutputStream.java +++ b/src/main/java/org/mariadb/jdbc/internal/stream/PacketOutputStream.java @@ -427,10 +427,7 @@ public void writeUnsafe(byte[] bytes) { * @return true if with this additional length stream can be send in the same stream */ public boolean checkRewritableLength(int length) { - if (checkPacketLength && (buffer.position() + length > maxAllowedPacket - 1)) { - return false; - } - return true; + return !(checkPacketLength && (buffer.position() + length > maxAllowedPacket - 1)); } private void checkPacketMaxSize(int limit) throws MaxAllowedPacketException { diff --git a/src/main/java/org/mariadb/jdbc/internal/util/Utils.java b/src/main/java/org/mariadb/jdbc/internal/util/Utils.java index 0d1b4752c..9bb68b694 100644 --- a/src/main/java/org/mariadb/jdbc/internal/util/Utils.java +++ b/src/main/java/org/mariadb/jdbc/internal/util/Utils.java @@ -301,7 +301,7 @@ private static String resolveEscapes(String escaped, boolean noBackslashEscapes) /** * Escape sql String - * @param sql intial sql + * @param sql initial sql * @param noBackslashEscapes must backslash be escape * @return escaped sql string * @throws SQLException if escape sequence is incorrect. @@ -433,7 +433,7 @@ public static String nativeSql(String sql, boolean noBackslashEscapes) throws SQ } /** - * Retreive protocol corresponding to the failover options. + * Retrieve protocol corresponding to the failover options. * if no failover option, protocol will not be proxied. * if a failover option is precised, protocol will be proxied so that any connection error will be handle directly. * @@ -447,13 +447,6 @@ public static Protocol retrieveProxy(final UrlParser urlParser, final ReentrantL Protocol protocol; switch (urlParser.getHaMode()) { case AURORA: - if (urlParser.getHostAddresses().size() == 1) { - //single node cluster consider like "FAILOVER" - return getProxyLoggingIfNeeded(urlParser, (Protocol) Proxy.newProxyInstance( - MasterProtocol.class.getClassLoader(), - new Class[]{Protocol.class}, - new FailoverProxy(new MastersFailoverListener(urlParser), lock))); - } return getProxyLoggingIfNeeded(urlParser, (Protocol) Proxy.newProxyInstance( AuroraProtocol.class.getClassLoader(), new Class[]{Protocol.class}, diff --git a/src/main/java/org/mariadb/jdbc/internal/util/dao/CallableStatementCacheKey.java b/src/main/java/org/mariadb/jdbc/internal/util/dao/CallableStatementCacheKey.java index 52f7099d1..08004fa59 100644 --- a/src/main/java/org/mariadb/jdbc/internal/util/dao/CallableStatementCacheKey.java +++ b/src/main/java/org/mariadb/jdbc/internal/util/dao/CallableStatementCacheKey.java @@ -68,10 +68,7 @@ public boolean equals(Object object) { return false; } CallableStatementCacheKey that = (CallableStatementCacheKey) object; - if (!database.equals(that.database)) { - return false; - } - return query.equals(that.query); + return database.equals(that.database) && query.equals(that.query); } diff --git a/src/main/java/org/mariadb/jdbc/internal/util/dao/PrepareStatementCacheKey.java b/src/main/java/org/mariadb/jdbc/internal/util/dao/PrepareStatementCacheKey.java index 1cee13e29..2ca248674 100644 --- a/src/main/java/org/mariadb/jdbc/internal/util/dao/PrepareStatementCacheKey.java +++ b/src/main/java/org/mariadb/jdbc/internal/util/dao/PrepareStatementCacheKey.java @@ -67,10 +67,7 @@ public boolean equals(Object object) { return false; } PrepareStatementCacheKey that = (PrepareStatementCacheKey) object; - if (!database.equals(that.database)) { - return false; - } - return query.equals(that.query); + return database.equals(that.database) && query.equals(that.query); } diff --git a/src/test/java/org/mariadb/jdbc/BaseTest.java b/src/test/java/org/mariadb/jdbc/BaseTest.java index c1324811c..7fa80eac5 100644 --- a/src/test/java/org/mariadb/jdbc/BaseTest.java +++ b/src/test/java/org/mariadb/jdbc/BaseTest.java @@ -471,12 +471,12 @@ boolean hasSuperPrivilege(String testName) throws SQLException { // first test for specific user and host combination ResultSet rs = st.executeQuery("SELECT Super_Priv FROM mysql.user WHERE user = '" + username + "' AND host = '" + hostname + "'"); if (rs.next()) { - superPrivilege = (rs.getString(1).equals("Y") ? true : false); + superPrivilege = (rs.getString(1).equals("Y")); } else { // then check for user on whatever (%) host rs = st.executeQuery("SELECT Super_Priv FROM mysql.user WHERE user = '" + username + "' AND host = '%'"); if (rs.next()) { - superPrivilege = (rs.getString(1).equals("Y") ? true : false); + superPrivilege = (rs.getString(1).equals("Y")); } } @@ -502,7 +502,7 @@ boolean isLocalConnection(String testName) { // do nothing } - if (isLocal == false) { + if (!isLocal) { System.out.println("test '" + testName + "' skipped because connection is not local"); } diff --git a/src/test/java/org/mariadb/jdbc/BigQueryTest.java b/src/test/java/org/mariadb/jdbc/BigQueryTest.java index 2b19fb5cd..d4c07a17a 100644 --- a/src/test/java/org/mariadb/jdbc/BigQueryTest.java +++ b/src/test/java/org/mariadb/jdbc/BigQueryTest.java @@ -92,7 +92,7 @@ public void testError() throws SQLException { rs.next(); assertEquals(arr.length, rs.getString(1).length()); } finally { - connection.close(); + if (connection != null) { connection.close(); } } } diff --git a/src/test/java/org/mariadb/jdbc/DatabaseMetadataTest.java b/src/test/java/org/mariadb/jdbc/DatabaseMetadataTest.java index a11af649a..6a8622f8b 100644 --- a/src/test/java/org/mariadb/jdbc/DatabaseMetadataTest.java +++ b/src/test/java/org/mariadb/jdbc/DatabaseMetadataTest.java @@ -362,12 +362,12 @@ public void testGetTables3() throws SQLException { String tableName = tableSet.getString("TABLE_NAME"); assertEquals("table_type_test", tableName); - + String tableType = tableSet.getString("TABLE_TYPE"); assertEquals("TABLE", tableType); // see for possible values https://docs.oracle.com/javase/7/docs/api/java/sql/DatabaseMetaData.html#getTableTypes%28%29 } - + @Test public void testGetColumns() throws SQLException { DatabaseMetaData dbmd = sharedConnection.getMetaData(); @@ -390,34 +390,42 @@ void testResultSetColumns(ResultSet rs, String spec) throws SQLException { int col = i + 1; assertEquals(label, rsmd.getColumnLabel(col)); int columnType = rsmd.getColumnType(col); - if (type.equals("String")) { - assertTrue("invalid type " + columnType + " for " + rsmd.getColumnLabel(col) + ",expected String", - columnType == Types.VARCHAR - || columnType == Types.NULL - || columnType == Types.LONGVARCHAR); - } else if ("decimal".equals(type)) { - assertTrue("invalid type " + columnType + "( " + rsmd.getColumnTypeName(col) + " ) for " - + rsmd.getColumnLabel(col) + ",expected decimal", - columnType == Types.DECIMAL); - - } else if ("int".equals(type) || "short".equals(type)) { - assertTrue("invalid type " + columnType + "( " + rsmd.getColumnTypeName(col) + " ) for " - + rsmd.getColumnLabel(col) + ",expected numeric", - columnType == Types.BIGINT - || columnType == Types.INTEGER - || columnType == Types.SMALLINT - || columnType == Types.TINYINT); - - } else if (type.equals("boolean")) { - assertTrue("invalid type " + columnType + "( " + rsmd.getColumnTypeName(col) + " ) for " - + rsmd.getColumnLabel(col) + ",expected boolean", - columnType == Types.BOOLEAN || columnType == Types.BIT); - - } else if (type.equals("null")) { - assertTrue("invalid type " + columnType + " for " + rsmd.getColumnLabel(col) + ",expected null", - columnType == Types.NULL); - } else { - assertTrue("invalid type '" + type + "'", false); + switch (type) { + case "String": + assertTrue("invalid type " + columnType + " for " + rsmd.getColumnLabel(col) + ",expected String", + columnType == Types.VARCHAR + || columnType == Types.NULL + || columnType == Types.LONGVARCHAR); + break; + case "decimal": + assertTrue("invalid type " + columnType + "( " + rsmd.getColumnTypeName(col) + " ) for " + + rsmd.getColumnLabel(col) + ",expected decimal", + columnType == Types.DECIMAL); + break; + case "int": + case "short": + + assertTrue("invalid type " + columnType + "( " + rsmd.getColumnTypeName(col) + " ) for " + + rsmd.getColumnLabel(col) + ",expected numeric", + columnType == Types.BIGINT + || columnType == Types.INTEGER + || columnType == Types.SMALLINT + || columnType == Types.TINYINT); + + break; + case "boolean": + assertTrue("invalid type " + columnType + "( " + rsmd.getColumnTypeName(col) + " ) for " + + rsmd.getColumnLabel(col) + ",expected boolean", + columnType == Types.BOOLEAN || columnType == Types.BIT); + + break; + case "null": + assertTrue("invalid type " + columnType + " for " + rsmd.getColumnLabel(col) + ",expected null", + columnType == Types.NULL); + break; + default: + assertTrue("invalid type '" + type + "'", false); + break; } } } diff --git a/src/test/java/org/mariadb/jdbc/JdbcParserTest.java b/src/test/java/org/mariadb/jdbc/JdbcParserTest.java index 3de56f35c..94a43525a 100644 --- a/src/test/java/org/mariadb/jdbc/JdbcParserTest.java +++ b/src/test/java/org/mariadb/jdbc/JdbcParserTest.java @@ -8,6 +8,8 @@ import java.sql.SQLException; import java.util.Properties; +import static org.junit.Assert.fail; + public class JdbcParserTest { @Test @@ -60,7 +62,7 @@ public void testOptionTakeDefault() throws Throwable { @Test public void testOptionTakeDefaultAurora() throws Throwable { - UrlParser jdbc = UrlParser.parse("jdbc:mysql:aurora://localhost/test"); + UrlParser jdbc = UrlParser.parse("jdbc:mysql:aurora://cluster-identifier.cluster-customerID.region.rds.amazonaws.com/test"); Assert.assertNull(jdbc.getOptions().connectTimeout); Assert.assertTrue(jdbc.getOptions().validConnectionTimeout == 120); Assert.assertFalse(jdbc.getOptions().autoReconnect); @@ -112,7 +114,7 @@ public void testOptionParseIntegerMinimum() throws Throwable { public void testOptionParseIntegerNotPossible() throws Throwable { UrlParser.parse("jdbc:mysql://localhost/test?user=root&autoReconnect=true&validConnectionTimeout=-2" + "&connectTimeout=5"); - Assert.fail(); + fail(); } @Test() @@ -140,6 +142,20 @@ public void testJdbcParserSimpleIpv4basicwithoutDatabase() throws SQLException { Assert.assertTrue(new HostAddress("slave2", 3308).equals(urlParser.getHostAddresses().get(2))); } + @Test + public void testJdbcParserWithoutDatabaseWithProperties() throws SQLException { + String url = "jdbc:mysql://master:3306,slave1:3307,slave2:3308?autoReconnect=true"; + UrlParser urlParser = UrlParser.parse(url); + Assert.assertNull(urlParser.getDatabase()); + Assert.assertNull(urlParser.getUsername()); + Assert.assertNull(urlParser.getPassword()); + Assert.assertTrue(urlParser.getOptions().autoReconnect); + Assert.assertTrue(urlParser.getHostAddresses().size() == 3); + Assert.assertTrue(new HostAddress("master", 3306).equals(urlParser.getHostAddresses().get(0))); + Assert.assertTrue(new HostAddress("slave1", 3307).equals(urlParser.getHostAddresses().get(1))); + Assert.assertTrue(new HostAddress("slave2", 3308).equals(urlParser.getHostAddresses().get(2))); + } + @Test public void testJdbcParserSimpleIpv4Properties() throws SQLException { String url = "jdbc:mysql://master:3306,slave1:3307,slave2:3308/database?autoReconnect=true"; @@ -158,6 +174,24 @@ public void testJdbcParserSimpleIpv4Properties() throws SQLException { Assert.assertTrue(new HostAddress("slave2", 3308).equals(urlParser.getHostAddresses().get(2))); } + @Test + public void testJdbcParserSimpleIpv4PropertiesReversedOrder() throws SQLException { + String url = "jdbc:mysql://master:3306,slave1:3307,slave2:3308?autoReconnect=true/database"; + Properties prop = new Properties(); + prop.setProperty("user", "greg"); + prop.setProperty("password", "pass"); + + UrlParser urlParser = UrlParser.parse(url, prop); + Assert.assertTrue("database".equals(urlParser.getDatabase())); + Assert.assertTrue("greg".equals(urlParser.getUsername())); + Assert.assertTrue("pass".equals(urlParser.getPassword())); + Assert.assertTrue(urlParser.getOptions().autoReconnect); + Assert.assertTrue(urlParser.getHostAddresses().size() == 3); + Assert.assertTrue(new HostAddress("master", 3306).equals(urlParser.getHostAddresses().get(0))); + Assert.assertTrue(new HostAddress("slave1", 3307).equals(urlParser.getHostAddresses().get(1))); + Assert.assertTrue(new HostAddress("slave2", 3308).equals(urlParser.getHostAddresses().get(2))); + } + @Test public void testJdbcParserSimpleIpv4() throws SQLException { String url = "jdbc:mysql://master:3306,slave1:3307,slave2:3308/database?user=greg&password=pass"; @@ -208,7 +242,7 @@ public void testJdbcParserParameterErrorEqual() { + "(host=master2),address=(type=slave)(host=slave1)(port=3308)/database?user=greg&password=pass"; try { UrlParser.parse(url); - Assert.fail(); + fail(); } catch (SQLException e) { Assert.assertTrue(true); } @@ -256,7 +290,7 @@ public void testJdbcParserReplicationParameterWithoutType() throws SQLException @Test public void testJdbcParserHaModeLoadAurora() throws SQLException { - String url = "jdbc:mysql:aurora://localhost/database"; + String url = "jdbc:mysql:aurora://cluster-identifier.cluster-customerID.region.rds.amazonaws.com/database"; UrlParser jdbc = UrlParser.parse(url); Assert.assertTrue(jdbc.getHaMode().equals(HaMode.AURORA)); } diff --git a/src/test/java/org/mariadb/jdbc/failover/AuroraAutoDiscoveryTest.java b/src/test/java/org/mariadb/jdbc/failover/AuroraAutoDiscoveryTest.java new file mode 100644 index 000000000..b2704e131 --- /dev/null +++ b/src/test/java/org/mariadb/jdbc/failover/AuroraAutoDiscoveryTest.java @@ -0,0 +1,328 @@ +package org.mariadb.jdbc.failover; + +import com.amazonaws.services.rds.model.DBInstanceNotFoundException; +import com.amazonaws.services.rds.model.InvalidDBClusterStateException; +import com.amazonaws.services.rds.model.InvalidDBInstanceStateException; +import com.amazonaws.services.rds.model.ModifyDBInstanceRequest; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.mariadb.jdbc.HostAddress; +import org.mariadb.jdbc.internal.protocol.Protocol; +import org.mariadb.jdbc.internal.util.constant.HaMode; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class AuroraAutoDiscoveryTest extends BaseMultiHostTest { + + /** + * Initialisation. + * + * @throws SQLException exception + */ + @BeforeClass() + public static void beforeClass2() throws SQLException { + System.setProperty("auroraFailoverTesting", "true"); + proxyUrl = proxyAuroraUrl; + System.out.println("environment variable \"AURORA\" value : " + System.getenv("AURORA")); + Assume.assumeTrue(initialAuroraUrl != null && System.getenv("AURORA") != null && amazonRDSClient != null); + } + + /** + * Initialisation. + * + * @throws SQLException exception + */ + @Before + public void init() throws SQLException { + defaultUrl = initialAuroraUrl; + currentType = HaMode.AURORA; + } + + /** + * Creates a mock replica_host_status table to imitate the database used to retrieve information about the endpoints. + * @param insertEntryQuery - Query to insert a new entry into the table before running the tests + * @throws SQLException + */ + private Connection tableSetup(String insertEntryQuery) throws Throwable { + Connection connection = null; + Statement statement = null; + + try { + connection = getNewConnection(true); + statement = connection.createStatement(); + statement.executeQuery("DROP TABLE IF EXISTS replica_host_status"); + statement.executeQuery("CREATE TABLE replica_host_status (SERVER_ID VARCHAR(255), SESSION_ID VARCHAR(255), LAST_UPDATE_TIMESTAMP TIMESTAMP DEFAULT NOW())"); + + ResultSet resultSet = statement.executeQuery("SELECT SERVER_ID, SESSION_ID, LAST_UPDATE_TIMESTAMP " + + "FROM information_schema.replica_host_status " + + "WHERE LAST_UPDATE_TIMESTAMP = (" + + "SELECT MAX(LAST_UPDATE_TIMESTAMP) " + + "FROM information_schema.replica_host_status)"); + + while (resultSet.next()) { + String values = ""; + for (int i = 1; i < 4; i++) { + values += (i == 1) ? "'localhost'" : ",'" + resultSet.getString(i) + "'"; + } + statement.executeQuery("INSERT INTO replica_host_status (SERVER_ID, SESSION_ID, LAST_UPDATE_TIMESTAMP) " + + "VALUES (" + values + ")"); + } + + if (insertEntryQuery != null) { + statement.executeQuery(insertEntryQuery); + } + + try { + setDbName(connection, "testj"); + } catch (Throwable t) { + fail("Unable to set database for testing"); + } + + int serverId = getServerId(connection); + stopProxy(serverId, 1); + statement = connection.createStatement(); + statement.executeQuery("select 1"); + + } catch (SQLException se) { + fail("Unable to execute queries to set up table: " + se); + } finally { + if (statement != null) { statement.close(); } + } + + return connection; + } + + /** + * Takes down the table created solely for these tests. + * @throws SQLException + */ + @After + public void after() throws SQLException { + Connection connection = null; + Statement statement = null; + try { + connection = getNewConnection(true); + statement = connection.createStatement(); + statement.executeQuery("DROP TABLE IF EXISTS replica_host_status"); + } finally { + if (statement != null) { statement.close(); } + if (connection != null) { connection.close(); } + } + } + + /** + * Test verifies that the driver discovers new instances as soon as they are available. + * @throws Throwable + */ + @Test + public void testDiscoverCreatedInstanceOnFailover() throws Throwable { + Connection connection = null; + Statement statement = null; + + try { + connection = tableSetup(null); + int masterServerId = getServerId(connection); + int initialSize = getProtocolFromConnection(connection).getUrlParser().getHostAddresses().size(); + + statement = connection.createStatement(); + statement.executeQuery("INSERT INTO replica_host_status (SERVER_ID, SESSION_ID) " + + "VALUES ('test-discovery-on-creation', 'mock-new-endpoint')"); + + stopProxy(masterServerId, 1); + statement.executeQuery("select 1"); + + List finalEndpoints = getProtocolFromConnection(connection).getUrlParser().getHostAddresses(); + boolean newEndpointFound = foundHostInList(finalEndpoints, "test-discovery-on-creation"); + + assertTrue("Discovered new endpoint on failover", newEndpointFound); + Assert.assertEquals(initialSize+1, finalEndpoints.size()); + + } catch (SQLException se) { + fail("Unable to execute query:" + se); + } finally { + if (statement != null) { statement.close(); } + if (connection != null) { connection.close(); } + } + } + + /** + * Test verifies that deleted instances are removed from the possible connections. + * @throws Throwable + */ + @Test + public void testRemoveDeletedInstanceOnFailover() throws Throwable { + Connection connection = null; + Statement statement = null; + try { + connection = tableSetup("INSERT INTO replica_host_status (SERVER_ID, SESSION_ID) " + + "VALUES ('test-instance-deleted-detection', 'mock-delete-endpoint')"); + Protocol protocol = getProtocolFromConnection(connection); + int initialSize = protocol.getUrlParser().getHostAddresses().size(); + int serverId = getServerId(connection); + + statement = connection.createStatement(); + statement.executeQuery("UPDATE replica_host_status " + + "SET LAST_UPDATE_TIMESTAMP = DATE_SUB(LAST_UPDATE_TIMESTAMP, INTERVAL 4 MINUTE) " + + "WHERE SERVER_ID = 'test-instance-deleted-detection'"); + stopProxy(serverId, 1); + statement.executeQuery("select 1"); + + List finalEndpoints = protocol.getUrlParser().getHostAddresses(); + boolean deletedInstanceGone = !foundHostInList(finalEndpoints, "test-instance-deleted-detection"); + + assertTrue("Removed deleted endpoint from urlParser", deletedInstanceGone); + Assert.assertEquals(initialSize - 1, finalEndpoints.size()); + + } catch (SQLException se) { + fail("Unable to execute query: " + se); + } finally { + if (statement != null) { statement.close(); } + if (connection != null) { connection.close(); } + } + } + + /** + * Must set newlyCreatedInstance system property in which the instance is not the current writer. + * The best way to test is to create a new instance as the test is started. + * All other instances should have a promotion tier greater than zero. + * Test checks if a newly created instance that is promoted as the writer is found and connected to right away. + * @throws Throwable + */ + @Test + public void testNewInstanceAsWriterDetection() throws Throwable { + Assume.assumeTrue("System property newlyCreatedInstance is set", System.getProperty("newlyCreatedInstance") != null); + + Connection connection = null; + Statement statement = null; + try { + connection = getNewConnection(false); + String initialHost = getProtocolFromConnection(connection).getHost(); + + ModifyDBInstanceRequest request1 = new ModifyDBInstanceRequest(); + request1.setDBInstanceIdentifier(System.getProperty("newlyCreatedInstance")); + request1.setPromotionTier(0); + + boolean promotionTierChanged; + do { + try { + amazonRDSClient.modifyDBInstance(request1); + promotionTierChanged = true; + } catch (InvalidDBInstanceStateException | DBInstanceNotFoundException e) { + promotionTierChanged = false; + } + } while (!promotionTierChanged); + + try { + Thread.sleep(10*1000); // Should have completed modification + } catch (InterruptedException e) { + fail("Thread sleep was interrupted"); + } + + launchAuroraFailover(); + try { + Thread.sleep(30*1000); // Should have failed over + } catch (InterruptedException e) { + fail("Thread sleep was interrupted"); + } + + statement = connection.createStatement(); + statement.executeQuery("select 1"); + + String newHost = getProtocolFromConnection(connection).getHost(); + assertTrue("Connected to new writer", !initialHost.equals(newHost)); + Assert.assertEquals(System.getProperty("newlyCreatedInstance"), newHost.substring(0, newHost.indexOf("."))); + + } catch (SQLException se) { + fail("Unable to execute query: " + se); + } finally { + if (statement != null) { statement.close(); } + if (connection != null) { connection.close(); } + } + } + + @Test + public void testExceptionHandlingWhenDataFromTable() throws Throwable { + Connection connection = null; + try { + connection = getNewConnection(false); + String initialHost = getProtocolFromConnection(connection).getHost(); + + final Statement statement = connection.createStatement(); + Thread queryThread = new Thread() { + public void run() { + long startTime = System.nanoTime(); + long stopTime = System.nanoTime(); + try { + while (Math.abs(TimeUnit.NANOSECONDS.toMillis(stopTime-startTime)) < 1000) { + stopTime = System.nanoTime(); + statement.executeQuery("SELECT 1"); + startTime = System.nanoTime(); + } + } catch (SQLException se) { + se.printStackTrace(); + } + } + }; + + Thread failoverThread = new Thread() { + public void run() { + do { + try { + launchAuroraFailover(); + } catch (InvalidDBClusterStateException e) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + // Expected because may occur due to queryThread + } + } + } while (!isInterrupted()); + } + }; + + queryThread.start(); + failoverThread.start(); + queryThread.join(); + if (!queryThread.isAlive()) { + failoverThread.interrupt(); + } + + if (statement != null) { statement.close(); } + + Set hostAddresses = getProtocolFromConnection(connection).getProxy().getListener().getBlacklistKeys(); + boolean connectionBlacklisted = foundHostInList(hostAddresses, initialHost); + assertTrue("Connection has been blacklisted", connectionBlacklisted); + + } catch (SQLException se) { + fail("Unable to execute query: " + se); + } finally { + if (connection != null) { connection.close(); } + } + } + + private boolean foundHostInList(Collection hostAddresses, String hostIdentifier) { + for (HostAddress hostAddress: hostAddresses) { + if (hostAddress.host.indexOf(hostIdentifier) > -1) { + return true; + } + } + return false; + } +} diff --git a/src/test/java/org/mariadb/jdbc/failover/AuroraFailoverTest.java b/src/test/java/org/mariadb/jdbc/failover/AuroraFailoverTest.java index 839441575..a09df4f9b 100644 --- a/src/test/java/org/mariadb/jdbc/failover/AuroraFailoverTest.java +++ b/src/test/java/org/mariadb/jdbc/failover/AuroraFailoverTest.java @@ -1,5 +1,6 @@ package org.mariadb.jdbc.failover; +import com.amazonaws.services.rds.model.InvalidDBClusterStateException; import org.junit.*; import org.mariadb.jdbc.MariaDbServerPreparedStatement; import org.mariadb.jdbc.internal.protocol.Protocol; @@ -9,7 +10,6 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; -import static org.junit.Assert.assertTrue; /** * Aurora test suite. @@ -30,6 +30,7 @@ public class AuroraFailoverTest extends BaseReplication { */ @BeforeClass() public static void beforeClass2() throws SQLException { + System.setProperty("auroraFailoverTesting", "true"); proxyUrl = proxyAuroraUrl; System.out.println("environment variable \"AURORA\" value : " + System.getenv("AURORA")); Assume.assumeTrue(initialAuroraUrl != null && System.getenv("AURORA") != null && amazonRDSClient != null); @@ -101,19 +102,23 @@ public void testFailMaster() throws Throwable { Connection connection = null; try { connection = getNewConnection("&retriesAllDown=3&connectTimeout=1000", true); + int previousPort = getProtocolFromConnection(connection).getPort(); Statement stmt = connection.createStatement(); int masterServerId = getServerId(connection); stopProxy(masterServerId); long stopTime = System.nanoTime(); try { + // Handles failover so may connect to another and is still able to execute stmt.execute("SELECT 1"); - Assert.fail(); + if (getProtocolFromConnection(connection).getPort() == previousPort) { + Assert.fail(); + } } catch (SQLException e) { //normal error } Assert.assertTrue(!connection.isReadOnly()); long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - stopTime); - Assert.assertTrue(duration < 20 * 1000); + Assert.assertTrue(duration < 25 * 1000); } finally { if (connection != null) { connection.close(); @@ -268,8 +273,18 @@ public void failoverPrepareStatementOnSlave() throws Throwable { nbExecutionOnMasterFirstFailover++; } } - launchAuroraFailover(); - while (nbExecutionOnSlave + nbExecutionOnMasterSecondFailover < 5000) { + + boolean invalidClusterState; + do { + try { + launchAuroraFailover(); + invalidClusterState = false; + } catch (InvalidDBClusterStateException e) { + invalidClusterState = true; + } + } while (invalidClusterState); + + while (nbExecutionOnSlave + nbExecutionOnMasterSecondFailover < 1000) { ResultSet rs = preparedStatement.executeQuery(); rs.next(); if (rs.getInt(1) == 1) { diff --git a/src/test/java/org/mariadb/jdbc/failover/BaseMonoServer.java b/src/test/java/org/mariadb/jdbc/failover/BaseMonoServer.java index 3b7a0f7b7..e5506cab2 100644 --- a/src/test/java/org/mariadb/jdbc/failover/BaseMonoServer.java +++ b/src/test/java/org/mariadb/jdbc/failover/BaseMonoServer.java @@ -1,14 +1,11 @@ package org.mariadb.jdbc.failover; import org.junit.Assert; -import org.junit.Assume; import org.junit.Test; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; public abstract class BaseMonoServer extends BaseMultiHostTest { @@ -66,7 +63,7 @@ public void relaunchWithErrorWhenInTransaction() throws Throwable { st.execute("INSERT INTO baseReplicationTransaction" + jobId + "(test) VALUES ('test')"); int masterServerId = getServerId(connection); st.execute("SELECT 1"); - long startTime = System.currentTimeMillis();; + long startTime = System.currentTimeMillis(); stopProxy(masterServerId, 2000); try { st.execute("SELECT 1"); diff --git a/src/test/java/org/mariadb/jdbc/failover/BaseMultiHostTest.java b/src/test/java/org/mariadb/jdbc/failover/BaseMultiHostTest.java index b8ec05334..b53b30722 100644 --- a/src/test/java/org/mariadb/jdbc/failover/BaseMultiHostTest.java +++ b/src/test/java/org/mariadb/jdbc/failover/BaseMultiHostTest.java @@ -12,6 +12,7 @@ import org.mariadb.jdbc.MariaDbServerPreparedStatement; import org.mariadb.jdbc.UrlParser; import org.mariadb.jdbc.internal.failover.AbstractMastersListener; +import org.mariadb.jdbc.internal.failover.impl.AuroraListener; import org.mariadb.jdbc.internal.protocol.Protocol; import org.mariadb.jdbc.internal.util.constant.HaMode; import org.mariadb.jdbc.internal.util.dao.ServerPrepareResult; @@ -29,6 +30,7 @@ * example mvn test -DdbUrl=jdbc:mysql://localhost:3306,localhost:3307/test?user=root -DlogLevel=FINEST * specific parameters : * defaultMultiHostUrl : + * If testing Aurora, set the region. Default is US_EAST_1. */ @Ignore public class BaseMultiHostTest { @@ -144,7 +146,13 @@ public static boolean requireMinimumVersion(Connection connection, int major, in } private static String createProxies(String tmpUrl, HaMode proxyType) throws SQLException { - UrlParser tmpUrlParser = UrlParser.parse(tmpUrl); + UrlParser tmpUrlParser; + if (proxyType == HaMode.AURORA) { + tmpUrlParser = retrieveEndpointsForProxies(tmpUrl); + } else { + tmpUrlParser = UrlParser.parse(tmpUrl); + } + TcpProxy[] tcpProxies = new TcpProxy[tmpUrlParser.getHostAddresses().size()]; username = tmpUrlParser.getUsername(); hostname = tmpUrlParser.getHostAddresses().get(0).host; @@ -170,6 +178,26 @@ private static String createProxies(String tmpUrl, HaMode proxyType) throws SQLE } + private static UrlParser retrieveEndpointsForProxies(String tmpUrl) throws SQLException{ + try { + Connection connection = DriverManager.getConnection(tmpUrl); + connection.setReadOnly(true); + try { + Protocol protocol = (new BaseMultiHostTest().getProtocolFromConnection(connection)); + UrlParser urlParser = protocol.getUrlParser(); + List hostAddresses = urlParser.getHostAddresses(); + hostAddresses.add(((AuroraListener) protocol.getProxy().getListener()).getClusterHostAddress()); + urlParser.setHostAddresses(hostAddresses); + return urlParser; + } catch (Throwable throwable) { + connection.close(); + return UrlParser.parse(tmpUrl); + } + } catch (SQLException se) { + return UrlParser.parse(tmpUrl); + } + } + /** * Clean proxies. * @throws SQLException exception @@ -208,11 +236,9 @@ protected Connection getNewConnection(String additionnalConnectionData, boolean if (forceNewProxy) { tmpProxyUrl = createProxies(defaultUrl, currentType); } - if (additionnalConnectionData == null) { - return DriverManager.getConnection(tmpProxyUrl); - } else { - return DriverManager.getConnection(tmpProxyUrl + additionnalConnectionData); - } + tmpProxyUrl += (additionnalConnectionData == null) ? "" : additionnalConnectionData; + return DriverManager.getConnection(tmpProxyUrl); + } else { if (additionnalConnectionData == null) { return DriverManager.getConnection(defaultUrl); @@ -301,12 +327,12 @@ public boolean hasSuperPrivilege(Connection connection, String testName) throws ResultSet rs = st.executeQuery("SELECT Super_Priv FROM mysql.user WHERE user = '" + username + "' AND host = '" + hostname + "'"); if (rs.next()) { - superPrivilege = (rs.getString(1).equals("Y") ? true : false); + superPrivilege = (rs.getString(1).equals("Y")); } else { // then check for user on whatever (%) host rs = st.executeQuery("SELECT Super_Priv FROM mysql.user WHERE user = '" + username + "' AND host = '%'"); if (rs.next()) { - superPrivilege = (rs.getString(1).equals("Y") ? true : false); + superPrivilege = (rs.getString(1).equals("Y")); } } @@ -326,8 +352,15 @@ protected Protocol getProtocolFromConnection(Connection conn) throws Throwable { return (Protocol) getProtocol.invoke(conn); } + void setDbName(Connection connection, String newDbName) throws Throwable { + AuroraListener auroraListener = (AuroraListener) getProtocolFromConnection(connection).getProxy().getListener(); + Field dbName = auroraListener.getClass().getDeclaredField("dbName"); + dbName.setAccessible(true); + dbName.set(auroraListener, newDbName); + } + /** - * Retreive server Id. + * Retrieve server Id. * @param connection connection * @return server index * @throws Throwable exception @@ -354,4 +387,4 @@ ServerPrepareResult getPrepareResult(MariaDbServerPreparedStatement preparedStat prepareResultField.setAccessible(true); return (ServerPrepareResult) prepareResultField.get(preparedStatement); //IllegalAccessException } -} \ No newline at end of file +} diff --git a/src/test/java/org/mariadb/jdbc/failover/BaseReplication.java b/src/test/java/org/mariadb/jdbc/failover/BaseReplication.java index 2053c4780..e78354a25 100644 --- a/src/test/java/org/mariadb/jdbc/failover/BaseReplication.java +++ b/src/test/java/org/mariadb/jdbc/failover/BaseReplication.java @@ -281,9 +281,7 @@ public void randomConnection() throws Throwable { } } finally { if (connection != null) { - if (connection != null) { - connection.close(); - } + connection.close(); } } } diff --git a/src/test/java/org/mariadb/jdbc/failover/TcpProxy.java b/src/test/java/org/mariadb/jdbc/failover/TcpProxy.java index be97c448e..011a7a205 100644 --- a/src/test/java/org/mariadb/jdbc/failover/TcpProxy.java +++ b/src/test/java/org/mariadb/jdbc/failover/TcpProxy.java @@ -12,7 +12,7 @@ public class TcpProxy { TcpProxySocket socket; /** - * Initialise procy. + * Initialise proxy. * @param host host (ip / dns) * @param remoteport port * @throws IOException exception