Skip to content

Commit

Permalink
feat(bakery): Clean up cached data created by Rosco. (#4323) (#4349)
Browse files Browse the repository at this point in the history
Co-authored-by: Fernando Freire <dogonthehorizon@gmail.com>
(cherry picked from commit e4973b0)

Co-authored-by: armory-abedonik <106548537+armory-abedonik@users.noreply.github.com>
  • Loading branch information
mergify[bot] and armory-abedonik committed Nov 23, 2022
1 parent 9709ee0 commit 91f6247
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 42 deletions.
Expand Up @@ -38,6 +38,9 @@ interface BakeryService {
@GET("/api/v1/{region}/bake/{bakeId}")
Bake lookupBake(@Path("region") String region, @Path("bakeId") String bakeId)

@POST("/api/v1/bakes/delete-requests")
Void createDeleteBakesRequest(@Body DeleteBakesRequest deleteBakesRequest)

//
// Methods below this line are not supported by the Netflix Bakery, and are only available
// iff bakery.roscoApisEnabled is true.
Expand Down
@@ -0,0 +1,29 @@
/*
* Copyright 2022 Armory, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.bakery.api;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;

@Data
public class DeleteBakesRequest {

@JsonProperty("pipelineExecutionIds")
private List<String> pipelineExecutionIds = new ArrayList<>();
}
@@ -0,0 +1,41 @@
/*
* Copyright 2022 Armory, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.bakery.pipeline;

import com.netflix.spinnaker.orca.bakery.api.BakeryService;
import com.netflix.spinnaker.orca.bakery.api.DeleteBakesRequest;
import com.netflix.spinnaker.orca.notifications.scheduling.PipelineDependencyCleanupOperator;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnExpression("${bakery-pipeline-dependency-cleanup.enabled:false}")
@RequiredArgsConstructor
public class BakeryPipelineDependencyCleanupOperator implements PipelineDependencyCleanupOperator {

private final BakeryService bakeryService;

@Override
public void cleanup(List<String> pipelineExecutionIds) {
DeleteBakesRequest deleteBakesRequest = new DeleteBakesRequest();
deleteBakesRequest.setPipelineExecutionIds(pipelineExecutionIds);

bakeryService.createDeleteBakesRequest(deleteBakesRequest);
}
}
Expand Up @@ -22,6 +22,7 @@ import com.netflix.spinnaker.orca.bakery.api.Bake
import com.netflix.spinnaker.orca.bakery.api.BakeRequest
import com.netflix.spinnaker.orca.bakery.api.BakeStatus
import com.netflix.spinnaker.orca.bakery.api.BakeryService
import com.netflix.spinnaker.orca.bakery.api.DeleteBakesRequest
import com.netflix.spinnaker.orca.bakery.api.BaseImage
import com.netflix.spinnaker.orca.bakery.api.manifests.BakeManifestRequest
import com.netflix.spinnaker.orca.bakery.config.BakeryConfigurationProperties
Expand Down Expand Up @@ -145,6 +146,11 @@ class BakerySelectorSpec extends Specification {
return null
}

@Override
Void createDeleteBakesRequest(@Body DeleteBakesRequest deleteBakesRequest) {
return null
}

@Override
BaseImage getBaseImage(@Path("cloudProvider") String cloudProvider, @Path("imageId") String imageId) {
return null
Expand Down
Expand Up @@ -102,6 +102,7 @@ public Boolean call(PipelineExecution execution) {
private final long pollingIntervalMs;
private final int thresholdDays;
private final int minimumPipelineExecutions;
private final List<PipelineDependencyCleanupOperator> pipelineDependencyCleanupOperators;

private final Id deletedId;
private final Id timerId;
Expand All @@ -115,14 +116,16 @@ public OldPipelineCleanupPollingNotificationAgent(
@Value("${pollers.old-pipeline-cleanup.interval-ms:3600000}") long pollingIntervalMs,
@Value("${pollers.old-pipeline-cleanup.threshold-days:30}") int thresholdDays,
@Value("${pollers.old-pipeline-cleanup.minimum-pipeline-executions:5}")
int minimumPipelineExecutions) {
int minimumPipelineExecutions,
List<PipelineDependencyCleanupOperator> pipelineDependencyCleanupOperators) {
super(clusterLock);
this.executionRepository = executionRepository;
this.clock = clock;
this.registry = registry;
this.pollingIntervalMs = pollingIntervalMs;
this.thresholdDays = thresholdDays;
this.minimumPipelineExecutions = minimumPipelineExecutions;
this.pipelineDependencyCleanupOperators = pipelineDependencyCleanupOperators;

deletedId = registry.createId("pollers.oldPipelineCleanup.deleted");
timerId = registry.createId("pollers.oldPipelineCleanup.timing");
Expand Down Expand Up @@ -179,14 +182,25 @@ private void cleanup(List<PipelineExecutionDetails> executions) {
}

executions.sort(sorter);
executions
.subList(0, (executions.size() - minimumPipelineExecutions))
.forEach(
p -> {
log.info("Deleting pipeline execution " + p.id + ": " + p.toString());
executionRepository.delete(PIPELINE, p.id);
registry.counter(deletedId.withTag("application", p.application)).increment();
});

List<PipelineExecutionDetails> removingPipelineExecutions =
executions.subList(0, (executions.size() - minimumPipelineExecutions));

List<String> removingPipelineExecutionIds =
removingPipelineExecutions.stream()
.map(pipelineExecutionDetails -> pipelineExecutionDetails.id)
.collect(Collectors.toList());

pipelineDependencyCleanupOperators.forEach(
pipelineDependencyCleanupOperator ->
pipelineDependencyCleanupOperator.cleanup(removingPipelineExecutionIds));

removingPipelineExecutions.forEach(
p -> {
log.info("Deleting pipeline execution " + p.id + ": " + p.toString());
executionRepository.delete(PIPELINE, p.id);
registry.counter(deletedId.withTag("application", p.application)).increment();
});
}

private static class PipelineExecutionDetails {
Expand Down
@@ -0,0 +1,25 @@
/*
* Copyright 2022 Armory, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.notifications.scheduling;

import java.util.List;

/** This interface is responsible for the pipeline execution dependencies cleanup. */
public interface PipelineDependencyCleanupOperator {

void cleanup(List<String> pipelineExecutionIds);
}
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class TopApplicationExecutionCleanupPollingNotificationAgent
private final Registry registry;
private final long pollingIntervalMs;
private final int threshold;
private final List<PipelineDependencyCleanupOperator> pipelineDependencyCleanupOperators;

private final Id deleteCountId;
private final Id timerId;
Expand All @@ -82,12 +84,14 @@ public TopApplicationExecutionCleanupPollingNotificationAgent(
Registry registry,
@Value("${pollers.top-application-execution-cleanup.interval-ms:3600000}")
long pollingIntervalMs,
@Value("${pollers.top-application-execution-cleanup.threshold:2500}") int threshold) {
@Value("${pollers.top-application-execution-cleanup.threshold:2500}") int threshold,
List<PipelineDependencyCleanupOperator> pipelineDependencyCleanupOperators) {
super(clusterLock);
this.executionRepository = executionRepository;
this.registry = registry;
this.pollingIntervalMs = pollingIntervalMs;
this.threshold = threshold;
this.pipelineDependencyCleanupOperators = pipelineDependencyCleanupOperators;

deleteCountId = registry.createId("pollers.topApplicationExecutionCleanup.deleted");
timerId = registry.createId("pollers.topApplicationExecutionCleanup.timing");
Expand Down Expand Up @@ -139,28 +143,38 @@ private void cleanup(Observable<PipelineExecution> observable, String applicatio
observable.filter(filter).map(mapper).toList().toBlocking().single();
executions.sort(comparing(a -> (Long) Optional.ofNullable(a.get("startTime")).orElse(0L)));
if (executions.size() > threshold) {
executions
.subList(0, (executions.size() - threshold))
.forEach(
it -> {
Long startTime =
Optional.ofNullable((Long) it.get("startTime"))
.orElseGet(() -> (Long) it.get("buildTime"));
log.info(
"Deleting {} execution {} (startTime: {}, application: {}, pipelineConfigId: {}, status: {})",
type,
it.get("id"),
startTime != null ? Instant.ofEpochMilli(startTime) : null,
application,
it.get("pipelineConfigId"),
it.get("status"));
if (type.equals("orchestration")) {
executionRepository.delete(ORCHESTRATION, (String) it.get("id"));
registry.counter(deleteCountId.withTag("application", application)).increment();
} else {
throw new IllegalArgumentException(format("Unsupported type '%s'", type));
}
});
List<? extends Map> removingPipelineExecutions =
executions.subList(0, (executions.size() - threshold));

List<String> removingPipelineExecutionIds =
removingPipelineExecutions.stream()
.map(pipelineExecution -> (String) pipelineExecution.get("id"))
.collect(Collectors.toList());

pipelineDependencyCleanupOperators.forEach(
pipelineDependencyCleanupOperator ->
pipelineDependencyCleanupOperator.cleanup(removingPipelineExecutionIds));

removingPipelineExecutions.forEach(
it -> {
Long startTime =
Optional.ofNullable((Long) it.get("startTime"))
.orElseGet(() -> (Long) it.get("buildTime"));
log.info(
"Deleting {} execution {} (startTime: {}, application: {}, pipelineConfigId: {}, status: {})",
type,
it.get("id"),
startTime != null ? Instant.ofEpochMilli(startTime) : null,
application,
it.get("pipelineConfigId"),
it.get("status"));
if (type.equals("orchestration")) {
executionRepository.delete(ORCHESTRATION, (String) it.get("id"));
registry.counter(deleteCountId.withTag("application", application)).increment();
} else {
throw new IllegalArgumentException(format("Unsupported type '%s'", type));
}
});
}
}
}
Expand Up @@ -45,7 +45,8 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification {
new NoopRegistry(),
5000,
1,
5
5,
[]
).filter

expect:
Expand Down Expand Up @@ -82,7 +83,8 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification {
new NoopRegistry(),
5000,
1,
5
5,
[]
).mapper

expect:
Expand Down Expand Up @@ -122,14 +124,16 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification {
1 * retrieveAllApplicationNames(PIPELINE) >> ["orca"]
1 * retrievePipelinesForApplication("orca") >> rx.Observable.from(pipelines)
}
def pipelineDependencyCleanupOperator = Mock(PipelineDependencyCleanupOperator)
def agent = new OldPipelineCleanupPollingNotificationAgent(
Mock(NotificationClusterLock),
executionRepository,
clock,
new NoopRegistry(),
5000,
thresholdDays,
retain
retain,
[pipelineDependencyCleanupOperator]
)

when:
Expand All @@ -140,6 +144,7 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification {
// expect D1-5 to be too old, but for the most recent 3 to be retained
1 * executionRepository.delete(PIPELINE, '1')
1 * executionRepository.delete(PIPELINE, '2')
1 * pipelineDependencyCleanupOperator.cleanup(['1', '2'])
}

private
Expand Down
Expand Up @@ -40,7 +40,8 @@ class TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec extends
Mock(ExecutionRepository),
new NoopRegistry(),
5000,
2500
2500,
[]
).filter

expect:
Expand Down Expand Up @@ -70,7 +71,8 @@ class TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec extends
Mock(ExecutionRepository),
new NoopRegistry(),
5000,
2500
2500,
[]
).mapper

