Skip to content

Commit

Permalink
Add support for SSE responses from updated Atlas. (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Duftler committed May 17, 2017
1 parent a827b54 commit e0de8d1
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 594 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ MetricsService atlasMetricsService(AtlasConfigurationProperties atlasConfigurati
}

@Bean
AtlasRemoteService atlasRemoteService(AtlasConfigurationProperties atlasConfigurationProperties,
AtlasRemoteService atlasRemoteService(AtlasSSEConverter atlasSSEConverter,
AtlasConfigurationProperties atlasConfigurationProperties,
RetrofitClientFactory retrofitClientFactory,
OkHttpClient okHttpClient) {
return retrofitClientFactory.createClient(AtlasRemoteService.class, atlasConfigurationProperties.getEndpoint(), okHttpClient);
return retrofitClientFactory.createClient(AtlasRemoteService.class,
atlasSSEConverter,
atlasConfigurationProperties.getEndpoint(),
okHttpClient);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2017 Google, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.kayenta.atlas.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.kayenta.atlas.model.AtlasResults;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import retrofit.converter.ConversionException;
import retrofit.converter.Converter;
import retrofit.mime.TypedInput;
import retrofit.mime.TypedOutput;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;

@Component
@Slf4j
// Atlas returns one json stanza per line; we rely on that here.
// We are not implementing full, proper SSE handling here.
public class AtlasSSEConverter implements Converter {

private static final ObjectMapper objectMapper = new ObjectMapper()
.setSerializationInclusion(NON_NULL)
.disable(FAIL_ON_UNKNOWN_PROPERTIES);
private static final List<String> expectedResultsTypeList = Arrays.asList(new String[]{"timeseries", "close"});

@Override
public List<AtlasResults> fromBody(TypedInput body, Type type) throws ConversionException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(body.in()))) {
List<String[]> tokenizedLines =
reader
.lines()
.filter(line -> !StringUtils.isEmpty(line))
.map(line -> line.split(": "))
.collect(Collectors.toList());

tokenizedLines
.stream()
.map(tokenizedLine -> tokenizedLine[0])
.filter(openingToken -> !openingToken.equals("data"))
.forEach(nonDataOpeningToken -> log.info("Received opening token other than 'data' from Atlas: {}", nonDataOpeningToken));

List<AtlasResults> atlasResultsList =
tokenizedLines
.stream()
.map(AtlasSSEConverter::convertTokenizedLineToAtlasResults)
.filter(Objects::nonNull)
.collect(Collectors.toList());

if (atlasResultsList.isEmpty()) {
log.error("Received no data from Atlas.");

// TODO(duftler): Propagate exception here?
return null;
} else if (!atlasResultsList.get(atlasResultsList.size() - 1).getType().equals("close")) {
log.error("Received data from Atlas that did not terminate with a 'close'.");

// TODO(duftler): Propagate exception here?
return null;
}

return atlasResultsList;
} catch (IOException e) {
e.printStackTrace();
}

return null;
}

private static AtlasResults convertTokenizedLineToAtlasResults(String[] tokenizedLine) {
try {
AtlasResults atlasResults = objectMapper.readValue(tokenizedLine[1], AtlasResults.class);
String atlasResultsType = atlasResults.getType();

if (StringUtils.isEmpty(atlasResultsType) || !expectedResultsTypeList.contains(atlasResultsType)) {
log.info("Received results of type other than 'timeseries' or 'close' from Atlas: {}", atlasResults);

// TODO: Retry on type 'error'?

return null;
}

return atlasResults;
} catch (IOException e) {
e.printStackTrace();

return null;
}
}

@Override
public TypedOutput toBody(Object object) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.netflix.kayenta.atlas.metrics;

import com.google.common.collect.ImmutableMap;
import com.netflix.kayenta.atlas.model.AtlasResults;
import com.netflix.kayenta.atlas.model.AtlasResultsHelper;
import com.netflix.kayenta.atlas.security.AtlasNamedAccountCredentials;
import com.netflix.kayenta.atlas.service.AtlasRemoteService;
import com.netflix.kayenta.metrics.MetricSet;
Expand All @@ -33,7 +33,6 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -56,6 +55,8 @@ public boolean servicesAccount(String accountName) {

@Override
// These are still placeholder arguments. Each metrics service will have its own set of required/optional arguments. The return type is a placeholder as well.
// Using metricSetName to pass the sse filename for now.
// TODO(duftler): Make this api generic.
public List<MetricSet> queryMetrics(String accountName,
String metricSetName,
String instanceNamePrefix,
Expand All @@ -65,34 +66,36 @@ public List<MetricSet> queryMetrics(String accountName,
.getOne(accountName)
.orElseThrow(() -> new IllegalArgumentException("Unable to resolve account " + accountName + "."));
AtlasRemoteService atlasRemoteService = credentials.getAtlasRemoteService();
AtlasResults atlasResults = atlasRemoteService.fetch("name,randomValue,:eq,:sum,(,name,),:by", "std.json");
Instant responseStartTimeInstant = Instant.ofEpochMilli(atlasResults.getStart());
List<Double> timeSeriesList = atlasResults.getData().getValues();
List<AtlasResults> atlasResultsList = atlasRemoteService.fetch(metricSetName);
Map<String, AtlasResults> idToAtlasResultsMap = AtlasResultsHelper.merge(atlasResultsList);
List<MetricSet> metricSetList = new ArrayList<>();

if (timeSeriesList == null) {
timeSeriesList = new ArrayList<>();
}
for (AtlasResults atlasResults : idToAtlasResultsMap.values()) {
Instant responseStartTimeInstant = Instant.ofEpochMilli(atlasResults.getStart());
List<Double> timeSeriesList = atlasResults.getData().getValues();

if (timeSeriesList == null) {
timeSeriesList = new ArrayList<>();
}

// TODO: Get sample Atlas response with more than one set of results.
// Deferring this for now since we're going to move to the /fetch endpoint once that's available in oss Atlas.
// We are currently developing against canned output retrieved via OSS Atlas's /graph endpoint.
// TODO: Get the metric set name from the request/canary-config.
MetricSet.MetricSetBuilder metricSetBuilder =
MetricSet.builder()
.name("cpu")
.startTimeMillis(atlasResults.getStart())
.startTimeIso(responseStartTimeInstant.toString())
.stepMillis(atlasResults.getStep())
.values(timeSeriesList);

// TODO: Get the metric set name from the request/canary-config.
MetricSet.MetricSetBuilder metricSetBuilder =
MetricSet.builder()
.name(metricSetName)
.startTimeMillis(atlasResults.getStart())
.startTimeIso(responseStartTimeInstant.toString())
.stepMillis(atlasResults.getStep())
.values(timeSeriesList);
Map<String, String> tags = atlasResults.getTags();

// TODO: These have to come from the Atlas response. Just not sure from where exactly yet.
Map<String, String> tags = ImmutableMap.of("not-sure", "about-tags");
if (tags != null) {
metricSetBuilder.tags(tags);
}

if (tags != null) {
metricSetBuilder.tags(tags);
metricSetList.add(metricSetBuilder.build());
}

return Collections.singletonList(metricSetBuilder.build());
return metricSetList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public class AtlasResults {
@Getter
private String type;

@NotNull
@Getter
// TODO(duftler): Might be better to have a different model for results that have messages and no data.
private String message;

@NotNull
@Getter
private String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ private static AtlasResults mergeByTime(List<AtlasResults> atlasResultsList) {
return null;
}

// TODO: Verify that the times do not overlap.
// TODO: Verify that the number of elements in the array is correct.
atlasResultsList.sort(Comparator.comparingLong(AtlasResults::getStart));

AtlasResults firstAtlasResults = atlasResultsList.get(0);
Expand Down Expand Up @@ -71,9 +73,10 @@ private static AtlasResults mergeByTime(List<AtlasResults> atlasResultsList) {
return atlasResultsBuilder.data(TimeseriesData.builder().values(values).type(firstAtlasResults.getData().getType()).build()).build();
}

static Map<String, AtlasResults> merge(List<AtlasResults> atlasResultsList) {
public static Map<String, AtlasResults> merge(List<AtlasResults> atlasResultsList) {
return atlasResultsList
.stream()
.filter(atlasResults -> !atlasResults.getType().equals("close"))
.collect(Collectors.groupingBy(AtlasResults::getId))
.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import retrofit.http.GET;
import retrofit.http.Query;

import java.util.List;

public interface AtlasRemoteService {

@GET("/api/v2/fetch")
AtlasResults fetch(@Query("q") String q, @Query("format") String format);
List<AtlasResults> fetch(@Query("q") String q);
}
Loading

0 comments on commit e0de8d1

Please sign in to comment.