Skip to content

Commit

Permalink
One metric fetch per pipeline stage. (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Graff committed Dec 18, 2017
1 parent bb6efe2 commit e07ea90
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ private boolean matches(Backend backend, String deployment, String dataset, Stri
return false;

// return false if it doesn't match the environment.
if (backend.getEnvironments() != null && !backend.getEnvironments().contains(environment))
return false;

return true;
return backend.getEnvironments() == null || backend.getEnvironments().contains(environment);
}

public synchronized Optional<Backend> getOne(String deployment, String dataset, String region, String environment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@
@Slf4j
public class AtlasFetchController {

@Autowired
AccountCredentialsRepository accountCredentialsRepository;
private final AccountCredentialsRepository accountCredentialsRepository;
private final SynchronousQueryProcessor synchronousQueryProcessor;

@Autowired
SynchronousQueryProcessor synchronousQueryProcessor;
public AtlasFetchController(AccountCredentialsRepository accountCredentialsRepository, SynchronousQueryProcessor synchronousQueryProcessor) {
this.accountCredentialsRepository = accountCredentialsRepository;
this.synchronousQueryProcessor = synchronousQueryProcessor;
}

@RequestMapping(value = "/query", method = RequestMethod.POST)
public Map queryMetrics(@RequestParam(required = false) final String metricsAccountName,
Expand Down Expand Up @@ -89,7 +92,8 @@ public Map queryMetrics(@RequestParam(required = false) final String metricsAcco
String metricSetListId = synchronousQueryProcessor.processQuery(resolvedMetricsAccountName,
resolvedStorageAccountName,
CanaryConfig.builder().metric(canaryMetricConfig).build(),
atlasCanaryScope).get(0);
0,
atlasCanaryScope);

return Collections.singletonMap("metricSetListId", metricSetListId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import lombok.Singular;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.validation.constraints.NotNull;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@
public class AtlasFetchTask implements RetryableTask {

@Autowired
ObjectMapper kayentaObjectMapper;
private ObjectMapper kayentaObjectMapper;

@Autowired
AccountCredentialsRepository accountCredentialsRepository;
private AccountCredentialsRepository accountCredentialsRepository;

@Autowired
SynchronousQueryProcessor synchronousQueryProcessor;
private SynchronousQueryProcessor synchronousQueryProcessor;

@Autowired
AtlasConfigurationProperties atlasConfigurationProperties;
private AtlasConfigurationProperties atlasConfigurationProperties;

@Override
public long getBackoffPeriod() {
Expand Down Expand Up @@ -81,7 +81,8 @@ public TaskResult execute(@Nonnull Stage stage) {
String storageAccountName = (String)context.get("storageAccountName");
Map<String, Object> canaryConfigMap = (Map<String, Object>)context.get("canaryConfig");
CanaryConfig canaryConfig = kayentaObjectMapper.convertValue(canaryConfigMap, CanaryConfig.class);
String scopeJson = (String)stage.getContext().get("atlasCanaryScope");
String scopeJson = (String)context.get("canaryScope");
int metricIndex = (Integer)context.get("metricIndex");
AtlasCanaryScope atlasCanaryScope;
try {
atlasCanaryScope = kayentaObjectMapper.readValue(scopeJson, AtlasCanaryScope.class);
Expand All @@ -99,6 +100,7 @@ public TaskResult execute(@Nonnull Stage stage) {
return synchronousQueryProcessor.processQueryAndProduceTaskResult(resolvedMetricsAccountName,
resolvedStorageAccountName,
canaryConfig,
metricIndex,
atlasCanaryScope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ public class CanaryConfig {
@Getter
private Map<String, String> templates;

@NotNull
@Singular
@Getter
private Map<String, CanaryServiceConfig> services;

@NotNull
@Getter
private CanaryClassifierConfig classifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import com.netflix.kayenta.canary.providers.AtlasCanaryMetricSetQueryConfig;
import com.netflix.kayenta.canary.providers.PrometheusCanaryMetricSetQueryConfig;
import com.netflix.kayenta.canary.providers.StackdriverCanaryMetricSetQueryConfig;
import lombok.NonNull;

@JsonTypeInfo(use= JsonTypeInfo.Id.NAME, include= JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes({@JsonSubTypes.Type(value = AtlasCanaryMetricSetQueryConfig.class, name = "atlas"),
@JsonSubTypes.Type(value = PrometheusCanaryMetricSetQueryConfig.class, name = "prometheus"),
@JsonSubTypes.Type(value = StackdriverCanaryMetricSetQueryConfig.class, name = "stackdriver")})
@JsonInclude(JsonInclude.Include.NON_NULL)
public interface CanaryMetricSetQueryConfig {}
public interface CanaryMetricSetQueryConfig {
@NonNull String getServiceType();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public class AtlasCanaryMetricSetQueryConfig implements CanaryMetricSetQueryConf
@NotNull
@Getter
private String q;

@Override
public String getServiceType() {
return "atlas";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ public class PrometheusCanaryMetricSetQueryConfig implements CanaryMetricSetQuer

@Getter
private List<String> sumByFields;

@Override
public String getServiceType() {
return "prometheus";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ public class StackdriverCanaryMetricSetQueryConfig implements CanaryMetricSetQue
// resulting stackdriver filter is used when composing the query.
@Getter
private String customFilterTemplate;

@Override
public String getServiceType() {
return "stackdriver";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@

public enum CanaryConfigIndexAction {
UPDATE,
DELETE;
DELETE
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -42,21 +41,24 @@
@Component
@Slf4j
public class SynchronousQueryProcessor {
private final MetricsServiceRepository metricsServiceRepository;
private final StorageServiceRepository storageServiceRepository;
private final Registry registry;

@Autowired
MetricsServiceRepository metricsServiceRepository;

@Autowired
StorageServiceRepository storageServiceRepository;

@Autowired
Registry registry;
public SynchronousQueryProcessor(MetricsServiceRepository metricsServiceRepository,
StorageServiceRepository storageServiceRepository,
Registry registry) {
this.metricsServiceRepository = metricsServiceRepository;
this.storageServiceRepository = storageServiceRepository;
this.registry = registry;
}

public List<String> processQuery(String metricsAccountName,
String storageAccountName,
CanaryConfig canaryConfig,
CanaryScope canaryScope) throws IOException {
List<CanaryMetricConfig> canaryMetricConfigs = canaryConfig.getMetrics();
public String processQuery(String metricsAccountName,
String storageAccountName,
CanaryConfig canaryConfig,
int metricIndex,
CanaryScope canaryScope) throws IOException {
MetricsService metricsService =
metricsServiceRepository
.getOne(metricsAccountName)
Expand All @@ -67,45 +69,42 @@ public List<String> processQuery(String metricsAccountName,
.getOne(storageAccountName)
.orElseThrow(() -> new IllegalArgumentException("No storage service was configured; unable to write metric set list."));

List<String> metricSetListIds = new ArrayList<>();
Id queryId = registry.createId("canary.telemetry.query").withTag("metricsStore", metricsService.getType());

for (CanaryMetricConfig canaryMetricConfig : canaryMetricConfigs) {
List<MetricSet> metricSetList = null;
int retries = 0;
boolean success = false;

while (!success) {
try {
registry.counter(queryId.withTag("retries", retries + "")).increment();
metricSetList = metricsService.queryMetrics(metricsAccountName, canaryConfig, canaryMetricConfig, canaryScope);
success = true;
} catch (IOException | UncheckedIOException | RetrofitError e) {
retries++;
// TODO: Externalize this as a configurable setting.
if (retries >= 10)
throw e;
log.warn("Retrying metric service query");
}
CanaryMetricConfig canaryMetricConfig = canaryConfig.getMetrics().get(metricIndex);
List<MetricSet> metricSetList = null;
int retries = 0;
boolean success = false;

while (!success) {
try {
registry.counter(queryId.withTag("retries", retries + "")).increment();
metricSetList = metricsService.queryMetrics(metricsAccountName, canaryConfig, canaryMetricConfig, canaryScope);
success = true;
} catch (IOException | UncheckedIOException | RetrofitError e) {
retries++;
// TODO: Externalize this as a configurable setting.
if (retries >= 10)
throw e;
log.warn("Retrying metric service query");
}
String metricSetListId = UUID.randomUUID() + "";

storageService.storeObject(storageAccountName, ObjectType.METRIC_SET_LIST, metricSetListId, metricSetList);
metricSetListIds.add(metricSetListId);
}
String metricSetListId = UUID.randomUUID() + "";

return metricSetListIds;
storageService.storeObject(storageAccountName, ObjectType.METRIC_SET_LIST, metricSetListId, metricSetList);
return metricSetListId;
}

public TaskResult processQueryAndProduceTaskResult(String metricsAccountName,
String storageAccountName,
CanaryConfig canaryConfig,
int metricIndex,
CanaryScope canaryScope) {
try {
List<String> metricSetListIds = processQuery(metricsAccountName, storageAccountName, canaryConfig, canaryScope);
Map outputs = Collections.singletonMap("metricSetListIds", metricSetListIds);
String metricSetListId = processQuery(metricsAccountName, storageAccountName, canaryConfig, metricIndex, canaryScope);
Map outputs = Collections.singletonMap("metricSetId", metricSetListId);

return new TaskResult(ExecutionStatus.SUCCEEDED, outputs);
return new TaskResult(ExecutionStatus.SUCCEEDED, Collections.emptyMap(), outputs);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,30 @@
import com.netflix.spinnaker.orca.ExecutionStatus;
import com.netflix.spinnaker.orca.RetryableTask;
import com.netflix.spinnaker.orca.TaskResult;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
import com.netflix.spinnaker.orca.pipeline.model.Stage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;

@Component
public class MetricSetMixerServiceTask implements RetryableTask {

@Autowired
AccountCredentialsRepository accountCredentialsRepository;

@Autowired
StorageServiceRepository storageServiceRepository;
private final AccountCredentialsRepository accountCredentialsRepository;
private final StorageServiceRepository storageServiceRepository;
private final MetricSetMixerService metricSetMixerService;

@Autowired
MetricSetMixerService metricSetMixerService;
public MetricSetMixerServiceTask(AccountCredentialsRepository accountCredentialsRepository,
StorageServiceRepository storageServiceRepository,
MetricSetMixerService metricSetMixerService) {
this.accountCredentialsRepository = accountCredentialsRepository;
this.storageServiceRepository = storageServiceRepository;
this.metricSetMixerService = metricSetMixerService;
}

@Override
public long getBackoffPeriod() {
Expand All @@ -63,8 +69,8 @@ public long getTimeout() {
public TaskResult execute(Stage stage) {
Map<String, Object> context = stage.getContext();
String storageAccountName = (String)context.get("storageAccountName");
List<String> controlMetricSetListIds = (List<String>)context.get("controlMetricSetListIds");
List<String> experimentMetricSetListIds = (List<String>)context.get("experimentMetricSetListIds");
List<String> controlMetricSetListIds = getMetricSetListIds(stage.getExecution(), (String)context.get("controlRefidPrefix"));
List<String> experimentMetricSetListIds = getMetricSetListIds(stage.getExecution(), (String)context.get("experimentRefidPrefix"));
String resolvedAccountName = CredentialsHelper.resolveAccountByNameOrType(storageAccountName,
AccountCredentials.Type.OBJECT_STORE,
accountCredentialsRepository);
Expand Down Expand Up @@ -102,4 +108,15 @@ public TaskResult execute(Stage stage) {

return new TaskResult(ExecutionStatus.SUCCEEDED, outputs);
}

private List<String> getMetricSetListIds(Execution execution, String stagePrefix) {
List<Stage> stages = execution.getStages();
return stages.stream()
.filter(stage -> {
String refId = stage.getRefId();
return refId != null && refId.startsWith(stagePrefix);
})
.map(stage -> (String)stage.getOutputs().get("metricSetId"))
.collect(Collectors.toList());
}
}
Loading

0 comments on commit e07ea90

Please sign in to comment.