Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJ-502] isolation correction on failover when using multiple pools…
… on same VM
  • Loading branch information
rusher committed Jul 24, 2017
1 parent b0ecfc2 commit 430583b
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 13 deletions.
Expand Up @@ -783,4 +783,25 @@ public void rePrepareOnSlave(ServerPrepareResult oldServerPrepareResult, boolean
oldServerPrepareResult.failover(serverPrepareResult.getStatementId(), secondaryProtocol);
}
}

/**
* List current connected HostAddress.
*
* @return hostAddress List.
*/
public List<HostAddress> connectedHosts() {
List<HostAddress> usedHost = new ArrayList<>();

if (isMasterHostFail()) {
Protocol masterProtocol = waitNewMasterProtocol.get();
if (masterProtocol != null) usedHost.add(masterProtocol.getHostAddress());
} else usedHost.add(masterProtocol.getHostAddress());

if (isSecondaryHostFail()) {
Protocol secondProtocol = waitNewSecondaryProtocol.get();
if (secondProtocol != null) usedHost.add(secondProtocol.getHostAddress());
} else usedHost.add(secondaryProtocol.getHostAddress());

return usedHost;
}
}
Expand Up @@ -62,8 +62,7 @@
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

import static org.mariadb.jdbc.internal.util.SqlStates.CONNECTION_EXCEPTION;
Expand Down Expand Up @@ -117,7 +116,7 @@ public static void loop(AuroraListener listener, final List<HostAddress> address
throws SQLException {

AuroraProtocol protocol;
ArrayDeque<HostAddress> loopAddresses = new ArrayDeque<>((!addresses.isEmpty()) ? addresses : listener.getBlacklistKeys());
Deque<HostAddress> loopAddresses = new ArrayDeque<>((!addresses.isEmpty()) ? addresses : listener.getBlacklistKeys());
if (loopAddresses.isEmpty()) {
loopAddresses.addAll(listener.getUrlParser().getHostAddresses());
}
Expand Down Expand Up @@ -221,10 +220,10 @@ public static void loop(AuroraListener listener, final List<HostAddress> address
return;
}

//loop is set so
// if server has try to connect to all host, and there is remaining master or slave that fail
// add all servers back to continue looping until maxConnectionTry is reached
if (loopAddresses.isEmpty() && !searchFilter.isFailoverLoop() && maxConnectionTry > 0) {
//use blacklist if all server has been connected and no result
loopAddresses = new ArrayDeque<>(listener.getBlacklistKeys());
resetHostList(listener, loopAddresses);
}

// Try to connect to the cluster if no other connection is good
Expand All @@ -246,6 +245,33 @@ public static void loop(AuroraListener listener, final List<HostAddress> address
}
}

/**
* Reinitialize loopAddresses with all hosts : all servers in randomize order with cluster address.
* If there is an active connection, connected host are remove from list.
*
* @param listener current listener
* @param loopAddresses the list to reinitialize
*/
private static void resetHostList(AuroraListener listener, Deque<HostAddress> loopAddresses) {
//if all servers have been connected without result
//add back all servers
List<HostAddress> servers = new ArrayList<>();
servers.addAll(listener.getUrlParser().getHostAddresses());

Collections.shuffle(servers);

//if cluster host is set, add it to the end of the list
if (listener.getClusterHostAddress() != null && listener.getUrlParser().getHostAddresses().size() < 2) {
servers.add(listener.getClusterHostAddress());
}

//remove current connected hosts to avoid reconnect them
servers.removeAll(listener.connectedHosts());

loopAddresses.clear();
loopAddresses.addAll(loopAddresses);
}

/**
* Initialize new protocol instance.
*
Expand Down
Expand Up @@ -60,8 +60,7 @@

import java.io.Closeable;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;


Expand Down Expand Up @@ -138,9 +137,12 @@ public static void loop(Listener listener, final List<HostAddress> addresses,
lastQueryException = e;
}

// if server has try to connect to all host, and master still fail
// add all servers back to continue looping until maxConnectionTry is reached
if (loopAddresses.isEmpty() && !searchFilter.isFailoverLoop() && maxConnectionTry > 0) {
loopAddresses = new ArrayDeque<>(listener.getBlacklistKeys());
resetHostList(listener, loopAddresses);
}

}
if (lastQueryException != null) {
throw new SQLException("No active connection found for master : " + lastQueryException.getMessage(),
Expand All @@ -149,4 +151,20 @@ public static void loop(Listener listener, final List<HostAddress> addresses,
throw new SQLException("No active connection found for master");
}

/**
* Reinitialize loopAddresses with all hosts : all servers in randomize order without connected host.
*
* @param listener current listener
* @param loopAddresses the list to reinitialize
*/
private static void resetHostList(Listener listener, Deque<HostAddress> loopAddresses) {
//if all servers have been connected without result
//add back all servers
List<HostAddress> servers = new ArrayList<>();
servers.addAll(listener.getUrlParser().getHostAddresses());
Collections.shuffle(servers);

loopAddresses.clear();
loopAddresses.addAll(loopAddresses);
}
}
Expand Up @@ -59,8 +59,7 @@
import org.mariadb.jdbc.internal.failover.tools.SearchFilter;

import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

public class MastersSlavesProtocol extends MasterProtocol {
Expand Down Expand Up @@ -137,9 +136,10 @@ public static void loop(MastersSlavesListener listener, final List<HostAddress>
return;
}

//loop is set so
// if server has try to connect to all host, and there is remaining master or slave that fail
// add all servers back to continue looping until maxConnectionTry is reached
if (loopAddresses.isEmpty() && !searchFilter.isFailoverLoop() && maxConnectionTry > 0) {
loopAddresses = new ArrayDeque<>(listener.getBlacklistKeys());
resetHostList(listener, loopAddresses);
}
}

Expand All @@ -157,6 +157,26 @@ public static void loop(MastersSlavesListener listener, final List<HostAddress>

}

/**
* Reinitialize loopAddresses with all servers in randomize order.
*
* @param listener current listener
* @param loopAddresses the list to reinitialize
*/
private static void resetHostList(MastersSlavesListener listener, Deque<HostAddress> loopAddresses) {
//if all servers have been connected without result
//add back all servers
List<HostAddress> servers = new ArrayList<>();
servers.addAll(listener.getUrlParser().getHostAddresses());
Collections.shuffle(servers);

//remove current connected hosts to avoid reconnect them
servers.removeAll(listener.connectedHosts());

loopAddresses.clear();
loopAddresses.addAll(loopAddresses);
}

protected static boolean foundMaster(MastersSlavesListener listener, MastersSlavesProtocol protocol,
SearchFilter searchFilter) {
protocol.setMustBeMasterConnection(true);
Expand Down

0 comments on commit 430583b

Please sign in to comment.