-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix/suppress ClientPoolImpl stacktrace spam #2432
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,6 +76,8 @@ | |
import com.palantir.common.base.FunctionCheckedException; | ||
import com.palantir.common.base.Throwables; | ||
import com.palantir.common.concurrent.PTExecutors; | ||
import com.palantir.logsafe.SafeArg; | ||
import com.palantir.logsafe.UnsafeArg; | ||
import com.palantir.processors.AutoDelegate; | ||
import com.palantir.remoting3.tracing.Tracers; | ||
|
||
|
@@ -276,7 +278,9 @@ private synchronized void refreshPool() { | |
} | ||
} | ||
|
||
log.debug("Cassandra pool refresh added hosts {}, removed hosts {}.", serversToAdd, serversToRemove); | ||
log.debug("Cassandra pool refresh added hosts {}, removed hosts {}.", | ||
SafeArg.of("serversToAdd", serversToAdd), | ||
SafeArg.of("serversToRemove", serversToRemove)); | ||
debugLogStateOfPool(); | ||
} | ||
|
||
|
@@ -298,7 +302,8 @@ void removePool(InetSocketAddress removedServerAddress) { | |
currentPools.get(removedServerAddress).shutdownPooling(); | ||
} catch (Exception e) { | ||
log.warn("While removing a host ({}) from the pool, we were unable to gently cleanup resources.", | ||
removedServerAddress, e); | ||
SafeArg.of("removedServerAddress", removedServerAddress), | ||
e); | ||
} | ||
currentPools.remove(removedServerAddress); | ||
} | ||
|
@@ -331,16 +336,17 @@ private void checkAndUpdateBlacklist() { | |
InetSocketAddress host = blacklistedEntry.getKey(); | ||
if (isHostHealthy(host)) { | ||
blacklistedHosts.remove(host); | ||
log.error("Added host {} back into the pool after a waiting period and successful health check.", | ||
host); | ||
log.info("Added host {} back into the pool after a waiting period and successful health check.", | ||
SafeArg.of("host", host)); | ||
} | ||
} | ||
} | ||
} | ||
|
||
// TODO (gsheasby): Why did we blacklist this host? | ||
private void addToBlacklist(InetSocketAddress badHost) { | ||
blacklistedHosts.put(badHost, System.currentTimeMillis()); | ||
log.info("Blacklisted host '{}'", badHost); | ||
log.warn("Blacklisted host '{}'", SafeArg.of("badHost", badHost)); | ||
} | ||
|
||
private boolean isHostHealthy(InetSocketAddress host) { | ||
|
@@ -350,8 +356,11 @@ private boolean isHostHealthy(InetSocketAddress host) { | |
testingContainer.runWithPooledResource(validatePartitioner); | ||
return true; | ||
} catch (Exception e) { | ||
log.error("We tried to add {} back into the pool, but got an exception" | ||
+ " that caused us to distrust this host further.", host, e); | ||
log.warn("We tried to add {} back into the pool, but got an exception" | ||
+ " that caused us to distrust this host further. Exception message was: {} : {}", | ||
SafeArg.of("host", host), | ||
SafeArg.of("exceptionClass", e.getClass().getTypeName()), | ||
UnsafeArg.of("exceptionMessage", e.getMessage())); | ||
return false; | ||
} | ||
} | ||
|
@@ -571,7 +580,7 @@ public <V, K extends Exception> V runWithRetryOnHost( | |
Set<InetSocketAddress> triedHosts = Sets.newHashSet(); | ||
while (true) { | ||
if (log.isTraceEnabled()) { | ||
log.trace("Running function on host {}.", specifiedHost.getHostString()); | ||
log.trace("Running function on host {}.", SafeArg.of("specifiedHost", specifiedHost.getHostString())); | ||
} | ||
CassandraClientPoolingContainer hostPool = currentPools.get(specifiedHost); | ||
|
||
|
@@ -581,8 +590,8 @@ public <V, K extends Exception> V runWithRetryOnHost( | |
= getRandomGoodHostForPredicate(address -> !triedHosts.contains(address)); | ||
hostPool = hostPoolCandidate.orElseGet(this::getRandomGoodHost); | ||
log.warn("Randomly redirected a query intended for host {} to {}.", | ||
previousHostPool, | ||
hostPool.getHost()); | ||
SafeArg.of("previousHost", previousHostPool), | ||
SafeArg.of("newHost", hostPool.getHost())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe: newHost -> randomHost |
||
} | ||
|
||
try { | ||
|
@@ -592,18 +601,24 @@ public <V, K extends Exception> V runWithRetryOnHost( | |
triedHosts.add(hostPool.getHost()); | ||
this.<K>handleException(numTries, hostPool.getHost(), e); | ||
if (isRetriableWithBackoffException(e)) { | ||
log.warn("Retrying with backoff a query intended for host {}.", hostPool.getHost(), e); | ||
// And value between -500 and +500ms to backoff to better spread load on failover | ||
int sleepDuration = numTries * 1000 + (ThreadLocalRandom.current().nextInt(1000) - 500); | ||
log.warn("Retrying with backoff ({}ms} a query, {}, intended for host {}.", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, there's a stray |
||
SafeArg.of("sleepDuration", sleepDuration), | ||
UnsafeArg.of("queryString", fn.toString()), | ||
SafeArg.of("hostName", hostPool.getHost())); | ||
|
||
try { | ||
// And value between -500 and +500ms to backoff to better spread load on failover | ||
Thread.sleep(numTries * 1000 + (ThreadLocalRandom.current().nextInt(1000) - 500)); | ||
Thread.sleep(sleepDuration); | ||
} catch (InterruptedException i) { | ||
throw new RuntimeException(i); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to reset interrupt here? Could fix in separate PR |
||
} | ||
if (numTries >= MAX_TRIES_SAME_HOST) { | ||
shouldRetryOnDifferentHost = true; | ||
} | ||
} else if (isFastFailoverException(e)) { | ||
log.info("Retrying with fast failover a query intended for host {}.", hostPool.getHost(), e); | ||
log.info("Retrying with fast failover a query intended for host {}.", | ||
SafeArg.of("hostName", hostPool.getHost())); | ||
shouldRetryOnDifferentHost = true; | ||
} | ||
} | ||
|
@@ -671,11 +686,23 @@ private <K extends Exception> void handleException(int numTries, InetSocketAddre | |
String errorMsg = MessageFormatter.format(CONNECTION_FAILURE_MSG, numTries).getMessage(); | ||
throw (K) new TTransportException(((TTransportException) ex).getType(), errorMsg, ex); | ||
} else { | ||
log.error("Tried to connect to cassandra {} times.", numTries, ex); | ||
log.error("Tried to connect to cassandra {} times.", SafeArg.of("numTries", numTries), ex); | ||
throw (K) ex; | ||
} | ||
} else { | ||
log.warn("Error occurred talking to cassandra. Attempt {} of {}.", numTries, MAX_TRIES_TOTAL, ex); | ||
// Only log the actual exception the first time | ||
if (numTries == 1) { | ||
log.warn("Error occurred talking to cassandra. Attempt {} of {}. Exception message was: {} : {}", | ||
SafeArg.of("numTries", numTries), | ||
SafeArg.of("maxTotalTries", MAX_TRIES_TOTAL), | ||
SafeArg.of("exceptionClass", ex.getClass().getTypeName()), | ||
UnsafeArg.of("exceptionMessage", ex.getMessage())); | ||
} else { | ||
log.warn("Error occurred talking to cassandra. Attempt {} of {}.", | ||
SafeArg.of("numTries", numTries), | ||
SafeArg.of("maxTotalTries", MAX_TRIES_TOTAL), | ||
ex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't the exception be the last arg in the if block, not else? I think we could actually do the following to cover all cases so that we include the exception class & message all the time, but only include stacktrace on first occurrence.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ....d'oh, I was right the first time! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gsheasby isn’t your logic here backwards if you’re trying to only log the exception on first failed attempt? I am pretty sure the whole of/else should be replaced with my comment above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks correct on develop. We don't need to log exceptionClass or message on the first try, since we log the whole exception. |
||
} | ||
if (isConnectionException(ex) && numTries >= MAX_TRIES_SAME_HOST) { | ||
addToBlacklist(host); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
import org.apache.cassandra.thrift.Cassandra; | ||
import org.apache.cassandra.thrift.Compression; | ||
import org.apache.cassandra.thrift.ConsistencyLevel; | ||
import org.apache.cassandra.thrift.CqlResult; | ||
|
@@ -32,6 +33,7 @@ | |
import com.palantir.atlasdb.encoding.PtBytes; | ||
import com.palantir.atlasdb.keyvalue.api.Cell; | ||
import com.palantir.atlasdb.keyvalue.api.TableReference; | ||
import com.palantir.common.base.FunctionCheckedException; | ||
import com.palantir.common.base.Throwables; | ||
|
||
public class CqlExecutor { | ||
|
@@ -114,19 +116,34 @@ private InetSocketAddress getHostForRow(byte[] row) { | |
} | ||
|
||
private CqlResult executeQueryOnHost(String query, InetSocketAddress host) { | ||
ByteBuffer queryBytes = ByteBuffer.wrap(query.getBytes(StandardCharsets.UTF_8)); | ||
return executeQueryOnHost(queryBytes, host); | ||
return executeCqlFunctionOnHost(getCqlFunction(query), host); | ||
} | ||
|
||
private CqlResult executeQueryOnHost(ByteBuffer queryBytes, InetSocketAddress host) { | ||
private CqlResult executeCqlFunctionOnHost(FunctionCheckedException<Cassandra.Client, CqlResult, TException> fn, | ||
InetSocketAddress host) { | ||
try { | ||
return clientPool.runWithRetryOnHost(host, client -> | ||
client.execute_cql3_query(queryBytes, Compression.NONE, consistency)); | ||
return clientPool.runWithRetryOnHost(host, fn); | ||
} catch (TException e) { | ||
throw Throwables.throwUncheckedException(e); | ||
} | ||
} | ||
|
||
private FunctionCheckedException<Cassandra.Client, CqlResult, TException> getCqlFunction(String query) { | ||
ByteBuffer queryBytes = ByteBuffer.wrap(query.getBytes(StandardCharsets.UTF_8)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want to eagerly capture and retain both the query string & converted bytes & byte buffer or should this be inlined into
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to override There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can still override toStringas you have it, this just avoids keeping the byte buffer around after executing the query. If we are going to re-execute the query many times, then your original version of eagerly converting to bytes might be better tradeoff There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect the query to execute one time in 99.9% of cases. Relatedly - I've been biasing towards keeping the stuff inside That way, it's more transparent which code might throw the exception we're passing, and it generally produces cleaner and more testable code - if we do that for long enough, eventually all of the clientPool-calling logic will live inside a small class which we can just mock for testing (and replace with CQL for production). Does that practice lead to sub-optimal perf somehow? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is actually a huge issue as I don't expect the queries to be that large, but my comment was more intended as context around potential issues with capturing references via lambda & anonymous inner classes. By "keeping the byte buffer around", what I mean is that in the current PR, the
|
||
|
||
return new FunctionCheckedException<Cassandra.Client, CqlResult, TException>() { | ||
@Override | ||
public CqlResult apply(Cassandra.Client client) throws TException { | ||
return client.execute_cql3_query(queryBytes, Compression.NONE, consistency); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return query; | ||
} | ||
}; | ||
} | ||
|
||
private String getQuotedTableName(TableReference tableRef) { | ||
return "\"" + CassandraKeyValueServiceImpl.internalTableName(tableRef) + "\""; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,7 +179,7 @@ private String getPoolMetricName(String poolName) { | |
@Test | ||
public void shouldNotAttemptMoreThanOneConnectionOnSuccess() { | ||
CassandraClientPool cassandraClientPool = clientPoolWithServersInCurrentPool(ImmutableSet.of(HOST_1)); | ||
cassandraClientPool.runWithRetryOnHost(HOST_1, input -> null); | ||
cassandraClientPool.runWithRetryOnHost(HOST_1, noOp()); | ||
verifyNumberOfAttemptsOnHost(HOST_1, cassandraClientPool, 1); | ||
} | ||
|
||
|
@@ -342,16 +342,16 @@ private void setFailureModeForHost(CassandraClientPoolingContainer poolingContai | |
} | ||
|
||
private void runNoopOnHost(InetSocketAddress host, CassandraClientPool pool) { | ||
pool.runOnHost(host, input -> null); | ||
pool.runOnHost(host, noOp()); | ||
} | ||
|
||
private void runNoopWithRetryOnHost(InetSocketAddress host, CassandraClientPool pool) { | ||
pool.runWithRetryOnHost(host, input -> null); | ||
pool.runWithRetryOnHost(host, noOp()); | ||
} | ||
|
||
private void runNoopOnHostWithException(InetSocketAddress host, CassandraClientPool pool) { | ||
try { | ||
pool.runOnHost(host, input -> null); | ||
pool.runOnHost(host, noOp()); | ||
fail(); | ||
} catch (Exception e) { | ||
// expected | ||
|
@@ -360,13 +360,27 @@ private void runNoopOnHostWithException(InetSocketAddress host, CassandraClientP | |
|
||
private void runNoopOnHostWithRetryWithException(InetSocketAddress host, CassandraClientPool pool) { | ||
try { | ||
pool.runWithRetryOnHost(host, input -> null); | ||
pool.runWithRetryOnHost(host, noOp()); | ||
fail(); | ||
} catch (Exception e) { | ||
// expected | ||
} | ||
} | ||
|
||
private FunctionCheckedException<Cassandra.Client, Object, RuntimeException> noOp() { | ||
return new FunctionCheckedException<Cassandra.Client, Object, RuntimeException>() { | ||
@Override | ||
public Object apply(Cassandra.Client input) throws RuntimeException { | ||
return null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this should probably return
|
||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "no-op"; | ||
} | ||
}; | ||
} | ||
|
||
private void verifyAggregateFailureMetrics( | ||
double requestFailureProportion, | ||
double requestConnectionExceptionProportion) { | ||
|
@@ -378,18 +392,6 @@ private void verifyAggregateFailureMetrics( | |
requestConnectionExceptionProportion); | ||
} | ||
|
||
private void verifyFailureMetricsOnHost( | ||
InetSocketAddress host, | ||
double requestFailureProportion, | ||
double requestConnectionExceptionProportion) { | ||
assertEquals( | ||
getMetricValueFromHostAndMetricName(host.getHostString(), "requestFailureProportion"), | ||
requestFailureProportion); | ||
assertEquals( | ||
getMetricValueFromHostAndMetricName(host.getHostString(), "requestConnectionExceptionProportion"), | ||
requestConnectionExceptionProportion); | ||
} | ||
|
||
private void verifyBlacklistMetric(Integer expectedSize) { | ||
assertEquals(getAggregateMetricValueForMetricName("numBlacklistedHosts"), expectedSize); | ||
} | ||
|
@@ -398,10 +400,4 @@ private Object getAggregateMetricValueForMetricName(String metricName) { | |
String fullyQualifiedMetricName = MetricRegistry.name(CassandraClientPool.class, metricName); | ||
return metricRegistry.getGauges().get(fullyQualifiedMetricName).getValue(); | ||
} | ||
|
||
private Object getMetricValueFromHostAndMetricName(String hostname, String metricName) { | ||
String fullyQualifiedMetricName = MetricRegistry.name(CassandraClientPool.class, hostname, metricName); | ||
return metricRegistry.getGauges().get(fullyQualifiedMetricName).getValue(); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
getTypeName
is a new one, should be fine thoughe.getClass().getName()
ore.getClass().getCanonicalName()
might be simpler