Skip to content

Commit

Permalink
Create AsyncRetryMetrics Bridge (ReactiveX#296)
Browse files Browse the repository at this point in the history
* Add AsyncRateLimiterMetrics

* Cleanup imports and docstring

* Rename ofRateLimiter to ofAsyncRetry

* Add tests around AsyncRetry

* Remove async method from HelloWorldService

* Formatting

* Individual imports

* rearrange imports

* Add license headers and test

* cleanup headers???

* Test failed futures instead of throw

* Remove throws statements

* Update documentation
  • Loading branch information
mgodave authored and RobWin committed Dec 6, 2018
1 parent 50c6f08 commit 8a0f656
Show file tree
Hide file tree
Showing 6 changed files with 538 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
*
* Copyright 2018 David Rusek
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.metrics;

import com.codahale.metrics.*;
import io.github.resilience4j.retry.AsyncRetry;
import io.github.resilience4j.retry.AsyncRetryRegistry;
import io.vavr.collection.Array;

import java.util.Map;

import static com.codahale.metrics.MetricRegistry.name;
import static io.github.resilience4j.retry.utils.MetricNames.*;
import static java.util.Objects.requireNonNull;

/**
* An adapter which exports {@link AsyncRetry.Metrics} as Dropwizard Metrics Gauges.
*/
public class AsyncRetryMetrics implements MetricSet {

private final MetricRegistry metricRegistry = new MetricRegistry();

private AsyncRetryMetrics(Iterable<AsyncRetry> retries){
this(DEFAULT_PREFIX, retries);
}

private AsyncRetryMetrics(String prefix, Iterable<AsyncRetry> retries){
requireNonNull(prefix);
requireNonNull(retries);
retries.forEach(retry -> {
String name = retry.getName();

metricRegistry.register(name(prefix, name, SUCCESSFUL_CALLS_WITHOUT_RETRY),
(Gauge<Long>) () -> retry.getMetrics().getNumberOfSuccessfulCallsWithoutRetryAttempt());
metricRegistry.register(name(prefix, name, SUCCESSFUL_CALLS_WITH_RETRY),
(Gauge<Long>) () -> retry.getMetrics().getNumberOfSuccessfulCallsWithRetryAttempt());
metricRegistry.register(name(prefix, name, FAILED_CALLS_WITHOUT_RETRY),
(Gauge<Long>) () -> retry.getMetrics().getNumberOfFailedCallsWithoutRetryAttempt());
metricRegistry.register(name(prefix, name, FAILED_CALLS_WITH_RETRY),
(Gauge<Long>) () -> retry.getMetrics().getNumberOfFailedCallsWithRetryAttempt());
});
}

public static AsyncRetryMetrics ofAsyncRetryRegistry(String prefix, AsyncRetryRegistry retryRegistry) {
return new AsyncRetryMetrics(prefix, retryRegistry.getAllRetries());
}

public static AsyncRetryMetrics ofAsyncRetryRegistry(AsyncRetryRegistry retryRegistry) {
return new AsyncRetryMetrics(retryRegistry.getAllRetries());
}

public static AsyncRetryMetrics ofIterable(String prefix, Iterable<AsyncRetry> retries) {
return new AsyncRetryMetrics(prefix, retries);
}

public static AsyncRetryMetrics ofIterable(Iterable<AsyncRetry> retries) {
return new AsyncRetryMetrics(retries);
}

public static AsyncRetryMetrics ofAsyncRetry(AsyncRetry retry) {
return new AsyncRetryMetrics(Array.of(retry));
}

@Override
public Map<String, Metric> getMetrics() {
return metricRegistry.getMetrics();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
*
* Copyright 2018 David Rusek
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.metrics;

import javax.xml.ws.WebServiceException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.codahale.metrics.MetricRegistry;
import io.github.resilience4j.retry.AsyncRetry;
import io.github.resilience4j.retry.AsyncRetryRegistry;
import io.github.resilience4j.test.AsyncHelloWorldService;
import io.vavr.control.Try;
import org.junit.Before;
import org.junit.Test;
import org.mockito.BDDMockito;

import static io.github.resilience4j.retry.utils.MetricNames.FAILED_CALLS_WITHOUT_RETRY;
import static io.github.resilience4j.retry.utils.MetricNames.FAILED_CALLS_WITH_RETRY;
import static io.github.resilience4j.retry.utils.MetricNames.SUCCESSFUL_CALLS_WITHOUT_RETRY;
import static io.github.resilience4j.retry.utils.MetricNames.SUCCESSFUL_CALLS_WITH_RETRY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

public class AsyncRetryMetricsTest {

private MetricRegistry metricRegistry;
private AsyncHelloWorldService helloWorldService;
private ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();

@Before
public void setUp(){
metricRegistry = new MetricRegistry();
helloWorldService = mock(AsyncHelloWorldService.class);
}

@Test
public void shouldRegisterMetricsWithoutRetry() {
//Given
AsyncRetryRegistry retryRegistry = AsyncRetryRegistry.ofDefaults();
AsyncRetry retry = retryRegistry.retry("testName");
metricRegistry.registerAll(AsyncRetryMetrics.ofAsyncRetryRegistry(retryRegistry));

// Given the HelloWorldService returns Hello world
BDDMockito.given(helloWorldService.returnHelloWorld())
.willReturn(CompletableFuture.completedFuture("Hello world"));

// Setup circuitbreaker with retry
String value = awaitResult(retry.executeCompletionStage(scheduler, helloWorldService::returnHelloWorld));

//Then
assertThat(value).isEqualTo("Hello world");
// Then the helloWorldService should be invoked 1 time
BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld();
assertThat(metricRegistry.getMetrics()).hasSize(4);
assertThat(metricRegistry.getGauges().get("resilience4j.retry.testName." + SUCCESSFUL_CALLS_WITH_RETRY).getValue()).isEqualTo(0L);
assertThat(metricRegistry.getGauges().get("resilience4j.retry.testName." + SUCCESSFUL_CALLS_WITHOUT_RETRY).getValue()).isEqualTo(1L);
assertThat(metricRegistry.getGauges().get("resilience4j.retry.testName." + FAILED_CALLS_WITH_RETRY).getValue()).isEqualTo(0L);
assertThat(metricRegistry.getGauges().get("resilience4j.retry.testName." + FAILED_CALLS_WITHOUT_RETRY).getValue()).isEqualTo(0L);
}

@Test
public void shouldRegisterMetricsWithRetry() {
//Given
AsyncRetryRegistry retryRegistry = AsyncRetryRegistry.ofDefaults();
AsyncRetry retry = retryRegistry.retry("testName");
metricRegistry.registerAll(AsyncRetryMetrics.ofAsyncRetryRegistry(retryRegistry));

// Given the HelloWorldService returns Hello world
BDDMockito.given(helloWorldService.returnHelloWorld())
.willReturn(failedFuture(new WebServiceException("BAM!")))
.willReturn(CompletableFuture.completedFuture("Hello world"))
.willReturn(failedFuture(new WebServiceException("BAM!")))
.willReturn(failedFuture(new WebServiceException("BAM!")))
.willReturn(failedFuture(new WebServiceException("BAM!")));

// Setup circuitbreaker with retry
String value1 = awaitResult(retry.executeCompletionStage(scheduler, helloWorldService::returnHelloWorld));
Try.ofCallable(() -> awaitResult(AsyncRetry.decorateCompletionStage(retry, scheduler, helloWorldService::returnHelloWorld).get()));

//Then
assertThat(value1).isEqualTo("Hello world");
// Then the helloWorldService should be invoked 5 times
BDDMockito.then(helloWorldService).should(times(5)).returnHelloWorld();
assertThat(metricRegistry.getMetrics()).hasSize(4);
assertThat(metricRegistry.getGauges().get("resilience4j.retry.testName." + SUCCESSFUL_CALLS_WITH_RETRY).getValue()).isEqualTo(1L);
assertThat(metricRegistry.getGauges().get("resilience4j.retry.testName." + SUCCESSFUL_CALLS_WITHOUT_RETRY).getValue()).isEqualTo(0L);
assertThat(metricRegistry.getGauges().get("resilience4j.retry.testName." + FAILED_CALLS_WITH_RETRY).getValue()).isEqualTo(1L);
assertThat(metricRegistry.getGauges().get("resilience4j.retry.testName." + FAILED_CALLS_WITHOUT_RETRY).getValue()).isEqualTo(0L);
}

@Test
public void shouldUseCustomPrefix() {
//Given
AsyncRetryRegistry retryRegistry = AsyncRetryRegistry.ofDefaults();
AsyncRetry retry = retryRegistry.retry("testName");
metricRegistry.registerAll(AsyncRetryMetrics.ofAsyncRetryRegistry("testPrefix",retryRegistry));

// Given the HelloWorldService returns Hello world
BDDMockito.given(helloWorldService.returnHelloWorld()).willReturn(CompletableFuture.completedFuture("Hello world"));

String value = awaitResult(retry.executeCompletionStage(scheduler, helloWorldService::returnHelloWorld));

//Then
assertThat(value).isEqualTo("Hello world");
// Then the helloWorldService should be invoked 1 time
BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld();
assertThat(metricRegistry.getMetrics()).hasSize(4);
assertThat(metricRegistry.getGauges().get("testPrefix.testName." + SUCCESSFUL_CALLS_WITH_RETRY).getValue()).isEqualTo(0L);
assertThat(metricRegistry.getGauges().get("testPrefix.testName." + SUCCESSFUL_CALLS_WITHOUT_RETRY).getValue()).isEqualTo(1L);
assertThat(metricRegistry.getGauges().get("testPrefix.testName." + FAILED_CALLS_WITH_RETRY).getValue()).isEqualTo(0L);
assertThat(metricRegistry.getGauges().get("testPrefix.testName." + FAILED_CALLS_WITHOUT_RETRY).getValue()).isEqualTo(0L);
}

private static <T> CompletableFuture<T> failedFuture(Throwable t) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(t);
return future;
}

private static <T> T awaitResult(CompletionStage<T> completionStage) {
try {
return completionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e) {
throw new AssertionError(e);
} catch (ExecutionException e) {
throw new RuntimeExecutionException(e.getCause());
}
}

private static class RuntimeExecutionException extends RuntimeException {
RuntimeExecutionException(Throwable cause) {
super(cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2018 David Rusek
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.retry;


import java.util.function.Supplier;

import io.github.resilience4j.retry.internal.InMemoryAsyncRetryRegistry;
import io.vavr.collection.Seq;

/**
* The {@link AsyncRetryRegistry} is a factory to create AsyncRetry instances which stores all AsyncRetry instances in a registry.
*/
public interface AsyncRetryRegistry {

/**
* Returns all managed {@link AsyncRetry} instances.
*
* @return all managed {@link AsyncRetry} instances.
*/
Seq<AsyncRetry> getAllRetries();

/**
* Returns a managed {@link AsyncRetry} or creates a new one with the default Retry configuration.
*
* @param name the name of the AsyncRetry
* @return The {@link AsyncRetry}
*/
AsyncRetry retry(String name);

/**
* Returns a managed {@link AsyncRetry} or creates a new one with a custom Retry configuration.
*
* @param name the name of the AsyncRetry
* @param retryConfig a custom Retry configuration
* @return The {@link AsyncRetry}
*/
AsyncRetry retry(String name, RetryConfig retryConfig);

/**
* Returns a managed {@link AsyncRetry} or creates a new one with a custom Retry configuration.
*
* @param name the name of the AsyncRetry
* @param retryConfigSupplier a supplier of a custom Retry configuration
* @return The {@link AsyncRetry}
*/
AsyncRetry retry(String name, Supplier<RetryConfig> retryConfigSupplier);


/**
* Creates an AsyncRetryRegistry with a custom Retry configuration.
*
* @param retryConfig a custom Retry configuration
* @return an AsyncRetryRegistry with a custom Retry configuration.
*/
static AsyncRetryRegistry of(RetryConfig retryConfig) {
return new InMemoryAsyncRetryRegistry(retryConfig);
}

/**
* Creates an AsyncRetryRegistry with a default Retry configuration.
*
* @return an AsyncRetryRegistry with a default Retry configuration.
*/
static AsyncRetryRegistry ofDefaults() {
return new InMemoryAsyncRetryRegistry();
}
}

0 comments on commit 8a0f656

Please sign in to comment.