Skip to content

Commit

Permalink
First pass at transforming raw Stackdriver and Atlas results into com…
Browse files Browse the repository at this point in the history
…mon metrics format. (#7)
  • Loading branch information
Matt Duftler committed Apr 13, 2017
1 parent e9cd8f7 commit 5cde992
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,29 @@

package com.netflix.kayenta.atlas.metrics;

import com.google.common.collect.ImmutableMap;
import com.netflix.kayenta.atlas.model.AtlasResults;
import com.netflix.kayenta.atlas.security.AtlasNamedAccountCredentials;
import com.netflix.kayenta.atlas.service.AtlasRemoteService;
import com.netflix.kayenta.metrics.MetricSet;
import com.netflix.kayenta.metrics.MetricsService;
import com.netflix.kayenta.security.AccountCredentialsRepository;
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.Collections;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@Builder
@Slf4j
public class AtlasMetricsService implements MetricsService {

@NotNull
Expand All @@ -51,15 +56,45 @@ public boolean servicesAccount(String accountName) {

@Override
// These are still placeholder arguments. Each metrics service will have its own set of required/optional arguments. The return type is a placeholder as well.
public Optional<Map> queryMetrics(String accountName, String instanceNamePrefix, String intervalStartTime, String intervalEndTime) throws IOException {
public Optional<MetricSet> queryMetrics(String accountName,
String metricSetName,
String instanceNamePrefix,
String intervalStartTime,
String intervalEndTime) throws IOException {
AtlasNamedAccountCredentials credentials = (AtlasNamedAccountCredentials)accountCredentialsRepository
.getOne(accountName)
.orElseThrow(() -> new IllegalArgumentException("Unable to resolve account " + accountName + "."));
AtlasRemoteService atlasRemoteService = credentials.getAtlasRemoteService();
AtlasResults atlasResults = atlasRemoteService.fetch("name,randomValue,:eq,:sum,(,name,),:by", "std.json");
Instant responseStartTimeInstant = Instant.ofEpochMilli(atlasResults.getStart());
List<List<Double>> timeSeriesList = atlasResults.getValues();

System.out.println("** Got back from fetch: atlasResults=" + atlasResults);
if (timeSeriesList.size() == 0) {
throw new IllegalArgumentException("No time series was returned.");
}

return Optional.of(Collections.singletonMap("some-key", "some-value"));
List<Double> pointValues =
timeSeriesList
.stream()
.map(timeSeries -> timeSeries.get(0))
.collect(Collectors.toList());

// TODO: Get the metric set name from the request/canary-config.
MetricSet.MetricSetBuilder metricSetBuilder =
MetricSet.builder()
.name(metricSetName)
.startTimeMillis(atlasResults.getStart())
.startTimeIso(responseStartTimeInstant.toString())
.stepMillis(atlasResults.getStep())
.values(pointValues);

// TODO: These have to come from the Atlas response. Just not sure from where exactly yet.
Map<String, String> tags = ImmutableMap.of("not-sure", "about-tags");

if (tags != null) {
metricSetBuilder.tags(tags);
}

return Optional.of(metricSetBuilder.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2017 Google, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.kayenta.metrics;

import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.ToString;

import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;

@Builder
@ToString
public class MetricSet {

@NotNull
@Getter
private String name;

@NotNull
@Singular
@Getter
private Map<String, String> tags;

@NotNull
@Getter
private long startTimeMillis;

@NotNull
@Getter
private String startTimeIso;

@NotNull
@Getter
private long stepMillis;

@NotNull
@Singular
@Getter
private List<Double> values;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package com.netflix.kayenta.metrics;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

public interface MetricsService {
boolean servicesAccount(String accountName);

// These are still placeholder arguments. Each metrics service will have its own set of required/optional arguments. The return type is a placeholder as well.
Optional<Map> queryMetrics(String accountName,
String instanceNamePrefix,
String intervalStartTime,
String intervalEndTime) throws IOException;
Optional<MetricSet> queryMetrics(String accountName,
String metricSetName,
String instanceNamePrefix,
String intervalStartTime,
String intervalEndTime) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import lombok.Getter;

public enum ObjectType {
METRICS("metrics", "metrics.json");
METRIC_SET("metrics", "metric_set.json");

@Getter
final String group;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,29 @@

import com.google.api.services.monitoring.v3.Monitoring;
import com.google.api.services.monitoring.v3.model.ListTimeSeriesResponse;
import com.google.api.services.monitoring.v3.model.Point;
import com.google.api.services.monitoring.v3.model.TimeSeries;
import com.netflix.kayenta.google.security.GoogleNamedAccountCredentials;
import com.netflix.kayenta.metrics.MetricSet;
import com.netflix.kayenta.metrics.MetricsService;
import com.netflix.kayenta.security.AccountCredentialsRepository;
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.Arrays;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@Builder
@Slf4j
public class StackdriverMetricsService implements MetricsService {

@NotNull
Expand All @@ -51,28 +58,87 @@ public boolean servicesAccount(String accountName) {

@Override
// These are still placeholder arguments. Each metrics service will have its own set of required/optional arguments. The return type is a placeholder as well.
public Optional<Map> queryMetrics(String accountName,
String instanceNamePrefix,
String intervalStartTime,
String intervalEndTime) throws IOException {
public Optional<MetricSet> queryMetrics(String accountName,
String metricSetName,
String instanceNamePrefix,
String intervalStartTime,
String intervalEndTime) throws IOException {
GoogleNamedAccountCredentials credentials = (GoogleNamedAccountCredentials)accountCredentialsRepository
.getOne(accountName)
.orElseThrow(() -> new IllegalArgumentException("Unable to resolve account " + accountName + "."));
Monitoring monitoring = credentials.getMonitoring();
// Some sample query parameters (mainly leaving all of these here so that I remember the api).
int alignmentPeriodSec = 3600;
ListTimeSeriesResponse response = monitoring
.projects()
.timeSeries()
.list("projects/" + credentials.getProject())
.setAggregationAlignmentPeriod("3600s")
.setAggregationAlignmentPeriod(alignmentPeriodSec + "s")
.setAggregationCrossSeriesReducer("REDUCE_MEAN")
.setAggregationGroupByFields(Arrays.asList("metric.label.instance_name"))
// Leaving this here for the time-being so I don't have to hunt for the label name later.
// .setAggregationGroupByFields(Arrays.asList("metric.label.instance_name"))
.setAggregationPerSeriesAligner("ALIGN_MEAN")
.setFilter("metric.type=\"compute.googleapis.com/instance/cpu/utilization\" AND metric.label.instance_name=starts_with(\"" + instanceNamePrefix + "\")")
.setIntervalStartTime(intervalStartTime)
.setIntervalEndTime(intervalEndTime)
.execute();

return Optional.of(response);
Instant requestStartTimeInstant = Instant.parse(intervalStartTime);
long requestStartTimeMillis = requestStartTimeInstant.toEpochMilli();
Instant requestEndTimeInstant = Instant.parse(intervalEndTime);
long requestEndTimeMillis = requestEndTimeInstant.toEpochMilli();
long elapsedSeconds = (requestEndTimeMillis - requestStartTimeMillis) / 1000;
long numIntervals = elapsedSeconds / alignmentPeriodSec;
long remainder = elapsedSeconds % alignmentPeriodSec;

if (remainder > 0) {
numIntervals++;
}

List<TimeSeries> timeSeriesList = response.getTimeSeries();

if (timeSeriesList.size() == 0) {
throw new IllegalArgumentException("No time series was returned.");
} else if (timeSeriesList.size() > 1) {
log.warn("Expected 1 time series, but {} were returned; using just the first time series.", timeSeriesList.size());
}

TimeSeries timeSeries = timeSeriesList.get(0);
List<Point> points = timeSeries.getPoints();

if (points.size() != numIntervals) {
String pointOrPoints = numIntervals == 1 ? "point" : "points";

log.warn("Expected {} data {}, but received {}.", numIntervals, pointOrPoints, points.size());
}

Collections.reverse(points);

Instant responseStartTimeInstant =
points.size() > 0 ? Instant.parse(points.get(0).getInterval().getStartTime()) : requestStartTimeInstant;
long responseStartTimeMillis = responseStartTimeInstant.toEpochMilli();

// TODO(duftler): What if there are no data points?
List<Double> pointValues =
points
.stream()
.map(point -> point.getValue().getDoubleValue())
.collect(Collectors.toList());

MetricSet.MetricSetBuilder metricSetBuilder =
MetricSet.builder()
.name(metricSetName)
.startTimeMillis(responseStartTimeMillis)
.startTimeIso(responseStartTimeInstant.toString())
.stepMillis(alignmentPeriodSec * 1000)
.values(pointValues);

Map<String, String> labels = timeSeries.getMetric().getLabels();

if (labels != null) {
metricSetBuilder.tags(labels);
}

return Optional.of(metricSetBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.kayenta.controllers;

import com.netflix.kayenta.metrics.MetricSet;
import com.netflix.kayenta.metrics.MetricsService;
import com.netflix.kayenta.metrics.MetricsServiceRepository;
import com.netflix.kayenta.security.AccountCredentials;
Expand All @@ -33,8 +34,6 @@
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -53,10 +52,11 @@ public class FetchController {
StorageServiceRepository storageServiceRepository;

@RequestMapping(method = RequestMethod.GET)
public Map queryMetrics(@RequestParam(required = false) final String accountName,
@ApiParam(defaultValue = "myapp-v010-") @RequestParam String instanceNamePrefix,
@ApiParam(defaultValue = "2017-02-24T15:13:00Z") @RequestParam String intervalStartTime,
@ApiParam(defaultValue = "2017-02-24T15:27:00Z") @RequestParam String intervalEndTime) throws IOException {
public MetricSet queryMetrics(@RequestParam(required = false) final String accountName,
@ApiParam(defaultValue = "cpu") @RequestParam String metricSetName,
@ApiParam(defaultValue = "myapp-v010-") @RequestParam String instanceNamePrefix,
@ApiParam(defaultValue = "2017-02-24T15:13:00Z") @RequestParam String intervalStartTime,
@ApiParam(defaultValue = "2017-02-24T15:27:00Z") @RequestParam String intervalEndTime) throws IOException {
AccountCredentials credentials;

if (StringUtils.hasLength(accountName)) {
Expand All @@ -71,13 +71,13 @@ public Map queryMetrics(@RequestParam(required = false) final String accountName

String resolvedAccountName = credentials.getName();
Optional<MetricsService> metricsService = metricsServiceRepository.getOne(resolvedAccountName);
Map someMetrics = null;
MetricSet someMetrics = null;

if (metricsService.isPresent()) {
someMetrics = metricsService
.get()
.queryMetrics(resolvedAccountName, instanceNamePrefix, intervalStartTime, intervalEndTime)
.orElse(Collections.singletonMap("no-metrics", "were-returned"));
.queryMetrics(resolvedAccountName, metricSetName, instanceNamePrefix, intervalStartTime, intervalEndTime)
.orElse(MetricSet.builder().name("no-metrics").build());
} else {
log.debug("No metrics service was configured; skipping placeholder logic to read from metrics store.");
}
Expand All @@ -87,7 +87,7 @@ public Map queryMetrics(@RequestParam(required = false) final String accountName
Optional<StorageService> storageService = storageServiceRepository.getOne(resolvedAccountName);

if (storageService.isPresent()) {
storageService.get().storeObject(resolvedAccountName, ObjectType.METRICS, UUID.randomUUID() + "", someMetrics);
storageService.get().storeObject(resolvedAccountName, ObjectType.METRIC_SET, UUID.randomUUID() + "", someMetrics);
} else {
log.debug("No storage service was configured; skipping placeholder logic to write to bucket.");
}
Expand Down

0 comments on commit 5cde992

Please sign in to comment.