Skip to content

Commit

Permalink
[RESTEASY-2196] Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
asoldano committed Apr 18, 2019
1 parent 15ce62f commit 84d00ac
Showing 1 changed file with 102 additions and 193 deletions.
Expand Up @@ -8,13 +8,10 @@
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function; import java.util.function.Function;


import javax.ws.rs.BadRequestException; import javax.ws.rs.BadRequestException;
Expand Down Expand Up @@ -49,6 +46,7 @@
import org.jboss.resteasy.client.jaxrs.ClientHttpEngine; import org.jboss.resteasy.client.jaxrs.ClientHttpEngine;
import org.jboss.resteasy.client.jaxrs.ResteasyClient; import org.jboss.resteasy.client.jaxrs.ResteasyClient;
import org.jboss.resteasy.client.jaxrs.engines.AsyncClientHttpEngine; import org.jboss.resteasy.client.jaxrs.engines.AsyncClientHttpEngine;
import org.jboss.resteasy.client.jaxrs.engines.AsyncClientHttpEngine.ResultExtractor;
import org.jboss.resteasy.client.jaxrs.internal.proxy.ClientInvoker; import org.jboss.resteasy.client.jaxrs.internal.proxy.ClientInvoker;
import org.jboss.resteasy.core.ResteasyContext; import org.jboss.resteasy.core.ResteasyContext;
import org.jboss.resteasy.core.interception.jaxrs.AbstractWriterInterceptorContext; import org.jboss.resteasy.core.interception.jaxrs.AbstractWriterInterceptorContext;
Expand Down Expand Up @@ -527,46 +525,19 @@ public <T> T invoke(GenericType<T> responseType)
@Override @Override
public Future<Response> submit() public Future<Response> submit()
{ {
return doSubmit(false, null, new AsyncClientHttpEngine.ResultExtractor<Response>() return doSubmit(false, null, result -> result);
{
@Override
public Response extractResult(ClientResponse response)
{
return response;
}
});
} }


@Override @Override
public <T> Future<T> submit(final Class<T> responseType) public <T> Future<T> submit(final Class<T> responseType)
{ {
return doSubmit(false, null, new AsyncClientHttpEngine.ResultExtractor<T>() return doSubmit(false, null, getResponseTypeExtractor(responseType));
{
@SuppressWarnings("unchecked")
@Override
public T extractResult(ClientResponse response)
{
if (Response.class.equals(responseType))
return (T) response;
return ClientInvocation.extractResult(new GenericType<T>(responseType), response, null);
}
});
} }


@Override @Override
public <T> Future<T> submit(final GenericType<T> responseType) public <T> Future<T> submit(final GenericType<T> responseType)
{ {
return doSubmit(false, null, new AsyncClientHttpEngine.ResultExtractor<T>() return doSubmit(false, null, getGenericTypeExtractor(responseType));
{
@SuppressWarnings("unchecked")
@Override
public T extractResult(ClientResponse response)
{
if (responseType.getRawType().equals(Response.class))
return (T) response;
return ClientInvocation.extractResult(responseType, response, null);
}
});
} }


@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
Expand All @@ -582,78 +553,98 @@ public <T> Future<T> submit(final InvocationCallback<T> callback)
genericType = new GenericType(typeInfo[0]); genericType = new GenericType(typeInfo[0]);
} }


final GenericType<T> responseType = genericType; return doSubmit(true, callback, getGenericTypeExtractor(genericType));
return doSubmit(true, callback, new AsyncClientHttpEngine.ResultExtractor<T>() }

private <T> Future<T> doSubmit(boolean buffered, InvocationCallback<T> callback, ResultExtractor<T> extractor) {
if (client.httpEngine() instanceof AsyncClientHttpEngine)
{ {
@Override return asyncSubmit(getFutureExtractorFunction(buffered, callback), extractor,
public T extractResult(ClientResponse response) getAsyncAbortedFunction(callback), getAsyncExceptionFunction(callback));
{ }
if (responseType.getRawType().equals(Response.class)) else
return (T) response; {
return ClientInvocation.extractResult(responseType, response, null); return executorSubmit(client.asyncInvocationExecutor(), callback, extractor);
} }
}); }

private <T> Function<ResultExtractor<T>, CompletableFuture<T>> getCompletableFutureExtractorFunction(boolean buffered) {
final ClientHttpEngine httpEngine = client.httpEngine();
return (httpEngine instanceof AsyncClientHttpEngine)
? ext -> ((AsyncClientHttpEngine) httpEngine).submit(this, buffered, ext, client.asyncInvocationExecutor()) : null;
} }


