Skip to content

Commit

Permalink
Send canary results as an ApplicatonEvent (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Graff committed Feb 16, 2018
1 parent 65edf39 commit b9e9685
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ public class CanaryExecutionStatusResponse {
protected String startTimeIso;
protected Long endTimeMillis;
protected String endTimeIso;

// If set, this is the storage account name used for the metric data
protected String storageAccountName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.netflix.kayenta.canary;

import com.netflix.kayenta.canary.orca.CanaryStageNames;
import com.netflix.kayenta.security.AccountCredentials;
import com.netflix.kayenta.security.AccountCredentialsRepository;
import com.netflix.kayenta.security.CredentialsHelper;
import com.netflix.kayenta.storage.ObjectType;
import com.netflix.kayenta.storage.StorageService;
import com.netflix.kayenta.storage.StorageServiceRepository;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
import com.netflix.spinnaker.orca.pipeline.model.Stage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.Map;
import java.util.stream.Collectors;

@Component
public class ExecutionMapper {

private final StorageServiceRepository storageServiceRepository;
private final AccountCredentialsRepository accountCredentialsRepository;

@Autowired
public ExecutionMapper(StorageServiceRepository storageServiceRepository, AccountCredentialsRepository accountCredentialsRepository) {
this.storageServiceRepository = storageServiceRepository;
this.accountCredentialsRepository = accountCredentialsRepository;
}

public CanaryExecutionStatusResponse fromExecution(Execution pipeline) {
String canaryExecutionId = pipeline.getId();

Stage contextStage = pipeline.getStages().stream()
.filter(stage -> stage.getRefId().equals(CanaryStageNames.REFID_SET_CONTEXT))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unable to find stage '" + CanaryStageNames.REFID_SET_CONTEXT + "' in pipeline ID '" + canaryExecutionId + "'"));
Map<String, Object> contextContext = contextStage.getContext();

String storageAccountName = (String)contextContext.get("storageAccountName");
return fromExecution(storageAccountName, pipeline);
}

public CanaryExecutionStatusResponse fromExecution(String unresolvedStorageAccountName, Execution pipeline) {
String storageAccountName = CredentialsHelper.resolveAccountByNameOrType(unresolvedStorageAccountName,
AccountCredentials.Type.OBJECT_STORE,
accountCredentialsRepository);

StorageService storageService =
storageServiceRepository
.getOne(storageAccountName)
.orElseThrow(() -> new IllegalArgumentException("No storage service was configured; unable to retrieve results."));

String canaryExecutionId = pipeline.getId();

Stage judgeStage = pipeline.getStages().stream()
.filter(stage -> stage.getRefId().equals(CanaryStageNames.REFID_JUDGE))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unable to find stage '" + CanaryStageNames.REFID_JUDGE + "' in pipeline ID '" + canaryExecutionId + "'"));
Map<String, Object> judgeOutputs = judgeStage.getOutputs();

Stage contextStage = pipeline.getStages().stream()
.filter(stage -> stage.getRefId().equals(CanaryStageNames.REFID_SET_CONTEXT))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unable to find stage '" + CanaryStageNames.REFID_SET_CONTEXT + "' in pipeline ID '" + canaryExecutionId + "'"));
Map<String, Object> contextContext = contextStage.getContext();

Stage mixerStage = pipeline.getStages().stream()
.filter(stage -> stage.getRefId().equals(CanaryStageNames.REFID_MIX_METRICS))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unable to find stage '" + CanaryStageNames.REFID_MIX_METRICS + "' in pipeline ID '" + canaryExecutionId + "'"));
Map<String, Object> mixerContext = mixerStage.getContext();

CanaryExecutionStatusResponse.CanaryExecutionStatusResponseBuilder canaryExecutionStatusResponseBuilder =
CanaryExecutionStatusResponse.builder()
.application((String)contextContext.get("application"))
.parentPipelineExecutionId((String)contextContext.get("parentPipelineExecutionId"));

Map<String, String> stageStatus = pipeline.getStages()
.stream()
.collect(Collectors.toMap(Stage::getRefId, s -> s.getStatus().toString().toLowerCase()));

Boolean isComplete = pipeline.getStatus().isComplete();
String pipelineStatus = pipeline.getStatus().toString().toLowerCase();

canaryExecutionStatusResponseBuilder
.stageStatus(stageStatus)
.complete(isComplete)
.status(pipelineStatus);

Long buildTime = pipeline.getBuildTime();
if (buildTime != null) {
canaryExecutionStatusResponseBuilder
.buildTimeMillis(buildTime)
.buildTimeIso(Instant.ofEpochMilli(buildTime) + "");
}

Long startTime = pipeline.getStartTime();
if (startTime != null) {
canaryExecutionStatusResponseBuilder
.startTimeMillis(startTime)
.startTimeIso(Instant.ofEpochMilli(startTime) + "");
}

Long endTime = pipeline.getEndTime();
if (endTime != null) {
canaryExecutionStatusResponseBuilder
.endTimeMillis(endTime)
.endTimeIso(Instant.ofEpochMilli(endTime) + "");
}

if (isComplete && pipelineStatus.equals("succeeded")) {
if (judgeOutputs.containsKey("canaryJudgeResultId")) {
String canaryJudgeResultId = (String)judgeOutputs.get("canaryJudgeResultId");
canaryExecutionStatusResponseBuilder.result(storageService.loadObject(storageAccountName, ObjectType.CANARY_RESULT, canaryJudgeResultId));
}
}

// Propagate the first canary pipeline exception we can locate.
Stage stageWithException = pipeline.getStages().stream()
.filter(stage -> stage.getContext().containsKey("exception"))
.findFirst()
.orElse(null);

if (stageWithException != null) {
canaryExecutionStatusResponseBuilder.exception(stageWithException.getContext().get("exception"));
}

canaryExecutionStatusResponseBuilder.storageAccountName(storageAccountName);

return canaryExecutionStatusResponseBuilder.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.netflix.kayenta.canary.orca;

public class CanaryStageNames {
public static final String REFID_SET_CONTEXT = "setupContext";
public static final String REFID_FETCH_CONTROL_PREFIX = "fetchControl";
public static final String REFID_FETCH_EXPERIMENT_PREFIX = "fetchExperiment";
public static final String REFID_MIX_METRICS = "mixMetrics";
public static final String REFID_JUDGE = "judge";
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
"com.netflix.kayenta.index.config",
"com.netflix.kayenta.metrics",
"com.netflix.kayenta.persistence.config",
"com.netflix.kayenta.retrofit.config"
"com.netflix.kayenta.retrofit.config",
"com.netflix.kayenta.events",
"com.netflix.kayenta.external"
})
public class KayentaConfiguration {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.netflix.kayenta.events;

import com.netflix.kayenta.canary.CanaryExecutionStatusResponse;
import org.springframework.context.ApplicationEvent;

public class CanaryExecutionCompletedEvent extends ApplicationEvent {
private final CanaryExecutionStatusResponse canaryExecutionStatusResponse;

public CanaryExecutionCompletedEvent(Object source, CanaryExecutionStatusResponse canaryExecutionStatusResponse) {
super(source);
this.canaryExecutionStatusResponse = canaryExecutionStatusResponse;
}

public CanaryExecutionStatusResponse getCanaryExecutionStatusResponse() {
return canaryExecutionStatusResponse;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.netflix.kayenta.events;

import com.netflix.kayenta.canary.CanaryExecutionStatusResponse;
import com.netflix.kayenta.canary.ExecutionMapper;
import com.netflix.spinnaker.orca.events.ExecutionComplete;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class CanaryExecutionCompletedProducer implements ApplicationListener<ExecutionComplete> {

private final ApplicationEventPublisher applicationEventPublisher;
private final ExecutionRepository executionRepository;
private final ExecutionMapper executionMapper;

@Autowired
public CanaryExecutionCompletedProducer(ApplicationEventPublisher applicationEventPublisher, ExecutionRepository executionRepository, ExecutionMapper executionMapper) {
this.applicationEventPublisher = applicationEventPublisher;
this.executionRepository = executionRepository;
this.executionMapper = executionMapper;
}

@Override
public void onApplicationEvent(ExecutionComplete event) {
if (event.getExecutionType() != Execution.ExecutionType.PIPELINE) {
return;
}
Execution execution = executionRepository.retrieve(Execution.ExecutionType.PIPELINE, event.getExecutionId());
CanaryExecutionStatusResponse canaryExecutionStatusResponse = executionMapper.fromExecution(execution);
CanaryExecutionCompletedEvent canaryExecutionCompletedEvent = new CanaryExecutionCompletedEvent(this, canaryExecutionStatusResponse);
applicationEventPublisher.publishEvent(canaryExecutionCompletedEvent);
}
}
Loading

0 comments on commit b9e9685

Please sign in to comment.