Skip to content

Commit

Permalink
feat(orca): Start orca queue processing after subsystems report healt…
Browse files Browse the repository at this point in the history
…hy. (#143)
  • Loading branch information
Matt Duftler committed Dec 6, 2017
1 parent 9821a37 commit a42741c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.netflix.kayenta.storage.StorageServiceRepository;
import com.netflix.spinnaker.kork.web.exceptions.NotFoundException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.scheduling.annotation.Scheduled;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
Expand All @@ -38,7 +40,7 @@
import java.util.*;

@Slf4j
public class CanaryConfigIndexingAgent {
public class CanaryConfigIndexingAgent extends AbstractHealthIndicator {

public static final String INDEXING_INSTANCE_KEY = "kayenta:indexing-instance";
public static final String HEARTBEAT_KEY_PREFIX = "kayenta:heartbeat:";
Expand All @@ -54,6 +56,9 @@ public class CanaryConfigIndexingAgent {
private final CanaryConfigIndex canaryConfigIndex;
private final IndexConfigurationProperties indexConfigurationProperties;

private int cyclesInitiated = 0;
private int cyclesCompleted = 0;

public CanaryConfigIndexingAgent(String currentInstanceId,
JedisPool jedisPool,
AccountCredentialsRepository accountCredentialsRepository,
Expand Down Expand Up @@ -81,6 +86,8 @@ public void heartbeat() {

@Scheduled(initialDelayString = "#{@indexConfigurationProperties.indexingInitialDelayMS}", fixedDelayString = "#{@indexConfigurationProperties.indexingIntervalMS}")
public void indexCanaryConfigs() {
cyclesInitiated++;

int indexingLockTTLSec = indexConfigurationProperties.getIndexingLockTTLSec();
long staleThresholdMS = indexConfigurationProperties.getPendingUpdateStaleEntryThresholdMS();

Expand Down Expand Up @@ -238,6 +245,38 @@ public void indexCanaryConfigs() {
} else {
log.debug("Failed to acquire indexing lock.");
}

cyclesCompleted++;
}
}

@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
Set<AccountCredentials> configurationStoreAccountCredentialsSet =
CredentialsHelper.getAllAccountsOfType(AccountCredentials.Type.CONFIGURATION_STORE, accountCredentialsRepository);
int existingByApplicationIndexCount = 0;

try (Jedis jedis = jedisPool.getResource()) {
for (AccountCredentials credentials : configurationStoreAccountCredentialsSet) {
String accountName = credentials.getName();
String mapByApplicationKey = "kayenta:" + credentials.getType() + ":" + accountName + MAP_BY_APPLICATION_KEY_SUFFIX;

if (jedis.exists(mapByApplicationKey)) {
existingByApplicationIndexCount++;
}
}
}

// So long as this instance has performed an indexing, or failed to acquire the lock since another instance was in
// the process of indexing, the index should be available. We also verify that the number of by-application index
// keys matches the number of configured configuration store accounts.
if (cyclesCompleted > 0 && existingByApplicationIndexCount == configurationStoreAccountCredentialsSet.size()) {
builder.up();
} else {
builder.down();
}

builder.withDetail("cyclesInitiated", cyclesInitiated);
builder.withDetail("cyclesCompleted", cyclesCompleted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ IndexConfigurationProperties indexConfigurationProperties() {
}

@Bean
CanaryConfigIndexingAgent canaryConfigCachingAgent(String currentInstanceId,
JedisPool jedisPool,
AccountCredentialsRepository accountCredentialsRepository,
StorageServiceRepository storageServiceRepository,
ObjectMapper kayentaObjectMapper,
CanaryConfigIndex canaryConfigIndex,
IndexConfigurationProperties indexConfigurationProperties) {
CanaryConfigIndexingAgent canaryConfigIndexingAgent(String currentInstanceId,
JedisPool jedisPool,
AccountCredentialsRepository accountCredentialsRepository,
StorageServiceRepository storageServiceRepository,
ObjectMapper kayentaObjectMapper,
CanaryConfigIndex canaryConfigIndex,
IndexConfigurationProperties indexConfigurationProperties) {
return new CanaryConfigIndexingAgent(currentInstanceId,
jedisPool,
accountCredentialsRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.HealthEndpoint;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.http.HttpStatus;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;

Expand All @@ -63,10 +67,25 @@ public class PipelineController {
@Autowired
ConfigurableApplicationContext context;

@Autowired
HealthEndpoint healthEndpoint;

@Autowired
ScheduledAnnotationBeanPostProcessor postProcessor;

// TODO(duftler): Expose /inservice and /outofservice endpoints.
@PostConstruct
public void setup() {
context.publishEvent(new RemoteStatusChangedEvent(new StatusChangeEvent(STARTING, UP)));
@Scheduled(initialDelay = 10000, fixedDelay = 5000)
void startOrcaQueueProcessing() {
Health health = healthEndpoint.invoke();

if (health.getStatus() == Status.UP) {
context.publishEvent(new RemoteStatusChangedEvent(new StatusChangeEvent(STARTING, UP)));

// Cancel the scheduled task.
postProcessor.postProcessBeforeDestruction(this, null);
} else {
log.info("Health indicators are still reporting DOWN; not starting orca queue processing yet: {}", health);
}
}

@ApiOperation(value = "Initiate a pipeline execution")
Expand Down

0 comments on commit a42741c

Please sign in to comment.