Skip to content

Commit

Permalink
Adding support to custom threshold and adding some tweaks to use para…
Browse files Browse the repository at this point in the history
…llel stream, instead of standard for loops. #7
  • Loading branch information
mtakaki committed Apr 21, 2016
1 parent dc23361 commit d577485
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 75 deletions.
Expand Up @@ -6,6 +6,7 @@
import com.codahale.metrics.MetricRegistry;
import com.github.mtakaki.dropwizard.circuitbreaker.jersey.CircuitBreaker;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
Expand All @@ -21,6 +22,17 @@
*
*/
public class CircuitBreakerManager {
/**
* Holds the {@link Meter} and the threshold. The threshold can either be a
* custom threshold or the default one.
*/
@AllArgsConstructor
@Getter
private static class MeterThreshold {
private final Meter meter;
private final double threshold;
}

/**
* The rate used to determine if the circuit is opened or not.
*/
Expand All @@ -38,18 +50,18 @@ public static interface Operation {
public void accept(final Meter meter) throws OperationException;
}

private final ConcurrentHashMap<String, Meter> circuitBreakerMap;
private final ConcurrentHashMap<String, MeterThreshold> circuitBreakerMap;
private final MetricRegistry metricRegistry;
@Getter
private final double threshold;
private final double defaultThreshold;
@Getter
private final RateType rateType;

/**
* Build a new instance of {@link CircuitBreakerManager}. This instance will
* be thread-safe, so it should be shared among different threads.
*
* @param threshold
* @param defaultThreshold
* The threshold that will determine if a circuit should be
* opened or not. This threshold unit is requests per second.
* @param metricRegistry
Expand All @@ -59,31 +71,49 @@ public static interface Operation {
* The rate unit used to determining if the circuit is opened or
* not.
*/
public CircuitBreakerManager(final MetricRegistry metricRegistry, final double threshold,
public CircuitBreakerManager(final MetricRegistry metricRegistry, final double defaultThreshold,
final RateType rateType) {
this.circuitBreakerMap = new ConcurrentHashMap<>();
this.threshold = threshold;
this.defaultThreshold = defaultThreshold;
this.metricRegistry = metricRegistry;
this.rateType = rateType;
}

/**
* Will retrieve, or build if it doesn't exist yet, the {@link Meter} that
* backs the circuit with the given name.
* backs the circuit with the given name. If the circuit breaker doesn't
* exist yet, it will use the default threshold. If you want to use a custom
* threshold, use the other {@code getMeter()} method.
*
* @param name
* The circuit name.
* @return The meter that belongs to the circuit with the given name.
*/
public Meter getMeter(final String name) {
Meter meter = this.circuitBreakerMap.get(name);
return this.getMeter(name, this.defaultThreshold);
}

/**
* Will retrieve, or build if it doesn't exist yet, the {@link Meter} that
* backs the circuit with the given name.
*
* @param name
* The circuit name.
* @param threshold
* The circuit breaker custom threshold, so it won't use the
* default one.
* @return The meter that belongs to the circuit with the given name.
*/
public Meter getMeter(final String name, final double threshold) {
MeterThreshold meterThreshold = this.circuitBreakerMap.get(name);

if (meter == null) {
meter = this.metricRegistry.meter(name);
this.circuitBreakerMap.put(name, meter);
if (meterThreshold == null) {
final Meter meter = this.metricRegistry.meter(name);
meterThreshold = new MeterThreshold(meter, threshold);
this.circuitBreakerMap.put(name, meterThreshold);
}

return meter;
return meterThreshold.getMeter();
}

/**
Expand All @@ -97,7 +127,8 @@ public Meter getMeter(final String name) {
* @throws OperationException
* The exception thrown from the given code block.
*/
public void wrapCodeBlock(final String name, final Operation codeBlock) throws OperationException {
public void wrapCodeBlock(final String name, final Operation codeBlock)
throws OperationException {
final Meter exceptionMeter = this.getMeter(name);

try {
Expand Down Expand Up @@ -148,13 +179,13 @@ public boolean isCircuitOpen(final String name) {

switch (this.rateType) {
case MEAN:
return exceptionMeter.getMeanRate() >= this.threshold;
return exceptionMeter.getMeanRate() >= this.defaultThreshold;
case ONE_MINUTE:
return exceptionMeter.getOneMinuteRate() >= this.threshold;
return exceptionMeter.getOneMinuteRate() >= this.defaultThreshold;
case FIVE_MINUTES:
return exceptionMeter.getFiveMinuteRate() >= this.threshold;
return exceptionMeter.getFiveMinuteRate() >= this.defaultThreshold;
case FIFTEEN_MINUTES:
return exceptionMeter.getFifteenMinuteRate() >= this.threshold;
return exceptionMeter.getFifteenMinuteRate() >= this.defaultThreshold;
default:
return false;
}
Expand Down
Expand Up @@ -19,4 +19,6 @@
@Documented
public @interface CircuitBreaker {
String name() default "";

double threshold() default 0d;
}
Expand Up @@ -2,6 +2,7 @@

import java.lang.reflect.Method;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -12,7 +13,6 @@

import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.server.model.Invocable;
import org.glassfish.jersey.server.model.Resource;
import org.glassfish.jersey.server.model.ResourceMethod;
import org.glassfish.jersey.server.monitoring.ApplicationEvent;
import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
Expand Down Expand Up @@ -50,47 +50,62 @@ private static class CircuitBreakerEventListener implements RequestEventListener

@Override
public void onEvent(final RequestEvent event) {
final Optional<String> circuitName = this.eventListener
.getCircuitBreakerName(event.getUriInfo().getMatchedResourceMethod());

circuitName.ifPresent(actualCircuitName -> {
if (event.getType() == RequestEvent.Type.RESOURCE_METHOD_START
&& this.circuitBreakerManager.isCircuitOpen(actualCircuitName)) {
this.meterMap.get(actualCircuitName + OPEN_CIRCUIT_SUFFIX).mark();
log.warn("Circuit breaker open, returning 503 Service Unavailable: "
+ actualCircuitName + OPEN_CIRCUIT_SUFFIX);
throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
} else if (event.getType() == RequestEvent.Type.ON_EXCEPTION
&& !this.circuitBreakerManager.isCircuitOpen(actualCircuitName)) {
this.meterMap.get(actualCircuitName).mark();
}
});
switch (event.getType()) {
case RESOURCE_METHOD_START:
this.eventListener
.getCircuitBreakerName(event.getUriInfo().getMatchedResourceMethod())
.ifPresent(actualCircuitName -> {
if (this.circuitBreakerManager.isCircuitOpen(actualCircuitName)) {
this.meterMap.get(actualCircuitName + OPEN_CIRCUIT_SUFFIX).mark();
log.warn("Circuit breaker open, returning 503 Service Unavailable: "
+ actualCircuitName + OPEN_CIRCUIT_SUFFIX);
throw new WebApplicationException(
Response.Status.SERVICE_UNAVAILABLE);
}
});
break;
case ON_EXCEPTION:
this.eventListener
.getCircuitBreakerName(event.getUriInfo().getMatchedResourceMethod())
.ifPresent(actualCircuitName -> {
if (!this.circuitBreakerManager.isCircuitOpen(actualCircuitName)) {
this.meterMap.get(actualCircuitName).mark();
}
});
break;
default:
break;
}
}
}

private final ConcurrentMap<String, Meter> meterMap = new ConcurrentHashMap<>();
private final MetricRegistry metricRegistry;
private final CircuitBreakerManager circuitBreaker;
private final ConcurrentMap<String, Meter> meterMap = new ConcurrentHashMap<>();
private final Timer requestOverheadTimer;
private final double defaultThreshold;

CircuitBreakerApplicationEventListener(final MetricRegistry metricRegistry,
final CircuitBreakerManager circuitBreaker) {
this.metricRegistry = metricRegistry;
this.circuitBreaker = circuitBreaker;
this.requestOverheadTimer = metricRegistry.timer(MetricRegistry
.name(CircuitBreakerApplicationEventListener.class, "getCircuitBreakerName"));
this.defaultThreshold = circuitBreaker.getDefaultThreshold();
}

@Override
public void onEvent(final ApplicationEvent event) {
if (event.getType() == ApplicationEvent.Type.INITIALIZATION_APP_FINISHED) {
for (final Resource resource : event.getResourceModel().getResources()) {
this.registerCircuitBreakerAnnotations(resource.getAllMethods());

for (final Resource childResource : resource.getChildResources()) {
this.registerCircuitBreakerAnnotations(childResource.getAllMethods());
}
}
event.getResourceModel().getResources().parallelStream()
.filter(Objects::nonNull)
.forEach(resource -> {
this.registerCircuitBreakerAnnotations(resource.getAllMethods());

resource.getChildResources().parallelStream().forEach(childResource -> {
this.registerCircuitBreakerAnnotations(childResource.getAllMethods());
});
});
}
}

Expand All @@ -103,20 +118,22 @@ public void onEvent(final ApplicationEvent event) {
* failures.
*/
private void registerCircuitBreakerAnnotations(final List<ResourceMethod> resourceMethods) {
for (final ResourceMethod resourceMethod : resourceMethods) {
this.registerCircuitBreakerAnnotations(resourceMethod);
}
}

private void registerCircuitBreakerAnnotations(final ResourceMethod resourceMethod) {
final Optional<String> circuitName = this.getCircuitBreakerName(resourceMethod);

if (circuitName.isPresent()) {
final String actualCircuitName = circuitName.get();
this.meterMap.put(actualCircuitName, this.circuitBreaker.getMeter(actualCircuitName));
this.meterMap.put(actualCircuitName + OPEN_CIRCUIT_SUFFIX,
this.metricRegistry.meter(actualCircuitName + OPEN_CIRCUIT_SUFFIX));
}
resourceMethods.parallelStream()
.filter(Objects::nonNull)
.forEach(resourceMethod -> {
this.getCircuitBreakerName(resourceMethod)
.ifPresent(actualCircuitName -> {
this.meterMap.put(actualCircuitName,
this.circuitBreaker.getMeter(
actualCircuitName,
this.getThreshold(resourceMethod)));

final String openCircuitName = new StringBuilder(actualCircuitName)
.append(OPEN_CIRCUIT_SUFFIX).toString();
this.meterMap.put(openCircuitName,
this.metricRegistry.meter(openCircuitName));
});
});
}

/**
Expand All @@ -131,10 +148,6 @@ private void registerCircuitBreakerAnnotations(final ResourceMethod resourceMeth
* {@code Optional.empty()} if it's not annotated.
*/
private Optional<String> getCircuitBreakerName(final ResourceMethod resourceMethod) {
if (resourceMethod == null) {
return Optional.empty();
}

try (Timer.Context context = this.requestOverheadTimer.time()) {
final Invocable invocable = resourceMethod.getInvocable();
Method method = invocable.getDefinitionMethod();
Expand All @@ -155,6 +168,25 @@ private Optional<String> getCircuitBreakerName(final ResourceMethod resourceMeth
}
}

private double getThreshold(final ResourceMethod resourceMethod) {
final Invocable invocable = resourceMethod.getInvocable();
Method method = invocable.getDefinitionMethod();
CircuitBreaker circuitBreaker = method.getAnnotation(CircuitBreaker.class);

// In case it's a child class with a parent method annotated.
if (circuitBreaker == null) {
method = invocable.getHandlingMethod();
circuitBreaker = method.getAnnotation(CircuitBreaker.class);
}

if (circuitBreaker != null) {
final double customThreshold = circuitBreaker.threshold();
return customThreshold > 0d ? customThreshold : this.defaultThreshold;
} else {
return this.defaultThreshold;
}
}

/**
* Builds the {@link Meter} name using the given class and method name.
*
Expand Down
Expand Up @@ -26,7 +26,8 @@
public class CircuitBreakerManagerTest {
private static final String METER_NAME = "test.meter";
// 1 request per second.
private static double DEFAULT_THRESHOLD = 2D;
private static final double DEFAULT_THRESHOLD = 2D;
private static final double CUSTOM_THRESHOLD = 3D;

private CircuitBreakerManager circuitBreaker;

Expand Down Expand Up @@ -60,6 +61,19 @@ public void testGetMeterWithExistingMeter() {
assertThat(sameMeter.getCount()).isEqualTo(0L);
}

/**
* Testing {@code getMeter()} will build a new {@link Meter} the first time
* the name is passed to it. We can't really easily test that the custom
* threshold is actually set.
*/
@Test
public void testGetMeterWithCustomThreshold() {
final Meter meter = this.circuitBreaker.getMeter(METER_NAME, CUSTOM_THRESHOLD);

assertThat(meter).isNotNull();
assertThat(meter.getCount()).isEqualTo(0L);
}

/**
* Testing that when we call {@code isCircuitOpen()} with a new meter name
* it will return {@code false}.
Expand Down

0 comments on commit d577485

Please sign in to comment.