private <T> Future<T> doSubmit(final boolean buffered, private <T> Function<ResultExtractor<T>, Future<T>> getFutureExtractorFunction(boolean buffered, InvocationCallback<T> callback) {
final InvocationCallback<T> callback, final ClientHttpEngine httpEngine = client.httpEngine();
final AsyncClientHttpEngine.ResultExtractor<T> extractor) { return (httpEngine instanceof AsyncClientHttpEngine)
final Function<AsyncClientHttpEngine, Future<T>> asyncSubmitFn = ? ext -> ((AsyncClientHttpEngine) httpEngine).submit(this, buffered, callback, ext) : null;
asyncClientHttpEngine -> asyncSubmit( }
ext -> asyncClientHttpEngine.submit(this, buffered, callback, ext),
extractor,
result -> {
callCompletedNoThrow(callback, result);
return new CompletedFuture<>(result, null);
},
ex -> {
callFailedNoThrow(callback, ex);
return new CompletedFuture<T>(null, new ExecutionException(ex));
});


return doSubmit(asyncSubmitFn, executorService -> executorSubmit(executorService, callback, extractor)); private static <T> Function<T, Future<T>> getAsyncAbortedFunction(InvocationCallback<T> callback) {
return result -> {
callCompletedNoThrow(callback, result);
return CompletableFuture.completedFuture(result);
};
} }


public CompletableFuture<Response> submitCF() private static <T> Function<Exception, Future<T>> getAsyncExceptionFunction(InvocationCallback<T> callback) {
{ return ex -> {
return doSubmit(false, response -> response); callFailedNoThrow(callback, ex);
CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(new ExecutionException(ex));
return completableFuture;
};
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> CompletableFuture<T> submitCF(final Class<T> responseType) private static <T> ResultExtractor<T> getGenericTypeExtractor(GenericType<T> responseType) {
{ return response -> {
return doSubmit(false, response -> { if (responseType.getRawType().equals(Response.class))
return (T) response;
return ClientInvocation.extractResult(responseType, response, null);
};
}

@SuppressWarnings("unchecked")
private static <T> ResultExtractor<T> getResponseTypeExtractor(Class<T> responseType) {
return response -> {
if (Response.class.equals(responseType)) if (Response.class.equals(responseType))
return (T) response; return (T) response;
return ClientInvocation.extractResult(new GenericType<T>(responseType), response, null); return ClientInvocation.extractResult(new GenericType<T>(responseType), response, null);
}); };
} }


@SuppressWarnings("unchecked") public CompletableFuture<Response> submitCF()
public <T> CompletableFuture<T> submitCF(final GenericType<T> responseType)
{ {
return doSubmit(false, response -> { return doSubmit(response -> response);
if (responseType.getRawType().equals(Response.class))
return (T) response;
return ClientInvocation.extractResult(responseType, response, null);
});
} }


private <T> CompletableFuture<T> doSubmit(final boolean buffered, public <T> CompletableFuture<T> submitCF(final Class<T> responseType)
final AsyncClientHttpEngine.ResultExtractor<T> extractor) { {
return doSubmit(getResponseTypeExtractor(responseType));
}


final Function<AsyncClientHttpEngine, CompletableFuture<T>> asyncSubmitFn = public <T> CompletableFuture<T> submitCF(final GenericType<T> responseType)
asyncClientHttpEngine -> asyncSubmit( {
ext -> asyncClientHttpEngine.submit(this, buffered, ext, client.asyncInvocationExecutor()), return doSubmit(getGenericTypeExtractor(responseType));
extractor, }
result -> CompletableFuture.completedFuture(result),
ex -> {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(new ExecutionException(ex));
return completableFuture;
});


return doSubmit(asyncSubmitFn, executorService -> executorSubmit(executorService, extractor)); private <T> CompletableFuture<T> doSubmit(ResultExtractor<T> extractor) {
if (client.httpEngine() instanceof AsyncClientHttpEngine)
{
return asyncSubmit(getCompletableFutureExtractorFunction(false),
extractor,
result -> CompletableFuture.completedFuture(result),
ex -> {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(new ExecutionException(ex));
return completableFuture;
});
}
else
{
return executorSubmit(client.asyncInvocationExecutor(), null, extractor);
}
} }


@Override @Override
Expand Down Expand Up @@ -743,26 +734,11 @@ protected ClientResponse filterResponse(ClientRequestContextImpl requestContext,
return response; return response;
} }


private <T> T doSubmit(final Function<AsyncClientHttpEngine, T> asyncSubmitFn, private <Q extends Future<T>, T> Q asyncSubmit(
final Function<ExecutorService, T> executorSubmitFn) final Function<ResultExtractor<T>, Q> asyncHttpEngineSubmitFn,
{ final ResultExtractor<T> extractor,
ClientHttpEngine httpEngine = client.httpEngine(); final Function<T, Q> abortedFn,
if (httpEngine instanceof AsyncClientHttpEngine) final Function<Exception, Q> exceptionFn)
{
return asyncSubmitFn.apply((AsyncClientHttpEngine) httpEngine);
}
else
{
// never buffered, but always blocks in a thread
return executorSubmitFn.apply(client.asyncInvocationExecutor());
}
}

private <T extends Future<U>, U> T asyncSubmit(
final Function<AsyncClientHttpEngine.ResultExtractor<U>, T> asyncHttpEngineSubmitFn,
final AsyncClientHttpEngine.ResultExtractor<U> extractor,
final Function<U, T> abortedFn,
final Function<Exception, T> exceptionFn)
{ {
final ClientRequestContextImpl requestContext = new ClientRequestContextImpl(this); final ClientRequestContextImpl requestContext = new ClientRequestContextImpl(this);
Providers current = pushProvidersContext(); Providers current = pushProvidersContext();
Expand All @@ -773,7 +749,7 @@ private <T extends Future<U>, U> T asyncSubmit(
{ {
// spec requires that aborted response go through filter/interceptor chains. // spec requires that aborted response go through filter/interceptor chains.
aborted = filterResponse(requestContext, aborted); aborted = filterResponse(requestContext, aborted);
U result = extractor.extractResult(aborted); T result = extractor.extractResult(aborted);
return abortedFn.apply(result); return abortedFn.apply(result);
} }
} }
Expand All @@ -799,54 +775,33 @@ private <T extends Future<U>, U> T asyncSubmit(
}); });
} }


