From 8413e629d5d3cfad83778afd5d8c00ecf7f81328 Mon Sep 17 00:00:00 2001 From: Travis Tomsu Date: Mon, 16 Sep 2019 14:05:11 -0400 Subject: [PATCH] feat(stats): Adds circuit breaker logic via resilience4j --- echo-telemetry/echo-telemetry.gradle | 2 - .../telemetry/TelemetryEventListener.java | 20 +++- .../TelemetryEventListenerSpec.groovy | 110 ++++++++++++------ echo-web/config/echo.yml | 14 +++ halconfig/echo.yml | 14 +++ 5 files changed, 118 insertions(+), 42 deletions(-) diff --git a/echo-telemetry/echo-telemetry.gradle b/echo-telemetry/echo-telemetry.gradle index 988851e22..d07e93ead 100644 --- a/echo-telemetry/echo-telemetry.gradle +++ b/echo-telemetry/echo-telemetry.gradle @@ -16,8 +16,6 @@ dependencies { implementation project(':echo-model') - implementation project(':echo-notifications') - implementation 'com.google.guava:guava' implementation 'com.google.protobuf:protobuf-java-util' implementation "com.netflix.spinnaker.kork:kork-proto" implementation 'com.netflix.spinnaker.kork:kork-web' diff --git a/echo-telemetry/src/main/java/com/netflix/spinnaker/echo/telemetry/TelemetryEventListener.java b/echo-telemetry/src/main/java/com/netflix/spinnaker/echo/telemetry/TelemetryEventListener.java index 638a35b58..fc9206c10 100644 --- a/echo-telemetry/src/main/java/com/netflix/spinnaker/echo/telemetry/TelemetryEventListener.java +++ b/echo-telemetry/src/main/java/com/netflix/spinnaker/echo/telemetry/TelemetryEventListener.java @@ -28,6 +28,8 @@ import com.netflix.spinnaker.kork.proto.stats.SpinnakerInstance; import com.netflix.spinnaker.kork.proto.stats.Stage; import com.netflix.spinnaker.kork.proto.stats.Status; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; @@ -47,6 +49,8 @@ @ConditionalOnProperty("telemetry.enabled") public class TelemetryEventListener implements EchoEventListener { + protected static final String TELEMETRY_REGISTRY_NAME = "telemetry"; + private static final Set LOGGABLE_DETAIL_TYPES = ImmutableSet.of( "orca:orchestration:complete", @@ -61,12 +65,16 @@ public class TelemetryEventListener implements EchoEventListener { private final TelemetryConfig.TelemetryConfigProps telemetryConfigProps; + private final CircuitBreakerRegistry registry; + @Autowired public TelemetryEventListener( TelemetryService telemetryService, - TelemetryConfig.TelemetryConfigProps telemetryConfigProps) { + TelemetryConfig.TelemetryConfigProps telemetryConfigProps, + CircuitBreakerRegistry registry) { this.telemetryService = telemetryService; this.telemetryConfigProps = telemetryConfigProps; + this.registry = registry; } @SuppressWarnings("unchecked") @@ -147,8 +155,16 @@ public void processEvent(Event event) { .build(); String content = JSON_PRINTER.print(loggedEvent); - telemetryService.log(new TypedJsonString(content)); + + registry + .circuitBreaker(TELEMETRY_REGISTRY_NAME) + .executeCallable(() -> telemetryService.log(new TypedJsonString(content))); log.debug("Telemetry sent!"); + } catch (CallNotPermittedException cnpe) { + log.debug( + "Telemetry not set: {} circuit breaker tripped - {}", + TELEMETRY_REGISTRY_NAME, + cnpe.getMessage()); } catch (Exception e) { log.warn("Could not send Telemetry event {}", event, e); } diff --git a/echo-telemetry/src/test/groovy/com/netflix/spinnaker/echo/telemetry/TelemetryEventListenerSpec.groovy b/echo-telemetry/src/test/groovy/com/netflix/spinnaker/echo/telemetry/TelemetryEventListenerSpec.groovy index ce54bafb7..846497c34 100644 --- a/echo-telemetry/src/test/groovy/com/netflix/spinnaker/echo/telemetry/TelemetryEventListenerSpec.groovy +++ b/echo-telemetry/src/test/groovy/com/netflix/spinnaker/echo/telemetry/TelemetryEventListenerSpec.groovy @@ -5,13 +5,18 @@ import com.netflix.spinnaker.echo.config.TelemetryConfig import com.netflix.spinnaker.echo.model.Event import com.netflix.spinnaker.kork.proto.stats.* import com.netflix.spinnaker.kork.proto.stats.Event as EventProto +import groovy.util.logging.Slf4j +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry import retrofit.client.Response import spock.lang.Specification import spock.lang.Subject +@Slf4j class TelemetryEventListenerSpec extends Specification { def service = Mock(TelemetryService) + def registry = CircuitBreakerRegistry.ofDefaults() + def circuitBreaker = registry.circuitBreaker(TelemetryEventListener.TELEMETRY_REGISTRY_NAME) def instanceId = "test-instance" def spinnakerVersion = "1.2.3" @@ -22,12 +27,52 @@ class TelemetryEventListenerSpec extends Specification { def executionId = "execution_id" def executionHash = "6d6de5b8d67c11fff6d817ea3e1190bc63857de0329d253b21aef6e5c6bbebf9" + Event validEvent = new Event( + details: [ + type : "orca:pipeline:complete", + application: applicationName, + ], + content: [ + execution: [ + id : executionId, + type : "PIPELINE", + status : "SUCCEEDED", + trigger: [ + type: "GIT" + ], + stages : [ + [ + type : "deploy", + status : "SUCCEEDED", + syntheticStageOwner: null, + context : [ + "cloudProvider": "nine" + ] + ], + [ + type : "removed", + syntheticStageOwner: "somethingNonNull", + status : "SUCCEEDED" + ], + [ + type : "wait", + status: "TERMINAL" + ], + ] + ] + ] + ) + + def setup() { + circuitBreaker.reset() + } + def "test Event validation"() { given: def configProps = new TelemetryConfig.TelemetryConfigProps() @Subject - def listener = new TelemetryEventListener(service, configProps) + def listener = new TelemetryEventListener(service, configProps, registry) when: "null details" listener.processEvent(new Event()) @@ -68,7 +113,7 @@ class TelemetryEventListenerSpec extends Specification { when: "no execution in content" listener.processEvent(new Event( details: [ - type: "orca:orchestration:complete", + type : "orca:orchestration:complete", application: "foobar", ], content: [ @@ -87,44 +132,10 @@ class TelemetryEventListenerSpec extends Specification { .setSpinnakerVersion(spinnakerVersion) @Subject - def listener = new TelemetryEventListener(service, configProps) + def listener = new TelemetryEventListener(service, configProps, registry) when: - listener.processEvent(new Event( - details: [ - type : "orca:pipeline:complete", - application: applicationName, - ], - content: [ - execution: [ - id : executionId, - type : "PIPELINE", - status : "SUCCEEDED", - trigger: [ - type: "GIT" - ], - stages : [ - [ - type : "deploy", - status: "SUCCEEDED", - syntheticStageOwner: null, - context: [ - "cloudProvider": "nine" - ] - ], - [ - type: "removed", - syntheticStageOwner: "somethingNonNull", - status: "SUCCEEDED" - ], - [ - type : "wait", - status: "TERMINAL" - ], - ] - ] - ] - )) + listener.processEvent(validEvent) then: 1 * service.log(_) >> { List args -> @@ -169,4 +180,27 @@ class TelemetryEventListenerSpec extends Specification { return new Response("url", 200, "", [], null) } } + + def "test circuit breaker"() { + given: + def configProps = new TelemetryConfig.TelemetryConfigProps() + .setInstanceId(instanceId) + .setSpinnakerVersion(spinnakerVersion) + + @Subject + def listener = new TelemetryEventListener(service, configProps, registry) + + circuitBreaker.transitionToOpenState() + boolean eventSendAttempted = false + circuitBreaker.getEventPublisher().onCallNotPermitted { event -> + log.debug("Event send attempted, and blocked by open circuit breaker.") + eventSendAttempted = true + } + + when: + listener.processEvent(validEvent) + + then: + eventSendAttempted + } } diff --git a/echo-web/config/echo.yml b/echo-web/config/echo.yml index 4d1d5c676..0a51117ce 100644 --- a/echo-web/config/echo.yml +++ b/echo-web/config/echo.yml @@ -24,3 +24,17 @@ webhooks: artifacts: enabled: false sources: [] + +resilience4j.circuitbreaker: + instances: + telemetry: + # Startup config... + registerHealthIndicator: true + # Warming up... + minimumNumberOfCalls: 5 + slidingWindowSize: 10 + slidingWindowType: COUNT_BASED + # When tripped... + waitDurationInOpenState: 12h + # Try to get back to a working state... + permittedNumberOfCallsInHalfOpenState: 1 diff --git a/halconfig/echo.yml b/halconfig/echo.yml index 0eb13d797..8799c5820 100644 --- a/halconfig/echo.yml +++ b/halconfig/echo.yml @@ -34,3 +34,17 @@ scheduler: redis: connection: ${services.redis.baseUrl:redis://localhost:6379} enabled: ${services.redis.enabled:false} + +resilience4j.circuitbreaker: + instances: + telemetry: + # Startup config... + registerHealthIndicator: true + # Warming up... + minimumNumberOfCalls: 5 + slidingWindowSize: 10 + slidingWindowType: COUNT_BASED + # When tripped... + waitDurationInOpenState: 12h + # Try to get back to a working state... + permittedNumberOfCallsInHalfOpenState: 1