Skip to content

Commit

Permalink
chore(core) old pipeline cleanup via redis delegate (#2065)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed Mar 19, 2018
1 parent 9034cae commit b510335
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 30 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ allprojects {
group = "com.netflix.spinnaker.orca"

ext {
spinnakerDependenciesVersion = project.hasProperty('spinnakerDependenciesVersion') ? project.property('spinnakerDependenciesVersion') : '0.146.3'
spinnakerDependenciesVersion = project.hasProperty('spinnakerDependenciesVersion') ? project.property('spinnakerDependenciesVersion') : '0.148.0'
}

def checkLocalVersions = [spinnakerDependenciesVersion: spinnakerDependenciesVersion]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;

import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.spinnaker.kork.eureka.RemoteStatusChangedEvent;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import com.netflix.spinnaker.orca.ExecutionStatus;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
Expand All @@ -35,15 +36,12 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import redis.clients.util.Pool;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE;

@Component
Expand Down Expand Up @@ -93,7 +91,7 @@ public Boolean call(Execution execution) {
private ExecutionRepository executionRepository;

@Autowired
private Pool<Jedis> jedisPool;
private RedisClientDelegate redisClientDelegate;

@Value("${pollers.oldPipelineCleanup.intervalMs:3600000}")
private long pollingIntervalMs;
Expand Down Expand Up @@ -130,22 +128,12 @@ private void startPolling() {
}

private void tick() {
ScanParams sp = new ScanParams().match("pipeline:app:*").count(2000);
String cursor = "0";
List<String> applications = new ArrayList<>();

try {
List<String> applications = new ArrayList<>();
while (true) {
String tmpCursor = cursor;
ScanResult<String> result = withJedis(jedis -> jedis.scan(tmpCursor, sp));
cursor = result.getStringCursor();

// pipeline:app:foo -> foo
applications.addAll(result.getResult().stream().map(k -> k.split(":")[2]).collect(Collectors.toList()));

if ("0".equals(cursor)) {
break;
}
}
redisClientDelegate.withKeyScan("pipeline:app:*", 200, r -> {
applications.addAll(r.getResults().stream().map(k -> k.split(":")[2]).collect(Collectors.toList()));
});

applications.forEach(app -> {
log.debug("Cleaning up " + app);
Expand Down Expand Up @@ -189,13 +177,9 @@ private void cleanup(List<PipelineExecutionDetails> executions) {

private boolean hasEntityTags(String pipelineId) {
// TODO rz - This index exists only in Netflix-land. Should be added to OSS eventually
return withJedis(jedis -> jedis.sismember("existingServerGroups:pipeline", "pipeline:" + pipelineId));
}

private <T> T withJedis(Function<Jedis, T> f) {
try(Jedis jedis = jedisPool.getResource()) {
return f.apply(jedis);
}
return redisClientDelegate.withCommandsClient(c -> {
return c.sismember("existingServerGroups:pipeline", "pipeline:" + pipelineId);
});
}

private static class PipelineExecutionDetails {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package com.netflix.spinnaker.orca.notifications.scheduling

import com.netflix.spinnaker.kork.jedis.JedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate

import java.time.Clock
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -99,10 +102,11 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification {
}
}
}
def redisClientDelegate = (RedisClientDelegate) new JedisClientDelegate(jedisPool)

def agent = new OldPipelineCleanupPollingNotificationAgent(
executionRepository: executionRepository,
jedisPool: jedisPool,
redisClientDelegate: redisClientDelegate,
clock: clock,
thresholdDays: 5,
minimumPipelineExecutions: 3
Expand Down

0 comments on commit b510335

Please sign in to comment.