Skip to content

Commit

Permalink
fix(kubernetes): Fix all agents caching cluster-scoped resources (#4128)
Browse files Browse the repository at this point in the history
* refactor(kubernetes): Minor refactor of core caching agent test

Pull some common logic out of ProcessOnDemandResult so that it can
be reused when we need to process the results of a loadData request
in an upcoming commit.

* test(kubernetes): Add tests to loadData in core caching agent

There are no tests of loadData() in the KubernetesCoreCachingAgent;
add a simple test that validates a namespaced and cluster-scoped
kind can be succesfully cached (ie, is returned from loadData and
is persisted to the cache).

* refactor(kubernetes): Immutable collections and nonnull annotations

Make a few collections returned by the kubernetes caching agent
immutable, and add some nonnull annotations that simplify the
work of calling code.

* refactor(kubernetes): Split caching of resources by scope

This commit splits the work to cache kubernetes objects into two
functions: the first caches namespace-scoped objects for all
namespaces relevant for the caching agent, and the second caches
all cluster-scoped resources.

This means that we'll no longer try to read the cluster-scoped
resources once per namespace and will read them once per caching
agent.

* fix(kubernetes): Fix all agents caching cluster-scoped resources

Currently all caching agents are caching cluster-scoped resources;
this should be delegated to a single agent so that we don't duplicate
work.

* fix(kubernetes): Remove duplicate test assertions

I had intended the second set of assertions to be testing the
contents of the cache. But it loadData doesn't actually
update the cache (it just returns results to an outer function that
stores the result) so we can't test that the cache is updated after
calling loadData. Just remove the extra assertions.
  • Loading branch information
ezimanyi committed Oct 29, 2019
1 parent 58d3b68 commit 6ab4288
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.BufferedReader;
import java.io.IOException;
import javax.annotation.Nonnull;

/**
* Transforms a stream into an object of arbitrary type using a supplied BufferReader for the
Expand All @@ -26,5 +27,6 @@
* <p>Implementations are responsible for closing the supplied BufferReader.
*/
public interface ReaderConsumer<T> {
@Nonnull
T consume(BufferedReader r) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSetMultimap;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.cats.agent.AgentIntervalAware;
import com.netflix.spinnaker.cats.agent.CacheResult;
Expand All @@ -31,6 +32,8 @@
import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.RegistryUtils;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKind;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKindProperties;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKindProperties.ResourceScope;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesManifest;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job.KubectlJobExecutor;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job.KubectlJobExecutor.KubectlException;
Expand All @@ -40,8 +43,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -74,27 +80,61 @@ protected Map<String, Object> defaultIntrospectionDetails() {

protected abstract List<KubernetesKind> primaryKinds();

private ImmutableList<KubernetesManifest> loadResources(
@Nonnull Iterable<KubernetesKind> kubernetesKinds, Optional<String> optionalNamespace) {
String namespace = optionalNamespace.orElse(null);
try {
return credentials.list(ImmutableList.copyOf(kubernetesKinds), namespace);
} catch (KubectlException e) {
log.warn(
"{}: Failed to read kind {} from namespace {}: {}",
getAgentType(),
kubernetesKinds,
namespace,
e.getMessage());
throw e;
}
}

@Nonnull
private ImmutableList<KubernetesManifest> loadNamespaceScopedResources(
@Nonnull Iterable<KubernetesKind> kubernetesKinds) {
return getNamespaces()
.parallelStream()
.map(n -> loadResources(kubernetesKinds, Optional.of(n)))
.flatMap(Collection::stream)
.collect(ImmutableList.toImmutableList());
}

@Nonnull
private ImmutableList<KubernetesManifest> loadClusterScopedResources(
@Nonnull Iterable<KubernetesKind> kubernetesKinds) {
if (handleClusterScopedResources()) {
return loadResources(kubernetesKinds, Optional.empty());
} else {
return ImmutableList.of();
}
}

private ImmutableSetMultimap<ResourceScope, KubernetesKind> primaryKindsByScope() {
return primaryKinds().stream()
.collect(
ImmutableSetMultimap.toImmutableSetMultimap(
k -> credentials.getKindRegistry().getKindProperties(k).getResourceScope(),
Function.identity()));
}

protected Map<KubernetesKind, List<KubernetesManifest>> loadPrimaryResourceList() {
List<KubernetesKind> primaryKinds = primaryKinds();
ImmutableSetMultimap<ResourceScope, KubernetesKind> kindsByScope = primaryKindsByScope();

Map<KubernetesKind, List<KubernetesManifest>> result =
getNamespaces()
.parallelStream()
.map(
n -> {
try {
return credentials.list(primaryKinds, n);
} catch (KubectlException e) {
log.warn(
"{}: Failed to read kind {} from namespace {}: {}",
getAgentType(),
primaryKinds,
n,
e.getMessage());
throw e;
}
})
.filter(Objects::nonNull)
.flatMap(Collection::stream)
Stream.concat(
loadClusterScopedResources(
kindsByScope.get(KubernetesKindProperties.ResourceScope.CLUSTER))
.stream(),
loadNamespaceScopedResources(
kindsByScope.get(KubernetesKindProperties.ResourceScope.NAMESPACE))
.stream())
.collect(Collectors.groupingBy(KubernetesManifest::getKind));

for (KubernetesCachingPolicy policy : credentials.getCachingPolicies()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,13 @@ public static KubernetesKindProperties fromCustomResourceDefinition(
public boolean hasClusterRelationship() {
return this.hasClusterRelationship;
}

public ResourceScope getResourceScope() {
return isNamespaced ? ResourceScope.NAMESPACE : ResourceScope.CLUSTER;
}

public enum ResourceScope {
CLUSTER,
NAMESPACE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job;

import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.stream.JsonReader;
Expand All @@ -39,6 +40,7 @@
import java.io.EOFException;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -397,7 +399,8 @@ public KubernetesManifest get(
}
}

public List<KubernetesManifest> eventsFor(
@Nonnull
public ImmutableList<KubernetesManifest> eventsFor(
KubernetesV2Credentials credentials, KubernetesKind kind, String namespace, String name) {
List<String> command =
kubectlNamespacedGet(
Expand All @@ -408,7 +411,7 @@ public List<KubernetesManifest> eventsFor(
"involvedObject.name=%s,involvedObject.kind=%s",
name, StringUtils.capitalize(kind.toString())));

JobResult<List<KubernetesManifest>> status =
JobResult<ImmutableList<KubernetesManifest>> status =
jobExecutor.runJob(new JobRequest(command), parseManifestList());

if (status.getResult() != JobResult.Result.SUCCESS) {
Expand All @@ -421,13 +424,14 @@ public List<KubernetesManifest> eventsFor(
}

if (status.getError().contains("No resources found")) {
return new ArrayList<>();
return ImmutableList.of();
}

return status.getOutput();
}

public List<KubernetesManifest> list(
@Nonnull
public ImmutableList<KubernetesManifest> list(
KubernetesV2Credentials credentials,
List<KubernetesKind> kinds,
String namespace,
Expand All @@ -437,7 +441,7 @@ public List<KubernetesManifest> list(
command.add("-l=" + selectors.toString());
}

JobResult<List<KubernetesManifest>> status =
JobResult<ImmutableList<KubernetesManifest>> status =
jobExecutor.runJob(new JobRequest(command), parseManifestList());

if (status.getResult() != JobResult.Result.SUCCESS) {
Expand All @@ -450,7 +454,7 @@ public List<KubernetesManifest> list(
}

if (status.getError().contains("No resources found")) {
return new ArrayList<>();
return ImmutableList.of();
}

return status.getOutput();
Expand Down Expand Up @@ -779,16 +783,16 @@ private Void patch(
return null;
}

private ReaderConsumer<List<KubernetesManifest>> parseManifestList() {
private ReaderConsumer<ImmutableList<KubernetesManifest>> parseManifestList() {
return (BufferedReader r) -> {
try (JsonReader reader = new JsonReader(r)) {
List<KubernetesManifest> manifestList = new ArrayList<>();
try {
reader.beginObject();
} catch (EOFException e) {
// If the stream we're parsing is empty, just return an empty list
return manifestList;
return ImmutableList.of();
}
ImmutableList.Builder<KubernetesManifest> manifestList = new ImmutableList.Builder<>();
while (reader.hasNext()) {
if (reader.nextName().equals("items")) {
reader.beginArray();
Expand All @@ -802,7 +806,7 @@ private ReaderConsumer<List<KubernetesManifest>> parseManifestList() {
}
}
reader.endObject();
return manifestList;
return manifestList.build();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ public KubernetesManifest get(KubernetesKind kind, String namespace, String name
"get", kind, namespace, () -> jobExecutor.get(this, kind, namespace, name));
}

public List<KubernetesManifest> list(KubernetesKind kind, String namespace) {
@Nonnull
public ImmutableList<KubernetesManifest> list(KubernetesKind kind, String namespace) {
return runAndRecordMetrics(
"list",
kind,
Expand All @@ -415,7 +416,8 @@ public List<KubernetesManifest> list(KubernetesKind kind, String namespace) {
this, Collections.singletonList(kind), namespace, new KubernetesSelectorList()));
}

public List<KubernetesManifest> list(
@Nonnull
public ImmutableList<KubernetesManifest> list(
KubernetesKind kind, String namespace, KubernetesSelectorList selectors) {
return runAndRecordMetrics(
"list",
Expand All @@ -424,9 +426,10 @@ public List<KubernetesManifest> list(
() -> jobExecutor.list(this, Collections.singletonList(kind), namespace, selectors));
}

public List<KubernetesManifest> list(List<KubernetesKind> kinds, String namespace) {
@Nonnull
public ImmutableList<KubernetesManifest> list(List<KubernetesKind> kinds, String namespace) {
if (kinds.isEmpty()) {
return new ArrayList<>();
return ImmutableList.of();
} else {
return runAndRecordMetrics(
"list",
Expand All @@ -436,7 +439,9 @@ public List<KubernetesManifest> list(List<KubernetesKind> kinds, String namespac
}
}

public List<KubernetesManifest> eventsFor(KubernetesKind kind, String namespace, String name) {
@Nonnull
public ImmutableList<KubernetesManifest> eventsFor(
KubernetesKind kind, String namespace, String name) {
return runAndRecordMetrics(
"list",
KubernetesKind.EVENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.spinnaker.clouddriver.kubernetes.v2.security

import com.google.common.collect.ImmutableList
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.clouddriver.kubernetes.config.KubernetesConfigurationProperties
Expand Down Expand Up @@ -117,11 +118,11 @@ class KubernetesV2CredentialsSpec extends Specification {
namespaces: [NAMESPACE],
checkPermissionsOnStartup: true,
))
kubectlJobExecutor.list(_ as KubernetesV2Credentials, [KubernetesKind.DEPLOYMENT], NAMESPACE, _ as KubernetesSelectorList) >> {
kubectlJobExecutor.list(_ as KubernetesV2Credentials, ImmutableList.of(KubernetesKind.DEPLOYMENT), NAMESPACE, _ as KubernetesSelectorList) >> {
throw new KubectlJobExecutor.KubectlException("Error", new Exception())
}
kubectlJobExecutor.list(_ as KubernetesV2Credentials, [KubernetesKind.REPLICA_SET], NAMESPACE, _ as KubernetesSelectorList) >> {
return Collections.emptyList()
kubectlJobExecutor.list(_ as KubernetesV2Credentials, ImmutableList.of(KubernetesKind.REPLICA_SET), NAMESPACE, _ as KubernetesSelectorList) >> {
return ImmutableList.of()
}

expect:
Expand Down
Loading

0 comments on commit 6ab4288

Please sign in to comment.