diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/jobs/local/ReaderConsumer.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/jobs/local/ReaderConsumer.java index 70fc6951674..38d11bf009f 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/jobs/local/ReaderConsumer.java +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/jobs/local/ReaderConsumer.java @@ -18,6 +18,7 @@ import java.io.BufferedReader; import java.io.IOException; +import javax.annotation.Nonnull; /** * Transforms a stream into an object of arbitrary type using a supplied BufferReader for the @@ -26,5 +27,6 @@ *

Implementations are responsible for closing the supplied BufferReader. */ public interface ReaderConsumer { + @Nonnull T consume(BufferedReader r) throws IOException; } diff --git a/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2CachingAgent.java b/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2CachingAgent.java index f22b5c44eed..c77ab56d664 100644 --- a/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2CachingAgent.java +++ b/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2CachingAgent.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSetMultimap; import com.netflix.spectator.api.Registry; import com.netflix.spinnaker.cats.agent.AgentIntervalAware; import com.netflix.spinnaker.cats.agent.CacheResult; @@ -31,6 +32,8 @@ import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials; import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.RegistryUtils; import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKind; +import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKindProperties; +import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesKindProperties.ResourceScope; import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.manifest.KubernetesManifest; import com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job.KubectlJobExecutor; import com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job.KubectlJobExecutor.KubectlException; @@ -40,8 +43,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nonnull; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -74,27 +80,61 @@ protected Map defaultIntrospectionDetails() { protected abstract List primaryKinds(); + private ImmutableList loadResources( + @Nonnull Iterable kubernetesKinds, Optional optionalNamespace) { + String namespace = optionalNamespace.orElse(null); + try { + return credentials.list(ImmutableList.copyOf(kubernetesKinds), namespace); + } catch (KubectlException e) { + log.warn( + "{}: Failed to read kind {} from namespace {}: {}", + getAgentType(), + kubernetesKinds, + namespace, + e.getMessage()); + throw e; + } + } + + @Nonnull + private ImmutableList loadNamespaceScopedResources( + @Nonnull Iterable kubernetesKinds) { + return getNamespaces() + .parallelStream() + .map(n -> loadResources(kubernetesKinds, Optional.of(n))) + .flatMap(Collection::stream) + .collect(ImmutableList.toImmutableList()); + } + + @Nonnull + private ImmutableList loadClusterScopedResources( + @Nonnull Iterable kubernetesKinds) { + if (handleClusterScopedResources()) { + return loadResources(kubernetesKinds, Optional.empty()); + } else { + return ImmutableList.of(); + } + } + + private ImmutableSetMultimap primaryKindsByScope() { + return primaryKinds().stream() + .collect( + ImmutableSetMultimap.toImmutableSetMultimap( + k -> credentials.getKindRegistry().getKindProperties(k).getResourceScope(), + Function.identity())); + } + protected Map> loadPrimaryResourceList() { - List primaryKinds = primaryKinds(); + ImmutableSetMultimap kindsByScope = primaryKindsByScope(); + Map> result = - getNamespaces() - .parallelStream() - .map( - n -> { - try { - return credentials.list(primaryKinds, n); - } catch (KubectlException e) { - log.warn( - "{}: Failed to read kind {} from namespace {}: {}", - getAgentType(), - primaryKinds, - n, - e.getMessage()); - throw e; - } - }) - .filter(Objects::nonNull) - .flatMap(Collection::stream) + Stream.concat( + loadClusterScopedResources( + kindsByScope.get(KubernetesKindProperties.ResourceScope.CLUSTER)) + .stream(), + loadNamespaceScopedResources( + kindsByScope.get(KubernetesKindProperties.ResourceScope.NAMESPACE)) + .stream()) .collect(Collectors.groupingBy(KubernetesManifest::getKind)); for (KubernetesCachingPolicy policy : credentials.getCachingPolicies()) { diff --git a/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/description/manifest/KubernetesKindProperties.java b/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/description/manifest/KubernetesKindProperties.java index 840a97833ad..a309a66e590 100644 --- a/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/description/manifest/KubernetesKindProperties.java +++ b/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/description/manifest/KubernetesKindProperties.java @@ -99,4 +99,13 @@ public static KubernetesKindProperties fromCustomResourceDefinition( public boolean hasClusterRelationship() { return this.hasClusterRelationship; } + + public ResourceScope getResourceScope() { + return isNamespaced ? ResourceScope.NAMESPACE : ResourceScope.CLUSTER; + } + + public enum ResourceScope { + CLUSTER, + NAMESPACE; + } } diff --git a/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/op/job/KubectlJobExecutor.java b/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/op/job/KubectlJobExecutor.java index 64375e1824c..1c24dc13f9e 100644 --- a/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/op/job/KubectlJobExecutor.java +++ b/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/op/job/KubectlJobExecutor.java @@ -17,6 +17,7 @@ package com.netflix.spinnaker.clouddriver.kubernetes.v2.op.job; +import com.google.common.collect.ImmutableList; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import com.google.gson.stream.JsonReader; @@ -39,6 +40,7 @@ import java.io.EOFException; import java.util.*; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -397,7 +399,8 @@ public KubernetesManifest get( } } - public List eventsFor( + @Nonnull + public ImmutableList eventsFor( KubernetesV2Credentials credentials, KubernetesKind kind, String namespace, String name) { List command = kubectlNamespacedGet( @@ -408,7 +411,7 @@ public List eventsFor( "involvedObject.name=%s,involvedObject.kind=%s", name, StringUtils.capitalize(kind.toString()))); - JobResult> status = + JobResult> status = jobExecutor.runJob(new JobRequest(command), parseManifestList()); if (status.getResult() != JobResult.Result.SUCCESS) { @@ -421,13 +424,14 @@ public List eventsFor( } if (status.getError().contains("No resources found")) { - return new ArrayList<>(); + return ImmutableList.of(); } return status.getOutput(); } - public List list( + @Nonnull + public ImmutableList list( KubernetesV2Credentials credentials, List kinds, String namespace, @@ -437,7 +441,7 @@ public List list( command.add("-l=" + selectors.toString()); } - JobResult> status = + JobResult> status = jobExecutor.runJob(new JobRequest(command), parseManifestList()); if (status.getResult() != JobResult.Result.SUCCESS) { @@ -450,7 +454,7 @@ public List list( } if (status.getError().contains("No resources found")) { - return new ArrayList<>(); + return ImmutableList.of(); } return status.getOutput(); @@ -779,16 +783,16 @@ private Void patch( return null; } - private ReaderConsumer> parseManifestList() { + private ReaderConsumer> parseManifestList() { return (BufferedReader r) -> { try (JsonReader reader = new JsonReader(r)) { - List manifestList = new ArrayList<>(); try { reader.beginObject(); } catch (EOFException e) { // If the stream we're parsing is empty, just return an empty list - return manifestList; + return ImmutableList.of(); } + ImmutableList.Builder manifestList = new ImmutableList.Builder<>(); while (reader.hasNext()) { if (reader.nextName().equals("items")) { reader.beginArray(); @@ -802,7 +806,7 @@ private ReaderConsumer> parseManifestList() { } } reader.endObject(); - return manifestList; + return manifestList.build(); } }; } diff --git a/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2Credentials.java b/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2Credentials.java index 08d21e467e3..c177886f064 100644 --- a/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2Credentials.java +++ b/clouddriver-kubernetes-v2/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2Credentials.java @@ -405,7 +405,8 @@ public KubernetesManifest get(KubernetesKind kind, String namespace, String name "get", kind, namespace, () -> jobExecutor.get(this, kind, namespace, name)); } - public List list(KubernetesKind kind, String namespace) { + @Nonnull + public ImmutableList list(KubernetesKind kind, String namespace) { return runAndRecordMetrics( "list", kind, @@ -415,7 +416,8 @@ public List list(KubernetesKind kind, String namespace) { this, Collections.singletonList(kind), namespace, new KubernetesSelectorList())); } - public List list( + @Nonnull + public ImmutableList list( KubernetesKind kind, String namespace, KubernetesSelectorList selectors) { return runAndRecordMetrics( "list", @@ -424,9 +426,10 @@ public List list( () -> jobExecutor.list(this, Collections.singletonList(kind), namespace, selectors)); } - public List list(List kinds, String namespace) { + @Nonnull + public ImmutableList list(List kinds, String namespace) { if (kinds.isEmpty()) { - return new ArrayList<>(); + return ImmutableList.of(); } else { return runAndRecordMetrics( "list", @@ -436,7 +439,9 @@ public List list(List kinds, String namespac } } - public List eventsFor(KubernetesKind kind, String namespace, String name) { + @Nonnull + public ImmutableList eventsFor( + KubernetesKind kind, String namespace, String name) { return runAndRecordMetrics( "list", KubernetesKind.EVENT, diff --git a/clouddriver-kubernetes-v2/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2CredentialsSpec.groovy b/clouddriver-kubernetes-v2/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2CredentialsSpec.groovy index 315da1148b6..7acd948f6ec 100644 --- a/clouddriver-kubernetes-v2/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2CredentialsSpec.groovy +++ b/clouddriver-kubernetes-v2/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2CredentialsSpec.groovy @@ -16,6 +16,7 @@ package com.netflix.spinnaker.clouddriver.kubernetes.v2.security +import com.google.common.collect.ImmutableList import com.netflix.spectator.api.NoopRegistry import com.netflix.spectator.api.Registry import com.netflix.spinnaker.clouddriver.kubernetes.config.KubernetesConfigurationProperties @@ -117,11 +118,11 @@ class KubernetesV2CredentialsSpec extends Specification { namespaces: [NAMESPACE], checkPermissionsOnStartup: true, )) - kubectlJobExecutor.list(_ as KubernetesV2Credentials, [KubernetesKind.DEPLOYMENT], NAMESPACE, _ as KubernetesSelectorList) >> { + kubectlJobExecutor.list(_ as KubernetesV2Credentials, ImmutableList.of(KubernetesKind.DEPLOYMENT), NAMESPACE, _ as KubernetesSelectorList) >> { throw new KubectlJobExecutor.KubectlException("Error", new Exception()) } - kubectlJobExecutor.list(_ as KubernetesV2Credentials, [KubernetesKind.REPLICA_SET], NAMESPACE, _ as KubernetesSelectorList) >> { - return Collections.emptyList() + kubectlJobExecutor.list(_ as KubernetesV2Credentials, ImmutableList.of(KubernetesKind.REPLICA_SET), NAMESPACE, _ as KubernetesSelectorList) >> { + return ImmutableList.of() } expect: diff --git a/clouddriver-kubernetes-v2/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesCoreCachingAgentTest.java b/clouddriver-kubernetes-v2/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesCoreCachingAgentTest.java index c0e204fd3fd..a4383e63e58 100644 --- a/clouddriver-kubernetes-v2/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesCoreCachingAgentTest.java +++ b/clouddriver-kubernetes-v2/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesCoreCachingAgentTest.java @@ -27,14 +27,17 @@ import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSetMultimap; import com.netflix.spectator.api.NoopRegistry; +import com.netflix.spinnaker.cats.agent.CacheResult; import com.netflix.spinnaker.cats.cache.CacheData; import com.netflix.spinnaker.cats.cache.DefaultCacheData; import com.netflix.spinnaker.cats.mem.InMemoryCache; import com.netflix.spinnaker.cats.provider.DefaultProviderCache; import com.netflix.spinnaker.cats.provider.ProviderCache; import com.netflix.spinnaker.clouddriver.cache.OnDemandAgent; +import com.netflix.spinnaker.clouddriver.cache.OnDemandAgent.OnDemandResult; import com.netflix.spinnaker.clouddriver.kubernetes.config.KubernetesConfigurationProperties; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials; import com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.Keys; @@ -52,6 +55,7 @@ import org.junit.jupiter.params.provider.ValueSource; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; +import org.mockito.stubbing.Answer; @RunWith(JUnitPlatform.class) final class KubernetesCoreCachingAgentTest { @@ -108,6 +112,23 @@ private static KubernetesV2Credentials mockKubernetesV2Credentials() { .thenReturn(deploymentManifest()); when(v2Credentials.get(KubernetesKind.STORAGE_CLASS, "", STORAGE_CLASS_NAME)) .thenReturn(storageClassManifest()); + when(v2Credentials.list(any(List.class), any())) + .thenAnswer( + (Answer>) + invocation -> { + Object[] args = invocation.getArguments(); + ImmutableSet kinds = + ImmutableSet.copyOf((List) args[0]); + String namespace = (String) args[1]; + ImmutableList.Builder result = new ImmutableList.Builder<>(); + if (kinds.contains(KubernetesKind.DEPLOYMENT) && NAMESPACE1.equals(namespace)) { + result.add(deploymentManifest()); + } + if (kinds.contains(KubernetesKind.STORAGE_CLASS)) { + result.add(storageClassManifest()); + } + return result.build(); + }); return v2Credentials; } @@ -386,56 +407,26 @@ private static class ProcessOnDemandResult { ProcessOnDemandResult( Collection onDemandResults, Collection providerCaches) { - this.onDemandResults = extractCacheResults(onDemandResults); - this.onDemandEvictions = extractCacheEvictions(onDemandResults); - this.onDemandEntries = extractOnDemandEntries(providerCaches); - } - - /** Given a collection of ProviderCache, return all on-demand entries in these caches. */ - private static ImmutableMap> extractOnDemandEntries( - Collection providerCaches) { - return providerCaches.stream() - .map(providerCache -> providerCache.getAll("onDemand")) - .flatMap(Collection::stream) - .filter(Objects::nonNull) - .map( - cacheData -> { - try { - return objectMapper.>>readValue( - (String) cacheData.getAttributes().get("cacheResults"), - new TypeReference>>() {}); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .map(Map::entrySet) - .flatMap(Collection::stream) - .collect( - ImmutableSetMultimap.flatteningToImmutableSetMultimap( - Map.Entry::getKey, e -> e.getValue().stream())) - .asMap(); + this.onDemandResults = extractOnDemandResults(onDemandResults); + this.onDemandEvictions = extractOnDemandEvictions(onDemandResults); + this.onDemandEntries = extractCacheEntries(providerCaches); } /** * Given a collection of OnDemandAgent.OnDemandResult, return all cache results in these * on-demand results. */ - private static ImmutableMap> extractCacheResults( + private static ImmutableMap> extractOnDemandResults( Collection onDemandResults) { - return onDemandResults.stream() - .map(result -> result.getCacheResult().getCacheResults().entrySet()) - .flatMap(Collection::stream) - .collect( - ImmutableSetMultimap.flatteningToImmutableSetMultimap( - Map.Entry::getKey, e -> e.getValue().stream())) - .asMap(); + return extractCacheResults( + onDemandResults.stream().map(OnDemandResult::getCacheResult).collect(toImmutableList())); } /** * Given a collection of OnDemandAgent.OnDemandResult, return all evictions in these on-demand * results. */ - private static ImmutableMap> extractCacheEvictions( + private static ImmutableMap> extractOnDemandEvictions( Collection onDemandResults) { return onDemandResults.stream() .map(result -> result.getEvictions().entrySet()) @@ -446,4 +437,114 @@ private static ImmutableMap> extractCacheEvictions( .asMap(); } } + + /** + * Given a collection of OnDemandAgent.OnDemandResult, return all cache results in these on-demand + * results. + */ + private static ImmutableMap> extractCacheResults( + Collection onDemandResults) { + return onDemandResults.stream() + .map(result -> result.getCacheResults().entrySet()) + .flatMap(Collection::stream) + .collect( + ImmutableSetMultimap.flatteningToImmutableSetMultimap( + Map.Entry::getKey, e -> e.getValue().stream())) + .asMap(); + } + + /** Given a collection of ProviderCache, return all on-demand entries in these caches. */ + private static ImmutableMap> extractCacheEntries( + Collection providerCaches) { + return providerCaches.stream() + .map(providerCache -> providerCache.getAll("onDemand")) + .flatMap(Collection::stream) + .filter(Objects::nonNull) + .map( + cacheData -> { + try { + return objectMapper.>>readValue( + (String) cacheData.getAttributes().get("cacheResults"), + new TypeReference>>() {}); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .map(Map::entrySet) + .flatMap(Collection::stream) + .collect( + ImmutableSetMultimap.flatteningToImmutableSetMultimap( + Map.Entry::getKey, e -> e.getValue().stream())) + .asMap(); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10}) + public void loadData(int numAgents) { + String deploymentKey = + Keys.InfrastructureCacheKey.createKey( + KubernetesKind.DEPLOYMENT, ACCOUNT, NAMESPACE1, DEPLOYMENT_NAME); + + String storageClassKey = + Keys.InfrastructureCacheKey.createKey( + KubernetesKind.STORAGE_CLASS, ACCOUNT, "", STORAGE_CLASS_NAME); + + ImmutableCollection cachingAgents = + createCachingAgents(getNamedAccountCredentials(), numAgents); + LoadDataResult loadDataResult = processLoadData(cachingAgents, ImmutableMap.of()); + + assertThat(loadDataResult.getResults()).containsKey(DEPLOYMENT_KIND); + Collection deployments = loadDataResult.getResults().get(DEPLOYMENT_KIND); + assertThat(deployments).extracting(CacheData::getId).containsExactly(deploymentKey); + assertThat(deployments) + .extracting(deployment -> deployment.getAttributes().get("name")) + .containsExactly(DEPLOYMENT_NAME); + + assertThat(loadDataResult.getResults()).containsKey(STORAGE_CLASS_KIND); + Collection storageClasses = loadDataResult.getResults().get(STORAGE_CLASS_KIND); + assertThat(storageClasses).extracting(CacheData::getId).contains(storageClassKey); + assertThat(storageClasses) + .extracting(storageClass -> storageClass.getAttributes().get("name")) + .containsExactly(STORAGE_CLASS_NAME); + } + + /** + * Given an on-demand cache request, constructs a set of caching agents and sends the on-demand + * request to those agents, returning a collection of all non-null results of handing those + * requests. Any cache entries in primeCacheData will be added to each agent's backing cache + * before processing the request. + */ + private static LoadDataResult processLoadData( + Collection cachingAgents, + Map> primeCacheData) { + ImmutableList.Builder resultBuilder = new ImmutableList.Builder<>(); + ImmutableList.Builder providerCacheBuilder = new ImmutableList.Builder<>(); + cachingAgents.forEach( + cachingAgent -> { + ProviderCache providerCache = new DefaultProviderCache(new InMemoryCache()); + providerCacheBuilder.add(providerCache); + for (String type : primeCacheData.keySet()) { + for (CacheData cacheData : primeCacheData.get(type)) { + providerCache.putCacheData(type, cacheData); + } + } + CacheResult result = cachingAgent.loadData(providerCache); + if (result != null) { + resultBuilder.add(result); + } + }); + return new LoadDataResult(resultBuilder.build(), providerCacheBuilder.build()); + } + + @Value + private static class LoadDataResult { + Map> results; + Map> cacheEntries; + + LoadDataResult( + Collection loadDataResults, Collection providerCaches) { + this.results = extractCacheResults(loadDataResults); + this.cacheEntries = extractCacheEntries(providerCaches); + } + } }