Skip to content

Commit

Permalink
refactor(kubernetes): Move pod metrics to use KubernetesCacheData (#3830
Browse files Browse the repository at this point in the history
)

* refactor(kubernetes): Process pod metrics outside of parallel loop

We currently fetch pod metrics in a parallel stream because most
of the time is spent in IO waiting for kubectl to return the metrics.
We're also processing the metrics in parallel, which blocks moving
to the new KubernetesCacheData object as it is not threadsafe.

Rather than make it threadsafe (which would add some overhead and
complexity) just move the processing of the received metrics out
of the parallel stream and into a sequential stream. We'll still
fetch the metrics in a parallel stream, it's just the relatively
fast post-processing that will now be sequential.

* refactor(kubernetes): Move pod metrics to use KubernetesCacheData

Remove invertRelationships and dedupCacheData from the
KubernetesMetricCachingAgent by having it use KubernetesCacheData.
This removes that last two uses of these functions, which can now
be deleted.

* perf(kubernetes): Replace stratify function

After generating a Collection<CacheData> of kubernetes cache data,
we always then group this data by the CacheData's group and perform
some filtering of the data.

The grouping and filtering requires parsing the cache key again, so
it would be more efficient if we did this as part of KubernetesCacheData
before writing out the CacheData entries.
  • Loading branch information
