Skip to content

Commit

Permalink
Merge pull request ReactiveX#371 from benjchristensen/retry
Browse files Browse the repository at this point in the history
Operator: Retry
  • Loading branch information
benjchristensen committed Sep 11, 2013
2 parents f8f378f + 30bcf5f commit 198498c
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 2 deletions.
37 changes: 37 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
Expand Down Expand Up @@ -3128,6 +3129,42 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
public ConnectableObservable<T> replay() {
return OperationMulticast.multicast(this, ReplaySubject.<T> create());
}

/**
* Retry subscription to origin Observable upto given retry count.
* <p>
* If {@link Observer#onError} is invoked the source Observable will be re-subscribed to as many times as defined by retryCount.
* <p>
* Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together.
* <p>
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
*
* @param retryCount
* Number of retry attempts before failing.
* @return Observable with retry logic.
*/
public Observable<T> retry(int retryCount) {
return create(OperationRetry.retry(this, retryCount));
}

/**
* Retry subscription to origin Observable whenever onError is called (infinite retry count).
* <p>
* If {@link Observer#onError} is invoked the source Observable will be re-subscribed to.
* <p>
* Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together.
* <p>
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
*
* @param retryCount
* Number of retry attempts before failing.
* @return Observable with retry logic.
*/
public Observable<T> retry() {
return create(OperationRetry.retry(this));
}

/**
* This method has similar behavior to {@link #replay} except that this auto-subscribes to
Expand Down
203 changes: 203 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationRetry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package rx.operators;

/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

public class OperationRetry {

private static final int INFINITE_RETRY = -1;

public static <T> OnSubscribeFunc<T> retry(final Observable<T> observable, final int retryCount) {
return new Retry<T>(observable, retryCount);
}

public static <T> OnSubscribeFunc<T> retry(final Observable<T> observable) {
return new Retry<T>(observable, INFINITE_RETRY);
}

private static class Retry<T> implements OnSubscribeFunc<T> {

private final Observable<T> source;
private final int retryCount;
private final AtomicInteger attempts = new AtomicInteger(0);
private final CompositeSubscription subscription = new CompositeSubscription();

public Retry(Observable<T> source, int retryCount) {
this.source = source;
this.retryCount = retryCount;
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer)));
return subscription;
}

private Action0 attemptSubscription(final Observer<? super T> observer) {
return new Action0() {

@Override
public void call() {
attempts.incrementAndGet();
source.subscribe(new Observer<T>() {

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !subscription.isUnsubscribed()) {
// retry again
// remove the last subscription since we have completed (so as we retry we don't build up a huge list)
subscription.removeLast();
// add the new subscription and schedule a retry
subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer)));
} else {
// give up and pass the failure
observer.onError(e);
}
}

@Override
public void onNext(T v) {
observer.onNext(v);
}
});

}
};
}

}

public static class UnitTest {

@Test
public void testOriginFails() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Observable<String> origin = Observable.create(new FuncWithErrors(2));
origin.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("beginningEveryTime");
inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));
inOrder.verify(observer, never()).onNext("onSuccessOnly");
inOrder.verify(observer, never()).onCompleted();
}

@Test
public void testRetryFail() {
int NUM_RETRIES = 1;
int NUM_FAILURES = 2;
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
Observable.create(retry(origin, NUM_RETRIES)).subscribe(observer);

InOrder inOrder = inOrder(observer);
// should show 2 attempts (first time fail, second time (1st retry) fail)
inOrder.verify(observer, times(1 + NUM_RETRIES)).onNext("beginningEveryTime");
// should only retry once, fail again and emit onError
inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));
// no success
inOrder.verify(observer, never()).onNext("onSuccessOnly");
inOrder.verify(observer, never()).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testRetrySuccess() {
int NUM_RETRIES = 3;
int NUM_FAILURES = 2;
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
Observable.create(retry(origin, NUM_RETRIES)).subscribe(observer);

InOrder inOrder = inOrder(observer);
// should show 3 attempts
inOrder.verify(observer, times(1 + NUM_FAILURES)).onNext("beginningEveryTime");
// should have no errors
inOrder.verify(observer, never()).onError(any(Throwable.class));
// should have a single success
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
// should have a single successful onCompleted
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testInfiniteRetry() {
int NUM_FAILURES = 20;
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
Observable.create(retry(origin)).subscribe(observer);

InOrder inOrder = inOrder(observer);
// should show 3 attempts
inOrder.verify(observer, times(1 + NUM_FAILURES)).onNext("beginningEveryTime");
// should have no errors
inOrder.verify(observer, never()).onError(any(Throwable.class));
// should have a single success
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
// should have a single successful onCompleted
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

public static class FuncWithErrors implements OnSubscribeFunc<String> {

private final int numFailures;
private final AtomicInteger count = new AtomicInteger(0);

FuncWithErrors(int count) {
this.numFailures = count;
}

@Override
public Subscription onSubscribe(Observer<? super String> o) {
o.onNext("beginningEveryTime");
if (count.incrementAndGet() <= numFailures) {
o.onError(new RuntimeException("forced failure: " + count.get()));
} else {
o.onNext("onSuccessOnly");
o.onCompleted();
}
return Subscriptions.empty();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -42,7 +42,7 @@ public class CompositeSubscription implements Subscription {
* TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach
*/
private AtomicBoolean unsubscribed = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<Subscription> subscriptions = new ConcurrentLinkedQueue<Subscription>();
private final LinkedBlockingDeque<Subscription> subscriptions = new LinkedBlockingDeque<Subscription>();

public CompositeSubscription(List<Subscription> subscriptions) {
this.subscriptions.addAll(subscriptions);
Expand All @@ -66,6 +66,15 @@ public synchronized void add(Subscription s) {
}
}

/**
* Remove the last Subscription that was added.
*
* @return Subscription or null if none exists
*/
public synchronized Subscription removeLast() {
return subscriptions.pollLast();
}

@Override
public synchronized void unsubscribe() {
if (unsubscribed.compareAndSet(false, true)) {
Expand Down

0 comments on commit 198498c

Please sign in to comment.