Skip to content

Commit

Permalink
fix(stats): Parse Event objects into structured POJOs for Telemetry e…
Browse files Browse the repository at this point in the history
…vents (#660)
  • Loading branch information
Travis Tomsu committed Sep 25, 2019
1 parent 84d7e5f commit ad0fd07
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 110 deletions.
1 change: 1 addition & 0 deletions echo-telemetry/echo-telemetry.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ dependencies {
implementation 'com.netflix.spinnaker.kork:kork-web'
implementation 'com.squareup.retrofit:retrofit'
implementation 'com.squareup.retrofit:converter-jackson'
implementation 'de.huxhorn.sulky:de.huxhorn.sulky.ulid'
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.spinnaker.retrofit.RetrofitConfigurationProperties;
import com.netflix.spinnaker.retrofit.Slf4jRetrofitLogger;
import com.squareup.okhttp.OkHttpClient;
import de.huxhorn.sulky.ulid.ULID;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -72,7 +73,7 @@ public static class TelemetryConfigProps {

boolean enabled = false;
String endpoint = DEFAULT_TELEMETRY_ENDPOINT;
String instanceId;
String instanceId = new ULID().nextULID();
String spinnakerVersion = "unknown";
int connectionTimeoutMillis = 3000;
int readTimeoutMillis = 5000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@

package com.netflix.spinnaker.echo.telemetry;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.Hashing;
import com.google.protobuf.Descriptors.EnumDescriptor;
import com.google.protobuf.Descriptors.EnumValueDescriptor;
import com.google.protobuf.util.JsonFormat;
import com.netflix.spinnaker.echo.config.TelemetryConfig;
import com.netflix.spinnaker.echo.events.EchoEventListener;
Expand All @@ -32,13 +38,14 @@
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
Expand All @@ -59,7 +66,10 @@ public class TelemetryEventListener implements EchoEventListener {
"orca:pipeline:failed");

private static final JsonFormat.Printer JSON_PRINTER =
JsonFormat.printer().omittingInsignificantWhitespace();
JsonFormat.printer().includingDefaultValueFields();

private static final ObjectMapper objectMapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private final TelemetryService telemetryService;

Expand All @@ -77,88 +87,84 @@ public TelemetryEventListener(
this.registry = registry;
}

@SuppressWarnings("unchecked")
@Override
public void processEvent(Event event) {
try {
if (event.getDetails() == null || event.getContent() == null) {
log.debug("Telemetry not sent: Details or content not found in event");
return;
}

String eventType = event.getDetails().getType();
if (!LOGGABLE_DETAIL_TYPES.contains(eventType)) {
log.debug("Telemetry not sent: type '{}' not whitelisted ", eventType);
return;
}

String applicationId = event.getDetails().getApplication();
if (applicationId == null || applicationId.isEmpty()) {
log.debug("Application ID must be non-null and not empty");
return;
}
if (event.getDetails() == null || event.getContent() == null) {
log.debug("Telemetry not sent: Details or content not found in event");
return;
}

Map<String, Object> execution = (Map<String, Object>) event.getContent().get("execution");
if (execution == null || execution.isEmpty()) {
log.debug("Missing execution from Event content.");
return;
}
String eventType = event.getDetails().getType();
if (!LOGGABLE_DETAIL_TYPES.contains(eventType)) {
log.debug("Telemetry not sent: type '{}' not whitelisted ", eventType);
return;
}

String hashedApplicationId = hash(applicationId);
Execution.Type executionType =
Execution.Type.valueOf(
// TODO(ttomsu, louisjimenez): Add MPTv1 and v2 execution type detection.
execution.getOrDefault("type", "UNKNOWN").toString().toUpperCase());
Status executionStatus =
Status.valueOf(execution.getOrDefault("status", "UNKNOWN").toString().toUpperCase());

Map<String, Object> trigger =
(Map<String, Object>) execution.getOrDefault("trigger", new HashMap<>());
Execution.Trigger.Type triggerType =
Execution.Trigger.Type.valueOf(
trigger.getOrDefault("type", "UNKNOWN").toString().toUpperCase());

List<Map<String, Object>> stages =
(List<Map<String, Object>>) execution.getOrDefault("stages", new ArrayList<>());
List<Stage> protoStages =
stages.stream()
.map(this::toStage)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

Execution.Builder executionBuilder =
Execution.newBuilder()
.setType(executionType)
.setStatus(executionStatus)
.setTrigger(Execution.Trigger.newBuilder().setType(triggerType))
.addAllStages(protoStages);
String executionId = execution.getOrDefault("id", "").toString();
if (!executionId.isEmpty()) {
executionBuilder.setId(hash(executionId));
}
Execution executionProto = executionBuilder.build();
String applicationId = event.getDetails().getApplication();
if (applicationId == null || applicationId.isEmpty()) {
log.debug("Application ID must be non-null and not empty");
return;
}
String hashedApplicationId = hash(applicationId);

Holder.Content content = objectMapper.convertValue(event.getContent(), Holder.Content.class);
Holder.Execution execution = content.getExecution();

// TODO(ttomsu, louisjimenez): Add MPTv1 and v2 execution type detection.
Execution.Type executionType =
Execution.Type.valueOf(
parseEnum(Execution.Type.getDescriptor(), execution.getType().toUpperCase()));

Status executionStatus =
Status.valueOf(parseEnum(Status.getDescriptor(), execution.getStatus().toUpperCase()));

Execution.Trigger.Type triggerType =
Execution.Trigger.Type.valueOf(
parseEnum(
Execution.Trigger.Type.getDescriptor(),
execution.getTrigger().getType().toUpperCase()));

List<Stage> protoStages =
execution.getStages().stream()
.map(this::toStage)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

Execution.Builder executionBuilder =
Execution.newBuilder()
.setType(executionType)
.setStatus(executionStatus)
.setTrigger(Execution.Trigger.newBuilder().setType(triggerType))
.addAllStages(protoStages);
String executionId = execution.getId();
if (!executionId.isEmpty()) {
executionBuilder.setId(hash(executionId));
}
Execution executionProto = executionBuilder.build();

Application application = Application.newBuilder().setId(hashedApplicationId).build();
Application application = Application.newBuilder().setId(hashedApplicationId).build();

SpinnakerInstance spinnakerInstance =
SpinnakerInstance.newBuilder()
.setId(telemetryConfigProps.getInstanceId())
.setVersion(telemetryConfigProps.getSpinnakerVersion())
.build();
SpinnakerInstance spinnakerInstance =
SpinnakerInstance.newBuilder()
.setId(telemetryConfigProps.getInstanceId())
.setVersion(telemetryConfigProps.getSpinnakerVersion())
.build();

com.netflix.spinnaker.kork.proto.stats.Event loggedEvent =
com.netflix.spinnaker.kork.proto.stats.Event.newBuilder()
.setSpinnakerInstance(spinnakerInstance)
.setApplication(application)
.setExecution(executionProto)
.build();
com.netflix.spinnaker.kork.proto.stats.Event loggedEvent =
com.netflix.spinnaker.kork.proto.stats.Event.newBuilder()
.setSpinnakerInstance(spinnakerInstance)
.setApplication(application)
.setExecution(executionProto)
.build();

String content = JSON_PRINTER.print(loggedEvent);
try {
String jsonContent = JSON_PRINTER.print(loggedEvent);
log.debug("Sending telemetry event:\n{}", jsonContent);

registry
.circuitBreaker(TELEMETRY_REGISTRY_NAME)
.executeCallable(() -> telemetryService.log(new TypedJsonString(content)));
.executeCallable(() -> telemetryService.log(new TypedJsonString(jsonContent)));
log.debug("Telemetry sent!");
} catch (CallNotPermittedException cnpe) {
log.debug(
Expand All @@ -170,29 +176,21 @@ public void processEvent(Event event) {
}
}

@SuppressWarnings("unchecked")
private Optional<Stage> toStage(Map<String, Object> stage) {
private Optional<Stage> toStage(Holder.Stage stage) {
// Only interested in user-configured stages.
if (stage.get("syntheticStageOwner") != null
&& !stage.get("syntheticStageOwner").toString().isEmpty()) {
if (stage.isSyntheticStage()) {
log.debug("Discarding synthetic stage");
return Optional.empty();
}

Status status =
Status.valueOf(stage.getOrDefault("status", "UNKNOWN").toString().toUpperCase());
Stage.Builder stageBuilder =
Stage.newBuilder()
.setType(stage.getOrDefault("type", "unknown").toString())
.setStatus(status);

Map<String, Object> context =
(Map<String, Object>) stage.getOrDefault("context", new HashMap<>());
if (context.containsKey("cloudProvider")) {
String cloudProvider = context.get("cloudProvider").toString();
stageBuilder.setCloudProvider(CloudProvider.newBuilder().setId(cloudProvider).build());
Status stageStatus =
Status.valueOf(parseEnum(Status.getDescriptor(), stage.getStatus().toUpperCase()));
Stage.Builder stageBuilder = Stage.newBuilder().setType(stage.getType()).setStatus(stageStatus);

String cloudProvider = stage.getContext().getCloudProvider();
if (StringUtils.isNotEmpty(cloudProvider)) {
// TODO(ttomsu): Figure out how to detect Kubernetes "flavor" - i.e. GKE, EKS, vanilla, etc.
stageBuilder.setCloudProvider(CloudProvider.newBuilder().setId(cloudProvider).build());
}

return Optional.of(stageBuilder.build());
Expand All @@ -202,6 +200,14 @@ private String hash(String clearText) {
return Hashing.sha256().hashString(clearText, StandardCharsets.UTF_8).toString();
}

private static EnumValueDescriptor parseEnum(EnumDescriptor ed, String value) {
EnumValueDescriptor evd = ed.findValueByName(value);
if (evd == null) {
return ed.getValues().get(0); // Default to first if unrecognized, which should be UNKNOWN.
}
return evd;
}

static class TypedJsonString extends TypedString {
TypedJsonString(String body) {
super(body);
Expand All @@ -217,4 +223,71 @@ public String toString() {
return new String(getBytes(), StandardCharsets.UTF_8);
}
}

// Arbitrary container for TelemetryEventListener structured data, so there aren't huge
// fully-qualified names for the equivalent classes in the kork.proto package all over the code.
public static class Holder {

@Getter
@Builder
@JsonDeserialize(builder = Content.ContentBuilder.class)
public static class Content {
@Builder.Default private final Execution execution = Execution.builder().build();

@JsonPOJOBuilder(withPrefix = "")
public static class ContentBuilder {}
}

@Getter
@Builder
@JsonDeserialize(builder = Execution.ExecutionBuilder.class)
public static class Execution {
@Builder.Default private final String id = "";
@Builder.Default private final String type = "UNKNOWN";
@Builder.Default private final String status = "UNKNOWN";
@Builder.Default private final Trigger trigger = Trigger.builder().build();
@Builder.Default private final List<Stage> stages = new ArrayList<>();

@JsonPOJOBuilder(withPrefix = "")
public static class ExecutionBuilder {}
}

@Getter
@Builder
@JsonDeserialize(builder = Trigger.TriggerBuilder.class)
public static class Trigger {

@Builder.Default private final String type = "UNKNOWN";

@JsonPOJOBuilder(withPrefix = "")
public static class TriggerBuilder {}
}

@Getter
@Builder
@JsonDeserialize(builder = Stage.StageBuilder.class)
public static class Stage {
@Builder.Default private final String status = "UNKNOWN";
@Builder.Default private final String type = "UNKNOWN";
@Builder.Default private final Context context = Context.builder().build();
private final String syntheticStageOwner;

public boolean isSyntheticStage() {
return StringUtils.isNotEmpty(syntheticStageOwner);
}

@JsonPOJOBuilder(withPrefix = "")
public static class StageBuilder {}
}

@Getter
@Builder
@JsonDeserialize(builder = Context.ContextBuilder.class)
public static class Context {
private final String cloudProvider;

@JsonPOJOBuilder(withPrefix = "")
public static class ContextBuilder {}
}
}
}
Loading

0 comments on commit ad0fd07

Please sign in to comment.