diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java index 4bc9cd9e55d14..6529375f6343c 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java @@ -7,27 +7,26 @@ package com.microsoft.azure; -import com.microsoft.rest.ServiceCall; -import com.microsoft.rest.ServiceCallback; -import com.microsoft.rest.ServiceException; import com.microsoft.rest.ServiceResponse; -import com.microsoft.rest.ServiceResponseCallback; import com.microsoft.rest.ServiceResponseWithHeaders; -import okhttp3.ResponseBody; -import retrofit2.Call; -import retrofit2.Response; -import retrofit2.http.GET; -import retrofit2.http.Header; -import retrofit2.http.Url; import java.io.IOException; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.net.URL; +import java.util.Arrays; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import okhttp3.ResponseBody; +import retrofit2.Response; +import retrofit2.http.GET; +import retrofit2.http.Header; +import retrofit2.http.Url; +import rx.Observable; +import rx.functions.Func1; + /** * An instance of this class defines a ServiceClient that handles polling and * retrying for long running operations when accessing Azure resources. @@ -58,11 +57,29 @@ public AzureClient(AzureServiceClient serviceClient) { this.serviceClientUserAgent = serviceClient.userAgent(); } + /** + * Gets the interval time between two long running operation polls. + * + * @return the time in milliseconds. + */ + public Integer getLongRunningOperationRetryTimeout() { + return longRunningOperationRetryTimeout; + } + + /** + * Sets the interval time between two long running operation polls. + * + * @param longRunningOperationRetryTimeout the time in milliseconds. + */ + public void withLongRunningOperationRetryTimeout(Integer longRunningOperationRetryTimeout) { + this.longRunningOperationRetryTimeout = longRunningOperationRetryTimeout; + } + /** * Handles an initial response from a PUT or PATCH operation response by polling * the status of the operation until the long running operation terminates. * - * @param response the initial response from the PUT or PATCH operation. + * @param observable the initial observable from the PUT or PATCH operation. * @param the return type of the caller * @param resourceType the type of the resource * @return the terminal response for the operation. @@ -70,62 +87,16 @@ public AzureClient(AzureServiceClient serviceClient) { * @throws InterruptedException interrupted exception * @throws IOException thrown by deserialization */ - public ServiceResponse getPutOrPatchResult(Response response, Type resourceType) throws CloudException, InterruptedException, IOException { - if (response == null) { - throw new CloudException("response is null."); - } - - int statusCode = response.code(); - ResponseBody responseBody; - if (response.isSuccessful()) { - responseBody = response.body(); - } else { - responseBody = response.errorBody(); - } - if (statusCode != 200 && statusCode != 201 && statusCode != 202) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - if (responseBody != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class)); - responseBody.close(); - } - throw exception; - } - - PollingState pollingState = new PollingState<>(response, this.getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); - String url = response.raw().request().url().toString(); - - // Check provisioning state - while (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { - Thread.sleep(pollingState.getDelayInMilliseconds()); - - if (pollingState.getAzureAsyncOperationHeaderLink() != null - && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { - updateStateFromAzureAsyncOperationHeader(pollingState); - } else if (pollingState.getLocationHeaderLink() != null - && !pollingState.getLocationHeaderLink().isEmpty()) { - updateStateFromLocationHeaderOnPut(pollingState); - } else { - updateStateFromGetResourceOperation(pollingState, url); - } - } - - if (AzureAsyncOperation.SUCCESS_STATUS.equals(pollingState.getStatus()) && pollingState.getResource() == null) { - updateStateFromGetResourceOperation(pollingState, url); - } - - if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { - throw new CloudException("Async operation failed"); - } - - return new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); + public ServiceResponse getPutOrPatchResult(Observable> observable, Type resourceType) throws CloudException, InterruptedException, IOException { + Observable> asyncObservable = getPutOrPatchResultAsync(observable, resourceType); + return asyncObservable.toBlocking().last(); } /** * Handles an initial response from a PUT or PATCH operation response by polling * the status of the operation until the long running operation terminates. * - * @param response the initial response from the PUT or PATCH operation. + * @param observable the initial observable from the PUT or PATCH operation. * @param resourceType the type of the resource * @param headerType the type of the response header * @param the return type of the caller @@ -135,12 +106,12 @@ public ServiceResponse getPutOrPatchResult(Response respons * @throws InterruptedException interrupted exception * @throws IOException thrown by deserialization */ - public ServiceResponseWithHeaders getPutOrPatchResultWithHeaders(Response response, Type resourceType, Class headerType) throws CloudException, InterruptedException, IOException { - ServiceResponse bodyResponse = getPutOrPatchResult(response, resourceType); + public ServiceResponseWithHeaders getPutOrPatchResultWithHeaders(Observable> observable, Type resourceType, Class headerType) throws CloudException, InterruptedException, IOException { + ServiceResponse bodyResponse = getPutOrPatchResult(observable, resourceType); return new ServiceResponseWithHeaders<>( - bodyResponse.getBody(), - restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(bodyResponse.getResponse().headers()), headerType), - bodyResponse.getResponse() + bodyResponse.getBody(), + restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(bodyResponse.getResponse().headers()), headerType), + bodyResponse.getResponse() ); } @@ -149,62 +120,77 @@ public ServiceResponseWithHeaders getPutOrPatchResultWi * the status of the operation asynchronously, calling the user provided callback * when the operation terminates. * - * @param response the initial response from the PUT or PATCH operation. + * @param observable the initial observable from the PUT or PATCH operation. * @param the return type of the caller. * @param resourceType the type of the resource. - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param callback the user callback to call when operation terminates. - * @return the task describing the asynchronous polling. + * @return the observable of which a subscription will lead to a final response. */ - public AsyncPollingTask getPutOrPatchResultAsync(Response response, Type resourceType, ServiceCall serviceCall, ServiceCallback callback) { - if (response == null) { - CloudException t = new CloudException("response is null."); - if (callback != null) { - callback.failure(t); - } - serviceCall.failure(t); - return null; - } + public Observable> getPutOrPatchResultAsync(Observable> observable, final Type resourceType) { + return observable + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + CloudException exception = createExceptionFromResponse(response, 200, 201, 202); + if (exception != null) { + return Observable.error(exception); + } - int statusCode = response.code(); - ResponseBody responseBody; - if (response.isSuccessful()) { - responseBody = response.body(); - } else { - responseBody = response.errorBody(); - } - if (statusCode != 200 && statusCode != 201 && statusCode != 202) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - try { - if (responseBody != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class)); - responseBody.close(); + try { + final PollingState pollingState = new PollingState<>(response, getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); + final String url = response.raw().request().url().toString(); + + // Task runner will take it from here + return Observable.just(pollingState) + // Emit a polling task intermittently + .repeatWhen(new Func1, Observable>() { + @Override + public Observable call(Observable observable) { + return observable.delay(pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); + } + }) + // Conditionally polls if it's not a terminal status + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(PollingState pollingState) { + if (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { + return putOrPatchPollingDispatcher(pollingState, url); + } else { + return Observable.just(pollingState); + } + } + }) + // The above process continues until this filter passes + .filter(new Func1, Boolean>() { + @Override + public Boolean call(PollingState pollingState) { + return AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus()); + } + }) + .first() + // Possible extra get to receive the actual resource + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(PollingState pollingState) { + if (AzureAsyncOperation.SUCCESS_STATUS.equals(pollingState.getStatus()) && pollingState.getResource() == null) { + return updateStateFromGetResourceOperationAsync(pollingState, url); + } + if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { + return Observable.error(new CloudException("Async operation failed with provisioning state: " + pollingState.getStatus())); + } + return Observable.just(pollingState); + } + }) + .map(new Func1, ServiceResponse>() { + @Override + public ServiceResponse call(PollingState pollingState) { + return new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); + } + }); + } catch (IOException e) { + return Observable.error(e); + } } - } catch (Exception e) { /* ignore serialization errors on top of service errors */ } - if (callback != null) { - callback.failure(exception); - } - serviceCall.failure(exception); - return null; - } - - PollingState pollingState; - try { - pollingState = new PollingState<>(response, this.getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); - } catch (IOException e) { - if (callback != null) { - callback.failure(e); - } - serviceCall.failure(e); - return null; - } - String url = response.raw().request().url().toString(); - - // Task runner will take it from here - PutPatchPollingTask task = new PutPatchPollingTask<>(pollingState, url, serviceCall, callback); - executor.schedule(task, pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); - return task; + }); } /** @@ -212,49 +198,36 @@ public AsyncPollingTask getPutOrPatchResultAsync(Response r * the status of the operation asynchronously, calling the user provided callback * when the operation terminates. * - * @param response the initial response from the PUT or PATCH operation. + * @param observable the initial response from the PUT or PATCH operation. * @param the return type of the caller * @param the type of the response header * @param resourceType the type of the resource. * @param headerType the type of the response header - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param callback the user callback to call when operation terminates. * @return the task describing the asynchronous polling. */ - public AsyncPollingTask getPutOrPatchResultWithHeadersAsync(Response response, Type resourceType, final Class headerType, final ServiceCall serviceCall, final ServiceCallback callback) { - return this.getPutOrPatchResultAsync(response, resourceType, serviceCall, new ServiceCallback() { - @Override - public void failure(Throwable t) { - if (callback != null) { - callback.failure(t); - } - serviceCall.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { - ServiceResponseWithHeaders clientResponse = new ServiceResponseWithHeaders<>( - result.getBody(), - restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(result.getResponse().headers()), headerType), - result.getResponse() - ); - if (callback != null) { - callback.success(clientResponse); + public Observable> getPutOrPatchResultWithHeadersAsync(Observable> observable, Type resourceType, final Class headerType) { + Observable> bodyResponse = getPutOrPatchResultAsync(observable, resourceType); + return bodyResponse + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(ServiceResponse serviceResponse) { + try { + return Observable + .just(new ServiceResponseWithHeaders<>(serviceResponse.getBody(), + restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(serviceResponse.getResponse().headers()), headerType), + serviceResponse.getResponse())); + } catch (IOException e) { + return Observable.error(e); } - serviceCall.success(clientResponse); - } catch (IOException e) { - failure(e); } - } - }); + }); } /** * Handles an initial response from a POST or DELETE operation response by polling * the status of the operation until the long running operation terminates. * - * @param response the initial response from the POST or DELETE operation. + * @param observable the initial observable from the POST or DELETE operation. * @param the return type of the caller * @param resourceType the type of the resource * @return the terminal response for the operation. @@ -262,60 +235,16 @@ public void success(ServiceResponse result) { * @throws InterruptedException interrupted exception * @throws IOException thrown by deserialization */ - public ServiceResponse getPostOrDeleteResult(Response response, Type resourceType) throws CloudException, InterruptedException, IOException { - if (response == null) { - throw new CloudException("response is null."); - } - - int statusCode = response.code(); - ResponseBody responseBody; - if (response.isSuccessful()) { - responseBody = response.body(); - } else { - responseBody = response.errorBody(); - } - if (statusCode != 200 && statusCode != 202 && statusCode != 204) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - if (responseBody != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class)); - responseBody.close(); - } - throw exception; - } - - PollingState pollingState = new PollingState<>(response, this.getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); - - // Check provisioning state - while (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { - Thread.sleep(pollingState.getDelayInMilliseconds()); - - if (pollingState.getAzureAsyncOperationHeaderLink() != null - && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { - updateStateFromAzureAsyncOperationHeader(pollingState); - } else if (pollingState.getLocationHeaderLink() != null - && !pollingState.getLocationHeaderLink().isEmpty()) { - updateStateFromLocationHeaderOnPostOrDelete(pollingState); - } else { - CloudException exception = new CloudException("No header in response"); - exception.setResponse(response); - throw exception; - } - } - - // Check if operation failed - if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { - throw new CloudException("Async operation failed"); - } - - return new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); + public ServiceResponse getPostOrDeleteResult(Observable> observable, Type resourceType) throws CloudException, InterruptedException, IOException { + Observable> asyncObservable = getPutOrPatchResultAsync(observable, resourceType); + return asyncObservable.toBlocking().last(); } /** * Handles an initial response from a POST or DELETE operation response by polling * the status of the operation until the long running operation terminates. * - * @param response the initial response from the POST or DELETE operation. + * @param observable the initial observable from the POST or DELETE operation. * @param resourceType the type of the resource * @param headerType the type of the response header * @param the return type of the caller @@ -325,12 +254,12 @@ public ServiceResponse getPostOrDeleteResult(Response respo * @throws InterruptedException interrupted exception * @throws IOException thrown by deserialization */ - public ServiceResponseWithHeaders getPostOrDeleteResultWithHeaders(Response response, Type resourceType, Class headerType) throws CloudException, InterruptedException, IOException { - ServiceResponse bodyResponse = getPostOrDeleteResult(response, resourceType); + public ServiceResponseWithHeaders getPostOrDeleteResultWithHeaders(Observable> observable, Type resourceType, Class headerType) throws CloudException, InterruptedException, IOException { + ServiceResponse bodyResponse = getPostOrDeleteResult(observable, resourceType); return new ServiceResponseWithHeaders<>( - bodyResponse.getBody(), - restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(bodyResponse.getResponse().headers()), headerType), - bodyResponse.getResponse() + bodyResponse.getBody(), + restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(bodyResponse.getResponse().headers()), headerType), + bodyResponse.getResponse() ); } @@ -339,61 +268,64 @@ public ServiceResponseWithHeaders getPostOrDeleteResult * the status of the operation asynchronously, calling the user provided callback * when the operation terminates. * - * @param response the initial response from the POST or DELETE operation. + * @param observable the initial response from the POST or DELETE operation. * @param the return type of the caller. * @param resourceType the type of the resource. - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param callback the user callback to call when operation terminates. * @return the task describing the asynchronous polling. */ - public AsyncPollingTask getPostOrDeleteResultAsync(Response response, Type resourceType, ServiceCall serviceCall, ServiceCallback callback) { - if (response == null) { - CloudException t = new CloudException("response is null."); - if (callback != null) { - callback.failure(t); - } - serviceCall.failure(t); - return null; - } + public Observable> getPostOrDeleteResultAsync(Observable> observable, final Type resourceType) { + return observable + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + CloudException exception = createExceptionFromResponse(response, 200, 202, 204); + if (exception != null) { + return Observable.error(exception); + } - int statusCode = response.code(); - ResponseBody responseBody; - if (response.isSuccessful()) { - responseBody = response.body(); - } else { - responseBody = response.errorBody(); - } - if (statusCode != 200 && statusCode != 202 && statusCode != 204) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - try { - if (responseBody != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class)); - responseBody.close(); + try { + final PollingState pollingState = new PollingState<>(response, getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); + return Observable.just(pollingState) + // Emit a polling task intermittently + .repeatWhen(new Func1, Observable>() { + @Override + public Observable call(Observable observable) { + return observable.delay(pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); + } + }) + // Conditionally polls if it's not a terminal status + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(PollingState pollingState) { + if (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { + return postOrDeletePollingDispatcher(pollingState); + } + return Observable.just(pollingState); + } + }) + // The above process continues until this filter passes + .filter(new Func1, Boolean>() { + @Override + public Boolean call(PollingState pollingState) { + return AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus()); + } + }) + .first() + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(PollingState pollingState) { + if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { + return Observable.error(new CloudException("Async operation failed with provisioning state: " + pollingState.getStatus())); + } else { + return Observable.just(new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse())); + } + } + }); + } catch (IOException e) { + return Observable.error(e); + } } - } catch (Exception e) { /* ignore serialization errors on top of service errors */ } - if (callback != null) { - callback.failure(exception); - } - serviceCall.failure(exception); - return null; - } - - PollingState pollingState; - try { - pollingState = new PollingState<>(response, this.getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); - } catch (IOException e) { - if (callback != null) { - callback.failure(e); - } - serviceCall.failure(e); - return null; - } - - // Task runner will take it from here - PostDeletePollingTask task = new PostDeletePollingTask<>(pollingState, serviceCall, callback); - executor.schedule(task, pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); - return task; + }); } /** @@ -401,42 +333,29 @@ public AsyncPollingTask getPostOrDeleteResultAsync(Response * the status of the operation asynchronously, calling the user provided callback * when the operation terminates. * - * @param response the initial response from the POST or DELETE operation. + * @param observable the initial observable from the POST or DELETE operation. * @param the return type of the caller * @param the type of the response header * @param resourceType the type of the resource. * @param headerType the type of the response header - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param callback the user callback to call when operation terminates. * @return the task describing the asynchronous polling. */ - public AsyncPollingTask getPostOrDeleteResultWithHeadersAsync(Response response, Type resourceType, final Class headerType, final ServiceCall serviceCall, final ServiceCallback callback) { - return this.getPostOrDeleteResultAsync(response, resourceType, serviceCall, new ServiceCallback() { - @Override - public void failure(Throwable t) { - if (callback != null) { - callback.failure(t); - } - serviceCall.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { - ServiceResponseWithHeaders clientResponse = new ServiceResponseWithHeaders<>( - result.getBody(), - restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(result.getResponse().headers()), headerType), - result.getResponse() - ); - if (callback != null) { - callback.success(clientResponse); + public Observable> getPostOrDeleteResultWithHeadersAsync(Observable> observable, Type resourceType, final Class headerType) { + Observable> bodyResponse = getPostOrDeleteResultAsync(observable, resourceType); + return bodyResponse + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(ServiceResponse serviceResponse) { + try { + return Observable + .just(new ServiceResponseWithHeaders<>(serviceResponse.getBody(), + restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(serviceResponse.getResponse().headers()), headerType), + serviceResponse.getResponse())); + } catch (IOException e) { + return Observable.error(e); } - serviceCall.success(clientResponse); - } catch (IOException e) { - failure(e); } - } - }); + }); } /** @@ -445,52 +364,26 @@ public void success(ServiceResponse result) { * * @param pollingState the polling state for the current operation. * @param the return type of the caller. - * @throws CloudException REST exception - * @throws IOException thrown by deserialization */ - private void updateStateFromLocationHeaderOnPut(PollingState pollingState) throws CloudException, IOException { - Response response = poll(pollingState.getLocationHeaderLink()); - int statusCode = response.code(); - if (statusCode == 202) { - pollingState.setResponse(response); - pollingState.setStatus(AzureAsyncOperation.IN_PROGRESS_STATUS); - } else if (statusCode == 200 || statusCode == 201) { - pollingState.updateFromResponseOnPutPatch(response); - } - } - - /** - * Polls from the location header and updates the polling state with the - * polling response for a PUT operation. - * - * @param pollingState the polling state for the current operation. - * @param callback the user callback to call when operation terminates. - * @param the return type of the caller. - * @return the task describing the asynchronous polling. - */ - private Call updateStateFromLocationHeaderOnPutAsync(final PollingState pollingState, final ServiceCallback callback) { - return pollAsync(pollingState.getLocationHeaderLink(), new ServiceCallback() { - @Override - public void failure(Throwable t) { - callback.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { - int statusCode = result.getResponse().code(); + private Observable> updateStateFromLocationHeaderOnPutAsync(final PollingState pollingState) { + return pollAsync(pollingState.getLocationHeaderLink()) + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + int statusCode = response.code(); if (statusCode == 202) { - pollingState.setResponse(result.getResponse()); + pollingState.setResponse(response); pollingState.setStatus(AzureAsyncOperation.IN_PROGRESS_STATUS); } else if (statusCode == 200 || statusCode == 201) { - pollingState.updateFromResponseOnPutPatch(result.getResponse()); + try { + pollingState.updateFromResponseOnPutPatch(response); + } catch (CloudException | IOException e) { + return Observable.error(e); + } } - callback.success(new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse())); - } catch (Throwable t) { - failure(t); + return Observable.just(pollingState); } - } - }); + }); } /** @@ -499,67 +392,26 @@ public void success(ServiceResponse result) { * * @param pollingState the polling state for the current operation. * @param the return type of the caller. - * @throws CloudException service exception - * @throws IOException thrown by deserialization */ - private void updateStateFromLocationHeaderOnPostOrDelete(PollingState pollingState) throws CloudException, IOException { - Response response = poll(pollingState.getLocationHeaderLink()); - int statusCode = response.code(); - if (statusCode == 202) { - pollingState.setResponse(response); - pollingState.setStatus(AzureAsyncOperation.IN_PROGRESS_STATUS); - } else if (statusCode == 200 || statusCode == 201 || statusCode == 204) { - pollingState.updateFromResponseOnDeletePost(response); - } - } - - /** - * Polls from the location header and updates the polling state with the - * polling response for a POST or DELETE operation. - * - * @param pollingState the polling state for the current operation. - * @param callback the user callback to call when operation terminates. - * @param the return type of the caller. - * @return the task describing the asynchronous polling. - */ - private Call updateStateFromLocationHeaderOnPostOrDeleteAsync(final PollingState pollingState, final ServiceCallback callback) { - return pollAsync(pollingState.getLocationHeaderLink(), new ServiceCallback() { - @Override - public void failure(Throwable t) { - callback.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { - int statusCode = result.getResponse().code(); + private Observable> updateStateFromLocationHeaderOnPostOrDeleteAsync(final PollingState pollingState) { + return pollAsync(pollingState.getLocationHeaderLink()) + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + int statusCode = response.code(); if (statusCode == 202) { - pollingState.setResponse(result.getResponse()); + pollingState.setResponse(response); pollingState.setStatus(AzureAsyncOperation.IN_PROGRESS_STATUS); } else if (statusCode == 200 || statusCode == 201 || statusCode == 204) { - pollingState.updateFromResponseOnDeletePost(result.getResponse()); + try { + pollingState.updateFromResponseOnDeletePost(response); + } catch (IOException e) { + return Observable.error(e); + } } - callback.success(new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse())); - } catch (Throwable t) { - failure(t); + return Observable.just(pollingState); } - } - }); - } - - /** - * Polls from the provided URL and updates the polling state with the - * polling response. - * - * @param pollingState the polling state for the current operation. - * @param url the url to poll from - * @param the return type of the caller. - * @throws CloudException service exception - * @throws IOException thrown by deserialization - */ - private void updateStateFromGetResourceOperation(PollingState pollingState, String url) throws CloudException, IOException { - Response response = poll(url); - pollingState.updateFromResponseOnPutPatch(response); + }); } /** @@ -568,39 +420,21 @@ private void updateStateFromGetResourceOperation(PollingState pollingStat * * @param pollingState the polling state for the current operation. * @param url the url to poll from - * @param serviceCall the future based service call - * @param callback the user callback to call when operation terminates. * @param the return type of the caller. - * @return the task describing the asynchronous polling. */ - private Call updateStateFromGetResourceOperationAsync(final PollingState pollingState, String url, final ServiceCall serviceCall, final ServiceCallback callback) { - return pollAsync(url, new ServiceCallback() { - @Override - public void failure(Throwable t) { - if (callback != null) { - callback.failure(t); - } - if (serviceCall != null) { - serviceCall.failure(t); - } - } - - @Override - public void success(ServiceResponse result) { - try { - pollingState.updateFromResponseOnPutPatch(result.getResponse()); - ServiceResponse clientResponse = new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); - if (callback != null) { - callback.success(clientResponse); - } - if (serviceCall != null) { - serviceCall.success(clientResponse); + private Observable> updateStateFromGetResourceOperationAsync(final PollingState pollingState, String url) { + return pollAsync(url) + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + try { + pollingState.updateFromResponseOnPutPatch(response); + return Observable.just(pollingState); + } catch (CloudException | IOException e) { + return Observable.error(e); } - } catch (Throwable t) { - failure(t); } - } - }); + }); } /** @@ -609,76 +443,34 @@ public void success(ServiceResponse result) { * * @param pollingState the polling state for the current operation. * @param the return type of the caller. - * @throws CloudException service exception - * @throws IOException thrown by deserialization */ - private void updateStateFromAzureAsyncOperationHeader(PollingState pollingState) throws CloudException, IOException { - Response response = poll(pollingState.getAzureAsyncOperationHeaderLink()); - - AzureAsyncOperation body = null; - if (response.body() != null) { - body = restClient().mapperAdapter().deserialize(response.body().string(), AzureAsyncOperation.class); - response.body().close(); - } - - if (body == null || body.getStatus() == null) { - CloudException exception = new CloudException("no body"); - exception.setResponse(response); - if (response.errorBody() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.errorBody().string(), CloudError.class)); - response.errorBody().close(); - } - throw exception; - } - - pollingState.setStatus(body.getStatus()); - pollingState.setResponse(response); - pollingState.setResource(null); - } - - /** - * Polls from the 'Azure-AsyncOperation' header and updates the polling - * state with the polling response. - * - * @param pollingState the polling state for the current operation. - * @param callback the user callback to call when operation terminates. - * @param the return type of the caller. - * @return the task describing the asynchronous polling. - */ - private Call updateStateFromAzureAsyncOperationHeaderAsync(final PollingState pollingState, final ServiceCallback callback) { - return pollAsync(pollingState.getAzureAsyncOperationHeaderLink(), new ServiceCallback() { - @Override - public void failure(Throwable t) { - callback.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { + private Observable> updateStateFromAzureAsyncOperationHeaderAsync(final PollingState pollingState) { + return pollAsync(pollingState.getAzureAsyncOperationHeaderLink()) + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { AzureAsyncOperation body = null; - if (result.getBody() != null) { - body = restClient().mapperAdapter().deserialize(result.getBody().string(), AzureAsyncOperation.class); - result.getBody().close(); + if (response.body() != null) { + try { + body = restClient().mapperAdapter().deserialize(response.body().string(), AzureAsyncOperation.class); + response.body().close(); + } catch (IOException e) { + body = null; + } } + if (body == null || body.getStatus() == null) { - CloudException exception = new CloudException("no body"); - exception.setResponse(result.getResponse()); - if (result.getResponse().errorBody() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(result.getResponse().errorBody().string(), CloudError.class)); - result.getResponse().errorBody().close(); - } - failure(exception); - } else { - pollingState.setStatus(body.getStatus()); - pollingState.setResponse(result.getResponse()); - pollingState.setResource(null); - callback.success(new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse())); + CloudException exception = new CloudException("polling response does not contain a valid body: " + body); + exception.setResponse(response); + return Observable.error(exception); } - } catch (IOException ex) { - failure(ex); + + pollingState.setStatus(body.getStatus()); + pollingState.setResponse(response); + pollingState.setResource(null); + return Observable.just(pollingState); } - } - }); + }); } /** @@ -686,260 +478,93 @@ public void success(ServiceResponse result) { * * @param url the URL to poll from. * @return the raw response. - * @throws CloudException REST exception - * @throws IOException thrown by deserialization - */ - private Response poll(String url) throws CloudException, IOException { - URL endpoint; - endpoint = new URL(url); - int port = endpoint.getPort(); - if (port == -1) { - port = endpoint.getDefaultPort(); - } - AsyncService service = restClient().retrofit().create(AsyncService.class); - Response response = service.get(endpoint.getFile(), serviceClientUserAgent).execute(); - int statusCode = response.code(); - if (statusCode != 200 && statusCode != 201 && statusCode != 202 && statusCode != 204) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - if (response.body() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.body().string(), CloudError.class)); - response.body().close(); - } else if (response.errorBody() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.errorBody().string(), CloudError.class)); - response.errorBody().close(); - } - throw exception; - } - return response; - } - - /** - * Polls asynchronously from the URL provided. - * - * @param url the URL to poll from. - * @param callback the user callback to call when operation terminates. - * @return the {@link Call} object from Retrofit. */ - private Call pollAsync(String url, final ServiceCallback callback) { + private Observable> pollAsync(String url) { URL endpoint; try { endpoint = new URL(url); } catch (MalformedURLException e) { - callback.failure(e); - return null; + return Observable.error(e); } int port = endpoint.getPort(); if (port == -1) { port = endpoint.getDefaultPort(); } AsyncService service = restClient().retrofit().create(AsyncService.class); - Call call = service.get(endpoint.getFile(), serviceClientUserAgent); - call.enqueue(new ServiceResponseCallback(null, callback) { - @Override - public void onResponse(Call call, Response response) { - try { - int statusCode = response.code(); - if (statusCode != 200 && statusCode != 201 && statusCode != 202 && statusCode != 204) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - if (response.body() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.body().string(), CloudError.class)); - response.body().close(); - } else if (response.errorBody() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.errorBody().string(), CloudError.class)); - response.errorBody().close(); - } - callback.failure(exception); - return; - } - callback.success(new ServiceResponse<>(response.body(), response)); - } catch (IOException ex) { - callback.failure(ex); - } - } - }); - return call; - } - - /** - * Gets the interval time between two long running operation polls. - * - * @return the time in milliseconds. - */ - public Integer getLongRunningOperationRetryTimeout() { - return longRunningOperationRetryTimeout; - } - - /** - * Sets the interval time between two long running operation polls. - * - * @param longRunningOperationRetryTimeout the time in milliseconds. - */ - public void withLongRunningOperationRetryTimeout(Integer longRunningOperationRetryTimeout) { - this.longRunningOperationRetryTimeout = longRunningOperationRetryTimeout; - } - - /** - * The Retrofit service used for polling. - */ - private interface AsyncService { - @GET - Call get(@Url String url, @Header("User-Agent") String userAgent); - } - - /** - * The task runner that describes the state of an asynchronous long running - * operation. - * - * @param the return type of the caller. - */ - abstract class AsyncPollingTask implements Runnable { - /** The {@link Call} object from Retrofit. */ - protected ServiceCall serviceCall; - /** The polling state for the current operation. */ - protected PollingState pollingState; - /** The callback used for asynchronous polling. */ - protected ServiceCallback pollingCallback; - /** The client callback to call when polling finishes. */ - protected ServiceCallback clientCallback; - } - - /** - * The task runner that handles PUT or PATCH operations. - * - * @param the return type of the caller. - */ - class PutPatchPollingTask extends AsyncPollingTask { - /** The URL to poll from. */ - private String url; - - /** - * Creates an instance of Polling task for PUT or PATCH operations. - * - * @param pollingState the current polling state. - * @param url the URL to poll from. - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param clientCallback the client callback to call when a terminal status is hit. - */ - PutPatchPollingTask(final PollingState pollingState, final String url, final ServiceCall serviceCall, final ServiceCallback clientCallback) { - this.serviceCall = serviceCall; - this.pollingState = pollingState; - this.url = url; - this.clientCallback = clientCallback; - this.pollingCallback = new ServiceCallback() { + return service.get(endpoint.getFile(), serviceClientUserAgent) + .flatMap(new Func1, Observable>>() { @Override - public void failure(Throwable t) { - if (clientCallback != null) { - clientCallback.failure(t); + public Observable> call(Response response) { + CloudException exception = createExceptionFromResponse(response, 200, 201, 202, 204); + if (exception != null) { + return Observable.error(exception); + } else { + return Observable.just(response); } - serviceCall.failure(t); } + }); + } - @Override - public void success(ServiceResponse result) { - PutPatchPollingTask task = new PutPatchPollingTask<>(pollingState, url, serviceCall, clientCallback); - executor.schedule(task, pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); - } - }; + private CloudException createExceptionFromResponse(Response response, Integer... allowedStatusCodes) { + int statusCode = response.code(); + ResponseBody responseBody; + if (response.isSuccessful()) { + responseBody = response.body(); + } else { + responseBody = response.errorBody(); } - - @Override - public void run() { - // Check provisioning state - if (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { - if (pollingState.getAzureAsyncOperationHeaderLink() != null - && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { - this.serviceCall.newCall(updateStateFromAzureAsyncOperationHeaderAsync(pollingState, pollingCallback)); - } else if (pollingState.getLocationHeaderLink() != null - && !pollingState.getLocationHeaderLink().isEmpty()) { - this.serviceCall.newCall(updateStateFromLocationHeaderOnPutAsync(pollingState, pollingCallback)); - } else { - this.serviceCall.newCall(updateStateFromGetResourceOperationAsync(pollingState, url, null, pollingCallback)); - } - } else { - if (AzureAsyncOperation.SUCCESS_STATUS.equals(pollingState.getStatus()) && pollingState.getResource() == null) { - this.serviceCall.newCall(updateStateFromGetResourceOperationAsync(pollingState, url, serviceCall, clientCallback)); - } else if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { - ServiceException t = new ServiceException("Async operation failed"); - if (clientCallback != null) { - clientCallback.failure(t); - } - serviceCall.failure(t); + if (!Arrays.asList(allowedStatusCodes).contains(statusCode)) { + CloudException exception; + try { + CloudError errorBody = restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class); + if (errorBody != null) { + exception = new CloudException(errorBody.getMessage()); } else { - ServiceResponse clientResponse = new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); - if (clientCallback != null) { - clientCallback.success(clientResponse); - } - serviceCall.success(clientResponse); + exception = new CloudException("Unknown error with status code " + statusCode); } + exception.setBody(errorBody); + exception.setResponse(response); + return exception; + } catch (Exception e) { + /* ignore serialization errors on top of service errors */ + return new CloudException("Unknown error with status code " + statusCode, e); } } + return null; } - /** - * The task runner that handles POST or DELETE operations. - * - * @param the return type of the caller. - */ - class PostDeletePollingTask extends AsyncPollingTask { - /** - * Creates an instance of Polling task for POST or DELETE operations. - * - * @param pollingState the current polling state. - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param clientCallback the client callback to call when a terminal status is hit. - */ - PostDeletePollingTask(final PollingState pollingState, final ServiceCall serviceCall, final ServiceCallback clientCallback) { - this.serviceCall = serviceCall; - this.pollingState = pollingState; - this.clientCallback = clientCallback; - this.pollingCallback = new ServiceCallback() { - @Override - public void failure(Throwable t) { - if (clientCallback != null) { - clientCallback.failure(t); - } - serviceCall.failure(t); - } - - @Override - public void success(ServiceResponse result) { - PostDeletePollingTask task = new PostDeletePollingTask<>(pollingState, serviceCall, clientCallback); - executor.schedule(task, pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); - } - }; + private Observable> putOrPatchPollingDispatcher(PollingState pollingState, String url) { + if (pollingState.getAzureAsyncOperationHeaderLink() != null + && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { + return updateStateFromAzureAsyncOperationHeaderAsync(pollingState); + } else if (pollingState.getLocationHeaderLink() != null + && !pollingState.getLocationHeaderLink().isEmpty()) { + return updateStateFromLocationHeaderOnPutAsync(pollingState); + } else { + return updateStateFromGetResourceOperationAsync(pollingState, url); } + } - @Override - public void run() { - if (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { - if (pollingState.getAzureAsyncOperationHeaderLink() != null - && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { - updateStateFromAzureAsyncOperationHeaderAsync(pollingState, pollingCallback); - } else if (pollingState.getLocationHeaderLink() != null - && !pollingState.getLocationHeaderLink().isEmpty()) { - updateStateFromLocationHeaderOnPostOrDeleteAsync(pollingState, pollingCallback); - } else { - ServiceException serviceException = new ServiceException("No async header in response"); - pollingCallback.failure(serviceException); - } - } else { - // Check if operation failed - if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { - ServiceException serviceException = new ServiceException("Async operation failed"); - if (clientCallback != null) { - clientCallback.failure(serviceException); - } - serviceCall.failure(serviceException); - } else { - ServiceResponse serviceResponse = new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); - if (clientCallback != null) { - clientCallback.success(serviceResponse); - } - serviceCall.success(serviceResponse); - } - } + private Observable> postOrDeletePollingDispatcher(PollingState pollingState) { + if (pollingState.getAzureAsyncOperationHeaderLink() != null + && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { + return updateStateFromAzureAsyncOperationHeaderAsync(pollingState); + } else if (pollingState.getLocationHeaderLink() != null + && !pollingState.getLocationHeaderLink().isEmpty()) { + return updateStateFromLocationHeaderOnPostOrDeleteAsync(pollingState); + } else { + CloudException exception = new CloudException("Response does not contain an Azure-AsyncOperation or Location header."); + exception.setBody(pollingState.getError()); + exception.setResponse(pollingState.getResponse()); + return Observable.error(exception); } } -} + + /** + * The Retrofit service used for polling. + */ + private interface AsyncService { + @GET + Observable> get(@Url String url, @Header("User-Agent") String userAgent); + } +} \ No newline at end of file diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/AzureServiceCall.java b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureServiceCall.java new file mode 100644 index 0000000000000..d60604815046e --- /dev/null +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureServiceCall.java @@ -0,0 +1,95 @@ +/** + * + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + * + */ + +package com.microsoft.azure; + +import com.microsoft.rest.ServiceCall; +import com.microsoft.rest.ServiceResponse; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Func1; + +/** + * An instance of this class provides access to the underlying REST call invocation. + * This class wraps around the Retrofit Call object and allows updates to it in the + * progress of a long running operation or a paging operation. + * + * @param the type of the returning object + */ +public final class AzureServiceCall extends ServiceCall { + private AzureServiceCall() { + } + + /** + * Creates a ServiceCall from a paging operation. + * + * @param first the observable to the first page + * @param next the observable to poll subsequent pages + * @param callback the client-side callback + * @param the Page type + * @param the element type + * @return the future based ServiceCall + */ + public static , V> ServiceCall create(Observable> first, final Func1>> next, final ListOperationCallback callback) { + final AzureServiceCall serviceCall = new AzureServiceCall<>(); + final PagingSubscriber subscriber = new PagingSubscriber<>(serviceCall, next, callback); + serviceCall.setSubscription(first + .single() + .subscribe(subscriber)); + return serviceCall; + } + + /** + * The subscriber that handles user callback and automatically subscribes to the next page. + * + * @param the Page type + * @param the element type + */ + private static class PagingSubscriber, V> extends Subscriber> { + private AzureServiceCall serviceCall; + private Func1>> next; + private ListOperationCallback callback; + private ServiceResponse lastResponse; + + PagingSubscriber(final AzureServiceCall serviceCall, final Func1>> next, final ListOperationCallback callback) { + this.serviceCall = serviceCall; + this.next = next; + this.callback = callback; + } + + @Override + public void onCompleted() { + // do nothing + } + + @Override + public void onError(Throwable e) { + serviceCall.setException(e); + if (callback != null) { + callback.failure(e); + } + } + + @Override + public void onNext(ServiceResponse serviceResponse) { + lastResponse = serviceResponse; + ListOperationCallback.PagingBehavior behavior = ListOperationCallback.PagingBehavior.CONTINUE; + if (callback != null) { + behavior = callback.progress(serviceResponse.getBody().getItems()); + if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) { + callback.success(); + } + } + if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) { + serviceCall.set(lastResponse); + } else { + serviceCall.setSubscription(next.call(serviceResponse.getBody().getNextPageLink()).single().subscribe(this)); + } + } + } +} diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/ListOperationCallback.java b/azure-client-runtime/src/main/java/com/microsoft/azure/ListOperationCallback.java index 6e0b2ea92a96b..63dc60f9f7c6b 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/ListOperationCallback.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/ListOperationCallback.java @@ -8,6 +8,7 @@ package com.microsoft.azure; import com.microsoft.rest.ServiceCallback; +import com.microsoft.rest.ServiceResponse; import java.util.List; @@ -36,16 +37,14 @@ public ListOperationCallback() { /** * Override this method to handle progressive results. - * The user is responsible for returning a {@link PagingBahavior} Enum to indicate + * The user is responsible for returning a {@link PagingBehavior} Enum to indicate * whether the client should continue loading or stop. * * @param partial the list of resources from the current request. * @return CONTINUE if you want to go on loading, STOP otherwise. * */ - public PagingBahavior progress(List partial) { - return PagingBahavior.CONTINUE; - } + public abstract PagingBehavior progress(List partial); /** * Get the list result that stores the accumulated resources loaded from server. @@ -71,6 +70,16 @@ public void load(List result) { } } + @Override + public void success(ServiceResponse> result) { + success(); + } + + /** + * Override this method to handle successful REST call results. + */ + public abstract void success(); + /** * Get the number of loaded pages. * @@ -83,7 +92,7 @@ public int pageCount() { /** * An enum to indicate whether the client should continue loading or stop. */ - public enum PagingBahavior { + public enum PagingBehavior { /** * Indicates that the client should continue loading. */ diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/PagedList.java b/azure-client-runtime/src/main/java/com/microsoft/azure/PagedList.java index a88a044f751e7..4f966acbcf3d0 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/PagedList.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/PagedList.java @@ -9,8 +9,6 @@ import com.microsoft.rest.RestException; -import javax.xml.bind.DataBindingException; -import javax.xml.ws.WebServiceException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -19,6 +17,8 @@ import java.util.ListIterator; import java.util.NoSuchElementException; +import javax.xml.bind.DataBindingException; + /** * Defines a list response from a paging operation. The pages are * lazy initialized when an instance of this class is iterated. @@ -81,8 +81,6 @@ public void loadNextPage() { this.nextPageLink = nextPage.getNextPageLink(); this.items.addAll(nextPage.getItems()); this.currentPage = nextPage; - } catch (RestException e) { - throw new WebServiceException(e.toString(), e); } catch (IOException e) { throw new DataBindingException(e.getMessage(), e); } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/PollingState.java b/azure-client-runtime/src/main/java/com/microsoft/azure/PollingState.java index 50255ba600899..0a952b6d2334f 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/PollingState.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/PollingState.java @@ -100,7 +100,7 @@ public void updateFromResponseOnPutPatch(Response response) throws } if (responseContent == null || responseContent.isEmpty()) { - CloudException exception = new CloudException("no body"); + CloudException exception = new CloudException("polling response does not contain a valid body"); exception.setResponse(response); throw exception; } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/RestClient.java b/azure-client-runtime/src/main/java/com/microsoft/azure/RestClient.java index 0ace881f0ee51..ad9ecf9e92d92 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/RestClient.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/RestClient.java @@ -20,6 +20,7 @@ import okhttp3.OkHttpClient; import okhttp3.logging.HttpLoggingInterceptor; import retrofit2.Retrofit; +import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory; import java.lang.reflect.Field; import java.net.CookieManager; @@ -338,6 +339,7 @@ public RestClient build() { .baseUrl(baseUrl) .client(httpClient) .addConverterFactory(mapperAdapter.getConverterFactory()) + .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(), credentials, customHeadersInterceptor, diff --git a/client-runtime/build.gradle b/client-runtime/build.gradle index e530072694672..b828a33f187bc 100644 --- a/client-runtime/build.gradle +++ b/client-runtime/build.gradle @@ -28,6 +28,9 @@ dependencies { compile 'com.squareup.okhttp3:logging-interceptor:3.3.1' compile 'com.squareup.okhttp3:okhttp-urlconnection:3.3.1' compile 'com.squareup.retrofit2:converter-jackson:2.0.2' + compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2' + compile 'io.reactivex:rxjava:1.1.8' + compile 'io.reactivex:rxjava-computation-expressions:0.21.0' compile 'com.fasterxml.jackson.datatype:jackson-datatype-joda:2.7.2' compile 'org.apache.commons:commons-lang3:3.4' testCompile 'junit:junit:4.12' diff --git a/client-runtime/src/main/java/com/microsoft/rest/RestException.java b/client-runtime/src/main/java/com/microsoft/rest/RestException.java index cf5dff556aea0..a11ac3c516d2c 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/RestException.java +++ b/client-runtime/src/main/java/com/microsoft/rest/RestException.java @@ -10,7 +10,7 @@ /** * Exception thrown for an invalid response with custom error information. */ -public abstract class RestException extends Exception { +public abstract class RestException extends RuntimeException { /** * Initializes a new instance of the AutoRestException class. */ diff --git a/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java b/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java index 1baf40cbd9718..f04232e75cbc8 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java +++ b/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java @@ -8,10 +8,10 @@ package com.microsoft.rest; import com.google.common.util.concurrent.AbstractFuture; - -import retrofit2.Call; import rx.Observable; import rx.Scheduler; +import rx.Subscription; +import rx.functions.Action1; import rx.functions.Func1; /** @@ -25,54 +25,110 @@ public class ServiceCall extends AbstractFuture> { /** * The Retrofit method invocation. */ - private Call call; + private Subscription subscription; + + protected ServiceCall() { + } /** - * Creates an instance of ServiceCall. + * Creates a ServiceCall from an observable object. * - * @param call the Retrofit call to wrap around. + * @param observable the observable to create from + * @param the type of the response + * @return the created ServiceCall */ - public ServiceCall(Call call) { - this.call = call; + public static ServiceCall create(final Observable> observable) { + final ServiceCall serviceCall = new ServiceCall<>(); + serviceCall.subscription = observable + .last() + .subscribe(new Action1>() { + @Override + public void call(ServiceResponse t) { + serviceCall.set(t); + } + }, new Action1() { + @Override + public void call(Throwable throwable) { + serviceCall.setException(throwable); + } + }); + return serviceCall; } /** - * Updates the current Retrofit call object. + * Creates a ServiceCall from an observable object and a callback. * - * @param call the new call object. + * @param observable the observable to create from + * @param callback the callback to call when events happen + * @param the type of the response + * @return the created ServiceCall */ - public void newCall(Call call) { - this.call = call; + public static ServiceCall create(final Observable> observable, final ServiceCallback callback) { + final ServiceCall serviceCall = new ServiceCall<>(); + serviceCall.subscription = observable + .last() + .subscribe(new Action1>() { + @Override + public void call(ServiceResponse t) { + if (callback != null) { + callback.success(t); + } + serviceCall.set(t); + } + }, new Action1() { + @Override + public void call(Throwable throwable) { + if (callback != null) { + callback.failure(throwable); + } + serviceCall.setException(throwable); + } + }); + return serviceCall; } /** - * Gets the current Retrofit call object. + * Creates a ServiceCall from an observable and a callback for a header response. * - * @return the current call object. + * @param observable the observable of a REST call that returns JSON in a header + * @param callback the callback to call when events happen + * @param the type of the response body + * @param the type of the response header + * @return the created ServiceCall */ - public Call getCall() { - return call; + public static ServiceCall createWithHeaders(final Observable> observable, final ServiceCallback callback) { + final ServiceCall serviceCall = new ServiceCall<>(); + serviceCall.subscription = observable + .last() + .subscribe(new Action1>() { + @Override + public void call(ServiceResponse t) { + if (callback != null) { + callback.success(t); + } + serviceCall.set(t); + } + }, new Action1() { + @Override + public void call(Throwable throwable) { + if (callback != null) { + callback.failure(throwable); + } + serviceCall.setException(throwable); + } + }); + return serviceCall; } /** - * Cancel the Retrofit call if possible. Parameter - * 'mayInterruptIfRunning is ignored. - * - * @param mayInterruptIfRunning ignored + * @return the current Rx subscription associated with the ServiceCall. */ - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (isCancelled()) { - return false; - } else { - call.cancel(); - return true; - } + public Subscription getSubscription() { + return subscription; } - @Override - public boolean isCancelled() { - return call.isCanceled(); + protected void setSubscription(Subscription subscription) { + this.subscription = subscription; } /** @@ -111,14 +167,14 @@ public boolean success(ServiceResponse result) { return set(result); } - /** - * Invoke this method to report a failure, allowing - * {@link AbstractFuture#get()} to throw the exception. - * - * @param t the exception thrown. - * @return true if successfully reported; false otherwise. - */ - public boolean failure(Throwable t) { - return setException(t); + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + subscription.unsubscribe(); + return super.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return subscription.isUnsubscribed(); } } diff --git a/client-runtime/src/main/java/com/microsoft/rest/ServiceClient.java b/client-runtime/src/main/java/com/microsoft/rest/ServiceClient.java index 5d6f0f850336c..dd279bc230120 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/ServiceClient.java +++ b/client-runtime/src/main/java/com/microsoft/rest/ServiceClient.java @@ -16,6 +16,7 @@ import okhttp3.JavaNetCookieJar; import okhttp3.OkHttpClient; import retrofit2.Retrofit; +import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory; /** * ServiceClient is the abstraction for accessing REST operations and their payload data types. @@ -65,6 +66,7 @@ protected ServiceClient(String baseUrl, OkHttpClient.Builder clientBuilder, Retr .baseUrl(baseUrl) .client(httpClient) .addConverterFactory(mapperAdapter.getConverterFactory()) + .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); } diff --git a/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseCallback.java b/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseCallback.java deleted file mode 100644 index a808869960294..0000000000000 --- a/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseCallback.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. - * - */ - -package com.microsoft.rest; - -import okhttp3.ResponseBody; -import retrofit2.Call; -import retrofit2.Callback; - -/** - * Inner callback used to merge both successful and failed responses into one - * callback for customized response handling in a response handling delegate. - * - * @param the response body type - */ -public abstract class ServiceResponseCallback implements Callback { - /** - * The client service call object. - */ - private ServiceCall serviceCall; - - /** - * The client callback. - */ - private ServiceCallback serviceCallback; - - /** - * Creates an instance of ServiceResponseCallback. - * - * @param serviceCall the client service call to call on a terminal state. - * @param serviceCallback the client callback to call on a terminal state. - */ - public ServiceResponseCallback(ServiceCall serviceCall, ServiceCallback serviceCallback) { - this.serviceCall = serviceCall; - this.serviceCallback = serviceCallback; - } - - @Override - public void onFailure(Call call, Throwable t) { - if (serviceCallback != null) { - serviceCallback.failure(t); - } - if (serviceCall != null) { - serviceCall.failure(t); - } - } -} diff --git a/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseEmptyCallback.java b/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseEmptyCallback.java deleted file mode 100644 index 2a82667795b27..0000000000000 --- a/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseEmptyCallback.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. - * - */ - -package com.microsoft.rest; - -import retrofit2.Call; -import retrofit2.Callback; - -/** - * Inner callback used to merge both successful and failed responses into one - * callback for customized response handling in a response handling delegate. - * - * @param the response body type - */ -public abstract class ServiceResponseEmptyCallback implements Callback { - /** - * The client service call object. - */ - private ServiceCall serviceCall; - - /** - * The client callback. - */ - private ServiceCallback serviceCallback; - - /** - * Creates an instance of ServiceResponseCallback. - * - * @param serviceCall the client service call to call on a terminal state. - * @param serviceCallback the client callback to call on a terminal state. - */ - public ServiceResponseEmptyCallback(ServiceCall serviceCall, ServiceCallback serviceCallback) { - this.serviceCall = serviceCall; - this.serviceCallback = serviceCallback; - } - - @Override - public void onFailure(Call call, Throwable t) { - if (serviceCallback != null) { - serviceCallback.failure(t); - } - if (serviceCall != null) { - serviceCall.failure(t); - } - } -}