Skip to content

Commit

Permalink
[CONJ-325] implementation simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Aug 2, 2016
1 parent 52a25f6 commit 415aa37
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 40 deletions.
Expand Up @@ -473,9 +473,7 @@ public boolean canRetryFailLoop() {
public static void clearBlacklist() {
blacklist.clear();
}
public static Set<HostAddress> getBlacklist() {
return blacklist.keySet();
}

public long getLastQueryNanos() {
return lastQueryNanos;
}
Expand Down
Expand Up @@ -180,9 +180,7 @@ public void reconnectFailedConnection(SearchFilter searchFilter) throws QueryExc
}
}

if (urlParser.getHostAddresses().size() < 2
|| (urlParser.getHostAddresses().size() == 2
&& urlParser.getHostAddresses().contains(getClusterHostAddress()))) {
if (urlParser.getHostAddresses().size() <= 1) {
searchFilter = new SearchFilter(true, false);
}
if ((isMasterHostFail() || isSecondaryHostFail())
Expand Down Expand Up @@ -279,39 +277,20 @@ private List<String> getCurrentEndpointIdentifiers(Protocol protocol) throws Que
}

/**
* Sets urlParser if there are any changes in the instance endpoints available.
* Sets urlParser accordingly to discovered hosts.
*
* @param endpoints instance identifiers
* @param port port that is common to all endpoints
*/
private void setUrlParserFromEndpoints(List<String> endpoints, int port) {
List<HostAddress> addresses = Collections.synchronizedList(urlParser.getHostAddresses());

List<String> currentHosts = new ArrayList<>();
synchronized (addresses) {
for (HostAddress address : addresses) {
currentHosts.add(address.host);
}
}

synchronized (addresses) {
Iterator<HostAddress> iterator = addresses.iterator();
while (iterator.hasNext() && endpoints.size() > 0) {
HostAddress address = iterator.next();
if (!endpoints.contains(address.host)) {
removeFromBlacklist(address);
iterator.remove();
}
}
List<HostAddress> addresses = new ArrayList<>();
for (String endpoint : endpoints) {
HostAddress newHostAddress = new HostAddress(endpoint, port, null);
addresses.add(newHostAddress);
}

synchronized (addresses) {
for (String endpoint : endpoints) {
if (!currentHosts.contains(endpoint)) {
HostAddress newHostAddress = new HostAddress(endpoint, port, null);
addresses.add(newHostAddress);
}
}
synchronized (urlParser) {
urlParser.setHostAddresses(addresses);
}
}

Expand Down
Expand Up @@ -122,12 +122,6 @@ public static void loop(AuroraListener listener, final List<HostAddress> address
QueryException lastQueryException = null;
HostAddress probableMasterHost = null;

// Only one address means cluster so only possible connection
if (listener.getClusterHostAddress() != null && loopAddresses.size() < 2
&& !loopAddresses.contains(listener.getClusterHostAddress())) {
loopAddresses.add(listener.getClusterHostAddress());
}

while (!loopAddresses.isEmpty() || (!searchFilter.isFailoverLoop() && maxConnectionTry > 0)) {
protocol = getNewProtocol(listener.getProxy(), listener.getUrlParser());

Expand Down Expand Up @@ -166,11 +160,15 @@ public static void loop(AuroraListener listener, final List<HostAddress> address
if (searchFilter.isFineIfFoundOnlyMaster() && listener.getUrlParser().getHostAddresses().size() <= 1
&& protocol.getHostAddress().equals(listener.getClusterHostAddress())) {
listener.retrieveAllEndpointsAndSet(protocol);

if (listener.getUrlParser().getHostAddresses().size() > 1) {
searchFilter = new SearchFilter(false);
//add newly discovered end-point to loop
loopAddresses.addAll(listener.getUrlParser().getHostAddresses());
//since there is more than one end point, reactivate connection to a read-only host
searchFilter = new SearchFilter(false);
}
}

if (foundMaster(listener, protocol, searchFilter)) {
return;
}
Expand Down
Expand Up @@ -127,7 +127,6 @@ public static void beforeClass() throws SQLException, IOException {
public void afterBaseTest() throws SQLException {
assureProxy();
assureBlackList();
System.out.println("BLACKLIST : " + AbstractMastersListener.getBlacklist().size());
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/test/java/org/mariadb/jdbc/failover/TcpProxy.java
@@ -1,11 +1,15 @@
package org.mariadb.jdbc.failover;

import org.mariadb.jdbc.internal.logging.Logger;
import org.mariadb.jdbc.internal.logging.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class TcpProxy {
private static Logger logger = LoggerFactory.getLogger(TcpProxy.class);

String host;
int remoteport;
Expand All @@ -30,6 +34,7 @@ public void stop() {

public void restart(long sleepTime) {
socket.kill();
logger.trace("host proxy port " + socket.localport + " for " + host + " started");
Executors.newSingleThreadScheduledExecutor().schedule(socket, sleepTime, TimeUnit.MILLISECONDS);
}

Expand Down
5 changes: 5 additions & 0 deletions src/test/java/org/mariadb/jdbc/failover/TcpProxySocket.java
@@ -1,11 +1,15 @@
package org.mariadb.jdbc.failover;

import org.mariadb.jdbc.internal.logging.Logger;
import org.mariadb.jdbc.internal.logging.LoggerFactory;

import java.io.*;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;

public class TcpProxySocket implements Runnable {
private static Logger logger = LoggerFactory.getLogger(TcpProxy.class);

String host;
int remoteport;
Expand Down Expand Up @@ -65,6 +69,7 @@ public void kill() {
@Override
public void run() {

logger.trace("host proxy port " + this.localport + " for " + host + " started");
stop = false;
try {
try {
Expand Down

0 comments on commit 415aa37

Please sign in to comment.