Skip to content

Commit

Permalink
Support ExchangeType for the response of annotated service (#4177)
Browse files Browse the repository at this point in the history
Motivation:

`ExchangeType` has been introduced with #3956. The changes in #3956
was huge, so I didn't implement `ExchangeType` for the response type of
an annotated service.

Modifications:

- Add `ResponseConverterFunction.isResponseStreaming()` to configure
  whether to enable response streaming.
  - The value is enabled by default because it abides by the original
    behavior.
  - The API uses `RequestHeaders` and a negotiated media type to
    determine response streaming.
- Override `.isResponseStreaming()` for all known implementions.
- Fix a bug where `ExchangeType` does not work for a `GrpcService`
  prefixed a path.
- Breaking) `HttpService.exchangeType()` now takes `RouteingContext` as
  a single parameter.
  - A negotiated media type was required.
- Add `RequestContext.exchangeType()` to get the `ExchangeType` of the current request.

Result:

- Annotated services perform better on unary responses.
  • Loading branch information
ikhoon committed Jul 5, 2022
1 parent bf792d4 commit 49054ad
Show file tree
Hide file tree
Showing 64 changed files with 1,809 additions and 171 deletions.
Expand Up @@ -39,6 +39,7 @@
import com.linecorp.armeria.common.util.SystemInfo;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.logging.LoggingService;

