Skip to content

Commit

Permalink
feat(redis): Retry logic on failed connections in cache & task reposi…
Browse files Browse the repository at this point in the history
…tory (#2389)
  • Loading branch information
robzienert authored Feb 23, 2018
1 parent bebfa2a commit c7eb87c
Show file tree
Hide file tree
Showing 27 changed files with 193 additions and 437 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ allprojects {
apply plugin: 'groovy'

ext {
spinnakerDependenciesVersion = project.hasProperty('spinnakerDependenciesVersion') ? project.property('spinnakerDependenciesVersion') : '0.142.1'
spinnakerDependenciesVersion = project.hasProperty('spinnakerDependenciesVersion') ? project.property('spinnakerDependenciesVersion') : '0.142.2'
}

def checkLocalVersions = [spinnakerDependenciesVersion: spinnakerDependenciesVersion]
Expand Down
6 changes: 1 addition & 5 deletions cats/cats-dynomite/cats-dynomite.gradle
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
dependencies {
compile project(':cats:cats-redis')
compile('com.netflix.dyno:dyno-jedis:1.6.0', {
exclude group: 'joda-time'
exclude group: 'org.apache.httpcomponents'
exclude group: 'org.slf4j'
})
compile("com.netflix.spinnaker.kork:kork-dynomite:${spinnaker.version("kork")}")
compile('net.jodah:failsafe:1.0.4')

testCompile project(':cats:cats-test')
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import com.netflix.spinnaker.cats.cache.DefaultCacheData;
import com.netflix.spinnaker.cats.compression.CompressionStrategy;
import com.netflix.spinnaker.cats.compression.NoopCompression;
import com.netflix.spinnaker.cats.dynomite.DynomiteClientDelegate;
import com.netflix.spinnaker.cats.dynomite.DynomiteClientDelegate.ClientDelegateException;
import com.netflix.spinnaker.cats.redis.cache.AbstractRedisCache;
import com.netflix.spinnaker.cats.redis.cache.RedisCacheOptions;
import com.netflix.spinnaker.kork.dynomite.DynomiteClientDelegate;
import com.netflix.spinnaker.kork.dynomite.DynomiteClientDelegate.ClientDelegateException;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
Expand All @@ -46,12 +46,14 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;

public class DynomiteCache extends AbstractRedisCache {
Expand Down Expand Up @@ -93,7 +95,7 @@ class NOOP implements CacheMetrics {}
private final Logger log = LoggerFactory.getLogger(getClass());

// TODO rz - Make retry policy configurable
private static final RetryPolicy redisRetryPolicy = new RetryPolicy()
private static final RetryPolicy REDIS_RETRY_POLICY = new RetryPolicy()
.retryOn(Arrays.asList(JedisException.class, DynoException.class, ClientDelegateException.class))
.withDelay(500, TimeUnit.MILLISECONDS)
.withMaxRetries(3);
Expand Down Expand Up @@ -129,9 +131,9 @@ public void mergeItems(String type, Collection<CacheData> items) {

Map<CacheData, Map<String, String>> allHashes = getAllHashes(type, items);
Failsafe
.with(redisRetryPolicy)
.onFailure(failure -> {
log.error("Encountered repeated failures while caching " + prefix + ":" + type, failure);
.with(REDIS_RETRY_POLICY)
.onRetriesExceeded(failure -> {
log.error("Encountered repeated failures while caching {}:{}, attempting cleanup", prefix, type, failure);
try {
redisClientDelegate.withPipeline(pipeline -> {
DynoJedisPipeline p = (DynoJedisPipeline) pipeline;
Expand All @@ -142,9 +144,9 @@ public void mergeItems(String type, Collection<CacheData> items) {
p.sync();
});
} catch (JedisException|DynoException e) {
log.error("Failed cleaning up hashes in failure handler in " + prefix + ":" + type, e);
log.error("Failed cleaning up hashes in failure handler in {}:{}", prefix, type, e);
}
throw new RuntimeException("Failed running caching agents", failure);
throw new ExcessiveDynoFailureRetries(format("Running cache agent %s:%s", prefix, type), failure);
})
.run(() -> redisClientDelegate.withPipeline(pipeline -> {
DynoJedisPipeline p = (DynoJedisPipeline) pipeline;
Expand Down Expand Up @@ -213,7 +215,10 @@ protected void evictItems(String type, List<String> identifiers, Collection<Stri
AtomicInteger sremOperations = new AtomicInteger();

Failsafe
.with(redisRetryPolicy)
.with(REDIS_RETRY_POLICY)
.onRetriesExceeded(failure -> {
throw new ExcessiveDynoFailureRetries(format("Evicting items for %s:%s", prefix, type), failure);
})
.run(() -> redisClientDelegate.withPipeline(pipeline -> {
DynoJedisPipeline p = (DynoJedisPipeline) pipeline;

Expand Down Expand Up @@ -253,21 +258,25 @@ protected Collection<CacheData> getItems(String type, List<String> ids, List<Str
}

AtomicInteger hmgetAllOperations = new AtomicInteger();
Map<String, Map<String, String>> rawItems = redisClientDelegate.withPipeline(pipeline -> {
DynoJedisPipeline p = (DynoJedisPipeline) pipeline;

Map<String, Response<Map<String, String>>> responses = new HashMap<>();
for (String id : ids) {
responses.put(id, pipeline.hgetAll(itemId(type, id)));
hmgetAllOperations.incrementAndGet();
}
p.sync();
Map<String, Map<String, String>> rawItems = Failsafe
.with(REDIS_RETRY_POLICY)
.onRetriesExceeded(failure -> {
throw new ExcessiveDynoFailureRetries(format("Getting items for %s:%s", prefix, type), failure);
})
.get(() -> redisClientDelegate.withPipeline(pipeline -> {
DynoJedisPipeline p = (DynoJedisPipeline) pipeline;

return responses.entrySet().stream()
.filter(e -> !e.getValue().get().isEmpty())
.collect(Collectors.toMap(Entry::getKey, it -> it.getValue().get()));
});
Map<String, Response<Map<String, String>>> responses = new HashMap<>();
for (String id : ids) {
responses.put(id, pipeline.hgetAll(itemId(type, id)));
hmgetAllOperations.incrementAndGet();
}
p.sync();

return responses.entrySet().stream()
.filter(e -> !e.getValue().get().isEmpty())
.collect(Collectors.toMap(Entry::getKey, it -> it.getValue().get()));
}));

Collection<CacheData> results = new ArrayList<>(ids.size());
for (Map.Entry<String, Map<String, String>> rawItem : rawItems.entrySet()) {
Expand Down Expand Up @@ -314,6 +323,13 @@ private CacheData extractHashedItem(String type, String id, Map<String, String>
}
}

@Override
protected Set<String> scanMembers(String setKey, Optional<String> glob) {
return Failsafe
.with(REDIS_RETRY_POLICY)
.get(() -> super.scanMembers(setKey, glob));
}

private static class MergeOp {
final Set<String> relNames;
final Map<String, String> valuesToSet;
Expand Down Expand Up @@ -386,17 +402,32 @@ private Map<CacheData, Map<String, String>> getAllHashes(String type, Collection
return new HashMap<>();
}

return redisClientDelegate.withPipeline(pipeline -> {
DynoJedisPipeline p = (DynoJedisPipeline) pipeline;
return Failsafe
.with(REDIS_RETRY_POLICY)
.onRetriesExceeded(failure -> {
throw new ExcessiveDynoFailureRetries(format("Getting all requested hashes for %s:%s", prefix, type), failure);
})
.get(() -> redisClientDelegate.withPipeline(pipeline -> {
DynoJedisPipeline p = (DynoJedisPipeline) pipeline;

Map<CacheData, Response<Map<String, String>>> responses = new HashMap<>();
for (CacheData item : items) {
responses.put(item, p.hgetAll(itemHashesId(type, item.getId())));
}
p.sync();
Map<CacheData, Response<Map<String, String>>> responses = new HashMap<>();
for (CacheData item : items) {
responses.put(item, p.hgetAll(itemHashesId(type, item.getId())));
}
p.sync();

return responses.entrySet().stream().collect(Collectors.toMap(Entry::getKey, it -> it.getValue().get()));
});
return responses.entrySet().stream().collect(Collectors.toMap(Entry::getKey, it -> it.getValue().get()));
}));
}

@Override
protected boolean isHashingDisabled(String type) {
return Failsafe
.with(REDIS_RETRY_POLICY)
.onRetriesExceeded(failure -> {
throw new ExcessiveDynoFailureRetries(format("Getting hashing flag for %s:%s", prefix, type), failure);
})
.get(() -> super.isHashingDisabled(type));
}

private int getHashExpiry() {
Expand All @@ -405,35 +436,41 @@ private int getHashExpiry() {
}

private String itemId(String type, String id) {
return String.format("{%s:%s}:%s", prefix, type, id);
return format("{%s:%s}:%s", prefix, type, id);
}

private String itemHashesId(String type, String id) {
return String.format("{%s:%s}:hashes:%s", prefix, type, id);
return format("{%s:%s}:hashes:%s", prefix, type, id);
}

@Override
protected String attributesId(String type, String id) {
return String.format("{%s:%s}:attributes:%s", prefix, type, id);
return format("{%s:%s}:attributes:%s", prefix, type, id);
}

@Override
protected String relationshipId(String type, String id, String relationship) {
return String.format("{%s:%s}:relationships:%s:%s", prefix, type, id, relationship);
return format("{%s:%s}:relationships:%s:%s", prefix, type, id, relationship);
}

@Override
protected String allRelationshipsId(String type) {
return String.format("{%s:%s}:relationships", prefix, type);
return format("{%s:%s}:relationships", prefix, type);
}

@Override
protected String allOfTypeId(String type) {
return String.format("{%s:%s}:members", prefix, type);
return format("{%s:%s}:members", prefix, type);
}

@Override
protected String allOfTypeReindex(String type) {
return String.format("{%s:%s}:members.2", prefix, type);
return format("{%s:%s}:members.2", prefix, type);
}

private static class ExcessiveDynoFailureRetries extends RuntimeException {
ExcessiveDynoFailureRetries(String message, Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.netflix.spinnaker.cats.cache.NamedCacheFactory;
import com.netflix.spinnaker.cats.cache.WriteableCache;
import com.netflix.spinnaker.cats.compression.CompressionStrategy;
import com.netflix.spinnaker.cats.dynomite.DynomiteClientDelegate;
import com.netflix.spinnaker.cats.dynomite.cache.DynomiteCache.CacheMetrics;
import com.netflix.spinnaker.cats.redis.cache.RedisCacheOptions;
import com.netflix.spinnaker.kork.dynomite.DynomiteClientDelegate;

import java.util.Optional;

Expand Down
Loading

0 comments on commit c7eb87c

Please sign in to comment.