Skip to content

Commit

Permalink
Fix/suppress ClientPoolImpl stacktrace spam (#2432)
Browse files Browse the repository at this point in the history
* Reapply changes from #2373

* Copy-paste those changes
* Fix logic: numTries == 1 instead of numTries > 1
* Add SafeArg and remove exception from the fast failover message

* newHost -> randomHost

* Fix miscellaneous nits

* Fix or suppress warnings
  • Loading branch information
gsheasby committed Oct 6, 2017
1 parent a09628b commit 5f828fa
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.base.Throwables;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.processors.AutoDelegate;
import com.palantir.remoting3.tracing.Tracers;

Expand Down Expand Up @@ -276,7 +278,9 @@ private synchronized void refreshPool() {
}
}

log.debug("Cassandra pool refresh added hosts {}, removed hosts {}.", serversToAdd, serversToRemove);
log.debug("Cassandra pool refresh added hosts {}, removed hosts {}.",
SafeArg.of("serversToAdd", serversToAdd),
SafeArg.of("serversToRemove", serversToRemove));
debugLogStateOfPool();
}

Expand All @@ -298,7 +302,8 @@ void removePool(InetSocketAddress removedServerAddress) {
currentPools.get(removedServerAddress).shutdownPooling();
} catch (Exception e) {
log.warn("While removing a host ({}) from the pool, we were unable to gently cleanup resources.",
removedServerAddress, e);
SafeArg.of("removedServerAddress", removedServerAddress),
e);
}
currentPools.remove(removedServerAddress);
}
Expand Down Expand Up @@ -331,16 +336,17 @@ private void checkAndUpdateBlacklist() {
InetSocketAddress host = blacklistedEntry.getKey();
if (isHostHealthy(host)) {
blacklistedHosts.remove(host);
log.error("Added host {} back into the pool after a waiting period and successful health check.",
host);
log.info("Added host {} back into the pool after a waiting period and successful health check.",
SafeArg.of("host", host));
}
}
}
}

// TODO (gsheasby): Why did we blacklist this host?
private void addToBlacklist(InetSocketAddress badHost) {
blacklistedHosts.put(badHost, System.currentTimeMillis());
log.info("Blacklisted host '{}'", badHost);
log.warn("Blacklisted host '{}'", SafeArg.of("badHost", badHost));
}