import brave.propagation.CurrentTraceContext;
import brave.test.http.ITHttpServer;
Expand Down Expand Up @@ -109,6 +110,7 @@ protected void init() {
sb.service("/badrequest", (ctx, req) -> HttpResponse.of(BAD_REQUEST));

sb.decorator(BraveService.newDecorator(httpTracing));
sb.decorator(LoggingService.newDecorator());

server = sb.build();
server.start().join();
Expand Down
Expand Up @@ -45,7 +45,7 @@ public final class ObjectCollectingUtil {
* <a href="https://projectreactor.io/">Project Reactor</a>.
*/
@Nullable
private static final Class<?> MONO_CLASS;
public static final Class<?> MONO_CLASS;

static {
Class<?> mono = null;
Expand Down
Expand Up @@ -18,6 +18,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.linecorp.armeria.internal.common.util.ObjectCollectingUtil.collectFrom;
import static com.linecorp.armeria.internal.server.annotation.ClassUtil.typeToClass;
import static com.linecorp.armeria.internal.server.annotation.ClassUtil.unwrapAsyncType;
import static java.util.Objects.requireNonNull;

import java.lang.invoke.MethodHandle;
Expand All @@ -39,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.AggregatedHttpResponse;
Expand All @@ -50,7 +53,6 @@
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.ResponseHeadersBuilder;
import com.linecorp.armeria.common.annotation.Nullable;
Expand All @@ -63,6 +65,7 @@
import com.linecorp.armeria.internal.server.annotation.AnnotatedValueResolver.ResolverContext;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Route;
import com.linecorp.armeria.server.RoutingContext;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.SimpleDecoratingHttpService;
import com.linecorp.armeria.server.annotation.ByteArrayResponseConverterFunction;
Expand Down Expand Up @@ -128,6 +131,7 @@ public final class AnnotatedService implements HttpService {
@Nullable
private final ExceptionHandlerFunction exceptionHandler;
private final ResponseConverterFunction responseConverter;
private final Type actualReturnType;

private final Route route;
private final HttpStatus defaultStatus;
Expand Down Expand Up @@ -163,8 +167,9 @@ public final class AnnotatedService implements HttpService {
method.getName(), exceptionHandlers);
}

actualReturnType = getActualReturnType(method);
responseConverter = responseConverter(
method, requireNonNull(responseConverters, "responseConverters"));
actualReturnType, requireNonNull(responseConverters, "responseConverters"));
aggregationStrategy = AggregationStrategy.from(resolvers);
this.route = requireNonNull(route, "route");

Expand Down Expand Up @@ -204,9 +209,8 @@ public final class AnnotatedService implements HttpService {
}

private static ResponseConverterFunction responseConverter(
Method method, List<ResponseConverterFunction> responseConverters) {
Type returnType, List<ResponseConverterFunction> responseConverters) {

final Type actualType = getActualReturnType(method);
final ImmutableList<ResponseConverterFunction> backingConverters =
ImmutableList
.<ResponseConverterFunction>builder()
Expand All @@ -225,7 +229,7 @@ private static ResponseConverterFunction responseConverter(

for (final ResponseConverterFunctionProvider provider : responseConverterFunctionProviders) {
final ResponseConverterFunction func =
provider.createResponseConverterFunction(actualType, responseConverter);
provider.createResponseConverterFunction(returnType, responseConverter);
if (func != null) {
return func;
}
Expand Down Expand Up @@ -489,12 +493,21 @@ private static CompletionStage<?> toCompletionStage(@Nullable Object obj, Execut
}

@Override
public ExchangeType exchangeType(RequestHeaders headers, Route route) {
// TODO(ikhoon): Support a non-streaming response type.
if (AnnotatedValueResolver.aggregationType(aggregationStrategy, headers) == AggregationType.ALL) {
return ExchangeType.RESPONSE_STREAMING;
public ExchangeType exchangeType(RoutingContext routingContext) {
final boolean isRequestStreaming =
AnnotatedValueResolver.aggregationType(aggregationStrategy,
routingContext.headers()) != AggregationType.ALL;
Boolean isResponseStreaming =
responseConverter.isResponseStreaming(
actualReturnType, routingContext.result().routingResult()
.negotiatedResponseMediaType());
if (isResponseStreaming == null) {
isResponseStreaming = true;
}
if (isRequestStreaming) {
return isResponseStreaming ? ExchangeType.BIDI_STREAMING : ExchangeType.REQUEST_STREAMING;
} else {
return ExchangeType.BIDI_STREAMING;
return isResponseStreaming ? ExchangeType.RESPONSE_STREAMING : ExchangeType.UNARY;
}
}

Expand Down Expand Up @@ -572,14 +585,32 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
* A response converter implementation which creates an {@link HttpResponse} with
* the objects published from a {@link Publisher} or {@link Stream}.
*/
private static final class AggregatedResponseConverterFunction implements ResponseConverterFunction {
@VisibleForTesting
static final class AggregatedResponseConverterFunction implements ResponseConverterFunction {

private final ResponseConverterFunction responseConverter;

AggregatedResponseConverterFunction(ResponseConverterFunction responseConverter) {
this.responseConverter = responseConverter;
}

@Override
public Boolean isResponseStreaming(Type returnType, @Nullable MediaType contentType) {
final Class<?> clazz = typeToClass(unwrapAsyncType(returnType));
if (clazz == null) {
return null;
}

if (HttpResponse.class.isAssignableFrom(clazz)) {
return true;
}
if (Publisher.class.isAssignableFrom(clazz) || Stream.class.isAssignableFrom(clazz)) {
return false;
}

return null;
}

@Override
@SuppressWarnings("unchecked")
public HttpResponse convertResponse(ServiceRequestContext ctx,
Expand Down
@@ -0,0 +1,74 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.internal.server.annotation;

import static com.linecorp.armeria.internal.common.util.ObjectCollectingUtil.MONO_CLASS;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.concurrent.CompletionStage;

import com.linecorp.armeria.common.annotation.Nullable;

public final class ClassUtil {

/**
* Converts the specified {@link Type} to a {@link Class} instance.
*/
@Nullable
public static Class<?> typeToClass(Type type) {
if (type instanceof Class) {
return (Class<?>) type;
}
if (type instanceof ParameterizedType) {
return (Class<?>) ((ParameterizedType) type).getRawType();
}
return null;
}

/**
* Unwraps an enclosing {@link CompletionStage}, {@code reactor.core.publisher.Mono}, or
* {@code scala.concurrent.Future} and returns the type arguments.
* Returns itself if the {@link Type} is not enclosed by one of them.
*/
public static Type unwrapAsyncType(Type type) {
if (type instanceof Class) {
return type;
}

if (!(type instanceof ParameterizedType)) {
return type;
}

final ParameterizedType ptype = (ParameterizedType) type;
final Type[] typeArguments = ptype.getActualTypeArguments();
if (typeArguments.length == 0) {
return type;
}

final Class<?> clazz = (Class<?>) ptype.getRawType();
final Type typeArgument = typeArguments[0];
if (CompletionStage.class.isAssignableFrom(clazz) ||
ScalaUtil.isScalaFuture(clazz) ||
(MONO_CLASS != null && MONO_CLASS.isAssignableFrom(clazz))) {
return typeArgument;
}
return type;
}

private ClassUtil() {}
}
Expand Up @@ -16,11 +16,14 @@

package com.linecorp.armeria.internal.server.annotation;

import java.lang.reflect.Type;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.SafeCloseable;
Expand All @@ -39,6 +42,18 @@ final class CompositeResponseConverterFunction implements ResponseConverterFunct
this.functions = functions;
}

@Override
@Nullable
public Boolean isResponseStreaming(Type returnType, @Nullable MediaType produceType) {
for (ResponseConverterFunction function : functions) {
final Boolean responseStreaming = function.isResponseStreaming(returnType, produceType);
if (responseStreaming != null) {
return responseStreaming;
}
}
return null;
}

@Override
public HttpResponse convertResponse(ServiceRequestContext ctx,
ResponseHeaders headers,
Expand Down
Expand Up @@ -39,7 +39,6 @@ final class AggregatingDecodedHttpRequest extends AggregatingStreamMessage<HttpO
private final long maxRequestLength;
private final RequestHeaders headers;
private final RoutingContext routingCtx;
private final Routed<ServiceConfig> routed;
private final ExchangeType exchangeType;

@Nullable
Expand All @@ -54,17 +53,16 @@ final class AggregatingDecodedHttpRequest extends AggregatingStreamMessage<HttpO

AggregatingDecodedHttpRequest(EventLoop eventLoop, int id, int streamId, RequestHeaders headers,
boolean keepAlive, long maxRequestLength,
RoutingContext routingCtx, Routed<ServiceConfig> routed,
ExchangeType exchangeType) {
RoutingContext routingCtx, ExchangeType exchangeType) {
super(4);
this.headers = headers;
this.eventLoop = eventLoop;
this.id = id;
this.streamId = streamId;
this.keepAlive = keepAlive;
this.maxRequestLength = maxRequestLength;
assert routingCtx.hasResult();
this.routingCtx = routingCtx;
this.routed = routed;
this.exchangeType = exchangeType;
}

Expand All @@ -85,7 +83,7 @@ public RoutingContext routingContext() {
@Nonnull
@Override
public Routed<ServiceConfig> route() {
return routed;
return routingCtx.result();
}

@Override
Expand Down
Expand Up @@ -31,25 +31,25 @@ interface DecodedHttpRequest extends HttpRequest {
static DecodedHttpRequest of(boolean endOfStream, EventLoop eventLoop, int id, int streamId,
RequestHeaders headers, boolean keepAlive,
InboundTrafficController inboundTrafficController,
RoutingContext routingCtx, @Nullable Routed<ServiceConfig> routed) {
if (routed == null) {
RoutingContext routingCtx) {
if (!routingCtx.hasResult()) {
return new EmptyContentDecodedHttpRequest(eventLoop, id, streamId, headers, keepAlive,
routingCtx, routed, ExchangeType.RESPONSE_STREAMING);
routingCtx, ExchangeType.RESPONSE_STREAMING);
} else {
final ServiceConfig config = routed.value();
final ServiceConfig config = routingCtx.result().value();
final HttpService service = config.service();
final ExchangeType exchangeType = service.exchangeType(headers, routed.route());
final ExchangeType exchangeType = service.exchangeType(routingCtx);
if (endOfStream) {
return new EmptyContentDecodedHttpRequest(eventLoop, id, streamId, headers, keepAlive,
routingCtx, routed, exchangeType);
routingCtx, exchangeType);
} else {
if (exchangeType.isRequestStreaming()) {
return new StreamingDecodedHttpRequest(eventLoop, id, streamId, headers, keepAlive,
inboundTrafficController, config.maxRequestLength(),
routingCtx, routed, exchangeType);
routingCtx, exchangeType);
} else {
return new AggregatingDecodedHttpRequest(eventLoop, id, streamId, headers, keepAlive,
config.maxRequestLength(), routingCtx, routed,
config.maxRequestLength(), routingCtx,
exchangeType);
}
}
Expand Down

0 comments on commit 49054ad

Please sign in to comment.