ezimanyi committed Jul 1, 2019
1 parent bc4a064 commit 5f4ee56
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ public String getGroup() {

@EqualsAndHashCode(callSuper = true)
@Getter
@RequiredArgsConstructor
public static class MetricCacheKey extends CacheKey {
@Getter private static final Kind kind = KUBERNETES_METRIC;
private final KubernetesKind kubernetesKind;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
import com.netflix.spinnaker.cats.cache.DefaultCacheData;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.Keys;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKind;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import lombok.Value;

Expand Down Expand Up @@ -73,7 +68,24 @@ public void addRelationships(Keys.CacheKey a, Set<Keys.CacheKey> b) {

/** Return a List of CacheData entries representing the current items in the cache. */
public List<CacheData> toCacheData() {
return items.values().stream().map(CacheItem::toCacheData).collect(Collectors.toList());
return items.values().stream()
.filter(item -> !item.omitItem())
.map(CacheItem::toCacheData)
.collect(Collectors.toList());
}

/**
* Return a List of CacheData entries representing the current items in the cache, grouped by the
* item's group.
*/
public Map<String, Collection<CacheData>> toStratifiedCacheData() {
return items.values().stream()
.filter(item -> !item.omitItem())
.collect(
Collectors.groupingBy(
item -> item.key.getGroup(),
Collectors.mapping(
CacheItem::toCacheData, Collectors.toCollection(ArrayList::new))));
}

/**
Expand Down Expand Up @@ -105,6 +117,16 @@ private Map<String, Collection<String>> groupedRelationships() {
return groups;
}

/**
* given that we now have large caching agents that are authoritative for huge chunks of the
* cache, it's possible that some resources (like events) still point to deleted resources.
* These won't have any attributes, but if we add a cache entry here, the deleted item will
* still be cached
*/
public boolean omitItem() {
return key instanceof Keys.InfrastructureCacheKey && attributes.isEmpty();
}

/** Convert this CacheItem to its corresponding CacheData object */
public CacheData toCacheData() {
int ttlSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -109,21 +108,6 @@ public static Optional<Keys.CacheKey> convertAsArtifact(
return Optional.of(key);
}

public static Collection<CacheData> dedupCacheData(Collection<CacheData> input) {
Map<String, CacheData> cacheDataById = new HashMap<>();
for (CacheData cd : input) {
String id = cd.getId();
if (cacheDataById.containsKey(id)) {
CacheData other = cacheDataById.get(id);
cd = mergeCacheData(cd, other);
}

cacheDataById.put(id, cd);
}

return cacheDataById.values();
}

public static CacheData mergeCacheData(CacheData current, CacheData added) {
String id = current.getId();
Map<String, Object> attributes = new HashMap<>(current.getAttributes());
Expand All @@ -150,28 +134,21 @@ public static CacheData mergeCacheData(CacheData current, CacheData added) {
return defaultCacheData(id, ttl, attributes, relationships);
}

public static CacheData convertPodMetric(
String account, String namespace, KubernetesPodMetric podMetric) {
public static void convertPodMetric(
KubernetesCacheData kubernetesCacheData, String account, KubernetesPodMetric podMetric) {
String podName = podMetric.getPodName();
String namespace = podMetric.getNamespace();
Map<String, Object> attributes =
new ImmutableMap.Builder<String, Object>()
.put("name", podName)
.put("namespace", namespace)
.put("metrics", podMetric.getContainerMetrics())
.build();

Map<String, Collection<String>> relationships =
new HashMap<>(
new ImmutableMap.Builder<String, Collection<String>>()
.put(
POD.toString(),
Collections.singletonList(
Keys.InfrastructureCacheKey.createKey(POD, account, namespace, podName)))
.build());

String id = Keys.MetricCacheKey.createKey(POD, account, namespace, podName);

return defaultCacheData(id, infrastructureTtlSeconds, attributes, relationships);
Keys.CacheKey key = new Keys.MetricCacheKey(POD, account, namespace, podName);
kubernetesCacheData.addItem(key, attributes);
kubernetesCacheData.addRelationship(
key, new Keys.InfrastructureCacheKey(POD, account, namespace, podName));
}

public static void convertAsResource(
Expand Down Expand Up @@ -253,10 +230,6 @@ public static KubernetesManifest getManifest(CacheData cacheData) {
return mapper.convertValue(cacheData.getAttributes().get("manifest"), KubernetesManifest.class);
}

public static Moniker getMoniker(CacheData cacheData) {
return mapper.convertValue(cacheData.getAttributes().get("moniker"), Moniker.class);
}

public static KubernetesManifest convertToManifest(Object o) {
return mapper.convertValue(o, KubernetesManifest.class);
}
Expand Down Expand Up @@ -316,64 +289,6 @@ static Set<Keys.CacheKey> ownerReferenceRelationships(
.collect(Collectors.toSet());
}

/**
* To ensure the entire relationship graph is bidirectional, invert any relationship entries here
* to point back at the resource being cached (key).
*/
static List<CacheData> invertRelationships(List<CacheData> resourceData) {
Map<String, Set<String>> inverted = new HashMap<>();
resourceData.forEach(
cacheData ->
cacheData.getRelationships().values().stream()
.flatMap(Collection::stream)
.forEach(
r -> inverted.computeIfAbsent(r, k -> new HashSet<>()).add(cacheData.getId())));

return inverted.entrySet().stream()
.map(e -> KubernetesCacheDataConverter.buildInverseRelationship(e.getKey(), e.getValue()))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

private static Optional<CacheData> buildInverseRelationship(
String key, Set<String> relationshipKeys) {
Map<String, Collection<String>> relationships = new HashMap<>();
for (String relationshipKey : relationshipKeys) {
Keys.CacheKey parsedKey =
Keys.parseKey(relationshipKey)
.orElseThrow(
() ->
new IllegalStateException(
"Cache data produced with illegal key format " + relationshipKey));
relationships
.computeIfAbsent(parsedKey.getGroup(), k -> new HashSet<>())
.add(relationshipKey);
}

/*
* Worth noting the strange behavior here. If we are inverting a relationship to create a cache data for
* either a cluster or an application we need to insert attributes to ensure the cache data gets entered into
* the cache. If we are caching anything else, we don't want competing agents to overwrite attributes, so
* we leave them blank.
*/
return Keys.parseKey(key)
.map(
k -> {
Map<String, Object> attributes;
int ttl;
if (Keys.LogicalKind.isLogicalGroup(k.getGroup())) {
ttl = logicalTtlSeconds;
attributes =
new ImmutableMap.Builder<String, Object>().put("name", k.getName()).build();
} else {
ttl = infrastructureTtlSeconds;
attributes = new HashMap<>();
}
return defaultCacheData(key, ttl, attributes, relationships);
});
}

static void logStratifiedCacheData(
String agentType, Map<String, Collection<CacheData>> stratifiedCacheData) {
for (Map.Entry<String, Collection<CacheData>> entry : stratifiedCacheData.entrySet()) {
Expand Down Expand Up @@ -417,46 +332,4 @@ static int relationshipCount(CacheData data) {
.map(Collection::size)
.reduce(0, (a, b) -> a + b);
}

@Builder
private static class CacheDataKeyPair {
Keys.CacheKey key;
CacheData cacheData;
}

static Map<String, Collection<CacheData>> stratifyCacheDataByGroup(
Collection<CacheData> ungroupedCacheData) {
return ungroupedCacheData.stream()
.map(
cd ->
CacheDataKeyPair.builder()
.cacheData(cd)
.key(
Keys.parseKey(cd.getId())
.orElseThrow(
() ->
new IllegalStateException(
"Cache data produced with illegal key format "
+ cd.getId())))
.build())
.filter(
kp -> {
// given that we now have large caching agents that are authoritative for huge chunks
// of the cache,
// it's possible that some resources (like events) still point to deleted resources.
// these won't have
// any attributes, but if we add a cache entry here, the deleted item will still be
// cached
if (kp.key instanceof Keys.InfrastructureCacheKey) {
return !(kp.cacheData.getAttributes() == null
|| kp.cacheData.getAttributes().isEmpty());
} else {
return true;
}
})
.collect(
Collectors.groupingBy(
kp -> kp.key.getGroup(),
Collectors.mapping(kp -> kp.cacheData, Collectors.toCollection(ArrayList::new))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netflix.spinnaker.clouddriver.kubernetes.KubernetesCloudProvider;
import com.netflix.spinnaker.clouddriver.kubernetes.caching.KubernetesCachingAgent;
import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesPodMetric;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job.KubectlJobExecutor;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.security.KubernetesV2Credentials;
import java.util.Collection;
Expand Down Expand Up @@ -69,15 +70,13 @@ public CacheResult loadData(ProviderCache providerCache) {
log.info(getAgentType() + ": agent is starting");
reloadNamespaces();

KubernetesCacheData kubernetesCacheData = new KubernetesCacheData();
List<CacheData> cacheData =
List<KubernetesPodMetric> podMetrics =
namespaces
.parallelStream()
.map(
n -> {
try {
return credentials.topPod(n, null).stream()
.map(m -> KubernetesCacheDataConverter.convertPodMetric(accountName, n, m));
return credentials.topPod(n, null);
} catch (KubectlJobExecutor.KubectlException e) {
if (e.getMessage().contains("not available")) {
log.warn(
Expand All @@ -87,24 +86,23 @@ public CacheResult loadData(ProviderCache providerCache) {
+ accountName
+ "' have not been recorded yet.",
getAgentType());
return null;
return Collections.<KubernetesPodMetric>emptyList();
} else {
throw e;
}
}
})
.flatMap(Collection::stream)
.filter(Objects::nonNull)
.flatMap(x -> x)
.collect(Collectors.toList());

List<CacheData> invertedRelationships =
KubernetesCacheDataConverter.invertRelationships(cacheData);

cacheData.addAll(invertedRelationships);
KubernetesCacheData kubernetesCacheData = new KubernetesCacheData();
podMetrics.forEach(
metric ->
KubernetesCacheDataConverter.convertPodMetric(
kubernetesCacheData, accountName, metric));

Map<String, Collection<CacheData>> entries =
KubernetesCacheDataConverter.stratifyCacheDataByGroup(
KubernetesCacheDataConverter.dedupCacheData(cacheData));
Map<String, Collection<CacheData>> entries = kubernetesCacheData.toStratifiedCacheData();
KubernetesCacheDataConverter.logStratifiedCacheData(getAgentType(), entries);

return new DefaultCacheResult(entries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,7 @@ protected CacheResult buildCacheResult(Map<KubernetesKind, List<KubernetesManife
}
});

List<CacheData> resourceData = kubernetesCacheData.toCacheData();

Map<String, Collection<CacheData>> entries =
KubernetesCacheDataConverter.stratifyCacheDataByGroup(resourceData);
Map<String, Collection<CacheData>> entries = kubernetesCacheData.toStratifiedCacheData();
KubernetesCacheDataConverter.logStratifiedCacheData(getAgentType(), entries);

return new DefaultCacheResult(entries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.*;

@Data
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class KubernetesPodMetric {
private String podName;
@Builder.Default private List<ContainerMetric> containerMetrics = new ArrayList<>();
private final String podName;
private final String namespace;
@Builder.Default private final List<ContainerMetric> containerMetrics = new ArrayList<>();

@Data
@Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ public Collection<KubernetesPodMetric> topPod(
podName,
KubernetesPodMetric.builder()
.podName(podName)
.namespace(namespace)
.containerMetrics(new ArrayList<>())
.build());

Expand Down
Loading

0 comments on commit 5f4ee56

Please sign in to comment.