Skip to content

Commit

Permalink
VertX example
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Jul 9, 2015
1 parent 3e55857 commit ea1bb68
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 13 deletions.
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -119,7 +119,7 @@ if (stats.canRetryOn(someFailure))
service.scheduleRetry(stats.getWaitTime(), TimeUnit.NANOSECONDS);
```

See the [RxJava example][RxJava] for a more detailed scenario.
See the [RxJava example][RxJava] for a more details.

## Example Integrations

Expand All @@ -128,6 +128,7 @@ Recurrent was designed to integrate nicely with existing libraries. Here are som
* [Java 8](https://github.com/jhalterman/recurrent/blob/master/src/test/java/net/jodah/recurrent/examples/Java8Example.java)
* [Netty](https://github.com/jhalterman/recurrent/blob/master/src/test/java/net/jodah/recurrent/examples/NettyExample.java)
* [RxJava]
* [Vert.x](https://github.com/jhalterman/recurrent/blob/master/src/test/java/net/jodah/recurrent/examples/VertxExample.java)

## Public API Integration

Expand Down
9 changes: 8 additions & 1 deletion pom.xml
@@ -1,4 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -70,6 +71,12 @@
<version>4.0.21.Final</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/net/jodah/recurrent/RetryPolicy.java
Expand Up @@ -120,9 +120,10 @@ public RetryPolicy retryOn(Class<? extends Throwable>... failures) {
*
* @throws NullPointerException if {@code failurePredicate} is null
*/
public RetryPolicy retryWhen(Predicate<Throwable> retryPredicate) {
@SuppressWarnings("unchecked")
public RetryPolicy retryWhen(Predicate<? extends Throwable> retryPredicate) {
Assert.notNull(retryPredicate, "retryPredicate");
this.retryPredicate = retryPredicate;
this.retryPredicate = (Predicate<Throwable>) retryPredicate;
return this;
}

Expand Down
10 changes: 1 addition & 9 deletions src/test/java/net/jodah/recurrent/RecurrentTest.java
Expand Up @@ -2,7 +2,7 @@

import static net.jodah.recurrent.Asserts.assertThrows;
import static net.jodah.recurrent.Asserts.matches;
import static net.jodah.recurrent.Testing.ignoreExceptions;
import static net.jodah.recurrent.Testing.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -56,14 +56,6 @@ protected void beforeMethod() {
waiter = new Waiter();
}

@SuppressWarnings("unchecked")
private <T> Class<? extends Exception>[] failures(int numFailures, Class<? extends Exception> failureType) {
Class<? extends Exception>[] failures = new Class[numFailures];
for (int i = 0; i < numFailures; i++)
failures[i] = failureType;
return failures;
}

public void shouldRun() throws Throwable {
Runnable runnable = () -> service.connect();

Expand Down
9 changes: 9 additions & 0 deletions src/test/java/net/jodah/recurrent/Testing.java
Expand Up @@ -25,11 +25,20 @@ public static <T> T ignoreExceptions(Callable<T> callable) {
return null;
}
}


public static void ignoreExceptions(ThrowableRunnable runnable) {
try {
runnable.run();
} catch (Throwable e) {
}
}

@SuppressWarnings("unchecked")
public static <T> Class<? extends Exception>[] failures(int numFailures, Class<? extends Exception> failureType) {
Class<? extends Exception>[] failures = new Class[numFailures];
for (int i = 0; i < numFailures; i++)
failures[i] = failureType;
return failures;
}
}
65 changes: 65 additions & 0 deletions src/test/java/net/jodah/recurrent/examples/VertxExample.java
@@ -0,0 +1,65 @@
package net.jodah.recurrent.examples;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.testng.annotations.Test;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import net.jodah.recurrent.RetryPolicy;
import net.jodah.recurrent.RetryStats;

@Test
public class VertxExample {
Vertx vertx = Vertx.vertx();

public void example() throws Throwable {
AtomicInteger failures = new AtomicInteger();
RetryPolicy retryPolicy = new RetryPolicy().withDelay(1, TimeUnit.SECONDS);

// Receiver
vertx.eventBus().consumer("ping-address", message -> {
// Fail 3 times then succeed
if (failures.getAndIncrement() < 3)
message.fail(1, "Failed");
else {
System.out.println("Received message: " + message.body());
message.reply("pong!");
}
});

// Sender
retryableSend("ping-address", "ping!", retryPolicy, reply -> {
System.out.println("Received reply " + reply.result().body());
});

Thread.sleep(5000);
}

private <T> void retryableSend(String address, Object message, RetryPolicy retryPolicy,
Handler<AsyncResult<Message<T>>> replyHandler) {
retryPolicy.retryWhen((ReplyException failure) -> ReplyFailure.RECIPIENT_FAILURE.equals(failure.failureType())
|| ReplyFailure.TIMEOUT.equals(failure.failureType()));

retryableSend(address, message, new RetryStats(retryPolicy), replyHandler);
}

private <T> void retryableSend(String address, Object message, RetryStats retryStats,
Handler<AsyncResult<Message<T>>> replyHandler) {
vertx.eventBus().send(address, message, (AsyncResult<Message<T>> reply) -> {
if (reply.succeeded() || !retryStats.canRetryOn(reply.cause())) {
replyHandler.handle(reply);
} else {
System.out.println("Failure detected");
vertx.setTimer(TimeUnit.NANOSECONDS.toMillis(retryStats.getWaitTime()), (timerId) -> {
retryableSend(address, message, retryStats, replyHandler);
});
}
});
}
}

0 comments on commit ea1bb68

Please sign in to comment.