Skip to content

Commit

Permalink
feat(signalfx): Provide additional context about the results of the s…
Browse files Browse the repository at this point in the history
…ignalflow programs. (#617)
  • Loading branch information
fieldju committed Oct 1, 2019
1 parent c056453 commit 9a2e0e4
Showing 1 changed file with 99 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 Nike, inc.
* Copyright (c) 2019 Nike, inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,6 +56,16 @@
@Slf4j
public class SignalFxMetricsService implements MetricsService {

private static final String SIGNAL_FLOW_ERROR_TEMPLATE =
"An error occurred whole executing the Signal Flow program. "
+ "account: %s "
+ "startEpochMilli: %s "
+ "endEpochMilli: %s "
+ "stepMilli: %s "
+ "maxDelay: %s "
+ "immediate: %s "
+ "program: %s";

@NotNull @Singular @Getter private List<String> accountNames;

@Autowired private final AccountCredentialsRepository accountCredentialsRepository;
Expand Down Expand Up @@ -129,6 +139,8 @@ public List<MetricSet> queryMetrics(
String accessToken = accountCredentials.getCredentials().getAccessToken();
SignalFxSignalFlowRemoteService signalFlowService = accountCredentials.getSignalFlowService();

final long maxDelay = 0;
final boolean immediate = true;
long startEpochMilli = signalFxCanaryScope.getStart().toEpochMilli();
long endEpochMilli = signalFxCanaryScope.getEnd().toEpochMilli();
long canaryStepLengthInSeconds = signalFxCanaryScope.getStep();
Expand All @@ -138,73 +150,137 @@ public List<MetricSet> queryMetrics(
String program = buildQuery(metricsAccountName, canaryConfig, canaryMetricConfig, canaryScope);

SignalFlowExecutionResult signalFlowExecutionResult;

try {
signalFlowExecutionResult =
signalFlowService.executeSignalFlowProgram(
accessToken, startEpochMilli, endEpochMilli, stepMilli, 0, true, program);
accessToken, startEpochMilli, endEpochMilli, stepMilli, maxDelay, immediate, program);
} catch (RetrofitError e) {
ErrorResponse errorResponse = (ErrorResponse) e.getBodyAs(ErrorResponse.class);
throw new SignalFxRequestError(
errorResponse, program, startEpochMilli, endEpochMilli, stepMilli, metricsAccountName);
}

validateResults(
signalFlowExecutionResult.getChannelMessages(),
metricsAccountName,
startEpochMilli,
endEpochMilli,
stepMilli,
maxDelay,
immediate,
program);

LinkedList<ChannelMessage.DataMessage> dataMessages =
extractDataMessages(signalFlowExecutionResult);

ChannelMessage.DataMessage firstDataPoint =
dataMessages.size() > 0 ? dataMessages.getFirst() : null;

ChannelMessage.DataMessage lastDataPoint =
dataMessages.size() > 0 ? dataMessages.getLast() : null;

// Return a Metric set of the reduced and aggregated data
MetricSet metricSet =
MetricSet.MetricSetBuilder metricSetBuilder =
MetricSet.builder()
.name(canaryMetricConfig.getName())
.startTimeMillis(startEpochMilli)
.startTimeIso(Instant.ofEpochMilli(startEpochMilli).toString())
.endTimeMillis(endEpochMilli)
.endTimeIso(Instant.ofEpochMilli(endEpochMilli).toString())
.stepMillis(stepMilli)
.values(
getTimeSeriesDataFromChannelMessages(
signalFlowExecutionResult.getChannelMessages()))
.values(getTimeSeriesDataFromDataMessages(dataMessages))
.tags(
queryPairs.stream()
.collect(Collectors.toMap(QueryPair::getKey, QueryPair::getValue)))
.attribute("signal-flow-program", program)
.build();
.attribute("actual-data-point-count", String.valueOf(dataMessages.size()))
.attribute("requested-start", String.valueOf(startEpochMilli))
.attribute("requested-end", String.valueOf(endEpochMilli))
.attribute("requested-step-milli", String.valueOf(stepMilli))
.attribute("requested-max-delay", String.valueOf(maxDelay))
.attribute("requested-immediate", String.valueOf(immediate))
.attribute("requested-account", metricsAccountName);

return Collections.singletonList(metricSet);
Optional.ofNullable(firstDataPoint)
.ifPresent(
dp ->
metricSetBuilder.attribute(
"actual-start-ts", String.valueOf(dp.getLogicalTimestampMs())));
Optional.ofNullable(lastDataPoint)
.ifPresent(
dp ->
metricSetBuilder.attribute(
"actual-end-ts", String.valueOf(dp.getLogicalTimestampMs())));

return Collections.singletonList(metricSetBuilder.build());
}

/**
* Parses the data out of the SignalFx Signal Flow messages to build the data Kayenta needs to
* make judgements.
* Extracts the messages with data points from the Signal Fx response.
*
* @param channelMessages The list of messages from the signal flow execution.
* @return The list of values with missing data filled with NaNs
* @param signalFlowExecutionResult The SignalFlow program results.
* @return An linked list of the data points from the result.
*/
protected List<Double> getTimeSeriesDataFromChannelMessages(
List<ChannelMessage> channelMessages) {
channelMessages
private LinkedList<ChannelMessage.DataMessage> extractDataMessages(
SignalFlowExecutionResult signalFlowExecutionResult) {
return signalFlowExecutionResult
.getChannelMessages()
.parallelStream()
.filter(channelMessage -> channelMessage.getType().equals(DATA_MESSAGE))
.map((message) -> (ChannelMessage.DataMessage) message)
.collect(Collectors.toCollection(LinkedList::new));
}

/** Parses the channel messages and the first error if present. */
private void validateResults(
List<ChannelMessage> channelMessages,
String account,
long startEpochMilli,
long endEpochMilli,
long stepMilli,
long maxDelay,
boolean immediate,
String program) {
channelMessages.stream()
.filter(channelMessage -> channelMessage.getType().equals(ERROR_MESSAGE))
.findAny()
.ifPresent(
error -> {

// This error message is terrible, and I am not sure how to add more context to it.
// error.getErrors() returns a List<Object>, and it is unclear what to do with those.
throw new RuntimeException(
"Some sort of error occurred, when executing the signal flow program");
String.format(
SIGNAL_FLOW_ERROR_TEMPLATE,
account,
String.valueOf(startEpochMilli),
String.valueOf(endEpochMilli),
String.valueOf(stepMilli),
String.valueOf(maxDelay),
String.valueOf(immediate),
program));
});
}

return channelMessages
.parallelStream()
.filter(channelMessage -> channelMessage.getType().equals(DATA_MESSAGE))
/**
* Parses the data out of the SignalFx Signal Flow messages to build the data Kayenta needs to
* make judgements.
*
* @param dataMessages The list of data messages from the signal flow execution.
* @return The list of values with missing data filled with NaNs
*/
protected List<Double> getTimeSeriesDataFromDataMessages(
List<ChannelMessage.DataMessage> dataMessages) {
return dataMessages.stream()
.map(
message -> {
ChannelMessage.DataMessage dataMessage = (ChannelMessage.DataMessage) message;
Map<String, Number> data = dataMessage.getData();
Map<String, Number> data = message.getData();
if (data.size() > 1) {
throw new IllegalStateException(
"There was more than one value for a given timestamp, a "
+ "SignalFlow stream method that can aggregate should have been applied to the data in "
+ "the SignalFlow program");
}
//noinspection OptionalGetWithoutIsPresent
return data.size() == 1
? data.values().stream().findFirst().get().doubleValue()
: Double.NaN;
Expand Down

0 comments on commit 9a2e0e4

Please sign in to comment.