Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
refactor(core): Polling agents are now persistence agnostic (#2214)
  • Loading branch information
robzienert committed May 14, 2018
1 parent da5a734 commit 47fc28e
Show file tree
Hide file tree
Showing 11 changed files with 367 additions and 193 deletions.
@@ -0,0 +1,76 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.orca.pipeline.persistence

import com.netflix.spinnaker.orca.notifications.scheduling.PollingAgentExecutionRepository
import spock.lang.Specification
import spock.lang.Subject
import spock.lang.Unroll

import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.ORCHESTRATION
import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE
import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.orchestration
import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.pipeline

@Subject(PollingAgentExecutionRepository)
@Unroll
abstract class PollingAgentExecutionRepositoryTck<T extends PollingAgentExecutionRepository> extends Specification {

@Subject
PollingAgentExecutionRepository repository

@Subject
PollingAgentExecutionRepository previousRepository

void setup() {
repository = createExecutionRepository()
previousRepository = createExecutionRepositoryPrevious()
}

abstract T createExecutionRepository()

abstract T createExecutionRepositoryPrevious()

def "can retrieve all application names in database"() {
given:
def execution1 = pipeline {
application = "spindemo"
}
def execution2 = pipeline {
application = "orca"
}
def execution3 = orchestration {
application = "spindemo"
}

when:
repository.store(execution1)
repository.store(execution2)
repository.store(execution3)
def apps = repository.retrieveAllApplicationNames(executionType, minExecutions)

then:
apps.sort() == expectedApps.sort()

where:
executionType | minExecutions || expectedApps
ORCHESTRATION | 0 || ["spindemo"]
PIPELINE | 0 || ["spindemo", "orca"]
null | 0 || ["spindemo", "orca"]
null | 2 || ["spindemo"]
PIPELINE | 2 || []
}
}
Expand Up @@ -65,7 +65,8 @@
"com.netflix.spinnaker.orca.pipeline",
"com.netflix.spinnaker.orca.deprecation",
"com.netflix.spinnaker.orca.pipeline.util",
"com.netflix.spinnaker.orca.telemetry"
"com.netflix.spinnaker.orca.telemetry",
"com.netflix.spinnaker.orca.notifications.scheduling"
})
@EnableConfigurationProperties
public class OrcaConfiguration {
Expand Down
Expand Up @@ -15,26 +15,21 @@
*/
package com.netflix.spinnaker.orca.notifications.scheduling;

import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.spinnaker.kork.eureka.RemoteStatusChangedEvent;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import com.netflix.spinnaker.orca.ExecutionStatus;
import com.netflix.spinnaker.orca.notifications.AbstractPollingNotificationAgent;
import com.netflix.spinnaker.orca.notifications.NotificationClusterLock;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import javax.annotation.PreDestroy;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
Expand All @@ -44,17 +39,15 @@

import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE;

// TODO rz - Remove redis know-how and move back into orca-core
@Component
@ConditionalOnExpression("${pollers.oldPipelineCleanup.enabled:false}")
public class OldPipelineCleanupPollingNotificationAgent implements ApplicationListener<RemoteStatusChangedEvent> {
public class OldPipelineCleanupPollingNotificationAgent extends AbstractPollingNotificationAgent {

private static final List<String> COMPLETED_STATUSES = ExecutionStatus.COMPLETED.stream().map(Enum::toString).collect(Collectors.toList());

private final Logger log = LoggerFactory.getLogger(OldPipelineCleanupPollingNotificationAgent.class);

private Scheduler scheduler = Schedulers.io();
private Subscription subscription;

private Func1<Execution, Boolean> filter = new Func1<Execution, Boolean>() {
@Override
Expand Down Expand Up @@ -86,61 +79,52 @@ public Boolean call(Execution execution) {
return 0;
};

private Clock clock = Clock.systemDefaultZone();
private final Clock clock;
private final PollingAgentExecutionRepository executionRepository;

@Autowired
private ExecutionRepository executionRepository;
private final long pollingIntervalMs;
private final int thresholdDays;
private final int minimumPipelineExecutions;

@Autowired
private RedisClientDelegate redisClientDelegate;

@Value("${pollers.oldPipelineCleanup.intervalMs:3600000}")
private long pollingIntervalMs;

@Value("${pollers.oldPipelineCleanup.thresholdDays:30}")
private int thresholdDays;

@Value("${pollers.oldPipelineCleanup.minimumPipelineExecutions:5}")
private int minimumPipelineExecutions;
public OldPipelineCleanupPollingNotificationAgent(NotificationClusterLock clusterLock,
PollingAgentExecutionRepository executionRepository,
Clock clock,
@Value("${pollers.oldPipelineCleanup.intervalMs:3600000}") long pollingIntervalMs,
@Value("${pollers.oldPipelineCleanup.thresholdDays:30}") int thresholdDays,
@Value("${pollers.oldPipelineCleanup.minimumPipelineExecutions:5}") int minimumPipelineExecutions) {
super(clusterLock);
this.executionRepository = executionRepository;
this.clock = clock;
this.pollingIntervalMs = pollingIntervalMs;
this.thresholdDays = thresholdDays;
this.minimumPipelineExecutions = minimumPipelineExecutions;
}

@PreDestroy
private void stopPolling() {
if (subscription != null) {
subscription.unsubscribe();
}
@Override
protected long getPollingInterval() {
return pollingIntervalMs;
}

@Override
public void onApplicationEvent(RemoteStatusChangedEvent event) {
if (event.getSource().isUp()) {
log.info("Instance is " + event.getSource().getStatus() + "... starting old pipeline cleanup");
startPolling();
} else if (event.getSource().getPreviousStatus() == InstanceStatus.UP) {
log.warn("Instance is " + event.getSource().getStatus() + "... stopping old pipeline cleanup");
stopPolling();
}
protected String getNotificationType() {
return OldPipelineCleanupPollingNotificationAgent.class.getSimpleName();
}

private void startPolling() {
@Override
protected void startPolling() {
subscription = Observable
.timer(pollingIntervalMs, TimeUnit.MILLISECONDS, scheduler)
.repeat()
.subscribe(aLong -> tick());
}

private void tick() {
List<String> applications = new ArrayList<>();

try {
redisClientDelegate.withKeyScan("pipeline:app:*", 200, r -> {
applications.addAll(r.getResults().stream().map(k -> k.split(":")[2]).collect(Collectors.toList()));
});

applications.forEach(app -> {
executionRepository.retrieveAllApplicationNames(PIPELINE).forEach(app -> {
log.debug("Cleaning up " + app);
cleanupApp(executionRepository.retrievePipelinesForApplication(app));
});

} catch (Exception e) {
log.error("Cleanup failed", e);
}
Expand Down Expand Up @@ -169,20 +153,13 @@ private void cleanup(List<PipelineExecutionDetails> executions) {
executions.subList(0, (executions.size() - minimumPipelineExecutions)).forEach(p -> {
long startTime = p.startTime == null ? p.buildTime : p.startTime;
long days = ChronoUnit.DAYS.between(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(clock.millis()));
if (days > thresholdDays && !hasEntityTags(p.id)) {
if (days > thresholdDays && !executionRepository.hasEntityTags(PIPELINE, p.id)) {
log.info("Deleting pipeline execution " + p.id + ": " + p.toString());
executionRepository.delete(PIPELINE, p.id);
}
});
}

private boolean hasEntityTags(String pipelineId) {
// TODO rz - This index exists only in Netflix-land. Should be added to OSS eventually
return redisClientDelegate.withCommandsClient(c -> {
return c.sismember("existingServerGroups:pipeline", "pipeline:" + pipelineId);
});
}

private static class PipelineExecutionDetails {
String id;
String application;
Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.orca.notifications.scheduling

import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository

interface PollingAgentExecutionRepository : ExecutionRepository {

fun retrieveAllApplicationNames(type: ExecutionType?): List<String>
fun retrieveAllApplicationNames(type: ExecutionType?, minExecutions: Int): List<String>
fun hasEntityTags(type: ExecutionType, id: String): Boolean
}

0 comments on commit 47fc28e

Please sign in to comment.