Skip to content

Commit

Permalink
Fix JmxConnectionFactory.connectAny(..) to loop a second time trying …
Browse files Browse the repository at this point in the history
…all hosts regardless of past successes/failures.

ref: #239
  • Loading branch information
michaelsembwever authored and adejanovski committed Oct 19, 2017
1 parent e899534 commit 22afbdb
Showing 1 changed file with 29 additions and 28 deletions.
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -32,6 +31,7 @@
import com.datastax.driver.core.policies.EC2MultiRegionAddressTranslator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,6 +60,7 @@ public JmxConnectionFactory(MetricRegistry metricRegistry) {

public JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int connectionTimeout)
throws ReaperException {

// use configured jmx port for host if provided
if (localMode) {
host = "127.0.0.1";
Expand All @@ -74,7 +75,14 @@ public JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int
username = jmxAuth.getUsername();
password = jmxAuth.getPassword();
}
return JmxProxyImpl.connect(handler, host, username, password, addressTranslator, connectionTimeout);
try {
JmxProxy jmxProxy = JmxProxyImpl.connect(handler, host, username, password, addressTranslator, connectionTimeout);
hostConnectionCounters.incrementSuccessfulConnections(host);
return jmxProxy;
} catch (ReaperException | RuntimeException e) {
hostConnectionCounters.decrementSuccessfulConnections(host);
throw e;
}
}

public final JmxProxy connect(String host, int connectionTimeout) throws ReaperException {
Expand All @@ -86,31 +94,24 @@ public final JmxProxy connectAny(
Collection<String> hosts,
int connectionTimeout) throws ReaperException {

if (hosts == null || hosts.isEmpty()) {
throw new ReaperException("no hosts given for connectAny");
}
Preconditions.checkArgument(null != hosts && !hosts.isEmpty(), "no hosts provided to connectAny");

List<String> hostList = new ArrayList<>(hosts);
Collections.shuffle(hostList);
Iterator<String> hostIterator = hostList.iterator();

for (int i = 0; i < 2; i++) {
while (hostIterator.hasNext()) {
String host = hostIterator.next();

for (String host : hostList) {
assert null != host; // @todo remove the null check in the following if condition
// First loop, we try the most accessible nodes, then second loop we try all nodes
if (null != host && (hostConnectionCounters.getSuccessfulConnections(host) >= 0 || 1 == i)) {
try {
JmxProxy jmxProxy = connect(handler, host, connectionTimeout);
hostConnectionCounters.incrementSuccessfulConnections(host);
return jmxProxy;
return connect(handler, host, connectionTimeout);
} catch (ReaperException | RuntimeException e) {
hostConnectionCounters.decrementSuccessfulConnections(host);
LOG.info("Unreachable host", e);
}
}
}
}

throw new ReaperException("no host could be reached through JMX");
}

Expand Down Expand Up @@ -152,15 +153,15 @@ void incrementSuccessfulConnections(String host) {
AtomicInteger successes = successfulConnections.putIfAbsent(host, new AtomicInteger(1));
if (null != successes && successes.get() <= 20) {
successes.incrementAndGet();
if (null != metricRegistry) {
metricRegistry
.counter(
MetricRegistry.name(
JmxConnectionFactory.class, "connections-health", host.replace('.', '-')))
.inc();
}
}
LOG.debug("Host {} has {} successfull connections", host, successes);
if (null != metricRegistry) {
metricRegistry
.counter(
MetricRegistry.name(
JmxConnectionFactory.class, "connections", host.replace('.', '-')))
.inc();
}
} catch (RuntimeException e) {
LOG.warn("Could not increment JMX successfull connections counter for host {}", host, e);
}
Expand All @@ -171,15 +172,15 @@ void decrementSuccessfulConnections(String host) {
AtomicInteger successes = successfulConnections.putIfAbsent(host, new AtomicInteger(-1));
if (null != successes && successes.get() >= -5) {
successes.decrementAndGet();
if (null != metricRegistry) {
metricRegistry
.counter(
MetricRegistry.name(
JmxConnectionFactory.class, "connections-health", host.replace('.', '-')))
.dec();
}
}
LOG.debug("Host {} has {} successfull connections", host, successes);
if (null != metricRegistry) {
metricRegistry
.counter(
MetricRegistry.name(
JmxConnectionFactory.class, "connections", host.replace('.', '-')))
.dec();
}
} catch (RuntimeException e) {
LOG.warn("Could not decrement JMX successfull connections counter for host {}", host, e);
}
Expand Down

0 comments on commit 22afbdb

Please sign in to comment.