diff --git a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/kato/pipeline/RollingPushStage.java b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/kato/pipeline/RollingPushStage.java index f20a799f85..6d4a042bfb 100644 --- a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/kato/pipeline/RollingPushStage.java +++ b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/kato/pipeline/RollingPushStage.java @@ -3,6 +3,7 @@ import java.util.Map; import com.netflix.spinnaker.orca.DefaultTaskResult; import com.netflix.spinnaker.orca.TaskResult; +import com.netflix.spinnaker.orca.clouddriver.FeaturesService; import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask; import com.netflix.spinnaker.orca.clouddriver.tasks.instance.TerminateInstancesTask; import com.netflix.spinnaker.orca.clouddriver.tasks.instance.WaitForDownInstanceHealthTask; @@ -15,23 +16,28 @@ import com.netflix.spinnaker.orca.kato.tasks.rollingpush.DetermineTerminationCandidatesTask; import com.netflix.spinnaker.orca.kato.tasks.rollingpush.DetermineTerminationPhaseInstancesTask; import com.netflix.spinnaker.orca.kato.tasks.rollingpush.WaitForNewInstanceLaunchTask; +import com.netflix.spinnaker.orca.kato.tasks.rollingpush.CleanUpTagsTask; import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder; import com.netflix.spinnaker.orca.pipeline.TaskNode; import com.netflix.spinnaker.orca.pipeline.model.Execution; import com.netflix.spinnaker.orca.pipeline.model.Stage; import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import static java.lang.String.format; @Component @Slf4j public class RollingPushStage implements StageDefinitionBuilder { - public static final String PIPELINE_CONFIG_TYPE = "rollingPush"; + @Autowired + private FeaturesService featuresService; + @Override public > void taskGraph(Stage stage, TaskNode.Builder builder) { + boolean taggingEnabled = featuresService.isStageAvailable("upsertEntityTags"); builder .withTask("ensureInterestingHealthProviderNames", EnsureInterestingHealthProviderNamesTask.class) .withTask("determineTerminationCandidates", DetermineTerminationCandidatesTask.class) @@ -54,9 +60,15 @@ public > void taskGraph(Stage stage, TaskNode.Builder .withTask("waitForNewInstances", WaitForNewInstanceLaunchTask.class) .withTask("waitForUpInstances", WaitForUpInstanceHealthTask.class) .withTask("checkForRemainingTerminations", CheckForRemainingTerminationsTask.class); - } - ) - .withTask("pushComplete", PushCompleteTask.class); + }); + + if (taggingEnabled) { + builder + .withTask("cleanUpTags", CleanUpTagsTask.class) + .withTask("monitorTagCleanUp", MonitorKatoTask.class); + } + + builder.withTask("pushComplete", PushCompleteTask.class); } private > boolean shouldWaitForTermination(Stage stage) { diff --git a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/kato/tasks/rollingpush/CleanUpTagsTask.java b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/kato/tasks/rollingpush/CleanUpTagsTask.java new file mode 100644 index 0000000000..bb82a805a6 --- /dev/null +++ b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/kato/tasks/rollingpush/CleanUpTagsTask.java @@ -0,0 +1,128 @@ +/* + * Copyright 2017 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.kato.tasks.rollingpush; + +import com.netflix.spinnaker.orca.RetryableTask; +import com.netflix.spinnaker.orca.TaskResult; +import com.netflix.spinnaker.orca.DefaultTaskResult; +import com.netflix.spinnaker.orca.ExecutionStatus; +import com.netflix.spinnaker.orca.clouddriver.KatoService; +import com.netflix.spinnaker.orca.clouddriver.OortService; +import com.netflix.spinnaker.orca.clouddriver.model.TaskId; +import com.netflix.spinnaker.orca.clouddriver.tasks.AbstractCloudProviderAwareTask; +import com.netflix.spinnaker.orca.kato.pipeline.support.SourceResolver; +import com.netflix.spinnaker.orca.kato.pipeline.support.StageData; +import com.netflix.spinnaker.orca.pipeline.model.Stage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +@Slf4j +@Component +public class CleanUpTagsTask extends AbstractCloudProviderAwareTask implements RetryableTask { + @Autowired + KatoService katoService; + + @Autowired + OortService oortService; + + @Autowired + SourceResolver sourceResolver; + + @Override + public TaskResult execute(Stage stage) { + try { + StageData.Source source = sourceResolver.getSource(stage); + String serverGroupName = source.getServerGroupName() != null ? source.getServerGroupName() : source.getAsgName(); + String imageId = (String) stage.getContext().getOrDefault("imageId", stage.getContext().get("amiName")); + + String cloudProvider = getCloudProvider(stage); + + List tags = oortService.getEntityTags( + cloudProvider, + "servergroup", + serverGroupName, + source.getAccount(), + source.getRegion() + ); + + List tagsToDelete = new ArrayList<>(); + tags.forEach( s -> tagsToDelete.addAll( + ((List) s.get("tags")) + .stream() + .filter(hasNonMatchingImageId(imageId)) + .map(t -> (String) t.get("name")) + .collect(Collectors.toList()) + )); + + log.info("found tags to delete {}", tagsToDelete); + if (tagsToDelete.isEmpty()) { + return new DefaultTaskResult(ExecutionStatus.SUCCEEDED); + } + + TaskId taskId = katoService.requestOperations( + cloudProvider, + operations(serverGroupName, tagsToDelete) + ).toBlocking().first(); + + return new DefaultTaskResult(ExecutionStatus.SUCCEEDED, new HashMap() {{ + put("notification.type", "deleteentitytags"); + put("kato.last.task.id", taskId); + }}); + + } catch (Exception e) { + log.error("Failed to clean up tags for stage {} ",stage, e); + return new DefaultTaskResult(ExecutionStatus.FAILED_CONTINUE); + } + + } + + private Predicate hasNonMatchingImageId(String imageId) { + return tag -> { + Map value = ((Map) tag.getOrDefault("value", Collections.EMPTY_MAP)); + return value.containsKey("imageId") && !value.get("imageId").equals(imageId); + }; + } + + private List> operations(String serverGroupName, List tags) { + return Collections.singletonList(Collections.singletonMap("deleteEntityTags", new HashMap() { + { + put("id", serverGroupName); + put("tags", tags); + } + })); + } + + @Override + public long getBackoffPeriod() { + return TimeUnit.SECONDS.toMillis(5); + } + + @Override + public long getTimeout() { + return TimeUnit.MINUTES.toMillis(5); + } +} diff --git a/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/kato/pipeline/RollingPushStageSpec.groovy b/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/kato/pipeline/RollingPushStageSpec.groovy index 23b89b329d..eca12ed044 100644 --- a/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/kato/pipeline/RollingPushStageSpec.groovy +++ b/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/kato/pipeline/RollingPushStageSpec.groovy @@ -16,6 +16,8 @@ package com.netflix.spinnaker.orca.kato.pipeline +import com.netflix.spinnaker.orca.clouddriver.FeaturesService +import com.netflix.spinnaker.orca.kato.tasks.rollingpush.CleanUpTagsTask import com.netflix.spinnaker.config.SpringBatchConfiguration import com.netflix.spinnaker.orca.DefaultTaskResult import com.netflix.spinnaker.orca.RetryableTask @@ -93,12 +95,15 @@ class RollingPushStageSpec extends Specification { * Task that is after the end of the loop */ @Autowired @Qualifier("postLoop") Task postLoopTask + @Autowired @Qualifier("cleanup") Task cleanupTask /** * Task in the stage downstream from rolling push */ @Autowired @Qualifier("downstream") Task downstreamTask + @Autowired @Qualifier("featuresService") FeaturesService featuresService + private static final SUCCESS = new DefaultTaskResult(SUCCEEDED) private static final REDIR = new DefaultTaskResult(REDIRECT) @@ -118,6 +123,13 @@ class RollingPushStageSpec extends Specification { and: "the loop will repeat a couple of times" endOfLoopTask.execute(_) >> REDIR >> REDIR >> SUCCESS + 1 * featuresService.isStageAvailable('upsertEntityTags') >> true + with(cleanupTask) { + 1 * execute(_) >> SUCCESS + 1 * getBackoffPeriod() >> 5000L + 1 * getTimeout() >> 3600000L + } + when: pipelineLauncher.start(configJson) @@ -174,6 +186,12 @@ class RollingPushStageSpec extends Specification { new SpockMockFactoryBean<>(PushCompleteTask) } + @Bean + @Qualifier("cleanup") + FactoryBean cleanUpTagsTask() { + new SpockMockFactoryBean<>(CleanUpTagsTask) + } + @Bean @Qualifier("downstream") FactoryBean endTask() { @@ -227,6 +245,12 @@ class RollingPushStageSpec extends Specification { FactoryBean waitForUpInstanceHealthTask() { new SpockMockFactoryBean<>(WaitForUpInstanceHealthTask) } + + @Bean + @Qualifier("featuresService") + FactoryBean featuresService() { + new SpockMockFactoryBean<>(FeaturesService) + } } @Component diff --git a/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/kato/tasks/rollingpush/CleanUpTagsTaskSpec.groovy b/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/kato/tasks/rollingpush/CleanUpTagsTaskSpec.groovy new file mode 100644 index 0000000000..461d8a24a9 --- /dev/null +++ b/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/kato/tasks/rollingpush/CleanUpTagsTaskSpec.groovy @@ -0,0 +1,84 @@ +package com.netflix.spinnaker.orca.kato.tasks.rollingpush + +import com.netflix.spinnaker.orca.clouddriver.KatoService +import com.netflix.spinnaker.orca.clouddriver.OortService +import com.netflix.spinnaker.orca.clouddriver.model.TaskId +import com.netflix.spinnaker.orca.kato.pipeline.support.SourceResolver +import com.netflix.spinnaker.orca.pipeline.model.Pipeline +import com.netflix.spinnaker.orca.pipeline.model.Stage +import spock.lang.Specification + +class CleanUpTagsTaskSpec extends Specification { + def "should create deleteEntityTags operations "() { + given: + def task = new CleanUpTagsTask() + def stage = new Stage<>(new Pipeline(), "") + stage.context = [ + application: "app", + cloudProvider: "aws", + source: [ + account: "test", + asgName: "app-v00", + region: "us-east-1" + ], + imageId: "imageId" + ] + + and: + def tags = [ + [ + tags: [ + [ + name: "tagName", + value: [ + imageId: "imageId" + ] + ], + [ + name: "tagName2", + value: [ + imageId: "imageId1" + ] + ] + ] + ], + [ + tags: [ + [ + name: "tagName3", + value: [ + imageId: "imageId1" + ] + ], + [ + name: "tagName3" + ] + ] + ] + ] + + List operations = [] + task.oortService = Mock(OortService) { + 1* getEntityTags("aws", "servergroup", "app-v00", "test", "us-east-1") >> { + tags + } + } + + task.katoService = Mock(KatoService) { + 1 * requestOperations('aws', _) >> { + operations += it[1] + rx.Observable.from(new TaskId(UUID.randomUUID().toString())) + } + } + + task.sourceResolver = new SourceResolver() + + when: + task.execute(stage) + + then: "should only delete tags that have an imageId & if it doesn't match the imageId in the stage context" + operations.size() == 1 + operations[0].deleteEntityTags.tags.size() == 2 + operations[0].deleteEntityTags.tags == ["tagName2", "tagName3"] + } +}