private boolean isHostHealthy(InetSocketAddress host) {
Expand All @@ -350,8 +356,11 @@ private boolean isHostHealthy(InetSocketAddress host) {
testingContainer.runWithPooledResource(validatePartitioner);
return true;
} catch (Exception e) {
log.error("We tried to add {} back into the pool, but got an exception"
+ " that caused us to distrust this host further.", host, e);
log.warn("We tried to add {} back into the pool, but got an exception"
+ " that caused us to distrust this host further. Exception message was: {} : {}",
SafeArg.of("host", host),
SafeArg.of("exceptionClass", e.getClass().getCanonicalName()),
UnsafeArg.of("exceptionMessage", e.getMessage()));
return false;
}
}
Expand Down Expand Up @@ -571,7 +580,7 @@ public <V, K extends Exception> V runWithRetryOnHost(
Set<InetSocketAddress> triedHosts = Sets.newHashSet();
while (true) {
if (log.isTraceEnabled()) {
log.trace("Running function on host {}.", specifiedHost.getHostString());
log.trace("Running function on host {}.", SafeArg.of("specifiedHost", specifiedHost.getHostString()));
}
CassandraClientPoolingContainer hostPool = currentPools.get(specifiedHost);

Expand All @@ -581,8 +590,8 @@ public <V, K extends Exception> V runWithRetryOnHost(
= getRandomGoodHostForPredicate(address -> !triedHosts.contains(address));
hostPool = hostPoolCandidate.orElseGet(this::getRandomGoodHost);
log.warn("Randomly redirected a query intended for host {} to {}.",
previousHostPool,
hostPool.getHost());
SafeArg.of("previousHost", previousHostPool),
SafeArg.of("randomHost", hostPool.getHost()));
}

try {
Expand All @@ -592,18 +601,24 @@ public <V, K extends Exception> V runWithRetryOnHost(
triedHosts.add(hostPool.getHost());
this.<K>handleException(numTries, hostPool.getHost(), e);
if (isRetriableWithBackoffException(e)) {
log.warn("Retrying with backoff a query intended for host {}.", hostPool.getHost(), e);
// And value between -500 and +500ms to backoff to better spread load on failover
int sleepDuration = numTries * 1000 + (ThreadLocalRandom.current().nextInt(1000) - 500);
log.warn("Retrying a query, {}, with backoff of {}ms, intended for host {}.",
UnsafeArg.of("queryString", fn.toString()),
SafeArg.of("sleepDuration", sleepDuration),
SafeArg.of("hostName", hostPool.getHost()));

try {
// And value between -500 and +500ms to backoff to better spread load on failover
Thread.sleep(numTries * 1000 + (ThreadLocalRandom.current().nextInt(1000) - 500));
Thread.sleep(sleepDuration);
} catch (InterruptedException i) {
throw new RuntimeException(i);
}
if (numTries >= MAX_TRIES_SAME_HOST) {
shouldRetryOnDifferentHost = true;
}
} else if (isFastFailoverException(e)) {
log.info("Retrying with fast failover a query intended for host {}.", hostPool.getHost(), e);
log.info("Retrying with fast failover a query intended for host {}.",
SafeArg.of("hostName", hostPool.getHost()));
shouldRetryOnDifferentHost = true;
}
}
Expand Down Expand Up @@ -671,11 +686,23 @@ private <K extends Exception> void handleException(int numTries, InetSocketAddre
String errorMsg = MessageFormatter.format(CONNECTION_FAILURE_MSG, numTries).getMessage();
throw (K) new TTransportException(((TTransportException) ex).getType(), errorMsg, ex);
} else {
log.error("Tried to connect to cassandra {} times.", numTries, ex);
log.error("Tried to connect to cassandra {} times.", SafeArg.of("numTries", numTries), ex);
throw (K) ex;
}
} else {
log.warn("Error occurred talking to cassandra. Attempt {} of {}.", numTries, MAX_TRIES_TOTAL, ex);
// Only log the actual exception the first time
if (numTries > 1) {
log.warn("Error occurred talking to cassandra. Attempt {} of {}. Exception message was: {} : {}",
SafeArg.of("numTries", numTries),
SafeArg.of("maxTotalTries", MAX_TRIES_TOTAL),
SafeArg.of("exceptionClass", ex.getClass().getTypeName()),
UnsafeArg.of("exceptionMessage", ex.getMessage()));
} else {
log.warn("Error occurred talking to cassandra. Attempt {} of {}.",
SafeArg.of("numTries", numTries),
SafeArg.of("maxTotalTries", MAX_TRIES_TOTAL),
ex);
}
if (isConnectionException(ex) && numTries >= MAX_TRIES_SAME_HOST) {
addToBlacklist(host);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.stream.Collectors;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.CqlResult;
Expand All @@ -32,6 +33,7 @@
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.base.Throwables;

public class CqlExecutor {
Expand Down Expand Up @@ -114,19 +116,34 @@ private InetSocketAddress getHostForRow(byte[] row) {
}

private CqlResult executeQueryOnHost(String query, InetSocketAddress host) {
ByteBuffer queryBytes = ByteBuffer.wrap(query.getBytes(StandardCharsets.UTF_8));
return executeQueryOnHost(queryBytes, host);
return executeCqlFunctionOnHost(getCqlFunction(query), host);
}

private CqlResult executeQueryOnHost(ByteBuffer queryBytes, InetSocketAddress host) {
private CqlResult executeCqlFunctionOnHost(FunctionCheckedException<Cassandra.Client, CqlResult, TException> fn,
InetSocketAddress host) {
try {
return clientPool.runWithRetryOnHost(host, client ->
client.execute_cql3_query(queryBytes, Compression.NONE, consistency));
return clientPool.runWithRetryOnHost(host, fn);
} catch (TException e) {
throw Throwables.throwUncheckedException(e);
}
}

private FunctionCheckedException<Cassandra.Client, CqlResult, TException> getCqlFunction(String query) {
ByteBuffer queryBytes = ByteBuffer.wrap(query.getBytes(StandardCharsets.UTF_8));

return new FunctionCheckedException<Cassandra.Client, CqlResult, TException>() {
@Override
public CqlResult apply(Cassandra.Client client) throws TException {
return client.execute_cql3_query(queryBytes, Compression.NONE, consistency);
}

@Override
public String toString() {
return query;
}
};
}

private String getQuotedTableName(TableReference tableRef) {
return "\"" + CassandraKeyValueServiceImpl.internalTableName(tableRef) + "\"";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ public void shouldOnlyReturnHostsMatchingPredicate() {
for (int i = 0; i < numTrials; i++) {
Optional<CassandraClientPoolingContainer> container
= cassandraClientPool.getRandomGoodHostForPredicate(address -> address.equals(HOST_1));
assertThat(container.isPresent(), is(true));
assertThat(container.get().getHost(), equalTo(HOST_1));
assertContainerHasHostOne(container);
}
}

Expand All @@ -135,6 +134,11 @@ public void shouldNotReturnHostsNotMatchingPredicateEvenWithNodeFailure() {
cassandraClientPool.blacklistedHosts.put(HOST_1, System.currentTimeMillis());
Optional<CassandraClientPoolingContainer> container
= cassandraClientPool.getRandomGoodHostForPredicate(address -> address.equals(HOST_1));
assertContainerHasHostOne(container);
}

@SuppressWarnings({"OptionalUsedAsFieldOrParameterType", "ConstantConditions"})
private void assertContainerHasHostOne(Optional<CassandraClientPoolingContainer> container) {
assertThat(container.isPresent(), is(true));
assertThat(container.get().getHost(), equalTo(HOST_1));
}
Expand Down Expand Up @@ -179,7 +183,7 @@ private String getPoolMetricName(String poolName) {
@Test
public void shouldNotAttemptMoreThanOneConnectionOnSuccess() {
CassandraClientPool cassandraClientPool = clientPoolWithServersInCurrentPool(ImmutableSet.of(HOST_1));
cassandraClientPool.runWithRetryOnHost(HOST_1, input -> null);
cassandraClientPool.runWithRetryOnHost(HOST_1, noOp());
verifyNumberOfAttemptsOnHost(HOST_1, cassandraClientPool, 1);
}

Expand Down Expand Up @@ -303,6 +307,7 @@ private CassandraClientPoolImpl throwingClientPoolWithServersInCurrentPool(Immut
return clientPoolWith(ImmutableSet.of(), servers, Optional.of(exception));
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType") // Unpacking it seems less readable
private CassandraClientPoolImpl clientPoolWith(
ImmutableSet<InetSocketAddress> servers,
ImmutableSet<InetSocketAddress> serversInPool,
Expand All @@ -321,13 +326,12 @@ private CassandraClientPoolImpl clientPoolWith(
return cassandraClientPool;
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType") // Unpacking it seems less readable
private CassandraClientPoolingContainer getMockPoolingContainerForHost(InetSocketAddress address,
Optional<Exception> maybeFailureMode) {
CassandraClientPoolingContainer poolingContainer = mock(CassandraClientPoolingContainer.class);
when(poolingContainer.getHost()).thenReturn(address);
if (maybeFailureMode.isPresent()) {
setFailureModeForHost(poolingContainer, maybeFailureMode.get());
}
maybeFailureMode.ifPresent(e -> setFailureModeForHost(poolingContainer, e));
return poolingContainer;
}

Expand All @@ -342,16 +346,16 @@ private void setFailureModeForHost(CassandraClientPoolingContainer poolingContai
}

private void runNoopOnHost(InetSocketAddress host, CassandraClientPool pool) {
pool.runOnHost(host, input -> null);
pool.runOnHost(host, noOp());
}

private void runNoopWithRetryOnHost(InetSocketAddress host, CassandraClientPool pool) {
pool.runWithRetryOnHost(host, input -> null);
pool.runWithRetryOnHost(host, noOp());
}

private void runNoopOnHostWithException(InetSocketAddress host, CassandraClientPool pool) {
try {
pool.runOnHost(host, input -> null);
pool.runOnHost(host, noOp());
fail();
} catch (Exception e) {
// expected
Expand All @@ -360,13 +364,27 @@ private void runNoopOnHostWithException(InetSocketAddress host, CassandraClientP

private void runNoopOnHostWithRetryWithException(InetSocketAddress host, CassandraClientPool pool) {
try {
pool.runWithRetryOnHost(host, input -> null);
pool.runWithRetryOnHost(host, noOp());
fail();
} catch (Exception e) {
// expected
}
}

private FunctionCheckedException<Cassandra.Client, Void, RuntimeException> noOp() {
return new FunctionCheckedException<Cassandra.Client, Void, RuntimeException>() {
@Override
public Void apply(Cassandra.Client input) throws RuntimeException {
return null;
}

@Override
public String toString() {
return "no-op";
}
};
}

private void verifyAggregateFailureMetrics(
double requestFailureProportion,
double requestConnectionExceptionProportion) {
Expand All @@ -378,18 +396,6 @@ private void verifyAggregateFailureMetrics(
requestConnectionExceptionProportion);
}

private void verifyFailureMetricsOnHost(
InetSocketAddress host,
double requestFailureProportion,
double requestConnectionExceptionProportion) {
assertEquals(
getMetricValueFromHostAndMetricName(host.getHostString(), "requestFailureProportion"),
requestFailureProportion);
assertEquals(
getMetricValueFromHostAndMetricName(host.getHostString(), "requestConnectionExceptionProportion"),
requestConnectionExceptionProportion);
}

private void verifyBlacklistMetric(Integer expectedSize) {
assertEquals(getAggregateMetricValueForMetricName("numBlacklistedHosts"), expectedSize);
}
Expand All @@ -398,10 +404,4 @@ private Object getAggregateMetricValueForMetricName(String metricName) {
String fullyQualifiedMetricName = MetricRegistry.name(CassandraClientPool.class, metricName);
return metricRegistry.getGauges().get(fullyQualifiedMetricName).getValue();
}

private Object getMetricValueFromHostAndMetricName(String hostname, String metricName) {
String fullyQualifiedMetricName = MetricRegistry.name(CassandraClientPool.class, hostname, metricName);
return metricRegistry.getGauges().get(fullyQualifiedMetricName).getValue();
}

}
6 changes: 6 additions & 0 deletions docs/source/release_notes/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ develop
This will help support large internal product's usage of the Timelock server.
(`Pull Request <https://github.com/palantir/atlasdb/pull/2364>`__)

* - |fixed|
- ``CassandraClientPool`` no longer logs stack traces twice for every failed attempt to connect to Cassandra.
Instead, the exception is logged once only, when we run out of retries.
(`Pull Request <https://github.com/palantir/atlasdb/pull/2432>`__)


.. <<<<------------------------------------------------------------------------------------------------------------->>>>
=======
Expand Down

0 comments on commit 5f828fa

Please sign in to comment.