Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kubernetes): Fix all agents caching cluster-scoped resources #4128

Merged
merged 6 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.BufferedReader;
import java.io.IOException;
import javax.annotation.Nonnull;

/**
* Transforms a stream into an object of arbitrary type using a supplied BufferReader for the
Expand All @@ -26,5 +27,6 @@
* <p>Implementations are responsible for closing the supplied BufferReader.
*/
public interface ReaderConsumer<T> {
@Nonnull
T consume(BufferedReader r) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSetMultimap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoah, new favorite Pokemon!

import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.cats.agent.AgentIntervalAware;
import com.netflix.spinnaker.cats.agent.CacheResult;
Expand All @@ -31,6 +32,8 @@
import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.RegistryUtils;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKind;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKindProperties;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKindProperties.ResourceScope;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesManifest;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job.KubectlJobExecutor;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job.KubectlJobExecutor.KubectlException;
Expand All @@ -40,8 +43,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -74,27 +80,61 @@ protected Map<String, Object> defaultIntrospectionDetails() {

protected abstract List<KubernetesKind> primaryKinds();

private ImmutableList<KubernetesManifest> loadResources(
@Nonnull Iterable<KubernetesKind> kubernetesKinds, Optional<String> optionalNamespace) {
String namespace = optionalNamespace.orElse(null);
try {
return credentials.list(ImmutableList.copyOf(kubernetesKinds), namespace);
} catch (KubectlException e) {
log.warn(
"{}: Failed to read kind {} from namespace {}: {}",
getAgentType(),
kubernetesKinds,
namespace,
e.getMessage());
throw e;
}
}

@Nonnull
private ImmutableList<KubernetesManifest> loadNamespaceScopedResources(
@Nonnull Iterable<KubernetesKind> kubernetesKinds) {
return getNamespaces()
.parallelStream()
.map(n -> loadResources(kubernetesKinds, Optional.of(n)))
.flatMap(Collection::stream)
.collect(ImmutableList.toImmutableList());
}

@Nonnull
private ImmutableList<KubernetesManifest> loadClusterScopedResources(
@Nonnull Iterable<KubernetesKind> kubernetesKinds) {
if (handleClusterScopedResources()) {
return loadResources(kubernetesKinds, Optional.empty());
} else {
return ImmutableList.of();
}
}

private ImmutableSetMultimap<ResourceScope, KubernetesKind> primaryKindsByScope() {
return primaryKinds().stream()
.collect(
ImmutableSetMultimap.toImmutableSetMultimap(
k -> credentials.getKindRegistry().getKindProperties(k).getResourceScope(),
Function.identity()));
}

protected Map<KubernetesKind, List<KubernetesManifest>> loadPrimaryResourceList() {
List<KubernetesKind> primaryKinds = primaryKinds();
ImmutableSetMultimap<ResourceScope, KubernetesKind> kindsByScope = primaryKindsByScope();

Map<KubernetesKind, List<KubernetesManifest>> result =
getNamespaces()
.parallelStream()
.map(
n -> {
try {
return credentials.list(primaryKinds, n);
} catch (KubectlException e) {
log.warn(
"{}: Failed to read kind {} from namespace {}: {}",
getAgentType(),
primaryKinds,
n,
e.getMessage());
throw e;
}
})
.filter(Objects::nonNull)
.flatMap(Collection::stream)
Stream.concat(
loadClusterScopedResources(
kindsByScope.get(KubernetesKindProperties.ResourceScope.CLUSTER))
.stream(),
loadNamespaceScopedResources(
kindsByScope.get(KubernetesKindProperties.ResourceScope.NAMESPACE))
.stream())
.collect(Collectors.groupingBy(KubernetesManifest::getKind));

for (KubernetesCachingPolicy policy : credentials.getCachingPolicies()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,13 @@ public static KubernetesKindProperties fromCustomResourceDefinition(
public boolean hasClusterRelationship() {
return this.hasClusterRelationship;
}

public ResourceScope getResourceScope() {
return isNamespaced ? ResourceScope.NAMESPACE : ResourceScope.CLUSTER;
}

public enum ResourceScope {
CLUSTER,
NAMESPACE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job;

import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.stream.JsonReader;
Expand All @@ -39,6 +40,7 @@
import java.io.EOFException;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -397,7 +399,8 @@ public KubernetesManifest get(
}
}

public List<KubernetesManifest> eventsFor(
@Nonnull
public ImmutableList<KubernetesManifest> eventsFor(
KubernetesV2Credentials credentials, KubernetesKind kind, String namespace, String name) {
List<String> command =
kubectlNamespacedGet(
Expand All @@ -408,7 +411,7 @@ public List<KubernetesManifest> eventsFor(
"involvedObject.name=%s,involvedObject.kind=%s",
name, StringUtils.capitalize(kind.toString())));

JobResult<List<KubernetesManifest>> status =
JobResult<ImmutableList<KubernetesManifest>> status =
jobExecutor.runJob(new JobRequest(command), parseManifestList());

if (status.getResult() != JobResult.Result.SUCCESS) {
Expand All @@ -421,13 +424,14 @@ public List<KubernetesManifest> eventsFor(
}

if (status.getError().contains("No resources found")) {
return new ArrayList<>();
return ImmutableList.of();
}

return status.getOutput();
}

public List<KubernetesManifest> list(
@Nonnull
public ImmutableList<KubernetesManifest> list(
KubernetesV2Credentials credentials,
List<KubernetesKind> kinds,
String namespace,
Expand All @@ -437,7 +441,7 @@ public List<KubernetesManifest> list(
command.add("-l=" + selectors.toString());
}

JobResult<List<KubernetesManifest>> status =
JobResult<ImmutableList<KubernetesManifest>> status =
jobExecutor.runJob(new JobRequest(command), parseManifestList());

if (status.getResult() != JobResult.Result.SUCCESS) {
Expand All @@ -450,7 +454,7 @@ public List<KubernetesManifest> list(
}

if (status.getError().contains("No resources found")) {
return new ArrayList<>();
return ImmutableList.of();
}

return status.getOutput();
Expand Down Expand Up @@ -779,16 +783,16 @@ private Void patch(
return null;
}

private ReaderConsumer<List<KubernetesManifest>> parseManifestList() {
private ReaderConsumer<ImmutableList<KubernetesManifest>> parseManifestList() {
return (BufferedReader r) -> {
try (JsonReader reader = new JsonReader(r)) {
List<KubernetesManifest> manifestList = new ArrayList<>();
try {
reader.beginObject();
} catch (EOFException e) {
// If the stream we're parsing is empty, just return an empty list
return manifestList;
return ImmutableList.of();
}
ImmutableList.Builder<KubernetesManifest> manifestList = new ImmutableList.Builder<>();
while (reader.hasNext()) {
if (reader.nextName().equals("items")) {
reader.beginArray();
Expand All @@ -802,7 +806,7 @@ private ReaderConsumer<List<KubernetesManifest>> parseManifestList() {
}
}
reader.endObject();
return manifestList;
return manifestList.build();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ public KubernetesManifest get(KubernetesKind kind, String namespace, String name
"get", kind, namespace, () -> jobExecutor.get(this, kind, namespace, name));
}

public List<KubernetesManifest> list(KubernetesKind kind, String namespace) {
@Nonnull
public ImmutableList<KubernetesManifest> list(KubernetesKind kind, String namespace) {
return runAndRecordMetrics(
"list",
kind,
Expand All @@ -415,7 +416,8 @@ public List<KubernetesManifest> list(KubernetesKind kind, String namespace) {
this, Collections.singletonList(kind), namespace, new KubernetesSelectorList()));
}

public List<KubernetesManifest> list(
@Nonnull
public ImmutableList<KubernetesManifest> list(
KubernetesKind kind, String namespace, KubernetesSelectorList selectors) {
return runAndRecordMetrics(
"list",
Expand All @@ -424,9 +426,10 @@ public List<KubernetesManifest> list(
() -> jobExecutor.list(this, Collections.singletonList(kind), namespace, selectors));
}

public List<KubernetesManifest> list(List<KubernetesKind> kinds, String namespace) {
@Nonnull
public ImmutableList<KubernetesManifest> list(List<KubernetesKind> kinds, String namespace) {
if (kinds.isEmpty()) {
return new ArrayList<>();
return ImmutableList.of();
} else {
return runAndRecordMetrics(
"list",
Expand All @@ -436,7 +439,9 @@ public List<KubernetesManifest> list(List<KubernetesKind> kinds, String namespac
}
}

public List<KubernetesManifest> eventsFor(KubernetesKind kind, String namespace, String name) {
@Nonnull
public ImmutableList<KubernetesManifest> eventsFor(
KubernetesKind kind, String namespace, String name) {
return runAndRecordMetrics(
"list",
KubernetesKind.EVENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.spinnaker.clouddriver.kubernetes.v2.security

import com.google.common.collect.ImmutableList
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.clouddriver.kubernetes.config.KubernetesConfigurationProperties
Expand Down Expand Up @@ -117,11 +118,11 @@ class KubernetesV2CredentialsSpec extends Specification {
namespaces: [NAMESPACE],
checkPermissionsOnStartup: true,
))
kubectlJobExecutor.list(_ as KubernetesV2Credentials, [KubernetesKind.DEPLOYMENT], NAMESPACE, _ as KubernetesSelectorList) >> {
kubectlJobExecutor.list(_ as KubernetesV2Credentials, ImmutableList.of(KubernetesKind.DEPLOYMENT), NAMESPACE, _ as KubernetesSelectorList) >> {
throw new KubectlJobExecutor.KubectlException("Error", new Exception())
}
kubectlJobExecutor.list(_ as KubernetesV2Credentials, [KubernetesKind.REPLICA_SET], NAMESPACE, _ as KubernetesSelectorList) >> {
return Collections.emptyList()
kubectlJobExecutor.list(_ as KubernetesV2Credentials, ImmutableList.of(KubernetesKind.REPLICA_SET), NAMESPACE, _ as KubernetesSelectorList) >> {
return ImmutableList.of()
}

expect:
Expand Down
Loading