Skip to content

Commit

Permalink
Merge commit '65d59d6f659b227b9936911c99f9cd053249af86' into rx
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghaolu committed Aug 26, 2016
2 parents 2572366 + 6c35000 commit 7814d4f
Show file tree
Hide file tree
Showing 12 changed files with 535 additions and 846 deletions.
1,015 changes: 320 additions & 695 deletions azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*
*/

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceResponse;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

/**
* An instance of this class provides access to the underlying REST call invocation.
* This class wraps around the Retrofit Call object and allows updates to it in the
* progress of a long running operation or a paging operation.
*
* @param <T> the type of the returning object
*/
public final class AzureServiceCall<T> extends ServiceCall<T> {
private AzureServiceCall() {
}

/**
* Creates a ServiceCall from a paging operation.
*
* @param first the observable to the first page
* @param next the observable to poll subsequent pages
* @param callback the client-side callback
* @param <T> the Page type
* @param <V> the element type
* @return the future based ServiceCall
*/
public static <T extends Page<V>, V> ServiceCall<T> create(Observable<ServiceResponse<T>> first, final Func1<String, Observable<ServiceResponse<T>>> next, final ListOperationCallback<V> callback) {
final AzureServiceCall<T> serviceCall = new AzureServiceCall<>();
final PagingSubscriber<T, V> subscriber = new PagingSubscriber<>(serviceCall, next, callback);
serviceCall.setSubscription(first
.single()
.subscribe(subscriber));
return serviceCall;
}

/**
* The subscriber that handles user callback and automatically subscribes to the next page.
*
* @param <T> the Page type
* @param <V> the element type
*/
private static class PagingSubscriber<T extends Page<V>, V> extends Subscriber<ServiceResponse<T>> {
private AzureServiceCall<T> serviceCall;
private Func1<String, Observable<ServiceResponse<T>>> next;
private ListOperationCallback<V> callback;
private ServiceResponse<T> lastResponse;

PagingSubscriber(final AzureServiceCall<T> serviceCall, final Func1<String, Observable<ServiceResponse<T>>> next, final ListOperationCallback<V> callback) {
this.serviceCall = serviceCall;
this.next = next;
this.callback = callback;
}

@Override
public void onCompleted() {
// do nothing
}

@Override
public void onError(Throwable e) {
serviceCall.setException(e);
if (callback != null) {
callback.failure(e);
}
}

@Override
public void onNext(ServiceResponse<T> serviceResponse) {
lastResponse = serviceResponse;
ListOperationCallback.PagingBehavior behavior = ListOperationCallback.PagingBehavior.CONTINUE;
if (callback != null) {
behavior = callback.progress(serviceResponse.getBody().getItems());
if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) {
callback.success();
}
}
if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) {
serviceCall.set(lastResponse);
} else {
serviceCall.setSubscription(next.call(serviceResponse.getBody().getNextPageLink()).single().subscribe(this));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.microsoft.azure;

import com.microsoft.rest.ServiceCallback;
import com.microsoft.rest.ServiceResponse;

import java.util.List;

Expand Down Expand Up @@ -36,16 +37,14 @@ public ListOperationCallback() {

/**
* Override this method to handle progressive results.
* The user is responsible for returning a {@link PagingBahavior} Enum to indicate
* The user is responsible for returning a {@link PagingBehavior} Enum to indicate
* whether the client should continue loading or stop.
*
* @param partial the list of resources from the current request.
* @return CONTINUE if you want to go on loading, STOP otherwise.
*
*/
public PagingBahavior progress(List<E> partial) {
return PagingBahavior.CONTINUE;
}
public abstract PagingBehavior progress(List<E> partial);

/**
* Get the list result that stores the accumulated resources loaded from server.
Expand All @@ -71,6 +70,16 @@ public void load(List<E> result) {
}
}

@Override
public void success(ServiceResponse<List<E>> result) {
success();
}

/**
* Override this method to handle successful REST call results.
*/
public abstract void success();

/**
* Get the number of loaded pages.
*
Expand All @@ -83,7 +92,7 @@ public int pageCount() {
/**
* An enum to indicate whether the client should continue loading or stop.
*/
public enum PagingBahavior {
public enum PagingBehavior {
/**
* Indicates that the client should continue loading.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

import com.microsoft.rest.RestException;

import javax.xml.bind.DataBindingException;
import javax.xml.ws.WebServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -19,6 +17,8 @@
import java.util.ListIterator;
import java.util.NoSuchElementException;

import javax.xml.bind.DataBindingException;

/**
* Defines a list response from a paging operation. The pages are
* lazy initialized when an instance of this class is iterated.
Expand Down Expand Up @@ -81,8 +81,6 @@ public void loadNextPage() {
this.nextPageLink = nextPage.getNextPageLink();
this.items.addAll(nextPage.getItems());
this.currentPage = nextPage;
} catch (RestException e) {
throw new WebServiceException(e.toString(), e);
} catch (IOException e) {
throw new DataBindingException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void updateFromResponseOnPutPatch(Response<ResponseBody> response) throws
}

if (responseContent == null || responseContent.isEmpty()) {
CloudException exception = new CloudException("no body");
CloudException exception = new CloudException("polling response does not contain a valid body");
exception.setResponse(response);
throw exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;

import java.lang.reflect.Field;
import java.net.CookieManager;
Expand Down Expand Up @@ -338,6 +339,7 @@ public RestClient build() {
.baseUrl(baseUrl)
.client(httpClient)
.addConverterFactory(mapperAdapter.getConverterFactory())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build(),
credentials,
customHeadersInterceptor,
Expand Down
3 changes: 3 additions & 0 deletions client-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ dependencies {
compile 'com.squareup.okhttp3:logging-interceptor:3.3.1'
compile 'com.squareup.okhttp3:okhttp-urlconnection:3.3.1'
compile 'com.squareup.retrofit2:converter-jackson:2.0.2'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2'
compile 'io.reactivex:rxjava:1.1.8'
compile 'io.reactivex:rxjava-computation-expressions:0.21.0'
compile 'com.fasterxml.jackson.datatype:jackson-datatype-joda:2.7.2'
compile 'org.apache.commons:commons-lang3:3.4'
testCompile 'junit:junit:4.12'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/**
* Exception thrown for an invalid response with custom error information.
*/
public abstract class RestException extends Exception {
public abstract class RestException extends RuntimeException {
/**
* Initializes a new instance of the AutoRestException class.
*/
Expand Down
134 changes: 95 additions & 39 deletions client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
package com.microsoft.rest;

import com.google.common.util.concurrent.AbstractFuture;

import retrofit2.Call;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;

/**
Expand All @@ -25,54 +25,110 @@ public class ServiceCall<T> extends AbstractFuture<ServiceResponse<T>> {
/**
* The Retrofit method invocation.
*/
private Call<?> call;
private Subscription subscription;

protected ServiceCall() {
}

/**
* Creates an instance of ServiceCall.
* Creates a ServiceCall from an observable object.
*
* @param call the Retrofit call to wrap around.
* @param observable the observable to create from
* @param <T> the type of the response
* @return the created ServiceCall
*/
public ServiceCall(Call<?> call) {
this.call = call;
public static <T> ServiceCall<T> create(final Observable<ServiceResponse<T>> observable) {
final ServiceCall<T> serviceCall = new ServiceCall<>();
serviceCall.subscription = observable
.last()
.subscribe(new Action1<ServiceResponse<T>>() {
@Override
public void call(ServiceResponse<T> t) {
serviceCall.set(t);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
serviceCall.setException(throwable);
}
});
return serviceCall;
}

/**
* Updates the current Retrofit call object.
* Creates a ServiceCall from an observable object and a callback.
*
* @param call the new call object.
* @param observable the observable to create from
* @param callback the callback to call when events happen
* @param <T> the type of the response
* @return the created ServiceCall
*/
public void newCall(Call<?> call) {
this.call = call;
public static <T> ServiceCall<T> create(final Observable<ServiceResponse<T>> observable, final ServiceCallback<T> callback) {
final ServiceCall<T> serviceCall = new ServiceCall<>();
serviceCall.subscription = observable
.last()
.subscribe(new Action1<ServiceResponse<T>>() {
@Override
public void call(ServiceResponse<T> t) {
if (callback != null) {
callback.success(t);
}
serviceCall.set(t);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (callback != null) {
callback.failure(throwable);
}
serviceCall.setException(throwable);
}
});
return serviceCall;
}

/**
* Gets the current Retrofit call object.
* Creates a ServiceCall from an observable and a callback for a header response.
*
* @return the current call object.
* @param observable the observable of a REST call that returns JSON in a header
* @param callback the callback to call when events happen
* @param <T> the type of the response body
* @param <V> the type of the response header
* @return the created ServiceCall
*/
public Call<?> getCall() {
return call;
public static <T, V> ServiceCall<T> createWithHeaders(final Observable<ServiceResponseWithHeaders<T, V>> observable, final ServiceCallback<T> callback) {
final ServiceCall<T> serviceCall = new ServiceCall<>();
serviceCall.subscription = observable
.last()
.subscribe(new Action1<ServiceResponse<T>>() {
@Override
public void call(ServiceResponse<T> t) {
if (callback != null) {
callback.success(t);
}
serviceCall.set(t);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (callback != null) {
callback.failure(throwable);
}
serviceCall.setException(throwable);
}
});
return serviceCall;
}

/**
* Cancel the Retrofit call if possible. Parameter
* 'mayInterruptIfRunning is ignored.
*
* @param mayInterruptIfRunning ignored
* @return the current Rx subscription associated with the ServiceCall.
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isCancelled()) {
return false;
} else {
call.cancel();
return true;
}
public Subscription getSubscription() {
return subscription;
}

@Override
public boolean isCancelled() {
return call.isCanceled();
protected void setSubscription(Subscription subscription) {
this.subscription = subscription;
}

/**
Expand Down Expand Up @@ -111,14 +167,14 @@ public boolean success(ServiceResponse<T> result) {
return set(result);
}

/**
* Invoke this method to report a failure, allowing
* {@link AbstractFuture#get()} to throw the exception.
*
* @param t the exception thrown.
* @return true if successfully reported; false otherwise.
*/
public boolean failure(Throwable t) {
return setException(t);
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
subscription.unsubscribe();
return super.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return subscription.isUnsubscribed();
}
}

0 comments on commit 7814d4f

Please sign in to comment.