Skip to content

Commit

Permalink
feat(stats): Adds circuit breaker logic via resilience4j
Browse files Browse the repository at this point in the history
  • Loading branch information
Travis Tomsu committed Sep 18, 2019
1 parent 54799e9 commit 8413e62
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 42 deletions.
2 changes: 0 additions & 2 deletions echo-telemetry/echo-telemetry.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

dependencies {
implementation project(':echo-model')
implementation project(':echo-notifications')
implementation 'com.google.guava:guava'
implementation 'com.google.protobuf:protobuf-java-util'
implementation "com.netflix.spinnaker.kork:kork-proto"
implementation 'com.netflix.spinnaker.kork:kork-web'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.netflix.spinnaker.kork.proto.stats.SpinnakerInstance;
import com.netflix.spinnaker.kork.proto.stats.Stage;
import com.netflix.spinnaker.kork.proto.stats.Status;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -47,6 +49,8 @@
@ConditionalOnProperty("telemetry.enabled")
public class TelemetryEventListener implements EchoEventListener {

protected static final String TELEMETRY_REGISTRY_NAME = "telemetry";

private static final Set<String> LOGGABLE_DETAIL_TYPES =
ImmutableSet.of(
"orca:orchestration:complete",
Expand All @@ -61,12 +65,16 @@ public class TelemetryEventListener implements EchoEventListener {

private final TelemetryConfig.TelemetryConfigProps telemetryConfigProps;

private final CircuitBreakerRegistry registry;

@Autowired
public TelemetryEventListener(
TelemetryService telemetryService,
TelemetryConfig.TelemetryConfigProps telemetryConfigProps) {
TelemetryConfig.TelemetryConfigProps telemetryConfigProps,
CircuitBreakerRegistry registry) {
this.telemetryService = telemetryService;
this.telemetryConfigProps = telemetryConfigProps;
this.registry = registry;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -147,8 +155,16 @@ public void processEvent(Event event) {
.build();

String content = JSON_PRINTER.print(loggedEvent);
telemetryService.log(new TypedJsonString(content));

registry
.circuitBreaker(TELEMETRY_REGISTRY_NAME)
.executeCallable(() -> telemetryService.log(new TypedJsonString(content)));
log.debug("Telemetry sent!");
} catch (CallNotPermittedException cnpe) {
log.debug(
"Telemetry not set: {} circuit breaker tripped - {}",
TELEMETRY_REGISTRY_NAME,
cnpe.getMessage());
} catch (Exception e) {
log.warn("Could not send Telemetry event {}", event, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import com.netflix.spinnaker.echo.config.TelemetryConfig
import com.netflix.spinnaker.echo.model.Event
import com.netflix.spinnaker.kork.proto.stats.*
import com.netflix.spinnaker.kork.proto.stats.Event as EventProto
import groovy.util.logging.Slf4j
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry
import retrofit.client.Response
import spock.lang.Specification
import spock.lang.Subject

@Slf4j
class TelemetryEventListenerSpec extends Specification {

def service = Mock(TelemetryService)
def registry = CircuitBreakerRegistry.ofDefaults()
def circuitBreaker = registry.circuitBreaker(TelemetryEventListener.TELEMETRY_REGISTRY_NAME)

def instanceId = "test-instance"
def spinnakerVersion = "1.2.3"
Expand All @@ -22,12 +27,52 @@ class TelemetryEventListenerSpec extends Specification {
def executionId = "execution_id"
def executionHash = "6d6de5b8d67c11fff6d817ea3e1190bc63857de0329d253b21aef6e5c6bbebf9"

Event validEvent = new Event(
details: [
type : "orca:pipeline:complete",
application: applicationName,
],
content: [
execution: [
id : executionId,
type : "PIPELINE",
status : "SUCCEEDED",
trigger: [
type: "GIT"
],
stages : [
[
type : "deploy",
status : "SUCCEEDED",
syntheticStageOwner: null,
context : [
"cloudProvider": "nine"
]
],
[
type : "removed",
syntheticStageOwner: "somethingNonNull",
status : "SUCCEEDED"
],
[
type : "wait",
status: "TERMINAL"
],
]
]
]
)

def setup() {
circuitBreaker.reset()
}

def "test Event validation"() {
given:
def configProps = new TelemetryConfig.TelemetryConfigProps()

@Subject
def listener = new TelemetryEventListener(service, configProps)
def listener = new TelemetryEventListener(service, configProps, registry)

when: "null details"
listener.processEvent(new Event())
Expand Down Expand Up @@ -68,7 +113,7 @@ class TelemetryEventListenerSpec extends Specification {
when: "no execution in content"
listener.processEvent(new Event(
details: [
type: "orca:orchestration:complete",
type : "orca:orchestration:complete",
application: "foobar",
],
content: [
Expand All @@ -87,44 +132,10 @@ class TelemetryEventListenerSpec extends Specification {
.setSpinnakerVersion(spinnakerVersion)

@Subject
def listener = new TelemetryEventListener(service, configProps)
def listener = new TelemetryEventListener(service, configProps, registry)

when:
listener.processEvent(new Event(
details: [
type : "orca:pipeline:complete",
application: applicationName,
],
content: [
execution: [
id : executionId,
type : "PIPELINE",
status : "SUCCEEDED",
trigger: [
type: "GIT"
],
stages : [
[
type : "deploy",
status: "SUCCEEDED",
syntheticStageOwner: null,
context: [
"cloudProvider": "nine"
]
],
[
type: "removed",
syntheticStageOwner: "somethingNonNull",
status: "SUCCEEDED"
],
[
type : "wait",
status: "TERMINAL"
],
]
]
]
))
listener.processEvent(validEvent)

then:
1 * service.log(_) >> { List args ->
Expand Down Expand Up @@ -169,4 +180,27 @@ class TelemetryEventListenerSpec extends Specification {
return new Response("url", 200, "", [], null)
}
}

def "test circuit breaker"() {
given:
def configProps = new TelemetryConfig.TelemetryConfigProps()
.setInstanceId(instanceId)
.setSpinnakerVersion(spinnakerVersion)

@Subject
def listener = new TelemetryEventListener(service, configProps, registry)

circuitBreaker.transitionToOpenState()
boolean eventSendAttempted = false
circuitBreaker.getEventPublisher().onCallNotPermitted { event ->
log.debug("Event send attempted, and blocked by open circuit breaker.")
eventSendAttempted = true
}

when:
listener.processEvent(validEvent)

then:
eventSendAttempted
}
}
14 changes: 14 additions & 0 deletions echo-web/config/echo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,17 @@ webhooks:
artifacts:
enabled: false
sources: []

resilience4j.circuitbreaker:
instances:
telemetry:
# Startup config...
registerHealthIndicator: true
# Warming up...
minimumNumberOfCalls: 5
slidingWindowSize: 10
slidingWindowType: COUNT_BASED
# When tripped...
waitDurationInOpenState: 12h
# Try to get back to a working state...
permittedNumberOfCallsInHalfOpenState: 1
14 changes: 14 additions & 0 deletions halconfig/echo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,17 @@ scheduler:
redis:
connection: ${services.redis.baseUrl:redis://localhost:6379}
enabled: ${services.redis.enabled:false}

resilience4j.circuitbreaker:
instances:
telemetry:
# Startup config...
registerHealthIndicator: true
# Warming up...
minimumNumberOfCalls: 5
slidingWindowSize: 10
slidingWindowType: COUNT_BASED
# When tripped...
waitDurationInOpenState: 12h
# Try to get back to a working state...
permittedNumberOfCallsInHalfOpenState: 1

0 comments on commit 8413e62

Please sign in to comment.