Skip to content

Commit

Permalink
fix(tagging): Cleaning up tags on Rolling Push
Browse files Browse the repository at this point in the history
- Deleting tags using the imageId to filter tags containing a non matching imageId
  • Loading branch information
jeyrschabu committed Mar 26, 2017
1 parent 2416178 commit 062e968
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 4 deletions.
Expand Up @@ -3,6 +3,7 @@
import java.util.Map;
import com.netflix.spinnaker.orca.DefaultTaskResult;
import com.netflix.spinnaker.orca.TaskResult;
import com.netflix.spinnaker.orca.clouddriver.FeaturesService;
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask;
import com.netflix.spinnaker.orca.clouddriver.tasks.instance.TerminateInstancesTask;
import com.netflix.spinnaker.orca.clouddriver.tasks.instance.WaitForDownInstanceHealthTask;
Expand All @@ -15,23 +16,28 @@
import com.netflix.spinnaker.orca.kato.tasks.rollingpush.DetermineTerminationCandidatesTask;
import com.netflix.spinnaker.orca.kato.tasks.rollingpush.DetermineTerminationPhaseInstancesTask;
import com.netflix.spinnaker.orca.kato.tasks.rollingpush.WaitForNewInstanceLaunchTask;
import com.netflix.spinnaker.orca.kato.tasks.rollingpush.CleanUpTagsTask;
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder;
import com.netflix.spinnaker.orca.pipeline.TaskNode;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
import com.netflix.spinnaker.orca.pipeline.model.Stage;
import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static java.lang.String.format;

@Component
@Slf4j
public class RollingPushStage implements StageDefinitionBuilder {

public static final String PIPELINE_CONFIG_TYPE = "rollingPush";

@Autowired
private FeaturesService featuresService;

@Override
public <T extends Execution<T>> void taskGraph(Stage<T> stage, TaskNode.Builder builder) {
boolean taggingEnabled = featuresService.isStageAvailable("upsertEntityTags");
builder
.withTask("ensureInterestingHealthProviderNames", EnsureInterestingHealthProviderNamesTask.class)
.withTask("determineTerminationCandidates", DetermineTerminationCandidatesTask.class)
Expand All @@ -54,9 +60,15 @@ public <T extends Execution<T>> void taskGraph(Stage<T> stage, TaskNode.Builder
.withTask("waitForNewInstances", WaitForNewInstanceLaunchTask.class)
.withTask("waitForUpInstances", WaitForUpInstanceHealthTask.class)
.withTask("checkForRemainingTerminations", CheckForRemainingTerminationsTask.class);
}
)
.withTask("pushComplete", PushCompleteTask.class);
});

if (taggingEnabled) {
builder
.withTask("cleanUpTags", CleanUpTagsTask.class)
.withTask("monitorTagCleanUp", MonitorKatoTask.class);
}

builder.withTask("pushComplete", PushCompleteTask.class);
}