expect:
Expand All @@ -92,19 +94,22 @@ class TopApplicationPipelineExecutionCleanupPollingNotificationAgentSpec extends
1 * retrieveAllApplicationNames(_, _) >> ["app1"]
1 * retrieveOrchestrationsForApplication("app1", _) >> rx.Observable.from(orchestrations)
}
def pipelineDependencyCleanupOperator = Mock(PipelineDependencyCleanupOperator)
def agent = new TopApplicationExecutionCleanupPollingNotificationAgent(
Mock(NotificationClusterLock),
executionRepository,
new NoopRegistry(),
5000,
2
2,
[pipelineDependencyCleanupOperator]
)

when:
agent.tick()

then:
1 * executionRepository.delete(ORCHESTRATION, orchestrations[0].id)
1 * pipelineDependencyCleanupOperator.cleanup([orchestrations[0].id])
}

private static Collection<PipelineExecutionImpl> buildExecutions(AtomicInteger stageStartTime,
Expand Down
Expand Up @@ -21,6 +21,7 @@ import com.netflix.spinnaker.config.OldPipelineCleanupAgentConfigurationProperti
import com.netflix.spinnaker.config.OrcaSqlProperties
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType
import com.netflix.spinnaker.orca.notifications.NotificationClusterLock
import com.netflix.spinnaker.orca.notifications.scheduling.PipelineDependencyCleanupOperator
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import java.time.Clock
import java.time.Instant
Expand All @@ -45,7 +46,8 @@ class OldPipelineCleanupPollingNotificationAgent(
registry: Registry,
private val executionRepository: ExecutionRepository,
private val configurationProperties: OldPipelineCleanupAgentConfigurationProperties,
private val orcaSqlProperties: OrcaSqlProperties
private val orcaSqlProperties: OrcaSqlProperties,
private val pipelineDependencyCleanupOperators: List<PipelineDependencyCleanupOperator>
) : AbstractCleanupPollingAgent(
clusterLock,
configurationProperties.intervalMs,
Expand Down Expand Up @@ -161,6 +163,8 @@ class OldPipelineCleanupPollingNotificationAgent(
.limit(configurationProperties.minimumPipelineExecutions, Int.MAX_VALUE)
.fetch(field("id"), String::class.java)

pipelineDependencyCleanupOperators.forEach { it.cleanup(executionsToRemove) }

executionsToRemove.chunked(configurationProperties.chunkSize).forEach { ids ->
deletedExecutionCount.addAndGet(ids.size)
executionRepository.delete(ExecutionType.PIPELINE, ids)
Expand Down

0 comments on commit 91f6247

Please sign in to comment.