private <T> Future<T> executorSubmit(ExecutorService executor, final InvocationCallback<T> callback, private <T> CompletableFuture<T> executorSubmit(ExecutorService executor, final InvocationCallback<T> callback,
final AsyncClientHttpEngine.ResultExtractor<T> extractor) final ResultExtractor<T> extractor)
{
return executor.submit(new Callable<T>()
{
@Override
public T call() throws Exception
{
// ensure the future and the callback see the same result
T result = null;
ClientResponse response = null;
try
{
response = invoke(); // does filtering too
result = extractor.extractResult(response);
callCompletedNoThrow(callback, result);
return result;
}
catch (Exception e)
{
callFailedNoThrow(callback, e);
throw e;
}
finally
{
if (response != null && callback != null)
response.close();
}
}
});
}

private <T> CompletableFuture<T> executorSubmit(ExecutorService executor,
final AsyncClientHttpEngine.ResultExtractor<T> extractor)
{ {
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
// ensure the future and the callback see the same result // ensure the future and the callback see the same result
ClientResponse response = null; ClientResponse response = null;
try { try
{
response = invoke(); // does filtering too response = invoke(); // does filtering too
return extractor.extractResult(response); T result = extractor.extractResult(response);
} catch (Exception e) { callCompletedNoThrow(callback, result);
return result;
}
catch (Exception e)
{
callFailedNoThrow(callback, e);
throw e; throw e;
} }
},executor); finally
{
if (response != null && callback != null)
response.close();
}
}, executor);
} }


private <T> void callCompletedNoThrow(InvocationCallback<T> callback, T result) private static <T> void callCompletedNoThrow(InvocationCallback<T> callback, T result)
{ {
if (callback != null) if (callback != null)
{ {
Expand All @@ -861,7 +816,7 @@ private <T> void callCompletedNoThrow(InvocationCallback<T> callback, T result)
} }
} }


private <T> void callFailedNoThrow(InvocationCallback<T> callback, Exception exception) private static <T> void callFailedNoThrow(InvocationCallback<T> callback, Exception exception)
{ {
if (callback != null) if (callback != null)
{ {
Expand All @@ -880,52 +835,6 @@ public RESTEasyTracingLogger getTracingLogger() {
return tracingLogger; return tracingLogger;
} }


private static class CompletedFuture<T> implements Future<T>
{

private final T result;

private final ExecutionException ex;

CompletedFuture(final T result, final ExecutionException ex)
{
this.ex = ex;
this.result = result;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return false;
}

@Override
public boolean isCancelled()
{
return false;
}

@Override
public boolean isDone()
{
return true;
}

@Override
public T get() throws InterruptedException, ExecutionException
{
if (ex != null)
throw ex;
return result;
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return get();
}
}

public void setActualTarget(WebTarget target) public void setActualTarget(WebTarget target)
{ {
this.actualTarget = target; this.actualTarget = target;
Expand Down

0 comments on commit 84d00ac

Please sign in to comment.