Skip to content

Commit

Permalink
feat(move canaryExecutionRequest and application to top-level)
Browse files Browse the repository at this point in the history
This moves the canaryExecutionRequest, application, and config to be top-level objects.
  • Loading branch information
Michael Graff committed Mar 23, 2018
1 parent 8fce738 commit 6784319
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
@Data
@Builder
public class CanaryExecutionStatusResponse {

// These are here (in addition to CanaryResult) so we can still correlate runs while the canary execution is in-flight,
// or in the case where it never reaches the judging stage.
protected String application;

protected String parentPipelineExecutionId;

@NotNull
protected String pipelineId;

@NotNull
Expand All @@ -45,6 +45,14 @@ public class CanaryExecutionStatusResponse {

protected CanaryResult result;

protected CanaryConfig config;

protected String configId;

protected CanaryExecutionRequest canaryExecutionRequest;

protected String metricSetPairListId;

//
// buildTime is when the pipeline was first created.
// startTime refers to the time the pipeline started running.
Expand All @@ -61,6 +69,7 @@ public class CanaryExecutionStatusResponse {
protected Long endTimeMillis;
protected String endTimeIso;

// If set, this is the storage account name used for the metric data
// If set, these are the account names used for this run.
protected String storageAccountName;
protected String configurationAccountName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,74 @@
*/
package com.netflix.kayenta.canary;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.netflix.kayenta.canary.orca.CanaryStageNames;
import com.netflix.kayenta.security.AccountCredentials;
import com.netflix.kayenta.security.AccountCredentialsRepository;
import com.netflix.kayenta.security.CredentialsHelper;
import com.netflix.kayenta.storage.ObjectType;
import com.netflix.kayenta.storage.StorageService;
import com.netflix.kayenta.storage.StorageServiceRepository;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.orca.ExecutionStatus;
import com.netflix.spinnaker.orca.pipeline.ExecutionLauncher;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
import com.netflix.spinnaker.orca.pipeline.model.PipelineBuilder;
import com.netflix.spinnaker.orca.pipeline.model.Stage;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Component
@Slf4j
public class ExecutionMapper {

private final StorageServiceRepository storageServiceRepository;
private final AccountCredentialsRepository accountCredentialsRepository;
private final ObjectMapper objectMapper;
private final Registry registry;
private final String currentInstanceId;
private final List<CanaryScopeFactory> canaryScopeFactories;
private final ExecutionLauncher executionLauncher;
private final ExecutionRepository executionRepository;

private final Id pipelineRunId;
private final Id failureId;

@Autowired
public ExecutionMapper(StorageServiceRepository storageServiceRepository, AccountCredentialsRepository accountCredentialsRepository) {
public ExecutionMapper(StorageServiceRepository storageServiceRepository,
AccountCredentialsRepository accountCredentialsRepository,
ObjectMapper kayentaObjectMapper,
Registry registry,
String currentInstanceId,
Optional<List<CanaryScopeFactory>> canaryScopeFactories,
ExecutionLauncher executionLauncher,
ExecutionRepository executionRepository) {
this.storageServiceRepository = storageServiceRepository;
this.accountCredentialsRepository = accountCredentialsRepository;
this.objectMapper = kayentaObjectMapper;
this.registry = registry;
this.currentInstanceId = currentInstanceId;
this.canaryScopeFactories = canaryScopeFactories.orElseGet(Collections::emptyList);
this.executionLauncher = executionLauncher;
this.executionRepository = executionRepository;

this.pipelineRunId = registry.createId("canary.pipelines.initiated");
this.failureId = registry.createId("canary.pipelines.startupFailed");
}

public CanaryExecutionStatusResponse fromExecution(Execution pipeline) {
Expand Down Expand Up @@ -85,12 +127,28 @@ public CanaryExecutionStatusResponse fromExecution(String unresolvedStorageAccou
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unable to find stage '" + CanaryStageNames.REFID_MIX_METRICS + "' in pipeline ID '" + canaryExecutionId + "'"));
Map<String, Object> mixerContext = mixerStage.getContext();
Map<String, Object> mixerOutputs = mixerStage.getOutputs();

CanaryExecutionStatusResponse.CanaryExecutionStatusResponseBuilder canaryExecutionStatusResponseBuilder =
CanaryExecutionStatusResponse.builder()
.application((String)contextContext.get("application"))
.parentPipelineExecutionId((String)contextContext.get("parentPipelineExecutionId"))
.pipelineId(pipeline.getId());
.pipelineId(pipeline.getId())
.storageAccountName(storageAccountName);
if (contextContext.containsKey("configId")) {
String configId = (String)contextContext.get("configId");
canaryExecutionStatusResponseBuilder.configId(configId);
}
if (contextContext.containsKey("configurationAccountName")) {
String configurationAccountName = (String)contextContext.get("configurationAccountName");
canaryExecutionStatusResponseBuilder.configurationAccountName(configurationAccountName);
}
canaryExecutionStatusResponseBuilder.config(getCanaryConfig(pipeline));
canaryExecutionStatusResponseBuilder.canaryExecutionRequest(getCanaryExecutionRequest(pipeline));

if (mixerOutputs.containsKey("metricSetPairListId")) {
canaryExecutionStatusResponseBuilder.metricSetPairListId((String)mixerOutputs.get("metricSetPairListId"));
}

Map<String, String> stageStatus = pipeline.getStages()
.stream()
Expand Down Expand Up @@ -142,9 +200,242 @@ public CanaryExecutionStatusResponse fromExecution(String unresolvedStorageAccou
canaryExecutionStatusResponseBuilder.exception(stageWithException.getContext().get("exception"));
}

canaryExecutionStatusResponseBuilder.storageAccountName(storageAccountName);

return canaryExecutionStatusResponseBuilder.build();
}

public CanaryExecutionRequest getCanaryExecutionRequest(Execution pipeline) {
Stage contextStage = pipeline.getStages().stream()
.filter(stage -> stage.getRefId().equals(CanaryStageNames.REFID_SET_CONTEXT))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unable to find stage '" + CanaryStageNames.REFID_SET_CONTEXT + "' in pipeline ID '" + pipeline.getId() + "'"));
Map<String, Object> context = contextStage.getContext();

String canaryExecutionRequestJSON = (String)context.get("canaryExecutionRequest");
if (canaryExecutionRequestJSON == null) {
return null;
}
CanaryExecutionRequest canaryExecutionRequest = null;
try {
canaryExecutionRequest = objectMapper.readValue(canaryExecutionRequestJSON, CanaryExecutionRequest.class);
} catch (IOException e) {
log.error("Cannot deserialize canaryExecutionRequest", e);
throw new IllegalArgumentException("Cannot deserialize canaryExecutionRequest", e);
}
return canaryExecutionRequest;
}

public CanaryConfig getCanaryConfig(Execution pipeline) {
Stage contextStage = pipeline.getStages().stream()
.filter(stage -> stage.getRefId().equals(CanaryStageNames.REFID_SET_CONTEXT))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unable to find stage '" + CanaryStageNames.REFID_SET_CONTEXT + "' in pipeline ID '" + pipeline.getId() + "'"));
Map<String, Object> context = contextStage.getContext();

Map<String, Object> canaryConfigMap = (Map<String, Object>)context.get("canaryConfig");
return objectMapper.convertValue(canaryConfigMap, CanaryConfig.class);
}


private CanaryScopeFactory getScopeFactoryForServiceType(String serviceType) {
return canaryScopeFactories
.stream()
.filter((f) -> f.handles(serviceType)).findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unable to resolve canary scope factory for '" + serviceType + "'."));
}

private CanaryScope getScopeForNamedScope(CanaryExecutionRequest executionRequest, String scopeName, boolean isCanary) {
CanaryScopePair canaryScopePair = executionRequest.getScopes().get(scopeName);
CanaryScope canaryScope = isCanary ? canaryScopePair.getExperimentScope() : canaryScopePair.getControlScope();
if (canaryScope == null) {
throw new IllegalArgumentException("Canary scope for named scope " + scopeName + " is missing experimentScope or controlScope keys");
}
return canaryScope;
}


private List<Map<String, Object>> generateFetchScopes(CanaryConfig canaryConfig,
CanaryExecutionRequest executionRequest,
boolean isCanary,
String resolvedMetricsAccountName,
String resolvedStorageAccountName) {
return IntStream.range(0, canaryConfig.getMetrics().size())
.mapToObj(index -> {
CanaryMetricConfig metric = canaryConfig.getMetrics().get(index);
String serviceType = metric.getQuery().getServiceType();
CanaryScopeFactory canaryScopeFactory = getScopeFactoryForServiceType(serviceType);
if (metric.getScopeName() == null) {
throw new IllegalArgumentException("Canary scope for metric named '" + metric.getName() + "' is null.");
}
CanaryScope inspecificScope = getScopeForNamedScope(executionRequest, metric.getScopeName(), isCanary);
CanaryScope scopeModel = canaryScopeFactory.buildCanaryScope(inspecificScope);
String stagePrefix = (isCanary ? CanaryStageNames.REFID_FETCH_EXPERIMENT_PREFIX : CanaryStageNames.REFID_FETCH_CONTROL_PREFIX);
String scopeJson;
try {
scopeJson = objectMapper.writeValueAsString(scopeModel);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Cannot render scope to json"); // TODO: this seems like cheating
}

String currentStageId = stagePrefix + index;
String previousStageId = (index == 0) ? CanaryStageNames.REFID_SET_CONTEXT : stagePrefix + (index - 1);

return Maps.newHashMap(
new ImmutableMap.Builder<String, Object>()
.put("refId", currentStageId)
.put("metricIndex", index)
.put("requisiteStageRefIds", Collections.singletonList(previousStageId))
.put("user", "[anonymous]")
.put("metricsAccountName", resolvedMetricsAccountName) // TODO: How can this work? We'd need to look this up per type
.put("storageAccountName", resolvedStorageAccountName)
.put("stageType", serviceType + "Fetch")
.put("canaryScope", scopeJson)
.build());
}).collect(Collectors.toList());
}

public CanaryExecutionResponse buildExecution(String application,
String parentPipelineExecutionId,
@NotNull String canaryConfigId,
@NotNull CanaryConfig canaryConfig,
String resolvedConfigurationAccountName,
@NotNull String resolvedMetricsAccountName,
@NotNull String resolvedStorageAccountName,
@NotNull CanaryExecutionRequest canaryExecutionRequest) throws JsonProcessingException {
registry.counter(pipelineRunId.withTag("canaryConfigId", canaryConfigId).withTag("canaryConfigName", canaryConfig.getName())).increment();

Set<String> requiredScopes = canaryConfig.getMetrics().stream()
.map(CanaryMetricConfig::getScopeName)
.filter(Objects::nonNull)
.collect(Collectors.toSet());

if (requiredScopes.size() > 0 && canaryExecutionRequest.getScopes() == null) {
throw new IllegalArgumentException("Canary metrics require scopes, but no scopes were provided in the execution request.");
}
Set<String> providedScopes = canaryExecutionRequest.getScopes() == null ? Collections.emptySet() : canaryExecutionRequest.getScopes().keySet();
requiredScopes.removeAll(providedScopes);
if (requiredScopes.size() > 0) {
throw new IllegalArgumentException("Canary metrics require scopes which were not provided in the execution request: " + requiredScopes);
}

// TODO: Will non-spinnaker users need to know what application to pass (probably should pass something, or else how will they group their runs)?
if (StringUtils.isEmpty(application)) {
application = "kayenta-" + currentInstanceId;
}

if (StringUtils.isEmpty(parentPipelineExecutionId)) {
parentPipelineExecutionId = "no-parent-pipeline-execution";
}

HashMap<String, Object> setupCanaryContext =
Maps.newHashMap(
new ImmutableMap.Builder<String, Object>()
.put("refId", CanaryStageNames.REFID_SET_CONTEXT)
.put("user", "[anonymous]")
.put("application", application)
.put("parentPipelineExecutionId", parentPipelineExecutionId)
.put("storageAccountName", resolvedStorageAccountName)
.put("canaryConfig", canaryConfig)
.build());
if (resolvedConfigurationAccountName != null) {
setupCanaryContext.put("configurationAccountName", resolvedConfigurationAccountName);
}
if (canaryConfigId != null) {
setupCanaryContext.put("canaryConfigId", canaryConfigId);
}

List<Map<String, Object>> fetchExperimentContexts = generateFetchScopes(canaryConfig,
canaryExecutionRequest,
true,
resolvedMetricsAccountName,
resolvedStorageAccountName);
List<Map<String, Object>> controlFetchContexts = generateFetchScopes(canaryConfig,
canaryExecutionRequest,
false,
resolvedMetricsAccountName,
resolvedStorageAccountName);

int maxMetricIndex = canaryConfig.getMetrics().size() - 1; // 0 based naming, so we want the last index value, not the count
String lastControlFetchRefid = CanaryStageNames.REFID_FETCH_CONTROL_PREFIX + maxMetricIndex;
String lastExperimentFetchRefid = CanaryStageNames.REFID_FETCH_EXPERIMENT_PREFIX + maxMetricIndex;

Map<String, Object> mixMetricSetsContext =
Maps.newHashMap(
new ImmutableMap.Builder<String, Object>()
.put("refId", CanaryStageNames.REFID_MIX_METRICS)
.put("requisiteStageRefIds", new ImmutableList.Builder().add(lastControlFetchRefid).add(lastExperimentFetchRefid).build())
.put("user", "[anonymous]")
.put("storageAccountName", resolvedStorageAccountName)
.put("controlRefidPrefix", CanaryStageNames.REFID_FETCH_CONTROL_PREFIX)
.put("experimentRefidPrefix", CanaryStageNames.REFID_FETCH_EXPERIMENT_PREFIX)
.build());

CanaryClassifierThresholdsConfig orchestratorScoreThresholds = canaryExecutionRequest.getThresholds();

if (orchestratorScoreThresholds == null) {
if (canaryConfig.getClassifier() == null || canaryConfig.getClassifier().getScoreThresholds() == null) {
throw new IllegalArgumentException("Classifier thresholds must be specified in either the canary config, or the execution request.");
}
// The score thresholds were not explicitly passed in from the orchestrator (i.e. Spinnaker), so just use the canary config values.
orchestratorScoreThresholds = canaryConfig.getClassifier().getScoreThresholds();
}

String canaryExecutionRequestJSON = objectMapper.writeValueAsString(canaryExecutionRequest);
setupCanaryContext.put("canaryExecutionRequest", canaryExecutionRequestJSON);

Map<String, Object> canaryJudgeContext =
Maps.newHashMap(
new ImmutableMap.Builder<String, Object>()
.put("refId", CanaryStageNames.REFID_JUDGE)
.put("requisiteStageRefIds", Collections.singletonList(CanaryStageNames.REFID_MIX_METRICS))
.put("user", "[anonymous]")
.put("storageAccountName", resolvedStorageAccountName)
.put("metricSetPairListId", "${ #stage('Mix Control and Experiment Results')['context']['metricSetPairListId']}")
.put("orchestratorScoreThresholds", orchestratorScoreThresholds)
.build());

String canaryPipelineConfigId = application + "-standard-canary-pipeline";
PipelineBuilder pipelineBuilder =
new PipelineBuilder(application)
.withName("Standard Canary Pipeline")
.withPipelineConfigId(canaryPipelineConfigId)
.withStage("setupCanary", "Setup Canary", setupCanaryContext)
.withStage("metricSetMixer", "Mix Control and Experiment Results", mixMetricSetsContext)
.withStage("canaryJudge", "Perform Analysis", canaryJudgeContext);

controlFetchContexts.forEach((context) ->
pipelineBuilder.withStage((String)context.get("stageType"),
(String)context.get("refId"),
context));
fetchExperimentContexts.forEach((context) ->
pipelineBuilder.withStage((String)context.get("stageType"),
(String)context.get("refId"),
context));

Execution pipeline = pipelineBuilder
.withLimitConcurrent(false)
.build();

executionRepository.store(pipeline);

try {
executionLauncher.start(pipeline);
} catch (Throwable t) {
handleStartupFailure(pipeline, t);
}

return CanaryExecutionResponse.builder().canaryExecutionId(pipeline.getId()).build();
}

private void handleStartupFailure(Execution execution, Throwable failure) {
final String canceledBy = "system";
final String reason = "Failed on startup: " + failure.getMessage();
final ExecutionStatus status = ExecutionStatus.TERMINAL;

log.error("Failed to start {} {}", execution.getType(), execution.getId(), failure);
executionRepository.updateStatus(execution.getId(), status);
executionRepository.cancel(execution.getId(), canceledBy, reason);

registry.counter(failureId).increment();
}

}
Loading

0 comments on commit 6784319

Please sign in to comment.