From 9a2e0e48bc32556386cbb705de74fc805528e79b Mon Sep 17 00:00:00 2001 From: Justin Field Date: Tue, 1 Oct 2019 15:02:06 -0700 Subject: [PATCH] feat(signalfx): Provide additional context about the results of the signalflow programs. (#617) --- .../metrics/SignalFxMetricsService.java | 122 ++++++++++++++---- 1 file changed, 99 insertions(+), 23 deletions(-) diff --git a/kayenta-signalfx/src/main/java/com/netflix/kayenta/signalfx/metrics/SignalFxMetricsService.java b/kayenta-signalfx/src/main/java/com/netflix/kayenta/signalfx/metrics/SignalFxMetricsService.java index 38b6e704f..334a2d764 100644 --- a/kayenta-signalfx/src/main/java/com/netflix/kayenta/signalfx/metrics/SignalFxMetricsService.java +++ b/kayenta-signalfx/src/main/java/com/netflix/kayenta/signalfx/metrics/SignalFxMetricsService.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018 Nike, inc. + * Copyright (c) 2019 Nike, inc. * * Licensed under the Apache License, Version 2.0 (the "License") * you may not use this file except in compliance with the License. @@ -56,6 +56,16 @@ @Slf4j public class SignalFxMetricsService implements MetricsService { + private static final String SIGNAL_FLOW_ERROR_TEMPLATE = + "An error occurred whole executing the Signal Flow program. " + + "account: %s " + + "startEpochMilli: %s " + + "endEpochMilli: %s " + + "stepMilli: %s " + + "maxDelay: %s " + + "immediate: %s " + + "program: %s"; + @NotNull @Singular @Getter private List accountNames; @Autowired private final AccountCredentialsRepository accountCredentialsRepository; @@ -129,6 +139,8 @@ public List queryMetrics( String accessToken = accountCredentials.getCredentials().getAccessToken(); SignalFxSignalFlowRemoteService signalFlowService = accountCredentials.getSignalFlowService(); + final long maxDelay = 0; + final boolean immediate = true; long startEpochMilli = signalFxCanaryScope.getStart().toEpochMilli(); long endEpochMilli = signalFxCanaryScope.getEnd().toEpochMilli(); long canaryStepLengthInSeconds = signalFxCanaryScope.getStep(); @@ -138,18 +150,38 @@ public List queryMetrics( String program = buildQuery(metricsAccountName, canaryConfig, canaryMetricConfig, canaryScope); SignalFlowExecutionResult signalFlowExecutionResult; + try { signalFlowExecutionResult = signalFlowService.executeSignalFlowProgram( - accessToken, startEpochMilli, endEpochMilli, stepMilli, 0, true, program); + accessToken, startEpochMilli, endEpochMilli, stepMilli, maxDelay, immediate, program); } catch (RetrofitError e) { ErrorResponse errorResponse = (ErrorResponse) e.getBodyAs(ErrorResponse.class); throw new SignalFxRequestError( errorResponse, program, startEpochMilli, endEpochMilli, stepMilli, metricsAccountName); } + validateResults( + signalFlowExecutionResult.getChannelMessages(), + metricsAccountName, + startEpochMilli, + endEpochMilli, + stepMilli, + maxDelay, + immediate, + program); + + LinkedList dataMessages = + extractDataMessages(signalFlowExecutionResult); + + ChannelMessage.DataMessage firstDataPoint = + dataMessages.size() > 0 ? dataMessages.getFirst() : null; + + ChannelMessage.DataMessage lastDataPoint = + dataMessages.size() > 0 ? dataMessages.getLast() : null; + // Return a Metric set of the reduced and aggregated data - MetricSet metricSet = + MetricSet.MetricSetBuilder metricSetBuilder = MetricSet.builder() .name(canaryMetricConfig.getName()) .startTimeMillis(startEpochMilli) @@ -157,54 +189,98 @@ public List queryMetrics( .endTimeMillis(endEpochMilli) .endTimeIso(Instant.ofEpochMilli(endEpochMilli).toString()) .stepMillis(stepMilli) - .values( - getTimeSeriesDataFromChannelMessages( - signalFlowExecutionResult.getChannelMessages())) + .values(getTimeSeriesDataFromDataMessages(dataMessages)) .tags( queryPairs.stream() .collect(Collectors.toMap(QueryPair::getKey, QueryPair::getValue))) .attribute("signal-flow-program", program) - .build(); + .attribute("actual-data-point-count", String.valueOf(dataMessages.size())) + .attribute("requested-start", String.valueOf(startEpochMilli)) + .attribute("requested-end", String.valueOf(endEpochMilli)) + .attribute("requested-step-milli", String.valueOf(stepMilli)) + .attribute("requested-max-delay", String.valueOf(maxDelay)) + .attribute("requested-immediate", String.valueOf(immediate)) + .attribute("requested-account", metricsAccountName); - return Collections.singletonList(metricSet); + Optional.ofNullable(firstDataPoint) + .ifPresent( + dp -> + metricSetBuilder.attribute( + "actual-start-ts", String.valueOf(dp.getLogicalTimestampMs()))); + Optional.ofNullable(lastDataPoint) + .ifPresent( + dp -> + metricSetBuilder.attribute( + "actual-end-ts", String.valueOf(dp.getLogicalTimestampMs()))); + + return Collections.singletonList(metricSetBuilder.build()); } /** - * Parses the data out of the SignalFx Signal Flow messages to build the data Kayenta needs to - * make judgements. + * Extracts the messages with data points from the Signal Fx response. * - * @param channelMessages The list of messages from the signal flow execution. - * @return The list of values with missing data filled with NaNs + * @param signalFlowExecutionResult The SignalFlow program results. + * @return An linked list of the data points from the result. */ - protected List getTimeSeriesDataFromChannelMessages( - List channelMessages) { - channelMessages + private LinkedList extractDataMessages( + SignalFlowExecutionResult signalFlowExecutionResult) { + return signalFlowExecutionResult + .getChannelMessages() .parallelStream() + .filter(channelMessage -> channelMessage.getType().equals(DATA_MESSAGE)) + .map((message) -> (ChannelMessage.DataMessage) message) + .collect(Collectors.toCollection(LinkedList::new)); + } + + /** Parses the channel messages and the first error if present. */ + private void validateResults( + List channelMessages, + String account, + long startEpochMilli, + long endEpochMilli, + long stepMilli, + long maxDelay, + boolean immediate, + String program) { + channelMessages.stream() .filter(channelMessage -> channelMessage.getType().equals(ERROR_MESSAGE)) .findAny() .ifPresent( error -> { - // This error message is terrible, and I am not sure how to add more context to it. // error.getErrors() returns a List, and it is unclear what to do with those. throw new RuntimeException( - "Some sort of error occurred, when executing the signal flow program"); + String.format( + SIGNAL_FLOW_ERROR_TEMPLATE, + account, + String.valueOf(startEpochMilli), + String.valueOf(endEpochMilli), + String.valueOf(stepMilli), + String.valueOf(maxDelay), + String.valueOf(immediate), + program)); }); + } - return channelMessages - .parallelStream() - .filter(channelMessage -> channelMessage.getType().equals(DATA_MESSAGE)) + /** + * Parses the data out of the SignalFx Signal Flow messages to build the data Kayenta needs to + * make judgements. + * + * @param dataMessages The list of data messages from the signal flow execution. + * @return The list of values with missing data filled with NaNs + */ + protected List getTimeSeriesDataFromDataMessages( + List dataMessages) { + return dataMessages.stream() .map( message -> { - ChannelMessage.DataMessage dataMessage = (ChannelMessage.DataMessage) message; - Map data = dataMessage.getData(); + Map data = message.getData(); if (data.size() > 1) { throw new IllegalStateException( "There was more than one value for a given timestamp, a " + "SignalFlow stream method that can aggregate should have been applied to the data in " + "the SignalFlow program"); } - //noinspection OptionalGetWithoutIsPresent return data.size() == 1 ? data.values().stream().findFirst().get().doubleValue() : Double.NaN;