Skip to content

Commit

Permalink
fix(kubernetes): force cache refresh after deploy stage artifact cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
maggieneterval committed Jun 3, 2019
1 parent 2eb7998 commit 3e66d2a
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public void taskGraph(@Nonnull Stage stage, @Nonnull TaskNode.Builder builder) {
.withTask(ManifestForceCacheRefreshTask.TASK_NAME, ManifestForceCacheRefreshTask.class)
.withTask(WaitForManifestStableTask.TASK_NAME, WaitForManifestStableTask.class)
.withTask(CleanupArtifactsTask.TASK_NAME, CleanupArtifactsTask.class)
.withTask("monitorCleanup", MonitorKatoTask.class)
.withTask(PromoteManifestKatoOutputsTask.TASK_NAME, PromoteManifestKatoOutputsTask.class)
.withTask(ManifestForceCacheRefreshTask.TASK_NAME, ManifestForceCacheRefreshTask.class)
.withTask(BindProducedArtifactsTask.TASK_NAME, BindProducedArtifactsTask.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
public interface ManifestAware {
default Map<String, List<String>> manifestNamesByNamespace(Stage stage) {
Map<String, List<String>> result =
(Map<String, List<String>>) stage.getContext().get("outputs.manifestNamesByNamespace");
(Map<String, List<String>>)
stage
.getContext()
.get(
PromoteManifestKatoOutputsTask.outputKey(
PromoteManifestKatoOutputsTask.MANIFESTS_BY_NAMESPACE_KEY));
if (result != null) {
return result;
}
Expand All @@ -44,4 +49,22 @@ default Map<String, List<String>> manifestNamesByNamespace(Stage stage) {

return result;
}

default Map<String, List<String>> manifestsToRefresh(Stage stage) {
Map<String, List<String>> result =
(Map<String, List<String>>)
stage
.getContext()
.get(PromoteManifestKatoOutputsTask.MANIFESTS_BY_NAMESPACE_TO_REFRESH_KEY);
if (result != null
&& (boolean)
stage
.getContext()
.get(
PromoteManifestKatoOutputsTask
.SHOULD_REFRESH_MANIFESTS_BY_NAMESPACE_TO_REFRESH_KEY)) {
return result;
}
return manifestNamesByNamespace(stage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public TaskResult execute(@Nonnull Stage stage) {
StageData stageData = fromStage(stage);
stageData.deployedManifests = getDeployedManifests(stage);

// Clear manifests to be refreshed
stageData.shouldRefreshManifestNamesByNamespaceToRefresh = false;

checkPendingRefreshes(cloudProvider, stageData, startTime);

refreshManifests(cloudProvider, stageData);
Expand Down Expand Up @@ -220,7 +223,7 @@ private List<ScopedManifest> manifestsNeedingRefresh(StageData stageData) {

private List<ScopedManifest> getDeployedManifests(Stage stage) {
String account = getCredentials(stage);
Map<String, List<String>> deployedManifests = manifestNamesByNamespace(stage);
Map<String, List<String>> deployedManifests = manifestsToRefresh(stage);
return deployedManifests.entrySet().stream()
.flatMap(e -> e.getValue().stream().map(v -> new ScopedManifest(account, e.getKey(), v)))
.collect(Collectors.toList());
Expand Down Expand Up @@ -288,6 +291,8 @@ private static class StageData {
Set<ScopedManifest> processedManifests = new HashSet<>();

Set<String> errors = new HashSet<>();

boolean shouldRefreshManifestNamesByNamespaceToRefresh;
}

@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ public class PromoteManifestKatoOutputsTask implements Task {
private static final TypeReference<List<Artifact>> artifactListType =
new TypeReference<List<Artifact>>() {};
private static final String MANIFESTS_KEY = "manifests";
private static final String MANIFESTS_BY_NAMESPACE_KEY = "manifestNamesByNamespace";
static final String MANIFESTS_BY_NAMESPACE_KEY = "manifestNamesByNamespace";
private static final String BOUND_ARTIFACTS_KEY = "boundArtifacts";
private static final String CREATED_ARTIFACTS_KEY = "createdArtifacts";
private static final String ARTIFACTS_KEY = "artifacts";
static final String MANIFESTS_BY_NAMESPACE_TO_REFRESH_KEY = "manifestNamesByNamespaceToRefresh";
static final String SHOULD_REFRESH_MANIFESTS_BY_NAMESPACE_TO_REFRESH_KEY =
"shouldRefreshManifestNamesByNamespaceToRefresh";

@Autowired ObjectMapper objectMapper;

Expand Down Expand Up @@ -77,14 +80,22 @@ public TaskResult execute(@Nonnull Stage stage) {
addToOutputs(outputs, allResults, CREATED_ARTIFACTS_KEY, ARTIFACTS_KEY);
convertKey(outputs, ARTIFACTS_KEY, artifactListType);

// Surface the most recently operated-on manifests for subsequent ManifestForceCacheRefreshTasks
addMostRecentValueToOutputs(
outputs, allResults, MANIFESTS_BY_NAMESPACE_KEY, MANIFESTS_BY_NAMESPACE_TO_REFRESH_KEY);

outputs.put(
SHOULD_REFRESH_MANIFESTS_BY_NAMESPACE_TO_REFRESH_KEY,
outputs.get(MANIFESTS_BY_NAMESPACE_TO_REFRESH_KEY) != null);

return TaskResult.builder(ExecutionStatus.SUCCEEDED).context(outputs).outputs(outputs).build();
}

private void convertKey(Map<String, Object> outputs, String key, TypeReference tr) {
outputs.computeIfPresent(key, (k, v) -> objectMapper.convertValue(v, tr));
}

private String outputKey(String input) {
static String outputKey(String input) {
return "outputs." + input;
}

Expand All @@ -98,4 +109,15 @@ private void addToOutputs(

value.ifPresent(m -> outputs.put(targetKey, m));
}

private void addMostRecentValueToOutputs(
Map<String, Object> outputs, List<Map> allResults, String key, String targetKey) {
Optional value =
allResults.stream()
.map(m -> m.get(key))
.filter(Objects::nonNull)
.reduce((first, second) -> second);

value.ifPresent(m -> outputs.put(targetKey, m));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -659,4 +659,40 @@ class ManifestForceCacheRefreshTaskSpec extends Specification {
]
}

def "reads manifests from `manifestNamesByNamespaceToRefresh` key if available"() {
given:
def namespace = "my-namespace"
def manifestA = "replicaSet my-replicaset-v014"
def manifestB = "replicaSet my-replicaset-v015"
def context = [
account: ACCOUNT,
cloudProvider: PROVIDER,
"outputs.manifestNamesByNamespace": [
(namespace): [
manifestA
]
],
"manifestNamesByNamespaceToRefresh": [
(namespace): [
manifestB
]
],
"shouldRefreshManifestNamesByNamespaceToRefresh": true
]
def refreshDetails = [
account: ACCOUNT,
location: namespace,
name: manifestB
]
def stage = mockStage(context)
stage.setStartTime(now.toEpochMilli())

when:
def taskResult = task.execute(stage)

then:
1 * cacheService.forceCacheUpdate(PROVIDER, REFRESH_TYPE, refreshDetails) >> mockResponse(HTTP_OK)
0 * cacheService._
taskResult.getStatus() == ExecutionStatus.SUCCEEDED
}
}

0 comments on commit 3e66d2a

Please sign in to comment.