Skip to content

Commit

Permalink
Merge commit 'dc9cf4f0b75ac3672a3e125fa2b905dcab7d46f1' into rx
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghaolu committed Aug 26, 2016
2 parents a1c0ae4 + 930fbf5 commit ffc5544
Showing 1 changed file with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceResponse;

import java.util.List;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
Expand All @@ -31,13 +33,12 @@ private AzureServiceCall() {
* @param first the observable to the first page
* @param next the observable to poll subsequent pages
* @param callback the client-side callback
* @param <T> the Page type
* @param <V> the element type
* @param <E> the element type
* @return the future based ServiceCall
*/
public static <T extends Page<V>, V> ServiceCall<T> create(Observable<ServiceResponse<T>> first, final Func1<String, Observable<ServiceResponse<T>>> next, final ListOperationCallback<V> callback) {
final AzureServiceCall<T> serviceCall = new AzureServiceCall<>();
final PagingSubscriber<T, V> subscriber = new PagingSubscriber<>(serviceCall, next, callback);
public static <E> ServiceCall<List<E>> create(Observable<ServiceResponse<Page<E>>> first, final Func1<String, Observable<ServiceResponse<Page<E>>>> next, final ListOperationCallback<E> callback) {
final AzureServiceCall<List<E>> serviceCall = new AzureServiceCall<>();
final PagingSubscriber<E> subscriber = new PagingSubscriber<>(serviceCall, next, callback);
serviceCall.setSubscription(first
.single()
.subscribe(subscriber));
Expand All @@ -47,16 +48,15 @@ public static <T extends Page<V>, V> ServiceCall<T> create(Observable<ServiceRes
/**
* The subscriber that handles user callback and automatically subscribes to the next page.
*
* @param <T> the Page type
* @param <V> the element type
* @param <E> the element type
*/
private static class PagingSubscriber<T extends Page<V>, V> extends Subscriber<ServiceResponse<T>> {
private AzureServiceCall<T> serviceCall;
private Func1<String, Observable<ServiceResponse<T>>> next;
private ListOperationCallback<V> callback;
private ServiceResponse<T> lastResponse;
private static class PagingSubscriber<E> extends Subscriber<ServiceResponse<Page<E>>> {
private AzureServiceCall<List<E>> serviceCall;
private Func1<String, Observable<ServiceResponse<Page<E>>>> next;
private ListOperationCallback<E> callback;
private ServiceResponse<Page<E>> lastResponse;

PagingSubscriber(final AzureServiceCall<T> serviceCall, final Func1<String, Observable<ServiceResponse<T>>> next, final ListOperationCallback<V> callback) {
PagingSubscriber(final AzureServiceCall<List<E>> serviceCall, final Func1<String, Observable<ServiceResponse<Page<E>>>> next, final ListOperationCallback<E> callback) {
this.serviceCall = serviceCall;
this.next = next;
this.callback = callback;
Expand All @@ -76,7 +76,7 @@ public void onError(Throwable e) {
}

@Override
public void onNext(ServiceResponse<T> serviceResponse) {
public void onNext(ServiceResponse<Page<E>> serviceResponse) {
lastResponse = serviceResponse;
ListOperationCallback.PagingBehavior behavior = ListOperationCallback.PagingBehavior.CONTINUE;
if (callback != null) {
Expand All @@ -86,7 +86,7 @@ public void onNext(ServiceResponse<T> serviceResponse) {
}
}
if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) {
serviceCall.set(lastResponse);
serviceCall.set(new ServiceResponse<>(lastResponse.getBody().getItems(), lastResponse.getResponse()));
} else {
serviceCall.setSubscription(next.call(serviceResponse.getBody().getNextPageLink()).single().subscribe(this));
}
Expand Down

0 comments on commit ffc5544

Please sign in to comment.