Skip to content

Commit

Permalink
Add handler-transforming functions to Chain
Browse files Browse the repository at this point in the history
  • Loading branch information
mizosoft committed Jun 23, 2022
1 parent 3f1bf0f commit f95061a
Showing 1 changed file with 36 additions and 33 deletions.
69 changes: 36 additions & 33 deletions methanol/src/main/java/com/github/mizosoft/methanol/Methanol.java
Expand Up @@ -391,7 +391,7 @@ <T> HttpResponse<T> intercept(HttpRequest request, Chain<T> chain)
<T> CompletableFuture<HttpResponse<T>> interceptAsync(HttpRequest request, Chain<T> chain);

/** Returns an interceptor that forwards the request after applying the given operator. */
static Interceptor create(UnaryOperator<HttpRequest> operator) {
static Interceptor create(Function<HttpRequest, HttpRequest> operator) {
requireNonNull(operator);
return new Interceptor() {
@Override
Expand Down Expand Up @@ -434,6 +434,23 @@ default <U> Chain<U> with(
throw new UnsupportedOperationException();
}

/** Returns a new chain after applying the given function to this chain's body handler. */
default Chain<T> with(UnaryOperator<BodyHandler<T>> bodyHandlerTransformer) {
return withBodyHandler(bodyHandlerTransformer.apply(bodyHandler()));
}

/**
* Returns a new chain after applying the given functions to this chain's body and push
* promise handlers.
*/
default Chain<T> with(
UnaryOperator<BodyHandler<T>> bodyHandlerTransformer,
UnaryOperator<PushPromiseHandler<T>> pushPromiseHandlerTransformer) {
return with(
bodyHandlerTransformer.apply(bodyHandler()),
pushPromiseHandler().map(pushPromiseHandlerTransformer).orElse(null));
}

/**
* Forwards the request to the next interceptor, or to the client's backend if called by the
* last interceptor.
Expand Down Expand Up @@ -829,9 +846,7 @@ public HttpResponse<T> forward(HttpRequest request) throws IOException, Interrup
}

var interceptor = interceptors.get(currentInterceptorIndex);
return requireNonNull(
interceptor.intercept(request, nextInterceptorChain()),
() -> interceptor + "::intercept returned a null response");
return interceptor.intercept(request, nextInterceptorChain());
}

@Override
Expand All @@ -843,12 +858,7 @@ public CompletableFuture<HttpResponse<T>> forwardAsync(HttpRequest request) {
}

var interceptor = interceptors.get(currentInterceptorIndex);
return interceptor
.interceptAsync(request, nextInterceptorChain())
.thenApply(
res ->
requireNonNull(
res, () -> interceptor + "::interceptAsync completed with a null response"));
return interceptor.interceptAsync(request, nextInterceptorChain());
}

private InterceptorChain<T> nextInterceptorChain() {
Expand Down Expand Up @@ -952,16 +962,12 @@ private static <T> Chain<T> decoding(HttpRequest request, Chain<T> chain) {
}

return chain.with(
MoreBodyHandlers.decoding(chain.bodyHandler()),
chain
.pushPromiseHandler()
.map(
handler ->
transformPushPromises(
handler,
MoreBodyHandlers::decoding,
AutoDecompressingInterceptor::stripContentEncoding))
.orElse(null));
MoreBodyHandlers::decoding,
pushPromiseHandler ->
transformPushPromises(
pushPromiseHandler,
MoreBodyHandlers::decoding,
AutoDecompressingInterceptor::stripContentEncoding));
}

private static <T> HttpResponse<T> stripContentEncoding(HttpResponse<T> response) {
Expand Down Expand Up @@ -1017,11 +1023,12 @@ public <T> CompletableFuture<HttpResponse<T>> interceptAsync(

private <T> Chain<T> withHeadersTimeout(Chain<T> chain, TimeoutTask timeoutTask) {
// TODO handle push promises
return chain.withBodyHandler(
responseInfo ->
timeoutTask.cancel()
? chain.bodyHandler().apply(responseInfo)
: new TimedOutSubscriber<>());
return chain.with(
bodyHandler ->
responseInfo ->
timeoutTask.cancel()
? bodyHandler.apply(responseInfo)
: new TimedOutSubscriber<>());
}

private static final class TimeoutTask {
Expand Down Expand Up @@ -1109,14 +1116,10 @@ public <T> CompletableFuture<HttpResponse<T>> interceptAsync(

private <T> Chain<T> withReadTimeout(Chain<T> chain) {
return chain.with(
withReadTimeout(chain.bodyHandler()),
chain
.pushPromiseHandler()
.map(
handler ->
transformPushPromises(
handler, this::withReadTimeout, UnaryOperator.identity()))
.orElse(null));
this::withReadTimeout,
pushPromiseHandler ->
transformPushPromises(
pushPromiseHandler, this::withReadTimeout, UnaryOperator.identity()));
}

private <T> BodyHandler<T> withReadTimeout(BodyHandler<T> bodyHandler) {
Expand Down

0 comments on commit f95061a

Please sign in to comment.