private <T extends Execution<T>> boolean shouldWaitForTermination(Stage<T> stage) {
Expand Down
@@ -0,0 +1,128 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.kato.tasks.rollingpush;

import com.netflix.spinnaker.orca.RetryableTask;
import com.netflix.spinnaker.orca.TaskResult;
import com.netflix.spinnaker.orca.DefaultTaskResult;
import com.netflix.spinnaker.orca.ExecutionStatus;
import com.netflix.spinnaker.orca.clouddriver.KatoService;
import com.netflix.spinnaker.orca.clouddriver.OortService;
import com.netflix.spinnaker.orca.clouddriver.model.TaskId;
import com.netflix.spinnaker.orca.clouddriver.tasks.AbstractCloudProviderAwareTask;
import com.netflix.spinnaker.orca.kato.pipeline.support.SourceResolver;
import com.netflix.spinnaker.orca.kato.pipeline.support.StageData;
import com.netflix.spinnaker.orca.pipeline.model.Stage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

@Slf4j
@Component
public class CleanUpTagsTask extends AbstractCloudProviderAwareTask implements RetryableTask {
@Autowired
KatoService katoService;

@Autowired
OortService oortService;

@Autowired
SourceResolver sourceResolver;

@Override
public TaskResult execute(Stage stage) {
try {
StageData.Source source = sourceResolver.getSource(stage);
String serverGroupName = source.getServerGroupName() != null ? source.getServerGroupName() : source.getAsgName();
String imageId = (String) stage.getContext().getOrDefault("imageId", stage.getContext().get("amiName"));

String cloudProvider = getCloudProvider(stage);

List<Map> tags = oortService.getEntityTags(
cloudProvider,
"servergroup",
serverGroupName,
source.getAccount(),
source.getRegion()
);

List<String> tagsToDelete = new ArrayList<>();
tags.forEach( s -> tagsToDelete.addAll(
((List<Map>) s.get("tags"))
.stream()
.filter(hasNonMatchingImageId(imageId))
.map(t -> (String) t.get("name"))
.collect(Collectors.toList())
));

log.info("found tags to delete {}", tagsToDelete);
if (tagsToDelete.isEmpty()) {
return new DefaultTaskResult(ExecutionStatus.SUCCEEDED);
}

TaskId taskId = katoService.requestOperations(
cloudProvider,
operations(serverGroupName, tagsToDelete)
).toBlocking().first();

return new DefaultTaskResult(ExecutionStatus.SUCCEEDED, new HashMap<String, Object>() {{
put("notification.type", "deleteentitytags");
put("kato.last.task.id", taskId);
}});

} catch (Exception e) {
log.error("Failed to clean up tags for stage {} ",stage, e);
return new DefaultTaskResult(ExecutionStatus.FAILED_CONTINUE);
}

}

private Predicate<Map> hasNonMatchingImageId(String imageId) {
return tag -> {
Map value = ((Map) tag.getOrDefault("value", Collections.EMPTY_MAP));
return value.containsKey("imageId") && !value.get("imageId").equals(imageId);
};
}

private List<Map<String, Map>> operations(String serverGroupName, List<String> tags) {
return Collections.singletonList(Collections.singletonMap("deleteEntityTags", new HashMap<String, Object>() {
{
put("id", serverGroupName);
put("tags", tags);
}
}));
}

@Override
public long getBackoffPeriod() {
return TimeUnit.SECONDS.toMillis(5);
}

@Override
public long getTimeout() {
return TimeUnit.MINUTES.toMillis(5);
}
}
Expand Up @@ -16,6 +16,8 @@

package com.netflix.spinnaker.orca.kato.pipeline

import com.netflix.spinnaker.orca.clouddriver.FeaturesService
import com.netflix.spinnaker.orca.kato.tasks.rollingpush.CleanUpTagsTask
import com.netflix.spinnaker.config.SpringBatchConfiguration
import com.netflix.spinnaker.orca.DefaultTaskResult
import com.netflix.spinnaker.orca.RetryableTask
Expand Down Expand Up @@ -93,12 +95,15 @@ class RollingPushStageSpec extends Specification {
* Task that is after the end of the loop
*/
@Autowired @Qualifier("postLoop") Task postLoopTask
@Autowired @Qualifier("cleanup") Task cleanupTask

/**
* Task in the stage downstream from rolling push
*/
@Autowired @Qualifier("downstream") Task downstreamTask

@Autowired @Qualifier("featuresService") FeaturesService featuresService

private static final SUCCESS = new DefaultTaskResult(SUCCEEDED)
private static final REDIR = new DefaultTaskResult(REDIRECT)

Expand All @@ -118,6 +123,13 @@ class RollingPushStageSpec extends Specification {
and: "the loop will repeat a couple of times"
endOfLoopTask.execute(_) >> REDIR >> REDIR >> SUCCESS

1 * featuresService.isStageAvailable('upsertEntityTags') >> true
with(cleanupTask) {
1 * execute(_) >> SUCCESS
1 * getBackoffPeriod() >> 5000L
1 * getTimeout() >> 3600000L
}

when:
pipelineLauncher.start(configJson)

Expand Down Expand Up @@ -174,6 +186,12 @@ class RollingPushStageSpec extends Specification {
new SpockMockFactoryBean<>(PushCompleteTask)
}

@Bean
@Qualifier("cleanup")
FactoryBean<CleanUpTagsTask> cleanUpTagsTask() {
new SpockMockFactoryBean<>(CleanUpTagsTask)
}

@Bean
@Qualifier("downstream")
FactoryBean<DownstreamTask> endTask() {
Expand Down Expand Up @@ -227,6 +245,12 @@ class RollingPushStageSpec extends Specification {
FactoryBean<WaitForUpInstanceHealthTask> waitForUpInstanceHealthTask() {
new SpockMockFactoryBean<>(WaitForUpInstanceHealthTask)
}

@Bean
@Qualifier("featuresService")
FactoryBean<FeaturesService> featuresService() {
new SpockMockFactoryBean<>(FeaturesService)
}
}

@Component
Expand Down
@@ -0,0 +1,84 @@
package com.netflix.spinnaker.orca.kato.tasks.rollingpush

import com.netflix.spinnaker.orca.clouddriver.KatoService
import com.netflix.spinnaker.orca.clouddriver.OortService
import com.netflix.spinnaker.orca.clouddriver.model.TaskId
import com.netflix.spinnaker.orca.kato.pipeline.support.SourceResolver
import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.netflix.spinnaker.orca.pipeline.model.Stage
import spock.lang.Specification

class CleanUpTagsTaskSpec extends Specification {
def "should create deleteEntityTags operations "() {
given:
def task = new CleanUpTagsTask()
def stage = new Stage<>(new Pipeline(), "")
stage.context = [
application: "app",
cloudProvider: "aws",
source: [
account: "test",
asgName: "app-v00",
region: "us-east-1"
],
imageId: "imageId"
]

and:
def tags = [
[
tags: [
[
name: "tagName",
value: [
imageId: "imageId"
]
],
[
name: "tagName2",
value: [
imageId: "imageId1"
]
]
]
],
[
tags: [
[
name: "tagName3",
value: [
imageId: "imageId1"
]
],
[
name: "tagName3"
]
]
]
]

List<Map> operations = []
task.oortService = Mock(OortService) {
1* getEntityTags("aws", "servergroup", "app-v00", "test", "us-east-1") >> {
tags
}
}

task.katoService = Mock(KatoService) {
1 * requestOperations('aws', _) >> {
operations += it[1]
rx.Observable.from(new TaskId(UUID.randomUUID().toString()))
}
}

task.sourceResolver = new SourceResolver()

when:
task.execute(stage)

then: "should only delete tags that have an imageId & if it doesn't match the imageId in the stage context"
operations.size() == 1
operations[0].deleteEntityTags.tags.size() == 2
operations[0].deleteEntityTags.tags == ["tagName2", "tagName3"]
}
}

0 comments on commit 062e968

Please sign in to comment.