diff --git a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/agent/KubernetesCachingAgent.java b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/agent/KubernetesCachingAgent.java index 5f24063d530..4ee2c69c2bd 100644 --- a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/agent/KubernetesCachingAgent.java +++ b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/agent/KubernetesCachingAgent.java @@ -41,6 +41,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesKindProperties.ResourceScope; import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesManifest; import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesManifestAnnotater; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesCredentials; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials; import java.util.Arrays; @@ -84,6 +85,7 @@ public abstract class KubernetesCachingAgent protected final int agentIndex; protected final int agentCount; + protected KubectlJobExecutor jobExecutor; @Getter protected String providerName = KubernetesCloudProvider.ID; diff --git a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesInstanceProvider.java b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesInstanceProvider.java index 4986169f805..f5d18c9e9af 100644 --- a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesInstanceProvider.java +++ b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesInstanceProvider.java @@ -23,7 +23,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.caching.view.model.KubernetesInstance; import com.netflix.spinnaker.clouddriver.kubernetes.description.KubernetesCoordinates; import com.netflix.spinnaker.clouddriver.kubernetes.model.ContainerLog; -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesCredentials; import com.netflix.spinnaker.clouddriver.model.InstanceProvider; import io.kubernetes.client.openapi.models.V1Container; @@ -119,7 +119,7 @@ private ContainerLog getContainerLog( String containerLogs = credentials.logs(metadata.getNamespace(), metadata.getName(), containerName); return new ContainerLog(containerName, containerLogs); - } catch (DefaultKubectlJobExecutor.KubectlException e) { + } catch (KubectlJobExecutor.KubectlException e) { // Typically happens if the container/pod isn't running yet return new ContainerLog(containerName, e.getMessage()); } diff --git a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/handler/CanDeploy.java b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/handler/CanDeploy.java index 09a4fda5e3d..84f80769e31 100644 --- a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/handler/CanDeploy.java +++ b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/handler/CanDeploy.java @@ -21,7 +21,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesManifest; import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesManifestStrategy; import com.netflix.spinnaker.clouddriver.kubernetes.op.OperationResult; -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesCredentials; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesSelectorList; import io.kubernetes.client.openapi.models.V1DeleteOptions; @@ -52,7 +52,7 @@ default OperationResult deploy( new V1DeleteOptions(), task, opName); - } catch (DefaultKubectlJobExecutor.KubectlException ignored) { + } catch (KubectlJobExecutor.KubectlException ignored) { } deployedManifest = credentials.deploy(manifest, task, opName); break; diff --git a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/DefaultKubectlJobExecutor.java b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/DefaultKubectlJobExecutor.java deleted file mode 100644 index a45ad5a4a59..00000000000 --- a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/DefaultKubectlJobExecutor.java +++ /dev/null @@ -1,1236 +0,0 @@ -/* - * Copyright 2017 Google, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.netflix.spinnaker.clouddriver.kubernetes.op.job; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSetMultimap; -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; -import com.google.gson.stream.JsonReader; -import com.netflix.spinnaker.clouddriver.data.task.Task; -import com.netflix.spinnaker.clouddriver.jobs.JobExecutor; -import com.netflix.spinnaker.clouddriver.jobs.JobRequest; -import com.netflix.spinnaker.clouddriver.jobs.JobResult; -import com.netflix.spinnaker.clouddriver.jobs.local.ReaderConsumer; -import com.netflix.spinnaker.clouddriver.kubernetes.config.KubernetesConfigurationProperties; -import com.netflix.spinnaker.clouddriver.kubernetes.description.JsonPatch; -import com.netflix.spinnaker.clouddriver.kubernetes.description.KubernetesPatchOptions; -import com.netflix.spinnaker.clouddriver.kubernetes.description.KubernetesPodMetric; -import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesKind; -import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesManifest; -import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesCredentials; -import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesSelectorList; -import com.netflix.spinnaker.kork.annotations.VisibleForTesting; -import io.github.resilience4j.core.EventConsumer; -import io.github.resilience4j.core.IntervalFunction; -import io.github.resilience4j.micrometer.tagged.TaggedRetryMetrics; -import io.github.resilience4j.retry.Retry; -import io.github.resilience4j.retry.RetryConfig; -import io.github.resilience4j.retry.RetryRegistry; -import io.github.resilience4j.retry.event.RetryEvent; -import io.github.resilience4j.retry.event.RetryOnErrorEvent; -import io.github.resilience4j.retry.event.RetryOnIgnoredErrorEvent; -import io.github.resilience4j.retry.event.RetryOnRetryEvent; -import io.github.resilience4j.retry.event.RetryOnSuccessEvent; -import io.kubernetes.client.openapi.models.V1DeleteOptions; -import io.micrometer.core.instrument.MeterRegistry; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.WillClose; -import lombok.Getter; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class DefaultKubectlJobExecutor implements KubectlJobExecutor { - private static final Logger log = LoggerFactory.getLogger(DefaultKubectlJobExecutor.class); - private static final String NOT_FOUND_STRING = "(NotFound)"; - private static final String KUBECTL_COMMAND_OPTION_TOKEN = "--token="; - private static final String KUBECTL_COMMAND_OPTION_KUBECONFIG = "--kubeconfig="; - private static final String KUBECTL_COMMAND_OPTION_CONTEXT = "--context="; - - private final JobExecutor jobExecutor; - - private final Gson gson = new Gson(); - - private final KubernetesConfigurationProperties kubernetesConfigurationProperties; - - // @Getter is required so that this can be used in tests - @Getter private final Optional retryRegistry; - - private final MeterRegistry meterRegistry; - - @Autowired - public DefaultKubectlJobExecutor( - JobExecutor jobExecutor, - KubernetesConfigurationProperties kubernetesConfigurationProperties, - MeterRegistry meterRegistry) { - this.jobExecutor = jobExecutor; - this.kubernetesConfigurationProperties = kubernetesConfigurationProperties; - this.meterRegistry = meterRegistry; - - this.retryRegistry = - initializeRetryRegistry(kubernetesConfigurationProperties.getJobExecutor().getRetries()); - } - - /** - * This is used to initialize a RetryRegistry. RetryRegistry acts as a global store for all retry - * instances. The retry instances are shared for various kubectl actions. A retry instance is - * identified by the account name. - * - * @param retriesConfig - kubectl job retries configuration - * @return - If retries are enabled, it returns an Optional that contains a RetryRegistry, - * otherwise it returns an empty Optional - */ - private Optional initializeRetryRegistry( - KubernetesConfigurationProperties.KubernetesJobExecutorProperties.Retries retriesConfig) { - if (retriesConfig.isEnabled()) { - log.info("kubectl retries are enabled"); - - // this config will be applied to all retry instances created from the registry - RetryConfig.Builder retryConfig = - RetryConfig.custom().maxAttempts(retriesConfig.getMaxAttempts()); - if (retriesConfig.isExponentialBackoffEnabled()) { - retryConfig.intervalFunction( - IntervalFunction.ofExponentialBackoff( - Duration.ofMillis(retriesConfig.getExponentialBackOffIntervalMs()), - retriesConfig.getExponentialBackoffMultiplier())); - } else { - retryConfig.waitDuration(Duration.ofMillis(retriesConfig.getBackOffInMs())); - } - - // retry on all exceptions except NoRetryException - retryConfig.ignoreExceptions(NoRetryException.class); - - // create the retry registry - RetryRegistry retryRegistry = RetryRegistry.of(retryConfig.build()); - - // log whenever a new retry instance is added, removed or replaced from the registry - retryRegistry - .getEventPublisher() - .onEntryAdded( - entryAddedEvent -> { - Retry addedRetry = entryAddedEvent.getAddedEntry(); - log.info("Kubectl retries configured for: {}", addedRetry.getName()); - }) - .onEntryRemoved( - entryRemovedEvent -> { - Retry removedRetry = entryRemovedEvent.getRemovedEntry(); - log.info("Kubectl retries removed for: {}", removedRetry.getName()); - }) - .onEntryReplaced( - entryReplacedEvent -> { - Retry oldEntry = entryReplacedEvent.getOldEntry(); - Retry newEntry = entryReplacedEvent.getNewEntry(); - log.info( - "Kubectl retry: {} updated to: {}", oldEntry.getName(), newEntry.getName()); - }); - - // define an event consumer once for the entire registry as mentioned here: - // https://github.com/resilience4j/resilience4j/issues/974#issuecomment-619956673 - // If we don't do this once, but add it for each individual retry instance, and if - // that retry instance is invoked by multiple threads, then there is a lot of log duplication. - // For example, if 10 threads invoke an action to get the top pod, and it is - // configured to use the retry instance with the identifier "mock-account.topPod.test-pod", - // then we will see 10*10 log lines showing up for each retry event instead of just 10 that - // we expect. - EventConsumer eventConsumer = - retryEvent -> { - if (retryEvent instanceof RetryOnErrorEvent) { - log.error( - "Kubectl command for {} failed after {} attempts. Exception: {}", - retryEvent.getName(), - retryEvent.getNumberOfRetryAttempts(), - retryEvent.getLastThrowable().toString()); - } else if (retryEvent instanceof RetryOnSuccessEvent) { - log.info( - "Kubectl command for {} is now successful in attempt #{}. Last attempt had failed with exception: {}", - retryEvent.getName(), - retryEvent.getNumberOfRetryAttempts() + 1, - retryEvent.getLastThrowable().toString()); - } else if (retryEvent instanceof RetryOnRetryEvent) { - log.info( - "Retrying Kubectl command for {}. Attempt #{} failed with exception: {}", - retryEvent.getName(), - retryEvent.getNumberOfRetryAttempts(), - retryEvent.getLastThrowable().toString()); - } else if (!(retryEvent instanceof RetryOnIgnoredErrorEvent)) { - // don't log anything for Ignored exceptions as it just leads to noise in the logs - log.info(retryEvent.toString()); - } - }; - retryRegistry - .getAllRetries() - .forEach(retry -> retry.getEventPublisher().onEvent(eventConsumer)); - retryRegistry - .getEventPublisher() - .onEntryAdded(event -> event.getAddedEntry().getEventPublisher().onEvent(eventConsumer)); - - if (this.kubernetesConfigurationProperties - .getJobExecutor() - .getRetries() - .getMetrics() - .isEnabled()) { - TaggedRetryMetrics.ofRetryRegistry(retryRegistry).bindTo(meterRegistry); - } - - return Optional.of(retryRegistry); - } else { - log.info("kubectl retries are disabled"); - return Optional.empty(); - } - } - - public String logs( - KubernetesCredentials credentials, String namespace, String podName, String containerName) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - command.add("logs"); - command.add(podName); - command.add("-c=" + containerName); - - JobResult status = executeKubectlCommand(credentials, command); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to get logs from " - + podName - + "/" - + containerName - + " in " - + namespace - + ": " - + status.getError()); - } - - return status.getOutput(); - } - - public String jobLogs( - KubernetesCredentials credentials, String namespace, String jobName, String containerName) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - String resource = "job/" + jobName; - command.add("logs"); - command.add(resource); - command.add("-c=" + containerName); - - JobResult status = executeKubectlCommand(credentials, command); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to get logs from " + resource + " in " + namespace + ": " + status.getError()); - } - - return status.getOutput(); - } - - public List delete( - KubernetesCredentials credentials, - KubernetesKind kind, - String namespace, - String name, - KubernetesSelectorList labelSelectors, - V1DeleteOptions deleteOptions, - Task task, - String opName) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - - command.add("delete"); - - command = kubectlLookupInfo(command, kind, name, labelSelectors); - - // spinnaker generally accepts deletes of resources that don't exist - command.add("--ignore-not-found=true"); - - if (deleteOptions.getPropagationPolicy() != null) { - command.add("--cascade=" + deleteOptions.getPropagationPolicy()); - } - - if (deleteOptions.getGracePeriodSeconds() != null) { - command.add("--grace-period=" + deleteOptions.getGracePeriodSeconds()); - } - - String id; - if (!Strings.isNullOrEmpty(name)) { - id = kind + "/" + name; - } else { - id = labelSelectors.toString(); - } - - JobResult status = executeKubectlCommand(credentials, command); - - persistKubectlJobOutput(credentials, status, id, task, opName); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to delete " + id + " from " + namespace + ": " + status.getError()); - } - - if (Strings.isNullOrEmpty(status.getOutput()) - || status.getOutput().equals("No output from command.") - || status.getOutput().startsWith("No resources found")) { - return new ArrayList<>(); - } - - return Arrays.stream(status.getOutput().split("\n")) - .map(m -> m.substring(m.indexOf("\"") + 1)) - .map(m -> m.substring(0, m.lastIndexOf("\""))) - .collect(Collectors.toList()); - } - - public Void scale( - KubernetesCredentials credentials, - KubernetesKind kind, - String namespace, - String name, - int replicas, - Task task, - String opName) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - - command.add("scale"); - command = kubectlLookupInfo(command, kind, name, null); - command.add("--replicas=" + replicas); - - String resource = kind + "/" + name; - JobResult status = executeKubectlCommand(credentials, command); - persistKubectlJobOutput(credentials, status, resource, task, opName); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to scale " + resource + " from " + namespace + ": " + status.getError()); - } - - return null; - } - - public List historyRollout( - KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - String resource = kind + "/" + name; - command.add("rollout"); - command.add("history"); - command.add(resource); - - JobResult status = executeKubectlCommand(credentials, command); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to get rollout history of " - + resource - + " from " - + namespace - + ": " - + status.getError()); - } - - String stdout = status.getOutput(); - if (Strings.isNullOrEmpty(stdout)) { - return new ArrayList<>(); - } - - // "name" - // REVISION CHANGE-CAUSE - // # - // # - // # - // ... - List splitOutput = Arrays.stream(stdout.split("\n")).collect(Collectors.toList()); - - if (splitOutput.size() <= 2) { - return new ArrayList<>(); - } - - splitOutput = splitOutput.subList(2, splitOutput.size()); - - return splitOutput.stream() - .map(l -> l.split("[ \t]")) - .filter(l -> l.length > 0) - .map(l -> l[0]) - .map(Integer::valueOf) - .collect(Collectors.toList()); - } - - public Void undoRollout( - KubernetesCredentials credentials, - KubernetesKind kind, - String namespace, - String name, - int revision) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - - String resource = kind + "/" + name; - command.add("rollout"); - command.add("undo"); - command.add(resource); - command.add("--to-revision=" + revision); - - JobResult status = executeKubectlCommand(credentials, command); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to undo rollout " + resource + " from " + namespace + ": " + status.getError()); - } - - return null; - } - - public Void pauseRollout( - KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - - String resource = kind + "/" + name; - command.add("rollout"); - command.add("pause"); - command.add(resource); - - JobResult status = executeKubectlCommand(credentials, command); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to pause rollout " + resource + " from " + namespace + ": " + status.getError()); - } - - return null; - } - - public Void resumeRollout( - KubernetesCredentials credentials, - KubernetesKind kind, - String namespace, - String name, - Task task, - String opName) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - - String resource = kind + "/" + name; - command.add("rollout"); - command.add("resume"); - command.add(resource); - - JobResult status = executeKubectlCommand(credentials, command); - - persistKubectlJobOutput(credentials, status, resource, task, opName); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to resume rollout " + resource + " from " + namespace + ": " + status.getError()); - } - - return null; - } - - public Void rollingRestart( - KubernetesCredentials credentials, - KubernetesKind kind, - String namespace, - String name, - Task task, - String opName) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - - String resource = kind + "/" + name; - command.add("rollout"); - command.add("restart"); - command.add(resource); - - JobResult status = executeKubectlCommand(credentials, command); - - persistKubectlJobOutput(credentials, status, resource, task, opName); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to complete rolling restart of " - + resource - + " from " - + namespace - + ": " - + status.getError()); - } - - return null; - } - - @Nullable - public KubernetesManifest get( - KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name) { - log.debug( - "Getting information for {} of Kind {} in namespace {}", name, kind.toString(), namespace); - List command = kubectlNamespacedGet(credentials, ImmutableList.of(kind), namespace); - command.add(name); - - JobResult status = executeKubectlCommand(credentials, command); - - if (status.getResult() != JobResult.Result.SUCCESS) { - if (status.getError().contains(NOT_FOUND_STRING)) { - return null; - } - - throw new KubectlException( - "Failed to get: " - + name - + " of kind: " - + kind - + " from namespace: " - + namespace - + ": " - + status.getError()); - } - - try { - return gson.fromJson(status.getOutput(), KubernetesManifest.class); - } catch (JsonSyntaxException e) { - throw new KubectlException( - "Failed to parse kubectl output for: " - + name - + " of kind: " - + kind - + " in namespace: " - + namespace - + ": " - + e.getMessage(), - e); - } - } - - @Nonnull - public ImmutableList eventsFor( - KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name) { - log.debug("Getting events for {} of Kind {} in namespace {}", name, kind.toString(), namespace); - List command = - kubectlNamespacedGet(credentials, ImmutableList.of(KubernetesKind.EVENT), namespace); - command.add("--field-selector"); - command.add( - String.format( - "involvedObject.name=%s,involvedObject.kind=%s", - name, StringUtils.capitalize(kind.toString()))); - - JobResult> status = - executeKubectlCommand(credentials, command, parseManifestList()); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Failed to read events for: " - + kind - + "/" - + name - + " from " - + namespace - + ": " - + status.getError()); - } - - if (status.getError().contains("No resources found")) { - return ImmutableList.of(); - } - - return status.getOutput(); - } - - @Nonnull - public ImmutableList list( - KubernetesCredentials credentials, - List kinds, - String namespace, - KubernetesSelectorList selectors) { - log.debug("Getting list of kinds {} in namespace {}", kinds, namespace); - List command = kubectlNamespacedGet(credentials, kinds, namespace); - if (selectors.isNotEmpty()) { - log.debug("with selectors: {}", selectors.toString()); - command.add("-l=" + selectors.toString()); - } - - JobResult> status = - executeKubectlCommand(credentials, command, parseManifestList()); - - if (status.getResult() != JobResult.Result.SUCCESS) { - boolean permissionError = - org.apache.commons.lang3.StringUtils.containsIgnoreCase(status.getError(), "forbidden"); - if (permissionError) { - log.warn(status.getError()); - } else { - throw new KubectlException( - "Failed to read " + kinds + " from " + namespace + ": " + status.getError()); - } - } - - if (status.getError().contains("No resources found")) { - return ImmutableList.of(); - } - - return status.getOutput(); - } - - public KubernetesManifest deploy( - KubernetesCredentials credentials, KubernetesManifest manifest, Task task, String opName) { - log.info("Deploying manifest {}", manifest.getFullResourceName()); - List command = kubectlAuthPrefix(credentials); - - // Read from stdin - command.add("apply"); - command.add("-o"); - command.add("json"); - command.add("-f"); - command.add("-"); - - JobResult status = executeKubectlCommand(credentials, command, Optional.of(manifest)); - - persistKubectlJobOutput(credentials, status, manifest.getFullResourceName(), task, opName); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Deploy failed for manifest: " - + manifest.getFullResourceName() - + ". Error: " - + status.getError()); - } - - return getKubernetesManifestFromJobResult(status, manifest); - } - - public KubernetesManifest replace( - KubernetesCredentials credentials, KubernetesManifest manifest, Task task, String opName) { - log.info("Replacing manifest {}", manifest.getFullResourceName()); - List command = kubectlAuthPrefix(credentials); - - // Read from stdin - command.add("replace"); - command.add("-o"); - command.add("json"); - command.add("-f"); - command.add("-"); - - JobResult status = executeKubectlCommand(credentials, command, Optional.of(manifest)); - - persistKubectlJobOutput(credentials, status, manifest.getFullResourceName(), task, opName); - - if (status.getResult() != JobResult.Result.SUCCESS) { - if (status.getError().contains(NOT_FOUND_STRING)) { - throw new KubectlNotFoundException( - "Replace failed for manifest: " - + manifest.getFullResourceName() - + ". Error: " - + status.getError()); - } - throw new KubectlException( - "Replace failed for manifest: " - + manifest.getFullResourceName() - + ". Error: " - + status.getError()); - } - - return getKubernetesManifestFromJobResult(status, manifest); - } - - public KubernetesManifest create( - KubernetesCredentials credentials, KubernetesManifest manifest, Task task, String opName) { - log.info("Creating manifest {}", manifest.getName()); - List command = kubectlAuthPrefix(credentials); - - // Read from stdin - command.add("create"); - command.add("-o"); - command.add("json"); - command.add("-f"); - command.add("-"); - - JobResult status = executeKubectlCommand(credentials, command, Optional.of(manifest)); - - persistKubectlJobOutput(credentials, status, manifest.getFullResourceName(), task, opName); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException( - "Create failed for manifest: " - + manifest.getFullResourceName() - + ". Error: " - + status.getError()); - } - - return getKubernetesManifestFromJobResult(status, manifest); - } - - private KubernetesManifest getKubernetesManifestFromJobResult( - JobResult status, KubernetesManifest inputManifest) { - try { - return gson.fromJson(status.getOutput(), KubernetesManifest.class); - } catch (JsonSyntaxException e) { - throw new KubectlException( - "Failed to parse kubectl output for manifest: " - + inputManifest.getName() - + ". Error: " - + e.getMessage(), - e); - } - } - - private List kubectlAuthPrefix(KubernetesCredentials credentials) { - List command = new ArrayList<>(); - if (!Strings.isNullOrEmpty(credentials.getKubectlExecutable())) { - command.add(credentials.getKubectlExecutable()); - } else { - command.add(this.kubernetesConfigurationProperties.getKubectl().getExecutable()); - } - - if (credentials.getKubectlRequestTimeoutSeconds() != null) { - command.add("--request-timeout=" + credentials.getKubectlRequestTimeoutSeconds()); - } - - if (credentials.isDebug()) { - command.add("-v"); - command.add("9"); - } - - if (!credentials.isServiceAccount()) { - if (credentials.getOAuthServiceAccount() != null - && !credentials.getOAuthServiceAccount().isEmpty()) { - command.add(KUBECTL_COMMAND_OPTION_TOKEN + getOAuthToken(credentials)); - } - - String kubeconfigFile = credentials.getKubeconfigFile(); - if (!Strings.isNullOrEmpty(kubeconfigFile)) { - command.add(KUBECTL_COMMAND_OPTION_KUBECONFIG + kubeconfigFile); - } - - String context = credentials.getContext(); - if (!Strings.isNullOrEmpty(context)) { - command.add(KUBECTL_COMMAND_OPTION_CONTEXT + context); - } - } - - return command; - } - - private List kubectlLookupInfo( - List command, - KubernetesKind kind, - String name, - KubernetesSelectorList labelSelectors) { - if (!Strings.isNullOrEmpty(name)) { - command.add(kind + "/" + name); - } else { - command.add(kind.toString()); - } - - if (labelSelectors != null && !labelSelectors.isEmpty()) { - command.add("-l=" + labelSelectors); - } - - return command; - } - - private List kubectlNamespacedAuthPrefix( - KubernetesCredentials credentials, String namespace) { - List command = kubectlAuthPrefix(credentials); - - if (!Strings.isNullOrEmpty(namespace)) { - command.add("--namespace=" + namespace); - } - - return command; - } - - private List kubectlNamespacedGet( - KubernetesCredentials credentials, List kind, String namespace) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - command.add("-o"); - command.add("json"); - - command.add("get"); - command.add(kind.stream().map(KubernetesKind::toString).collect(Collectors.joining(","))); - - return command; - } - - private String getOAuthToken(KubernetesCredentials credentials) { - List command = new ArrayList<>(); - command.add(this.kubernetesConfigurationProperties.getOAuth().getExecutable()); - command.add("fetch"); - command.add("--json"); - command.add(credentials.getOAuthServiceAccount()); - command.addAll(credentials.getOAuthScopes()); - - JobResult status = executeKubectlCommand(credentials, command); - - if (status.getResult() != JobResult.Result.SUCCESS) { - throw new KubectlException("Could not fetch OAuth token: " + status.getError()); - } - return status.getOutput(); - } - - public ImmutableList topPod( - KubernetesCredentials credentials, String namespace, @Nonnull String pod) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - command.add("top"); - command.add("po"); - if (!pod.isEmpty()) { - command.add(pod); - } - command.add("--containers"); - - JobResult status = executeKubectlCommand(credentials, command); - if (status.getResult() != JobResult.Result.SUCCESS) { - if (status.getError().toLowerCase().contains("not available") - || status.getError().toLowerCase().contains("not found")) { - log.warn( - String.format( - "Error fetching metrics for account %s: %s", - credentials.getAccountName(), status.getError())); - return ImmutableList.of(); - } - throw new KubectlException("Could not read metrics: " + status.getError()); - } - - ImmutableSetMultimap metrics = - MetricParser.parseMetrics(status.getOutput()); - return metrics.asMap().entrySet().stream() - .map( - podMetrics -> - KubernetesPodMetric.builder() - .podName(podMetrics.getKey()) - .namespace(namespace) - .containerMetrics(podMetrics.getValue()) - .build()) - .collect(ImmutableList.toImmutableList()); - } - - public Void patch( - KubernetesCredentials credentials, - KubernetesKind kind, - String namespace, - String name, - KubernetesPatchOptions options, - List patches, - Task task, - String opName) { - return patch(credentials, kind, namespace, name, options, gson.toJson(patches), task, opName); - } - - public Void patch( - KubernetesCredentials credentials, - KubernetesKind kind, - String namespace, - String name, - KubernetesPatchOptions options, - KubernetesManifest manifest, - Task task, - String opName) { - return patch(credentials, kind, namespace, name, options, gson.toJson(manifest), task, opName); - } - - private Void patch( - KubernetesCredentials credentials, - KubernetesKind kind, - String namespace, - String name, - KubernetesPatchOptions options, - String patchBody, - Task task, - String opName) { - List command = kubectlNamespacedAuthPrefix(credentials, namespace); - - command.add("patch"); - command.add(kind.toString()); - command.add(name); - - if (options.isRecord()) { - command.add("--record"); - } - - String mergeStrategy = options.getMergeStrategy().toString(); - if (!Strings.isNullOrEmpty(mergeStrategy)) { - command.add("--type"); - command.add(mergeStrategy); - } - - command.add("--patch"); - command.add(patchBody); - - JobResult status = executeKubectlCommand(credentials, command); - - persistKubectlJobOutput(credentials, status, kind + "/" + name, task, opName); - - if (status.getResult() != JobResult.Result.SUCCESS) { - String errMsg = status.getError(); - if (Strings.isNullOrEmpty(errMsg)) { - errMsg = status.getOutput(); - } - if (errMsg.contains("not patched")) { - log.warn("No change occurred after patching {} {}:{}, ignoring", kind, namespace, name); - return null; - } - - throw new KubectlException( - "Patch failed for: " + name + " in namespace: " + namespace + ": " + errMsg); - } - - return null; - } - - private ReaderConsumer> parseManifestList() { - return (@WillClose BufferedReader r) -> { - try (JsonReader reader = new JsonReader(r)) { - try { - reader.beginObject(); - } catch (EOFException e) { - // If the stream we're parsing is empty, just return an empty list - return ImmutableList.of(); - } - ImmutableList.Builder manifestList = new ImmutableList.Builder<>(); - while (reader.hasNext()) { - if (reader.nextName().equals("items")) { - reader.beginArray(); - while (reader.hasNext()) { - KubernetesManifest manifest = gson.fromJson(reader, KubernetesManifest.class); - manifestList.add(manifest); - } - reader.endArray(); - } else { - reader.skipValue(); - } - } - reader.endObject(); - return manifestList.build(); - } catch (IllegalStateException | JsonSyntaxException e) { - // An IllegalStageException is thrown when we call beginObject, nextName(), etc. and the - // next token is not what we are asserting it to be. A JsonSyntaxException is thrown when - // gson.fromJson isn't able to map the next token to a KubernetesManifest. - // In both of these cases, the error is due to the output from kubectl being malformed (or - // at least malformed relative to our expectations) so we'll wrap the exception in a - // KubectlException. - throw new KubectlException("Failed to parse kubectl output: " + e.getMessage(), e); - } - }; - } - - /** - * This method executes the actual kubectl command and determines if retries are required, on - * failure. - * - * @param credentials k8s account credentials - * @param command the actual kubectl command to be performed - * @return - the result of the kubectl command - */ - private JobResult executeKubectlCommand( - KubernetesCredentials credentials, List command) { - return executeKubectlCommand(credentials, command, Optional.empty()); - } - - /** - * This method executes the actual kubectl command and determines if retries are required, on - * failure. - * - * @param credentials k8s account credentials - * @param command the actual kubectl command to be performed - * @param manifest the manifest supplied to the kubectl command - * @return - the result of the kubectl command - */ - private JobResult executeKubectlCommand( - KubernetesCredentials credentials, - List command, - Optional manifest) { - // retry registry is empty if retries are not enabled. - if (retryRegistry.isEmpty()) { - return jobExecutor.runJob(createJobRequest(command, manifest)); - } - - // capture the original result obtained from the jobExecutor.runJob(jobRequest) call. - JobResult.JobResultBuilder finalResult = JobResult.builder(); - - KubectlActionIdentifier identifier = - new KubectlActionIdentifier(credentials, command, manifest); - Retry retryContext = retryRegistry.get().retry(identifier.getRetryInstanceName()); - try { - return retryContext.executeSupplier( - () -> { - JobResult result = jobExecutor.runJob(createJobRequest(command, manifest)); - return processJobResult(identifier, result, finalResult); - }); - } catch (KubectlException | NoRetryException e) { - // the caller functions expect any failures to be defined in a JobResult object and not in - // the form of an exception. Hence, we need to translate the above exceptions back into a - // JobResult object - but we only need to do it for KubectlException and NoRetryException ( - // since these are the ones explicitly thrown above) and not for any other ones. - return finalResult.build(); - } - } - - /** - * This method executes the actual kubectl command and determines if retries are required, on - * failure. - * - * @param credentials k8s account credentials - * @param command the actual kubectl command to be performed - * @param readerConsumer A function that transforms the job's standard output - * @param return type of the JobResult output - * @return the result of the kubectl command - */ - private JobResult executeKubectlCommand( - KubernetesCredentials credentials, List command, ReaderConsumer readerConsumer) { - // retry registry is empty if retries are not enabled. - if (retryRegistry.isEmpty()) { - return jobExecutor.runJob(new JobRequest(command), readerConsumer); - } - - // capture the original result obtained from the jobExecutor.runJob(jobRequest, readerConsumer) - // call. - JobResult.JobResultBuilder finalResult = JobResult.builder(); - KubectlActionIdentifier identifier = new KubectlActionIdentifier(credentials, command); - Retry retryContext = retryRegistry.get().retry(identifier.getRetryInstanceName()); - try { - return retryContext.executeSupplier( - () -> { - JobResult result = jobExecutor.runJob(new JobRequest(command), readerConsumer); - return processJobResult(identifier, result, finalResult); - }); - } catch (KubectlException | NoRetryException e) { - // the caller functions expect any failures to be defined in a JobResult object and not in - // the form of an exception. Hence, we need to translate the above exceptions back into a - // JobResult object - but we only need to do it for KubectlException and NoRetryException - // (since these are the ones explicitly thrown above) and not for any other ones. - return finalResult.build(); - } - } - - /** - * helper function to create a JobRequest using the input parameters - * - * @param command the command to be executed in the job request - * @param manifest the manifest to be used by the command. This is optional. - * @return a job request object - */ - @VisibleForTesting - JobRequest createJobRequest(List command, Optional manifest) { - // depending on the presence of the manifest, an appropriate job request is created - if (manifest.isPresent()) { - String manifestAsJson = gson.toJson(manifest.get()); - return new JobRequest( - command, new ByteArrayInputStream(manifestAsJson.getBytes(StandardCharsets.UTF_8))); - } - - return new JobRequest(command); - } - - /** - * helper function to handle a job result obtained after performing a job request. This either - * returns the result, if successful, or throws an exception on failure. - * - * @param identifier uniquely identifies the job in the logs - * @param result the job result to be processed - * @param finalResult a buffer that keeps track of the result. This ensures on retries, the - * original is not lost - * @param the return type of the JobResult output - * @return the result of the kubectl command, in the form of a JobResult object - */ - @VisibleForTesting - JobResult processJobResult( - KubectlActionIdentifier identifier, - JobResult result, - JobResult.JobResultBuilder finalResult) { - if (result.getResult() == JobResult.Result.SUCCESS) { - return result; - } - - // save the result as it'll be needed later on when we are done with retries - finalResult - .error(result.getError()) - .killed(result.isKilled()) - .output(result.getOutput()) - .result(result.getResult()); - - // if result is not successful, that means we need to determine if we should retry - // or not. - // - // Since Kubectl binary doesn't throw any exceptions by default, we need to - // check the result to see if retries are needed. Resilience.4j needs an exception to be - // thrown to decide if retries are needed and also, to capture retry metrics correctly. - throw convertKubectlJobResultToException(identifier.getKubectlAction(), result); - } - - /** - * this method is meant to be invoked only for those JobResults which are unsuccessful. It - * determines if the error contained in the JobResult should be retried or not. If the error needs - * to be retried, then KubectlException is returned. Otherwise, NoRetryException is returned. - * - * @param identifier used to log which action's job result is being processed - * @param result the job result which needs to be checked to see if it has an error that can be - * retried - * @param job result generic type - * @return - Either KubectlException or NoRetryException - */ - private RuntimeException convertKubectlJobResultToException( - String identifier, JobResult result) { - // the error matches the configured list of retryable errors. - if (this.kubernetesConfigurationProperties - .getJobExecutor() - .getRetries() - .getRetryableErrorMessages() - .stream() - .anyMatch(errorMessage -> result.getError().contains(errorMessage))) { - return new KubectlException(identifier + " failed. Error: " + result.getError()); - } - - // even though the error is not explicitly configured to be retryable, the job was killed - - // hence, we should retry - if (result.isKilled()) { - return new KubectlException( - "retrying " + identifier + " since the job " + result + " was killed"); - } - - String message = - "Not retrying " - + identifier - + " as retries are not enabled for error: " - + result.getError(); - log.warn(message); - // we want to let the retry library know that such errors should not be retried. - // Since we have configured the global retry registry to ignore errors of type - // NoRetryException, we return this here - return new NoRetryException(message); - } - - private void persistKubectlJobOutput( - KubernetesCredentials credentials, - JobResult status, - String manifestName, - Task task, - String taskName) { - if (kubernetesConfigurationProperties.getJobExecutor().isPersistTaskOutput()) { - if (kubernetesConfigurationProperties.getJobExecutor().isEnableTaskOutputForAllAccounts() - || credentials.isDebug()) { - task.updateOutput(manifestName, taskName, status.getOutput(), status.getError()); - } - } - } - - public static class KubectlException extends RuntimeException { - public KubectlException(String message) { - super(message); - } - - public KubectlException(String message, Throwable cause) { - super(message, cause); - } - } - - public static class KubectlNotFoundException extends KubectlException { - public KubectlNotFoundException(String message) { - super(message); - } - } - - /** - * this exception is only meant to be used in cases where we want resilience4j to not retry - * kubectl calls. It should not be used anywhere else. - */ - static class NoRetryException extends RuntimeException { - NoRetryException(String message) { - super(message); - } - } - - /** helper class to identify the kubectl command in logs and metrics when retries are enabled */ - static class KubectlActionIdentifier { - KubernetesCredentials credentials; - List command; - String namespace; - String resource; - - public KubectlActionIdentifier( - KubernetesCredentials credentials, - List command, - String namespace, - String resource) { - this.credentials = credentials; - this.command = command; - this.namespace = namespace; - this.resource = resource; - } - - public KubectlActionIdentifier(KubernetesCredentials credentials, List command) { - this(credentials, command, "", ""); - } - - public KubectlActionIdentifier( - KubernetesCredentials credentials, - List command, - Optional manifest) { - this(credentials, command); - if (manifest.isPresent()) { - this.namespace = manifest.get().getNamespace(); - this.resource = manifest.get().getFullResourceName(); - } - } - - /** - * this returns the sanitized kubectl command. This can be used to log the command during retry - * attempts, among other things. - * - * @return - the sanitized kubectl command - */ - public String getKubectlAction() { - // no need to display everything in a kubectl command - List commandToLog = - command.stream() - .filter( - s -> - !(s.contains(KUBECTL_COMMAND_OPTION_TOKEN) - || s.contains(KUBECTL_COMMAND_OPTION_KUBECONFIG) - || s.contains(KUBECTL_COMMAND_OPTION_CONTEXT))) - .collect(Collectors.toList()); - - String identifier = - "command: '" - + String.join(" ", commandToLog) - + "' in account: " - + this.credentials.getAccountName(); - - if (!namespace.isEmpty()) { - identifier += " in namespace: " + namespace; - } - - if (!resource.isEmpty()) { - identifier += " for resource: " + resource; - } - return identifier; - } - - /** - * this returns a name which uniquely identifies a retry instance. This name shows up in the - * logs when each retry event is logged. Also, when capturing the retry metrics, the 'name' tag - * in the metric corresponds to this. - * - * @return - the name to be used to uniquely identify a retry instance - */ - public String getRetryInstanceName() { - return this.credentials.getAccountName(); - } - } -} diff --git a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubectlJobExecutor.java b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubectlJobExecutor.java index f97ca06061e..3ffe7d9a32c 100644 --- a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubectlJobExecutor.java +++ b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubectlJobExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 Armory, Inc. + * Copyright 2017 Google, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,12 +12,23 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package com.netflix.spinnaker.clouddriver.kubernetes.op.job; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSetMultimap; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import com.google.gson.stream.JsonReader; import com.netflix.spinnaker.clouddriver.data.task.Task; +import com.netflix.spinnaker.clouddriver.jobs.JobExecutor; +import com.netflix.spinnaker.clouddriver.jobs.JobRequest; +import com.netflix.spinnaker.clouddriver.jobs.JobResult; +import com.netflix.spinnaker.clouddriver.jobs.local.ReaderConsumer; +import com.netflix.spinnaker.clouddriver.kubernetes.config.KubernetesConfigurationProperties; import com.netflix.spinnaker.clouddriver.kubernetes.description.JsonPatch; import com.netflix.spinnaker.clouddriver.kubernetes.description.KubernetesPatchOptions; import com.netflix.spinnaker.clouddriver.kubernetes.description.KubernetesPodMetric; @@ -25,78 +36,789 @@ import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesManifest; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesCredentials; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesSelectorList; +import com.netflix.spinnaker.kork.annotations.VisibleForTesting; +import io.github.resilience4j.core.EventConsumer; +import io.github.resilience4j.core.IntervalFunction; +import io.github.resilience4j.micrometer.tagged.TaggedRetryMetrics; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.retry.RetryRegistry; +import io.github.resilience4j.retry.event.RetryEvent; +import io.github.resilience4j.retry.event.RetryOnErrorEvent; +import io.github.resilience4j.retry.event.RetryOnIgnoredErrorEvent; +import io.github.resilience4j.retry.event.RetryOnRetryEvent; +import io.github.resilience4j.retry.event.RetryOnSuccessEvent; import io.kubernetes.client.openapi.models.V1DeleteOptions; -import java.util.List; +import io.micrometer.core.instrument.MeterRegistry; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.*; +import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.WillClose; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class KubectlJobExecutor { + private static final Logger log = LoggerFactory.getLogger(KubectlJobExecutor.class); + private static final String NOT_FOUND_STRING = "(NotFound)"; + private static final String KUBECTL_COMMAND_OPTION_TOKEN = "--token="; + private static final String KUBECTL_COMMAND_OPTION_KUBECONFIG = "--kubeconfig="; + private static final String KUBECTL_COMMAND_OPTION_CONTEXT = "--context="; + + private final JobExecutor jobExecutor; + + private final Gson gson = new Gson(); + + private final KubernetesConfigurationProperties kubernetesConfigurationProperties; + + // @Getter is required so that this can be used in tests + @Getter private final Optional retryRegistry; + + private final MeterRegistry meterRegistry; + + @Autowired + public KubectlJobExecutor( + JobExecutor jobExecutor, + KubernetesConfigurationProperties kubernetesConfigurationProperties, + MeterRegistry meterRegistry) { + this.jobExecutor = jobExecutor; + this.kubernetesConfigurationProperties = kubernetesConfigurationProperties; + this.meterRegistry = meterRegistry; + + this.retryRegistry = + initializeRetryRegistry(kubernetesConfigurationProperties.getJobExecutor().getRetries()); + } + + /** + * This is used to initialize a RetryRegistry. RetryRegistry acts as a global store for all retry + * instances. The retry instances are shared for various kubectl actions. A retry instance is + * identified by the account name. + * + * @param retriesConfig - kubectl job retries configuration + * @return - If retries are enabled, it returns an Optional that contains a RetryRegistry, + * otherwise it returns an empty Optional + */ + private Optional initializeRetryRegistry( + KubernetesConfigurationProperties.KubernetesJobExecutorProperties.Retries retriesConfig) { + if (retriesConfig.isEnabled()) { + log.info("kubectl retries are enabled"); + + // this config will be applied to all retry instances created from the registry + RetryConfig.Builder retryConfig = + RetryConfig.custom().maxAttempts(retriesConfig.getMaxAttempts()); + if (retriesConfig.isExponentialBackoffEnabled()) { + retryConfig.intervalFunction( + IntervalFunction.ofExponentialBackoff( + Duration.ofMillis(retriesConfig.getExponentialBackOffIntervalMs()), + retriesConfig.getExponentialBackoffMultiplier())); + } else { + retryConfig.waitDuration(Duration.ofMillis(retriesConfig.getBackOffInMs())); + } + + // retry on all exceptions except NoRetryException + retryConfig.ignoreExceptions(NoRetryException.class); + + // create the retry registry + RetryRegistry retryRegistry = RetryRegistry.of(retryConfig.build()); + + // log whenever a new retry instance is added, removed or replaced from the registry + retryRegistry + .getEventPublisher() + .onEntryAdded( + entryAddedEvent -> { + Retry addedRetry = entryAddedEvent.getAddedEntry(); + log.info("Kubectl retries configured for: {}", addedRetry.getName()); + }) + .onEntryRemoved( + entryRemovedEvent -> { + Retry removedRetry = entryRemovedEvent.getRemovedEntry(); + log.info("Kubectl retries removed for: {}", removedRetry.getName()); + }) + .onEntryReplaced( + entryReplacedEvent -> { + Retry oldEntry = entryReplacedEvent.getOldEntry(); + Retry newEntry = entryReplacedEvent.getNewEntry(); + log.info( + "Kubectl retry: {} updated to: {}", oldEntry.getName(), newEntry.getName()); + }); + + // define an event consumer once for the entire registry as mentioned here: + // https://github.com/resilience4j/resilience4j/issues/974#issuecomment-619956673 + // If we don't do this once, but add it for each individual retry instance, and if + // that retry instance is invoked by multiple threads, then there is a lot of log duplication. + // For example, if 10 threads invoke an action to get the top pod, and it is + // configured to use the retry instance with the identifier "mock-account.topPod.test-pod", + // then we will see 10*10 log lines showing up for each retry event instead of just 10 that + // we expect. + EventConsumer eventConsumer = + retryEvent -> { + if (retryEvent instanceof RetryOnErrorEvent) { + log.error( + "Kubectl command for {} failed after {} attempts. Exception: {}", + retryEvent.getName(), + retryEvent.getNumberOfRetryAttempts(), + retryEvent.getLastThrowable().toString()); + } else if (retryEvent instanceof RetryOnSuccessEvent) { + log.info( + "Kubectl command for {} is now successful in attempt #{}. Last attempt had failed with exception: {}", + retryEvent.getName(), + retryEvent.getNumberOfRetryAttempts() + 1, + retryEvent.getLastThrowable().toString()); + } else if (retryEvent instanceof RetryOnRetryEvent) { + log.info( + "Retrying Kubectl command for {}. Attempt #{} failed with exception: {}", + retryEvent.getName(), + retryEvent.getNumberOfRetryAttempts(), + retryEvent.getLastThrowable().toString()); + } else if (!(retryEvent instanceof RetryOnIgnoredErrorEvent)) { + // don't log anything for Ignored exceptions as it just leads to noise in the logs + log.info(retryEvent.toString()); + } + }; + retryRegistry + .getAllRetries() + .forEach(retry -> retry.getEventPublisher().onEvent(eventConsumer)); + retryRegistry + .getEventPublisher() + .onEntryAdded(event -> event.getAddedEntry().getEventPublisher().onEvent(eventConsumer)); + + if (this.kubernetesConfigurationProperties + .getJobExecutor() + .getRetries() + .getMetrics() + .isEnabled()) { + TaggedRetryMetrics.ofRetryRegistry(retryRegistry).bindTo(meterRegistry); + } + + return Optional.of(retryRegistry); + } else { + log.info("kubectl retries are disabled"); + return Optional.empty(); + } + } + + public String logs( + KubernetesCredentials credentials, String namespace, String podName, String containerName) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + command.add("logs"); + command.add(podName); + command.add("-c=" + containerName); + + JobResult status = executeKubectlCommand(credentials, command); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to get logs from " + + podName + + "/" + + containerName + + " in " + + namespace + + ": " + + status.getError()); + } + + return status.getOutput(); + } + + public String jobLogs( + KubernetesCredentials credentials, String namespace, String jobName, String containerName) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + String resource = "job/" + jobName; + command.add("logs"); + command.add(resource); + command.add("-c=" + containerName); -public interface KubectlJobExecutor { - String logs( - KubernetesCredentials credentials, String namespace, String podName, String containerName); + JobResult status = executeKubectlCommand(credentials, command); - String jobLogs( - KubernetesCredentials credentials, String namespace, String jobName, String containerName); + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to get logs from " + resource + " in " + namespace + ": " + status.getError()); + } - Void scale( + return status.getOutput(); + } + + public List delete( KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name, - int replicas, + KubernetesSelectorList labelSelectors, + V1DeleteOptions deleteOptions, Task task, - String opName); + String opName) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + + command.add("delete"); + + command = kubectlLookupInfo(command, kind, name, labelSelectors); + + // spinnaker generally accepts deletes of resources that don't exist + command.add("--ignore-not-found=true"); + + if (deleteOptions.getPropagationPolicy() != null) { + command.add("--cascade=" + deleteOptions.getPropagationPolicy()); + } + + if (deleteOptions.getGracePeriodSeconds() != null) { + command.add("--grace-period=" + deleteOptions.getGracePeriodSeconds()); + } + + String id; + if (!Strings.isNullOrEmpty(name)) { + id = kind + "/" + name; + } else { + id = labelSelectors.toString(); + } + + JobResult status = executeKubectlCommand(credentials, command); - List delete( + persistKubectlJobOutput(credentials, status, id, task, opName); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to delete " + id + " from " + namespace + ": " + status.getError()); + } + + if (Strings.isNullOrEmpty(status.getOutput()) + || status.getOutput().equals("No output from command.") + || status.getOutput().startsWith("No resources found")) { + return new ArrayList<>(); + } + + return Arrays.stream(status.getOutput().split("\n")) + .map(m -> m.substring(m.indexOf("\"") + 1)) + .map(m -> m.substring(0, m.lastIndexOf("\""))) + .collect(Collectors.toList()); + } + + public Void scale( KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name, - KubernetesSelectorList labelSelectors, - V1DeleteOptions deleteOptions, + int replicas, Task task, - String opName); + String opName) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + + command.add("scale"); + command = kubectlLookupInfo(command, kind, name, null); + command.add("--replicas=" + replicas); + + String resource = kind + "/" + name; + JobResult status = executeKubectlCommand(credentials, command); + persistKubectlJobOutput(credentials, status, resource, task, opName); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to scale " + resource + " from " + namespace + ": " + status.getError()); + } - ImmutableList topPod( - KubernetesCredentials credentials, String namespace, @Nonnull String pod); + return null; + } - KubernetesManifest deploy( - KubernetesCredentials credentials, KubernetesManifest manifest, Task task, String opName); + public List historyRollout( + KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + String resource = kind + "/" + name; + command.add("rollout"); + command.add("history"); + command.add(resource); - KubernetesManifest replace( - KubernetesCredentials credentials, KubernetesManifest manifest, Task task, String opName); + JobResult status = executeKubectlCommand(credentials, command); - KubernetesManifest create( - KubernetesCredentials credentials, KubernetesManifest manifest, Task task, String opName); + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to get rollout history of " + + resource + + " from " + + namespace + + ": " + + status.getError()); + } - List historyRollout( - KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name); + String stdout = status.getOutput(); + if (Strings.isNullOrEmpty(stdout)) { + return new ArrayList<>(); + } - Void undoRollout( + // "name" + // REVISION CHANGE-CAUSE + // # + // # + // # + // ... + List splitOutput = Arrays.stream(stdout.split("\n")).collect(Collectors.toList()); + + if (splitOutput.size() <= 2) { + return new ArrayList<>(); + } + + splitOutput = splitOutput.subList(2, splitOutput.size()); + + return splitOutput.stream() + .map(l -> l.split("[ \t]")) + .filter(l -> l.length > 0) + .map(l -> l[0]) + .map(Integer::valueOf) + .collect(Collectors.toList()); + } + + public Void undoRollout( KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name, - int revision); + int revision) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + + String resource = kind + "/" + name; + command.add("rollout"); + command.add("undo"); + command.add(resource); + command.add("--to-revision=" + revision); + + JobResult status = executeKubectlCommand(credentials, command); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to undo rollout " + resource + " from " + namespace + ": " + status.getError()); + } + + return null; + } + + public Void pauseRollout( + KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + + String resource = kind + "/" + name; + command.add("rollout"); + command.add("pause"); + command.add(resource); + + JobResult status = executeKubectlCommand(credentials, command); - Void pauseRollout( - KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name); + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to pause rollout " + resource + " from " + namespace + ": " + status.getError()); + } - Void resumeRollout( + return null; + } + + public Void resumeRollout( KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name, Task task, - String opName); + String opName) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + + String resource = kind + "/" + name; + command.add("rollout"); + command.add("resume"); + command.add(resource); + + JobResult status = executeKubectlCommand(credentials, command); + + persistKubectlJobOutput(credentials, status, resource, task, opName); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to resume rollout " + resource + " from " + namespace + ": " + status.getError()); + } - Void rollingRestart( + return null; + } + + public Void rollingRestart( KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name, Task task, - String opName); + String opName) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + + String resource = kind + "/" + name; + command.add("rollout"); + command.add("restart"); + command.add(resource); + + JobResult status = executeKubectlCommand(credentials, command); + + persistKubectlJobOutput(credentials, status, resource, task, opName); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to complete rolling restart of " + + resource + + " from " + + namespace + + ": " + + status.getError()); + } + + return null; + } + + @Nullable + public KubernetesManifest get( + KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name) { + log.debug( + "Getting information for {} of Kind {} in namespace {}", name, kind.toString(), namespace); + List command = kubectlNamespacedGet(credentials, ImmutableList.of(kind), namespace); + command.add(name); + + JobResult status = executeKubectlCommand(credentials, command); + + if (status.getResult() != JobResult.Result.SUCCESS) { + if (status.getError().contains(NOT_FOUND_STRING)) { + return null; + } + + throw new KubectlException( + "Failed to get: " + + name + + " of kind: " + + kind + + " from namespace: " + + namespace + + ": " + + status.getError()); + } + + try { + return gson.fromJson(status.getOutput(), KubernetesManifest.class); + } catch (JsonSyntaxException e) { + throw new KubectlException( + "Failed to parse kubectl output for: " + + name + + " of kind: " + + kind + + " in namespace: " + + namespace + + ": " + + e.getMessage(), + e); + } + } + + @Nonnull + public ImmutableList eventsFor( + KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name) { + log.debug("Getting events for {} of Kind {} in namespace {}", name, kind.toString(), namespace); + List command = + kubectlNamespacedGet(credentials, ImmutableList.of(KubernetesKind.EVENT), namespace); + command.add("--field-selector"); + command.add( + String.format( + "involvedObject.name=%s,involvedObject.kind=%s", + name, StringUtils.capitalize(kind.toString()))); + + JobResult> status = + executeKubectlCommand(credentials, command, parseManifestList()); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Failed to read events for: " + + kind + + "/" + + name + + " from " + + namespace + + ": " + + status.getError()); + } + + if (status.getError().contains("No resources found")) { + return ImmutableList.of(); + } + + return status.getOutput(); + } + + @Nonnull + public ImmutableList list( + KubernetesCredentials credentials, + List kinds, + String namespace, + KubernetesSelectorList selectors) { + log.debug("Getting list of kinds {} in namespace {}", kinds, namespace); + List command = kubectlNamespacedGet(credentials, kinds, namespace); + if (selectors.isNotEmpty()) { + log.debug("with selectors: {}", selectors.toString()); + command.add("-l=" + selectors.toString()); + } + + JobResult> status = + executeKubectlCommand(credentials, command, parseManifestList()); + + if (status.getResult() != JobResult.Result.SUCCESS) { + boolean permissionError = + org.apache.commons.lang3.StringUtils.containsIgnoreCase(status.getError(), "forbidden"); + if (permissionError) { + log.warn(status.getError()); + } else { + throw new KubectlException( + "Failed to read " + kinds + " from " + namespace + ": " + status.getError()); + } + } + + if (status.getError().contains("No resources found")) { + return ImmutableList.of(); + } + + return status.getOutput(); + } + + public KubernetesManifest deploy( + KubernetesCredentials credentials, KubernetesManifest manifest, Task task, String opName) { + log.info("Deploying manifest {}", manifest.getFullResourceName()); + List command = kubectlAuthPrefix(credentials); + + // Read from stdin + command.add("apply"); + command.add("-o"); + command.add("json"); + command.add("-f"); + command.add("-"); - Void patch( + JobResult status = executeKubectlCommand(credentials, command, Optional.of(manifest)); + + persistKubectlJobOutput(credentials, status, manifest.getFullResourceName(), task, opName); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Deploy failed for manifest: " + + manifest.getFullResourceName() + + ". Error: " + + status.getError()); + } + + return getKubernetesManifestFromJobResult(status, manifest); + } + + public KubernetesManifest replace( + KubernetesCredentials credentials, KubernetesManifest manifest, Task task, String opName) { + log.info("Replacing manifest {}", manifest.getFullResourceName()); + List command = kubectlAuthPrefix(credentials); + + // Read from stdin + command.add("replace"); + command.add("-o"); + command.add("json"); + command.add("-f"); + command.add("-"); + + JobResult status = executeKubectlCommand(credentials, command, Optional.of(manifest)); + + persistKubectlJobOutput(credentials, status, manifest.getFullResourceName(), task, opName); + + if (status.getResult() != JobResult.Result.SUCCESS) { + if (status.getError().contains(NOT_FOUND_STRING)) { + throw new KubectlNotFoundException( + "Replace failed for manifest: " + + manifest.getFullResourceName() + + ". Error: " + + status.getError()); + } + throw new KubectlException( + "Replace failed for manifest: " + + manifest.getFullResourceName() + + ". Error: " + + status.getError()); + } + + return getKubernetesManifestFromJobResult(status, manifest); + } + + public KubernetesManifest create( + KubernetesCredentials credentials, KubernetesManifest manifest, Task task, String opName) { + log.info("Creating manifest {}", manifest.getName()); + List command = kubectlAuthPrefix(credentials); + + // Read from stdin + command.add("create"); + command.add("-o"); + command.add("json"); + command.add("-f"); + command.add("-"); + + JobResult status = executeKubectlCommand(credentials, command, Optional.of(manifest)); + + persistKubectlJobOutput(credentials, status, manifest.getFullResourceName(), task, opName); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException( + "Create failed for manifest: " + + manifest.getFullResourceName() + + ". Error: " + + status.getError()); + } + + return getKubernetesManifestFromJobResult(status, manifest); + } + + private KubernetesManifest getKubernetesManifestFromJobResult( + JobResult status, KubernetesManifest inputManifest) { + try { + return gson.fromJson(status.getOutput(), KubernetesManifest.class); + } catch (JsonSyntaxException e) { + throw new KubectlException( + "Failed to parse kubectl output for manifest: " + + inputManifest.getName() + + ". Error: " + + e.getMessage(), + e); + } + } + + private List kubectlAuthPrefix(KubernetesCredentials credentials) { + List command = new ArrayList<>(); + if (!Strings.isNullOrEmpty(credentials.getKubectlExecutable())) { + command.add(credentials.getKubectlExecutable()); + } else { + command.add(this.kubernetesConfigurationProperties.getKubectl().getExecutable()); + } + + if (credentials.getKubectlRequestTimeoutSeconds() != null) { + command.add("--request-timeout=" + credentials.getKubectlRequestTimeoutSeconds()); + } + + if (credentials.isDebug()) { + command.add("-v"); + command.add("9"); + } + + if (!credentials.isServiceAccount()) { + if (credentials.getOAuthServiceAccount() != null + && !credentials.getOAuthServiceAccount().isEmpty()) { + command.add(KUBECTL_COMMAND_OPTION_TOKEN + getOAuthToken(credentials)); + } + + String kubeconfigFile = credentials.getKubeconfigFile(); + if (!Strings.isNullOrEmpty(kubeconfigFile)) { + command.add(KUBECTL_COMMAND_OPTION_KUBECONFIG + kubeconfigFile); + } + + String context = credentials.getContext(); + if (!Strings.isNullOrEmpty(context)) { + command.add(KUBECTL_COMMAND_OPTION_CONTEXT + context); + } + } + + return command; + } + + private List kubectlLookupInfo( + List command, + KubernetesKind kind, + String name, + KubernetesSelectorList labelSelectors) { + if (!Strings.isNullOrEmpty(name)) { + command.add(kind + "/" + name); + } else { + command.add(kind.toString()); + } + + if (labelSelectors != null && !labelSelectors.isEmpty()) { + command.add("-l=" + labelSelectors); + } + + return command; + } + + private List kubectlNamespacedAuthPrefix( + KubernetesCredentials credentials, String namespace) { + List command = kubectlAuthPrefix(credentials); + + if (!Strings.isNullOrEmpty(namespace)) { + command.add("--namespace=" + namespace); + } + + return command; + } + + private List kubectlNamespacedGet( + KubernetesCredentials credentials, List kind, String namespace) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + command.add("-o"); + command.add("json"); + + command.add("get"); + command.add(kind.stream().map(KubernetesKind::toString).collect(Collectors.joining(","))); + + return command; + } + + private String getOAuthToken(KubernetesCredentials credentials) { + List command = new ArrayList<>(); + command.add(this.kubernetesConfigurationProperties.getOAuth().getExecutable()); + command.add("fetch"); + command.add("--json"); + command.add(credentials.getOAuthServiceAccount()); + command.addAll(credentials.getOAuthScopes()); + + JobResult status = executeKubectlCommand(credentials, command); + + if (status.getResult() != JobResult.Result.SUCCESS) { + throw new KubectlException("Could not fetch OAuth token: " + status.getError()); + } + return status.getOutput(); + } + + public ImmutableList topPod( + KubernetesCredentials credentials, String namespace, @Nonnull String pod) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + command.add("top"); + command.add("po"); + if (!pod.isEmpty()) { + command.add(pod); + } + command.add("--containers"); + + JobResult status = executeKubectlCommand(credentials, command); + if (status.getResult() != JobResult.Result.SUCCESS) { + if (status.getError().toLowerCase().contains("not available") + || status.getError().toLowerCase().contains("not found")) { + log.warn( + String.format( + "Error fetching metrics for account %s: %s", + credentials.getAccountName(), status.getError())); + return ImmutableList.of(); + } + throw new KubectlException("Could not read metrics: " + status.getError()); + } + + ImmutableSetMultimap metrics = + MetricParser.parseMetrics(status.getOutput()); + return metrics.asMap().entrySet().stream() + .map( + podMetrics -> + KubernetesPodMetric.builder() + .podName(podMetrics.getKey()) + .namespace(namespace) + .containerMetrics(podMetrics.getValue()) + .build()) + .collect(ImmutableList.toImmutableList()); + } + + public Void patch( KubernetesCredentials credentials, KubernetesKind kind, String namespace, @@ -104,9 +826,11 @@ Void patch( KubernetesPatchOptions options, List patches, Task task, - String opName); + String opName) { + return patch(credentials, kind, namespace, name, options, gson.toJson(patches), task, opName); + } - Void patch( + public Void patch( KubernetesCredentials credentials, KubernetesKind kind, String namespace, @@ -114,17 +838,396 @@ Void patch( KubernetesPatchOptions options, KubernetesManifest manifest, Task task, - String opName); + String opName) { + return patch(credentials, kind, namespace, name, options, gson.toJson(manifest), task, opName); + } - ImmutableList list( + private Void patch( KubernetesCredentials credentials, - List kinds, + KubernetesKind kind, String namespace, - KubernetesSelectorList selectors); + String name, + KubernetesPatchOptions options, + String patchBody, + Task task, + String opName) { + List command = kubectlNamespacedAuthPrefix(credentials, namespace); + + command.add("patch"); + command.add(kind.toString()); + command.add(name); + + if (options.isRecord()) { + command.add("--record"); + } + + String mergeStrategy = options.getMergeStrategy().toString(); + if (!Strings.isNullOrEmpty(mergeStrategy)) { + command.add("--type"); + command.add(mergeStrategy); + } + + command.add("--patch"); + command.add(patchBody); + + JobResult status = executeKubectlCommand(credentials, command); + + persistKubectlJobOutput(credentials, status, kind + "/" + name, task, opName); + + if (status.getResult() != JobResult.Result.SUCCESS) { + String errMsg = status.getError(); + if (Strings.isNullOrEmpty(errMsg)) { + errMsg = status.getOutput(); + } + if (errMsg.contains("not patched")) { + log.warn("No change occurred after patching {} {}:{}, ignoring", kind, namespace, name); + return null; + } + + throw new KubectlException( + "Patch failed for: " + name + " in namespace: " + namespace + ": " + errMsg); + } + + return null; + } + + private ReaderConsumer> parseManifestList() { + return (@WillClose BufferedReader r) -> { + try (JsonReader reader = new JsonReader(r)) { + try { + reader.beginObject(); + } catch (EOFException e) { + // If the stream we're parsing is empty, just return an empty list + return ImmutableList.of(); + } + ImmutableList.Builder manifestList = new ImmutableList.Builder<>(); + while (reader.hasNext()) { + if (reader.nextName().equals("items")) { + reader.beginArray(); + while (reader.hasNext()) { + KubernetesManifest manifest = gson.fromJson(reader, KubernetesManifest.class); + manifestList.add(manifest); + } + reader.endArray(); + } else { + reader.skipValue(); + } + } + reader.endObject(); + return manifestList.build(); + } catch (IllegalStateException | JsonSyntaxException e) { + // An IllegalStageException is thrown when we call beginObject, nextName(), etc. and the + // next token is not what we are asserting it to be. A JsonSyntaxException is thrown when + // gson.fromJson isn't able to map the next token to a KubernetesManifest. + // In both of these cases, the error is due to the output from kubectl being malformed (or + // at least malformed relative to our expectations) so we'll wrap the exception in a + // KubectlException. + throw new KubectlException("Failed to parse kubectl output: " + e.getMessage(), e); + } + }; + } + + /** + * This method executes the actual kubectl command and determines if retries are required, on + * failure. + * + * @param credentials k8s account credentials + * @param command the actual kubectl command to be performed + * @return - the result of the kubectl command + */ + private JobResult executeKubectlCommand( + KubernetesCredentials credentials, List command) { + return executeKubectlCommand(credentials, command, Optional.empty()); + } + + /** + * This method executes the actual kubectl command and determines if retries are required, on + * failure. + * + * @param credentials k8s account credentials + * @param command the actual kubectl command to be performed + * @param manifest the manifest supplied to the kubectl command + * @return - the result of the kubectl command + */ + private JobResult executeKubectlCommand( + KubernetesCredentials credentials, + List command, + Optional manifest) { + // retry registry is empty if retries are not enabled. + if (retryRegistry.isEmpty()) { + return jobExecutor.runJob(createJobRequest(command, manifest)); + } + + // capture the original result obtained from the jobExecutor.runJob(jobRequest) call. + JobResult.JobResultBuilder finalResult = JobResult.builder(); + + KubectlActionIdentifier identifier = + new KubectlActionIdentifier(credentials, command, manifest); + Retry retryContext = retryRegistry.get().retry(identifier.getRetryInstanceName()); + try { + return retryContext.executeSupplier( + () -> { + JobResult result = jobExecutor.runJob(createJobRequest(command, manifest)); + return processJobResult(identifier, result, finalResult); + }); + } catch (KubectlException | NoRetryException e) { + // the caller functions expect any failures to be defined in a JobResult object and not in + // the form of an exception. Hence, we need to translate the above exceptions back into a + // JobResult object - but we only need to do it for KubectlException and NoRetryException ( + // since these are the ones explicitly thrown above) and not for any other ones. + return finalResult.build(); + } + } + + /** + * This method executes the actual kubectl command and determines if retries are required, on + * failure. + * + * @param credentials k8s account credentials + * @param command the actual kubectl command to be performed + * @param readerConsumer A function that transforms the job's standard output + * @param return type of the JobResult output + * @return the result of the kubectl command + */ + private JobResult executeKubectlCommand( + KubernetesCredentials credentials, List command, ReaderConsumer readerConsumer) { + // retry registry is empty if retries are not enabled. + if (retryRegistry.isEmpty()) { + return jobExecutor.runJob(new JobRequest(command), readerConsumer); + } + + // capture the original result obtained from the jobExecutor.runJob(jobRequest, readerConsumer) + // call. + JobResult.JobResultBuilder finalResult = JobResult.builder(); + KubectlActionIdentifier identifier = new KubectlActionIdentifier(credentials, command); + Retry retryContext = retryRegistry.get().retry(identifier.getRetryInstanceName()); + try { + return retryContext.executeSupplier( + () -> { + JobResult result = jobExecutor.runJob(new JobRequest(command), readerConsumer); + return processJobResult(identifier, result, finalResult); + }); + } catch (KubectlException | NoRetryException e) { + // the caller functions expect any failures to be defined in a JobResult object and not in + // the form of an exception. Hence, we need to translate the above exceptions back into a + // JobResult object - but we only need to do it for KubectlException and NoRetryException + // (since these are the ones explicitly thrown above) and not for any other ones. + return finalResult.build(); + } + } + + /** + * helper function to create a JobRequest using the input parameters + * + * @param command the command to be executed in the job request + * @param manifest the manifest to be used by the command. This is optional. + * @return a job request object + */ + @VisibleForTesting + JobRequest createJobRequest(List command, Optional manifest) { + // depending on the presence of the manifest, an appropriate job request is created + if (manifest.isPresent()) { + String manifestAsJson = gson.toJson(manifest.get()); + return new JobRequest( + command, new ByteArrayInputStream(manifestAsJson.getBytes(StandardCharsets.UTF_8))); + } + + return new JobRequest(command); + } + + /** + * helper function to handle a job result obtained after performing a job request. This either + * returns the result, if successful, or throws an exception on failure. + * + * @param identifier uniquely identifies the job in the logs + * @param result the job result to be processed + * @param finalResult a buffer that keeps track of the result. This ensures on retries, the + * original is not lost + * @param the return type of the JobResult output + * @return the result of the kubectl command, in the form of a JobResult object + */ + @VisibleForTesting + JobResult processJobResult( + KubectlActionIdentifier identifier, + JobResult result, + JobResult.JobResultBuilder finalResult) { + if (result.getResult() == JobResult.Result.SUCCESS) { + return result; + } + + // save the result as it'll be needed later on when we are done with retries + finalResult + .error(result.getError()) + .killed(result.isKilled()) + .output(result.getOutput()) + .result(result.getResult()); + + // if result is not successful, that means we need to determine if we should retry + // or not. + // + // Since Kubectl binary doesn't throw any exceptions by default, we need to + // check the result to see if retries are needed. Resilience.4j needs an exception to be + // thrown to decide if retries are needed and also, to capture retry metrics correctly. + throw convertKubectlJobResultToException(identifier.getKubectlAction(), result); + } + + /** + * this method is meant to be invoked only for those JobResults which are unsuccessful. It + * determines if the error contained in the JobResult should be retried or not. If the error needs + * to be retried, then KubectlException is returned. Otherwise, NoRetryException is returned. + * + * @param identifier used to log which action's job result is being processed + * @param result the job result which needs to be checked to see if it has an error that can be + * retried + * @param job result generic type + * @return - Either KubectlException or NoRetryException + */ + private RuntimeException convertKubectlJobResultToException( + String identifier, JobResult result) { + // the error matches the configured list of retryable errors. + if (this.kubernetesConfigurationProperties + .getJobExecutor() + .getRetries() + .getRetryableErrorMessages() + .stream() + .anyMatch(errorMessage -> result.getError().contains(errorMessage))) { + return new KubectlException(identifier + " failed. Error: " + result.getError()); + } + + // even though the error is not explicitly configured to be retryable, the job was killed - + // hence, we should retry + if (result.isKilled()) { + return new KubectlException( + "retrying " + identifier + " since the job " + result + " was killed"); + } + + String message = + "Not retrying " + + identifier + + " as retries are not enabled for error: " + + result.getError(); + log.warn(message); + // we want to let the retry library know that such errors should not be retried. + // Since we have configured the global retry registry to ignore errors of type + // NoRetryException, we return this here + return new NoRetryException(message); + } + + private void persistKubectlJobOutput( + KubernetesCredentials credentials, + JobResult status, + String manifestName, + Task task, + String taskName) { + if (kubernetesConfigurationProperties.getJobExecutor().isPersistTaskOutput()) { + if (kubernetesConfigurationProperties.getJobExecutor().isEnableTaskOutputForAllAccounts() + || credentials.isDebug()) { + task.updateOutput(manifestName, taskName, status.getOutput(), status.getError()); + } + } + } + + public static class KubectlException extends RuntimeException { + public KubectlException(String message) { + super(message); + } + + public KubectlException(String message, Throwable cause) { + super(message, cause); + } + } + + public static class KubectlNotFoundException extends KubectlException { + public KubectlNotFoundException(String message) { + super(message); + } + } + + /** + * this exception is only meant to be used in cases where we want resilience4j to not retry + * kubectl calls. It should not be used anywhere else. + */ + static class NoRetryException extends RuntimeException { + NoRetryException(String message) { + super(message); + } + } + + /** helper class to identify the kubectl command in logs and metrics when retries are enabled */ + static class KubectlActionIdentifier { + KubernetesCredentials credentials; + List command; + String namespace; + String resource; + + public KubectlActionIdentifier( + KubernetesCredentials credentials, + List command, + String namespace, + String resource) { + this.credentials = credentials; + this.command = command; + this.namespace = namespace; + this.resource = resource; + } + + public KubectlActionIdentifier(KubernetesCredentials credentials, List command) { + this(credentials, command, "", ""); + } + + public KubectlActionIdentifier( + KubernetesCredentials credentials, + List command, + Optional manifest) { + this(credentials, command); + if (manifest.isPresent()) { + this.namespace = manifest.get().getNamespace(); + this.resource = manifest.get().getFullResourceName(); + } + } + + /** + * this returns the sanitized kubectl command. This can be used to log the command during retry + * attempts, among other things. + * + * @return - the sanitized kubectl command + */ + public String getKubectlAction() { + // no need to display everything in a kubectl command + List commandToLog = + command.stream() + .filter( + s -> + !(s.contains(KUBECTL_COMMAND_OPTION_TOKEN) + || s.contains(KUBECTL_COMMAND_OPTION_KUBECONFIG) + || s.contains(KUBECTL_COMMAND_OPTION_CONTEXT))) + .collect(Collectors.toList()); + + String identifier = + "command: '" + + String.join(" ", commandToLog) + + "' in account: " + + this.credentials.getAccountName(); + + if (!namespace.isEmpty()) { + identifier += " in namespace: " + namespace; + } - KubernetesManifest get( - KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name); + if (!resource.isEmpty()) { + identifier += " for resource: " + resource; + } + return identifier; + } - ImmutableList eventsFor( - KubernetesCredentials credentials, KubernetesKind kind, String namespace, String name); + /** + * this returns a name which uniquely identifies a retry instance. This name shows up in the + * logs when each retry event is logged. Also, when capturing the retry metrics, the 'name' tag + * in the metric corresponds to this. + * + * @return - the name to be used to uniquely identify a retry instance + */ + public String getRetryInstanceName() { + return this.credentials.getAccountName(); + } + } } diff --git a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentials.java b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentials.java index fb086ab6d12..07e061521fa 100644 --- a/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentials.java +++ b/clouddriver-kubernetes/src/main/java/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentials.java @@ -55,19 +55,14 @@ import com.netflix.spinnaker.clouddriver.kubernetes.names.KubernetesNamerRegistry; import com.netflix.spinnaker.clouddriver.kubernetes.op.handler.KubernetesCustomResourceHandler; import com.netflix.spinnaker.clouddriver.kubernetes.op.handler.KubernetesHandler; -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor.KubectlException; -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor.KubectlNotFoundException; import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor.KubectlException; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor.KubectlNotFoundException; import com.netflix.spinnaker.kork.configserver.ConfigFileService; import com.netflix.spinnaker.moniker.Namer; import io.kubernetes.client.openapi.models.V1DeleteOptions; import io.kubernetes.client.openapi.models.V1beta1CustomResourceDefinition; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Function; diff --git a/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/provider/view/KubernetesJobProviderSpec.groovy b/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/provider/view/KubernetesJobProviderSpec.groovy index 1cf4c1318da..644d12f89b6 100644 --- a/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/provider/view/KubernetesJobProviderSpec.groovy +++ b/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/provider/view/KubernetesJobProviderSpec.groovy @@ -20,7 +20,7 @@ package com.netflix.spinnaker.clouddriver.kubernetes.provider.view import com.netflix.spinnaker.clouddriver.kubernetes.caching.view.model.KubernetesManifestContainer import com.netflix.spinnaker.clouddriver.kubernetes.caching.view.provider.KubernetesManifestProvider import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesManifest -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesCredentials import com.netflix.spinnaker.clouddriver.security.AccountCredentials import com.netflix.spinnaker.clouddriver.security.AccountCredentialsProvider @@ -79,7 +79,7 @@ class KubernetesJobProviderSpec extends Specification { given: def mockCredentials = Mock(KubernetesCredentials) { jobLogs(*_) >> { - throw new DefaultKubectlJobExecutor.KubectlException("some exception while getting logs", new Exception()) + throw new KubectlJobExecutor.KubectlException("some exception while getting logs", new Exception()) } } @@ -150,7 +150,7 @@ class KubernetesJobProviderSpec extends Specification { given: def mockCredentials = Mock(KubernetesCredentials) { logs(*_) >> { - throw new DefaultKubectlJobExecutor.KubectlException("some exception while getting logs", new Exception()) + throw new KubectlJobExecutor.KubectlException("some exception while getting logs", new Exception()) } } diff --git a/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentialsSpec.groovy b/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentialsSpec.groovy index ade4dafcaa2..dae204ea975 100644 --- a/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentialsSpec.groovy +++ b/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentialsSpec.groovy @@ -30,14 +30,14 @@ import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.Kuberne import com.netflix.spinnaker.clouddriver.kubernetes.names.KubernetesManifestNamer import com.netflix.spinnaker.clouddriver.kubernetes.names.KubernetesNamerRegistry import com.netflix.spinnaker.clouddriver.kubernetes.op.handler.KubernetesUnregisteredCustomResourceHandler -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesCredentials.KubernetesKindStatus import com.netflix.spinnaker.kork.configserver.ConfigFileService import spock.lang.Specification class KubernetesCredentialsSpec extends Specification { Registry registry = Stub(Registry) - DefaultKubectlJobExecutor kubectlJobExecutor = Stub(DefaultKubectlJobExecutor) + KubectlJobExecutor kubectlJobExecutor = Stub(KubectlJobExecutor) String NAMESPACE = "my-namespace" AccountResourcePropertyRegistry.Factory resourcePropertyRegistryFactory = Mock(AccountResourcePropertyRegistry.Factory) KubernetesKindRegistry.Factory kindRegistryFactory = new KubernetesKindRegistry.Factory( @@ -137,7 +137,7 @@ class KubernetesCredentialsSpec extends Specification { checkPermissionsOnStartup: true, )) kubectlJobExecutor.list(_ as KubernetesCredentials, ImmutableList.of(KubernetesKind.DEPLOYMENT), NAMESPACE, _ as KubernetesSelectorList) >> { - throw new DefaultKubectlJobExecutor.KubectlException("Error", new Exception()) + throw new KubectlJobExecutor.KubectlException("Error", new Exception()) } kubectlJobExecutor.list(_ as KubernetesCredentials, ImmutableList.of(KubernetesKind.REPLICA_SET), NAMESPACE, _ as KubernetesSelectorList) >> { return ImmutableList.of() @@ -187,7 +187,7 @@ class KubernetesCredentialsSpec extends Specification { metrics: true )) kubectlJobExecutor.topPod(_ as KubernetesCredentials, NAMESPACE, _) >> { - throw new DefaultKubectlJobExecutor.KubectlException("Error", new Exception()) + throw new KubectlJobExecutor.KubectlException("Error", new Exception()) } expect: diff --git a/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesNamedAccountCredentialsSpec.groovy b/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesNamedAccountCredentialsSpec.groovy index b1641a48ea9..a6a86f667e3 100644 --- a/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesNamedAccountCredentialsSpec.groovy +++ b/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesNamedAccountCredentialsSpec.groovy @@ -26,7 +26,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.description.KubernetesSpinna import com.netflix.spinnaker.clouddriver.kubernetes.names.KubernetesManifestNamer import com.netflix.spinnaker.clouddriver.kubernetes.names.KubernetesNamerRegistry import com.netflix.spinnaker.clouddriver.kubernetes.op.handler.KubernetesUnregisteredCustomResourceHandler -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor import com.netflix.spinnaker.fiat.model.Authorization import com.netflix.spinnaker.kork.configserver.ConfigFileService import spock.lang.Specification @@ -41,7 +41,7 @@ class KubernetesNamedAccountCredentialsSpec extends Specification { KubernetesSpinnakerKindMap kubernetesSpinnakerKindMap = new KubernetesSpinnakerKindMap(ImmutableList.of()) GlobalResourcePropertyRegistry globalResourcePropertyRegistry = new GlobalResourcePropertyRegistry(ImmutableList.of(), new KubernetesUnregisteredCustomResourceHandler()) - DefaultKubectlJobExecutor mockKubectlJobExecutor = Mock(DefaultKubectlJobExecutor) + KubectlJobExecutor mockKubectlJobExecutor = Mock(KubectlJobExecutor) KubernetesCredentials.Factory credentialFactory = new KubernetesCredentials.Factory( new NoopRegistry(), diff --git a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesDataProviderIntegrationTest.java b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesDataProviderIntegrationTest.java index 08124decf40..53b402549c6 100644 --- a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesDataProviderIntegrationTest.java +++ b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesDataProviderIntegrationTest.java @@ -63,7 +63,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.op.handler.KubernetesServiceHandler; import com.netflix.spinnaker.clouddriver.kubernetes.op.handler.KubernetesUnregisteredCustomResourceHandler; import com.netflix.spinnaker.clouddriver.kubernetes.op.handler.ManifestFetcher; -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor; import com.netflix.spinnaker.clouddriver.kubernetes.security.*; import com.netflix.spinnaker.clouddriver.model.Application; import com.netflix.spinnaker.clouddriver.model.HealthState; @@ -574,9 +574,8 @@ void getClusterManifestCoordinatesEmptyCluster(SoftAssertions softly) { softly.assertThat(coordinates).isEmpty(); } - private static DefaultKubectlJobExecutor getJobExecutor() { - DefaultKubectlJobExecutor jobExecutor = - mock(DefaultKubectlJobExecutor.class, new ReturnsSmartNulls()); + private static KubectlJobExecutor getJobExecutor() { + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class, new ReturnsSmartNulls()); when(jobExecutor.list( any(KubernetesCredentials.class), anyList(), diff --git a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesInstanceProviderTest.java b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesInstanceProviderTest.java index d191212527b..5ddd7b6b0ac 100644 --- a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesInstanceProviderTest.java +++ b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/caching/view/provider/KubernetesInstanceProviderTest.java @@ -31,7 +31,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesKind; import com.netflix.spinnaker.clouddriver.kubernetes.description.manifest.KubernetesManifest; import com.netflix.spinnaker.clouddriver.kubernetes.model.ContainerLog; -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesCredentials; import io.kubernetes.client.openapi.JSON; import io.kubernetes.client.openapi.models.V1Container; @@ -170,7 +170,7 @@ void getConsoleOutputKubectlException() { .build())) .thenReturn(manifest); when(credentials.logs(anyString(), anyString(), anyString())) - .thenThrow(new DefaultKubectlJobExecutor.KubectlException(LOG_OUTPUT, null)); + .thenThrow(new KubectlJobExecutor.KubectlException(LOG_OUTPUT, null)); List logs = provider.getConsoleOutput(ACCOUNT, NAMESPACE, POD_FULL_NAME); diff --git a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/DefaultKubectlJobExecutorTest.java b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubectlJobExecutorTest.java similarity index 86% rename from clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/DefaultKubectlJobExecutorTest.java rename to clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubectlJobExecutorTest.java index 61b57ef4b08..223a65f9166 100644 --- a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/DefaultKubectlJobExecutorTest.java +++ b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubectlJobExecutorTest.java @@ -75,7 +75,7 @@ import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.LoggerFactory; -final class DefaultKubectlJobExecutorTest { +final class KubectlJobExecutorTest { private static final String NAMESPACE = "test-namespace"; JobExecutor jobExecutor; KubernetesConfigurationProperties kubernetesConfigurationProperties; @@ -98,11 +98,11 @@ void topPodEmptyOutput(boolean retriesEnabled) { new KubernetesConfigurationProperties(); kubernetesConfigurationProperties.getJobExecutor().getRetries().setEnabled(retriesEnabled); - DefaultKubectlJobExecutor defaultKubectlJobExecutor = - new DefaultKubectlJobExecutor( + KubectlJobExecutor kubectlJobExecutor = + new KubectlJobExecutor( jobExecutor, kubernetesConfigurationProperties, new SimpleMeterRegistry()); Collection podMetrics = - defaultKubectlJobExecutor.topPod(mockKubernetesCredentials(), "test", ""); + kubectlJobExecutor.topPod(mockKubernetesCredentials(), "test", ""); assertThat(podMetrics).isEmpty(); // should only be called once as no retries are performed @@ -110,8 +110,8 @@ void topPodEmptyOutput(boolean retriesEnabled) { if (retriesEnabled) { // verify retry registry - assertTrue(defaultKubectlJobExecutor.getRetryRegistry().isPresent()); - RetryRegistry retryRegistry = defaultKubectlJobExecutor.getRetryRegistry().get(); + assertTrue(kubectlJobExecutor.getRetryRegistry().isPresent()); + RetryRegistry retryRegistry = kubectlJobExecutor.getRetryRegistry().get(); assertThat(retryRegistry.getAllRetries().size()).isEqualTo(1); assertThat(retryRegistry.getAllRetries().get(0).getName()).isEqualTo("mock-account"); @@ -131,16 +131,15 @@ void topPodMultipleContainers() { .thenReturn( JobResult.builder() .result(Result.SUCCESS) - .output( - ManifestFetcher.getResource(DefaultKubectlJobExecutorTest.class, "top-pod.txt")) + .output(ManifestFetcher.getResource(KubectlJobExecutorTest.class, "top-pod.txt")) .error("") .build()); - DefaultKubectlJobExecutor defaultKubectlJobExecutor = - new DefaultKubectlJobExecutor( + KubectlJobExecutor kubectlJobExecutor = + new KubectlJobExecutor( jobExecutor, new KubernetesConfigurationProperties(), new SimpleMeterRegistry()); Collection podMetrics = - defaultKubectlJobExecutor.topPod(mockKubernetesCredentials(), NAMESPACE, ""); + kubectlJobExecutor.topPod(mockKubernetesCredentials(), NAMESPACE, ""); assertThat(podMetrics).hasSize(2); ImmutableSetMultimap expectedMetrics = @@ -193,15 +192,15 @@ void kubectlJobExecutorErrorHandlingWhenRetriesAreDisabled() { .error("some error") .build()); - DefaultKubectlJobExecutor defaultKubectlJobExecutor = - new DefaultKubectlJobExecutor( + KubectlJobExecutor kubectlJobExecutor = + new KubectlJobExecutor( jobExecutor, kubernetesConfigurationProperties, new SimpleMeterRegistry()); // then - DefaultKubectlJobExecutor.KubectlException thrown = + KubectlJobExecutor.KubectlException thrown = assertThrows( - DefaultKubectlJobExecutor.KubectlException.class, - () -> defaultKubectlJobExecutor.topPod(mockKubernetesCredentials(), "test", "")); + KubectlJobExecutor.KubectlException.class, + () -> kubectlJobExecutor.topPod(mockKubernetesCredentials(), "test", "")); assertTrue(thrown.getMessage().contains("some error")); // should only be called once as no retries are performed for this error @@ -220,13 +219,13 @@ void kubectlRetryHandlingForConfiguredErrorsThatContinueFailingAfterMaxRetryAtte kubernetesConfigurationProperties.getJobExecutor().getRetries().setEnabled(true); // to test log messages - MemoryAppender memoryAppender = new MemoryAppender(DefaultKubectlJobExecutor.class); + MemoryAppender memoryAppender = new MemoryAppender(KubectlJobExecutor.class); final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads, new ThreadFactoryBuilder() - .setNameFormat(DefaultKubectlJobExecutorTest.class.getSimpleName() + "-%d") + .setNameFormat(KubectlJobExecutorTest.class.getSimpleName() + "-%d") .build()); final ArrayList>> futures = @@ -241,16 +240,14 @@ void kubectlRetryHandlingForConfiguredErrorsThatContinueFailingAfterMaxRetryAtte .error("Unable to connect to the server: net/http: TLS handshake timeout") .build()); - DefaultKubectlJobExecutor defaultKubectlJobExecutor = - new DefaultKubectlJobExecutor( + KubectlJobExecutor kubectlJobExecutor = + new KubectlJobExecutor( jobExecutor, kubernetesConfigurationProperties, new SimpleMeterRegistry()); for (int i = 1; i <= numberOfThreads; i++) { futures.add( executor.submit( - () -> - defaultKubectlJobExecutor.topPod( - mockKubernetesCredentials(), NAMESPACE, "test-pod"))); + () -> kubectlJobExecutor.topPod(mockKubernetesCredentials(), NAMESPACE, "test-pod"))); } // then @@ -258,7 +255,7 @@ void kubectlRetryHandlingForConfiguredErrorsThatContinueFailingAfterMaxRetryAtte try { future.get(); } catch (final ExecutionException e) { - assertTrue(e.getCause() instanceof DefaultKubectlJobExecutor.KubectlException); + assertTrue(e.getCause() instanceof KubectlJobExecutor.KubectlException); assertTrue( e.getMessage() .contains("Unable to connect to the server: net/http: TLS handshake timeout")); @@ -278,8 +275,8 @@ void kubectlRetryHandlingForConfiguredErrorsThatContinueFailingAfterMaxRetryAtte .runJob(any(JobRequest.class)); // verify retry registry - assertTrue(defaultKubectlJobExecutor.getRetryRegistry().isPresent()); - RetryRegistry retryRegistry = defaultKubectlJobExecutor.getRetryRegistry().get(); + assertTrue(kubectlJobExecutor.getRetryRegistry().isPresent()); + RetryRegistry retryRegistry = kubectlJobExecutor.getRetryRegistry().get(); assertThat(retryRegistry.getAllRetries().size()).isEqualTo(1); assertThat(retryRegistry.getAllRetries().get(0).getName()).isEqualTo("mock-account"); @@ -297,7 +294,7 @@ void kubectlRetryHandlingForConfiguredErrorsThatContinueFailingAfterMaxRetryAtte "Kubectl command for mock-account failed after " + kubernetesConfigurationProperties.getJobExecutor().getRetries().getMaxAttempts() + " attempts. Exception: com.netflix.spinnaker.clouddriver.kubernetes.op." - + "job.DefaultKubectlJobExecutor$KubectlException: command: 'kubectl " + + "job.KubectlJobExecutor$KubectlException: command: 'kubectl " + "--request-timeout=0 --namespace=test-namespace top po test-pod " + "--containers' in account: mock-account failed. Error: Unable to " + "connect to the server: net/http: TLS handshake timeout", @@ -318,7 +315,7 @@ void kubectlMultiThreadedRetryHandlingForErrorsThatAreNotConfiguredToBeRetryable kubernetesConfigurationProperties.getJobExecutor().getRetries().setEnabled(true); // to test log messages - Logger logger = (Logger) LoggerFactory.getLogger(DefaultKubectlJobExecutor.class); + Logger logger = (Logger) LoggerFactory.getLogger(KubectlJobExecutor.class); ListAppender listAppender = new ListAppender<>(); listAppender.setContext((LoggerContext) LoggerFactory.getILoggerFactory()); logger.addAppender(listAppender); @@ -328,7 +325,7 @@ void kubectlMultiThreadedRetryHandlingForErrorsThatAreNotConfiguredToBeRetryable Executors.newFixedThreadPool( numberOfThreads, new ThreadFactoryBuilder() - .setNameFormat(DefaultKubectlJobExecutorTest.class.getSimpleName() + "-%d") + .setNameFormat(KubectlJobExecutorTest.class.getSimpleName() + "-%d") .build()); final ArrayList>> futures = @@ -343,16 +340,14 @@ void kubectlMultiThreadedRetryHandlingForErrorsThatAreNotConfiguredToBeRetryable .error("un-retryable error") .build()); - DefaultKubectlJobExecutor defaultKubectlJobExecutor = - new DefaultKubectlJobExecutor( + KubectlJobExecutor kubectlJobExecutor = + new KubectlJobExecutor( jobExecutor, kubernetesConfigurationProperties, new SimpleMeterRegistry()); for (int i = 1; i <= numberOfThreads; i++) { futures.add( executor.submit( - () -> - defaultKubectlJobExecutor.topPod( - mockKubernetesCredentials(), NAMESPACE, "test-pod"))); + () -> kubectlJobExecutor.topPod(mockKubernetesCredentials(), NAMESPACE, "test-pod"))); } // then @@ -360,7 +355,7 @@ void kubectlMultiThreadedRetryHandlingForErrorsThatAreNotConfiguredToBeRetryable try { future.get(); } catch (final ExecutionException e) { - assertTrue(e.getCause() instanceof DefaultKubectlJobExecutor.KubectlException); + assertTrue(e.getCause() instanceof KubectlJobExecutor.KubectlException); assertTrue(e.getMessage().contains("un-retryable error")); } catch (final InterruptedException ignored) { } @@ -372,8 +367,8 @@ void kubectlMultiThreadedRetryHandlingForErrorsThatAreNotConfiguredToBeRetryable verify(jobExecutor, times(numberOfThreads)).runJob(any(JobRequest.class)); // verify retry registry - assertTrue(defaultKubectlJobExecutor.getRetryRegistry().isPresent()); - RetryRegistry retryRegistry = defaultKubectlJobExecutor.getRetryRegistry().get(); + assertTrue(kubectlJobExecutor.getRetryRegistry().isPresent()); + RetryRegistry retryRegistry = kubectlJobExecutor.getRetryRegistry().get(); assertThat(retryRegistry.getAllRetries().size()).isEqualTo(1); assertThat(retryRegistry.getAllRetries().get(0).getName()).isEqualTo("mock-account"); @@ -410,7 +405,7 @@ void kubectlRetryHandlingForConfiguredErrorsThatSucceedAfterAFewRetries() { kubernetesConfigurationProperties.getJobExecutor().getRetries().setEnabled(true); // to test log messages - Logger logger = (Logger) LoggerFactory.getLogger(DefaultKubectlJobExecutor.class); + Logger logger = (Logger) LoggerFactory.getLogger(KubectlJobExecutor.class); ListAppender listAppender = new ListAppender<>(); listAppender.setContext((LoggerContext) LoggerFactory.getILoggerFactory()); logger.addAppender(listAppender); @@ -427,17 +422,16 @@ void kubectlRetryHandlingForConfiguredErrorsThatSucceedAfterAFewRetries() { .thenReturn( JobResult.builder() .result(Result.SUCCESS) - .output( - ManifestFetcher.getResource(DefaultKubectlJobExecutorTest.class, "top-pod.txt")) + .output(ManifestFetcher.getResource(KubectlJobExecutorTest.class, "top-pod.txt")) .error("") .build()); - DefaultKubectlJobExecutor defaultKubectlJobExecutor = - new DefaultKubectlJobExecutor( + KubectlJobExecutor kubectlJobExecutor = + new KubectlJobExecutor( jobExecutor, kubernetesConfigurationProperties, new SimpleMeterRegistry()); Collection podMetrics = - defaultKubectlJobExecutor.topPod(mockKubernetesCredentials(), NAMESPACE, "test-pod"); + kubectlJobExecutor.topPod(mockKubernetesCredentials(), NAMESPACE, "test-pod"); // then @@ -446,8 +440,8 @@ void kubectlRetryHandlingForConfiguredErrorsThatSucceedAfterAFewRetries() { verify(jobExecutor, times(2)).runJob(any(JobRequest.class)); // verify retry registry - assertTrue(defaultKubectlJobExecutor.getRetryRegistry().isPresent()); - RetryRegistry retryRegistry = defaultKubectlJobExecutor.getRetryRegistry().get(); + assertTrue(kubectlJobExecutor.getRetryRegistry().isPresent()); + RetryRegistry retryRegistry = kubectlJobExecutor.getRetryRegistry().get(); assertThat(retryRegistry.getAllRetries().size()).isEqualTo(1); assertThat(retryRegistry.getAllRetries().get(0).getName()).isEqualTo("mock-account"); @@ -470,7 +464,7 @@ void kubectlRetryHandlingForConfiguredErrorsThatSucceedAfterAFewRetries() { .contains( "Kubectl command for mock-account is now successful in attempt #2. Last " + "attempt had failed with exception: com.netflix.spinnaker.clouddriver" - + ".kubernetes.op.job.DefaultKubectlJobExecutor$KubectlException: command: " + + ".kubernetes.op.job.KubectlJobExecutor$KubectlException: command: " + "'kubectl --request-timeout=0 --namespace=test-namespace top po test-pod" + " --containers' in account: mock-account failed. Error: Unable to connect to" + " the server: net/http: TLS handshake timeout")) @@ -529,15 +523,14 @@ void kubectlJobExecutorRaisesException(boolean retriesEnabled) { kubernetesConfigurationProperties.getJobExecutor().getRetries().setEnabled(true); } - DefaultKubectlJobExecutor defaultKubectlJobExecutor = - new DefaultKubectlJobExecutor( + KubectlJobExecutor kubectlJobExecutor = + new KubectlJobExecutor( jobExecutor, kubernetesConfigurationProperties, new SimpleMeterRegistry()); JobExecutionException thrown = assertThrows( JobExecutionException.class, - () -> - defaultKubectlJobExecutor.topPod(mockKubernetesCredentials(), "test", "test-pod")); + () -> kubectlJobExecutor.topPod(mockKubernetesCredentials(), "test", "test-pod")); if (retriesEnabled) { // should be called 3 times as there were max 3 attempts made @@ -559,19 +552,19 @@ void kubectlRetryHandlingForKubectlCallsThatUseStdinWhichSucceedAfterAFewRetries // fetch a test manifest KubernetesManifest inputManifest = - ManifestFetcher.getManifest(DefaultKubectlJobExecutorTest.class, "job.yml").get(0); + ManifestFetcher.getManifest(KubectlJobExecutorTest.class, "job.yml").get(0); - DefaultKubectlJobExecutor defaultKubectlJobExecutor = - new TestScriptJobExecutorDefault( + KubectlJobExecutor kubectlJobExecutor = + new TestScriptJobExecutor( new JobExecutorLocal(/* timeoutMinutes */ 1), kubernetesConfigurationProperties, new SimpleMeterRegistry(), - TestScriptJobExecutorDefault.RetryBehavior.SUCCESS_AFTER_INITIAL_FAILURE); + TestScriptJobExecutor.RetryBehavior.SUCCESS_AFTER_INITIAL_FAILURE); // We are using a real job executor. Therefore, we can simulate the call `kubectl apply -f -` // by substituting kubectl with a test script that accepts stdin KubernetesManifest returnedManifest = - defaultKubectlJobExecutor.deploy( + kubectlJobExecutor.deploy( mockKubernetesCredentials( "src/test/resources/com/netflix/spinnaker/clouddriver/kubernetes/op/job/mock-kubectl-stdin-command.sh"), inputManifest, @@ -594,22 +587,22 @@ void kubectlRetryHandlingForKubectlCallsThatUseStdinWhichContinueFailingAfterAll // fetch a test manifest KubernetesManifest inputManifest = - ManifestFetcher.getManifest(DefaultKubectlJobExecutorTest.class, "job.yml").get(0); + ManifestFetcher.getManifest(KubectlJobExecutorTest.class, "job.yml").get(0); - DefaultKubectlJobExecutor defaultKubectlJobExecutor = - new TestScriptJobExecutorDefault( + KubectlJobExecutor kubectlJobExecutor = + new TestScriptJobExecutor( new JobExecutorLocal(/* timeoutMinutes */ 1), kubernetesConfigurationProperties, new SimpleMeterRegistry(), - TestScriptJobExecutorDefault.RetryBehavior.FAILED); + TestScriptJobExecutor.RetryBehavior.FAILED); // We are using a real job executor. Therefore, we can simulate the call `kubectl apply -f -` // by substituting kubectl with a test script that accepts stdin - DefaultKubectlJobExecutor.KubectlException thrown = + KubectlJobExecutor.KubectlException thrown = assertThrows( - DefaultKubectlJobExecutor.KubectlException.class, + KubectlJobExecutor.KubectlException.class, () -> - defaultKubectlJobExecutor.deploy( + kubectlJobExecutor.deploy( mockKubernetesCredentials( "src/test/resources/com/netflix/spinnaker/clouddriver/kubernetes/op/job/mock-kubectl-stdin-command.sh"), inputManifest, @@ -642,7 +635,7 @@ private static KubernetesCredentials mockKubernetesCredentials(String pathToExec * Only meant to be used in tests where mocking certain kubectl calls prove to be tricky. This is * currently used in tests that verify retry behavior for such calls. */ - private static class TestScriptJobExecutorDefault extends DefaultKubectlJobExecutor { + private static class TestScriptJobExecutor extends KubectlJobExecutor { /** * depending on the custom script provided, to simulate retry attempts, we need to let the * script know when to emit an error message vs when to emit a success message. These enums help @@ -659,7 +652,7 @@ private enum RetryBehavior { private int createJobRequestInvokedCounter; - TestScriptJobExecutorDefault( + TestScriptJobExecutor( JobExecutor jobExecutor, KubernetesConfigurationProperties kubernetesConfigurationProperties, MeterRegistry meterRegistry, diff --git a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubernetesRunJobOperationTest.java b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubernetesRunJobOperationTest.java index 49a3862d05e..8a5ed24e731 100644 --- a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubernetesRunJobOperationTest.java +++ b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/op/job/KubernetesRunJobOperationTest.java @@ -146,7 +146,7 @@ private static KubernetesCredentials getMockKubernetesCredential() { invocation.getArgument(0, KubernetesManifest.class).clone(); if (Strings.isNullOrEmpty(result.getName())) { // We can't apply if there is no name; throw an exception here - throw new DefaultKubectlJobExecutor.KubectlException( + throw new KubectlJobExecutor.KubectlException( "error: error when retrieving current configuration"); } return result; diff --git a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentialsTest.java b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentialsTest.java index e5d93c23045..82323a771f7 100644 --- a/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentialsTest.java +++ b/clouddriver-kubernetes/src/test/java/com/netflix/spinnaker/clouddriver/kubernetes/security/KubernetesCredentialsTest.java @@ -44,9 +44,9 @@ import com.netflix.spinnaker.clouddriver.kubernetes.names.KubernetesManifestNamer; import com.netflix.spinnaker.clouddriver.kubernetes.names.KubernetesNamerRegistry; import com.netflix.spinnaker.clouddriver.kubernetes.op.handler.KubernetesUnregisteredCustomResourceHandler; -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor; -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor.KubectlException; -import com.netflix.spinnaker.clouddriver.kubernetes.op.job.DefaultKubectlJobExecutor.KubectlNotFoundException; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor.KubectlException; +import com.netflix.spinnaker.clouddriver.kubernetes.op.job.KubectlJobExecutor.KubectlNotFoundException; import com.netflix.spinnaker.kork.configserver.CloudConfigResourceService; import com.netflix.spinnaker.kork.configserver.ConfigFileService; import java.util.HashMap; @@ -59,8 +59,7 @@ final class KubernetesCredentialsTest { private final String OP_NAME = "KubernetesCredentialsTest"; private final Task task = new DefaultTask("task-id"); - private KubernetesCredentials getCredentials( - Registry registry, DefaultKubectlJobExecutor jobExecutor) { + private KubernetesCredentials getCredentials(Registry registry, KubectlJobExecutor jobExecutor) { KubernetesCredentials.Factory factory = new KubernetesCredentials.Factory( registry, @@ -91,7 +90,7 @@ private KubernetesManifest getManifest() { @Test void metricTagsForSuccessfulDeploy() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); Registry registry = new DefaultRegistry(); KubernetesCredentials credentials = getCredentials(registry, jobExecutor); credentials.deploy(getManifest(), task, OP_NAME); @@ -112,7 +111,7 @@ void metricTagsForSuccessfulDeploy() { @Test void metricTagsForSuccessfulList() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); Registry registry = new DefaultRegistry(); KubernetesCredentials credentials = getCredentials(registry, jobExecutor); credentials.list( @@ -134,7 +133,7 @@ void metricTagsForSuccessfulList() { @Test void metricTagsForSuccessfulListNoNamespace() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); Registry registry = new DefaultRegistry(); KubernetesCredentials credentials = getCredentials(registry, jobExecutor); credentials.list(ImmutableList.of(KubernetesKind.DEPLOYMENT, KubernetesKind.REPLICA_SET), null); @@ -149,7 +148,7 @@ void metricTagsForSuccessfulListNoNamespace() { @Test void metricTagsForSuccessfulListEmptyNamespace() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); Registry registry = new DefaultRegistry(); KubernetesCredentials credentials = getCredentials(registry, jobExecutor); credentials.list(ImmutableList.of(KubernetesKind.DEPLOYMENT, KubernetesKind.REPLICA_SET), ""); @@ -164,7 +163,7 @@ void metricTagsForSuccessfulListEmptyNamespace() { @Test void returnValueForSuccessfulList() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); Registry registry = new DefaultRegistry(); KubernetesCredentials credentials = getCredentials(registry, jobExecutor); @@ -179,7 +178,7 @@ void returnValueForSuccessfulList() { @Test void timeRecordedForSuccessfulList() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); ManualClock clock = new ManualClock(); Registry registry = new DefaultRegistry(clock); @@ -205,7 +204,7 @@ void timeRecordedForSuccessfulList() { @Test void metricTagsForListThrowingKubectlException() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); Registry registry = new DefaultRegistry(); KubernetesCredentials credentials = getCredentials(registry, jobExecutor); @@ -238,7 +237,7 @@ void metricTagsForListThrowingKubectlException() { @Test void propagatedExceptionForListThrowingKubectlException() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); Registry registry = new DefaultRegistry(); KubernetesCredentials credentials = getCredentials(registry, jobExecutor); @@ -258,7 +257,7 @@ void propagatedExceptionForListThrowingKubectlException() { @Test void timeRecordedForListThrowingKubectlException() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); ManualClock clock = new ManualClock(); Registry registry = new DefaultRegistry(clock); @@ -289,7 +288,7 @@ void timeRecordedForListThrowingKubectlException() { @Test void metricTagsForListThrowingOtherException() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); Registry registry = new DefaultRegistry(); KubernetesCredentials credentials = getCredentials(registry, jobExecutor); @@ -320,7 +319,7 @@ void metricTagsForListThrowingOtherException() { @Test void timeRecordedForListThrowingOtherException() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); ManualClock clock = new ManualClock(); Registry registry = new DefaultRegistry(clock); @@ -350,7 +349,7 @@ void timeRecordedForListThrowingOtherException() { @Test void propagatedExceptionForListThrowingOtherException() { - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); Registry registry = new DefaultRegistry(); KubernetesCredentials credentials = getCredentials(registry, jobExecutor); @@ -370,7 +369,7 @@ void propagatedExceptionForListThrowingOtherException() { @Test void replaceWhenResourceExists() { KubernetesManifest manifest = getManifest(); - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); KubernetesCredentials credentials = getCredentials(new NoopRegistry(), jobExecutor); when(jobExecutor.create(credentials, manifest, task, OP_NAME)) .thenThrow(new KubectlException("Create failed: Error from server (AlreadyExists)")); @@ -383,7 +382,7 @@ void replaceWhenResourceExists() { @Test void replaceWhenResourceDoesNotExist() { KubernetesManifest manifest = getManifest(); - DefaultKubectlJobExecutor jobExecutor = mock(DefaultKubectlJobExecutor.class); + KubectlJobExecutor jobExecutor = mock(KubectlJobExecutor.class); KubernetesCredentials credentials = getCredentials(new NoopRegistry(), jobExecutor); when(jobExecutor.replace(credentials, manifest, task, OP_NAME)) .thenThrow(new KubectlNotFoundException("Not found"));