Skip to content

Commit

Permalink
Multiple thrift services (#2285)
Browse files Browse the repository at this point in the history
This PR will allow multiple Thrift services to be bind on an HTTP path, also supporting `TMultiplexed ` protocol.

**Changes**:
1. `ThriftFunction` holds reference of the implementation
2. `ThriftServiceMetadata` now can take a list of thrift service implementations and can hold a mapping between the method name and `ThriftFunction`.
3. Introduced `THttpServiceBuilder` to fluently build instance of `THttpService`.

**Deprecations**:
 1. API's in`THttpService` that take `Map<String, ?>` are deprecated in favor of `THttpServiceBuiler.ofService(String serviceName, Object implementation)`.

**Breaking changes**:
1. `ThriftCallService` factory method with signature `ThriftCallService of(Map<String, ? extends Iterable<?>> implementation)` is changed to `ThriftCallService of(Map<String, ? extends Iterable<?>> implementations)`

Closes: #2164
  • Loading branch information
sivaalli authored and trustin committed Dec 6, 2019
1 parent 405c2d2 commit 4a5811f
Show file tree
Hide file tree
Showing 9 changed files with 429 additions and 57 deletions.
Expand Up @@ -20,7 +20,7 @@
import java.util.Optional;

/**
* Skeletal {@link Unwrappable} imlementation.
* Skeletal {@link Unwrappable} implementation.
*
* @param <T> the type of the object being decorated
*/
Expand Down
Expand Up @@ -52,6 +52,8 @@ private enum Type {
private final Class<?> serviceType;
private final String name;
@Nullable
private final Object implementation;
@Nullable
private final TBase<?, ?> result;
private final TFieldIdEnum[] argFields;
@Nullable
Expand All @@ -61,18 +63,30 @@ private enum Type {

ThriftFunction(Class<?> serviceType, ProcessFunction<?, ?> func) throws Exception {
this(serviceType, func.getMethodName(), func, Type.SYNC,
getArgFields(func), getResult(func), getDeclaredExceptions(func));
getArgFields(func), getResult(func), getDeclaredExceptions(func), null);
}

ThriftFunction(Class<?> serviceType, ProcessFunction<?, ?> func,
@Nullable Object implementation) throws Exception {
this(serviceType, func.getMethodName(), func, Type.SYNC,
getArgFields(func), getResult(func), getDeclaredExceptions(func), implementation);
}

ThriftFunction(Class<?> serviceType, AsyncProcessFunction<?, ?, ?> func) throws Exception {
this(serviceType, func.getMethodName(), func, Type.ASYNC,
getArgFields(func), getResult(func), getDeclaredExceptions(func));
getArgFields(func), getResult(func), getDeclaredExceptions(func), null);
}

ThriftFunction(Class<?> serviceType, AsyncProcessFunction<?, ?, ?> func,
@Nullable Object implementation) throws Exception {
this(serviceType, func.getMethodName(), func, Type.ASYNC,
getArgFields(func), getResult(func), getDeclaredExceptions(func), implementation);
}

private ThriftFunction(
Class<?> serviceType, String name, Object func, Type type,
TFieldIdEnum[] argFields, @Nullable TBase<?, ?> result,
Class<?>[] declaredExceptions) throws Exception {
Class<?>[] declaredExceptions, @Nullable Object implementation) throws Exception {

this.func = func;
this.type = type;
Expand All @@ -81,6 +95,7 @@ private ThriftFunction(
this.argFields = argFields;
this.result = result;
this.declaredExceptions = declaredExceptions;
this.implementation = implementation;

// Determine the success and exception fields of the function.
final ImmutableMap.Builder<Class<Throwable>, TFieldIdEnum> exceptionFieldsBuilder =
Expand Down Expand Up @@ -194,6 +209,14 @@ public Class<?>[] declaredExceptions() {
return declaredExceptions;
}

/**
* Returns the implementation that this function is associated with.
*/
@Nullable
public Object implementation() {
return implementation;
}

/**
* Returns a new empty arguments instance.
*/
Expand Down
Expand Up @@ -34,6 +34,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import com.linecorp.armeria.internal.Types;

/**
Expand All @@ -56,8 +59,22 @@ public final class ThriftServiceMetadata {
* interfaces.
*/
public ThriftServiceMetadata(Object implementation) {
requireNonNull(implementation, "implementation");
interfaces = init(implementation);
this(ImmutableList.of(requireNonNull(implementation, "implementation")));
}

/**
* Creates a new instance from a list of Thrift service implementations, while each service can implement
* one or more Thrift service interfaces.
*/
public ThriftServiceMetadata(Iterable<?> implementations) {
requireNonNull(implementations, "implementations");

final ImmutableSet.Builder<Class<?>> interfaceBuilder = ImmutableSet.builder();
implementations.forEach(implementation -> {
requireNonNull(implementation, "implementations contains null.");
interfaceBuilder.addAll(init(implementation));
});
interfaces = interfaceBuilder.build();
}

/**
Expand All @@ -76,23 +93,22 @@ private Set<Class<?>> init(@Nullable Object implementation, Iterable<Class<?>> c

// Build the map of method names and their corresponding process functions.
// If a method is defined multiple times, we take the first definition
final Set<String> methodNames = new HashSet<>();
final Set<Class<?>> interfaces = new HashSet<>();

for (Class<?> iface : candidateInterfaces) {
final Map<String, AsyncProcessFunction<?, ?, ?>> asyncProcessMap;
asyncProcessMap = getThriftAsyncProcessMap(implementation, iface);
if (asyncProcessMap != null) {
asyncProcessMap.forEach(
(name, func) -> registerFunction(methodNames, iface, name, func));
(name, func) -> registerFunction(iface, name, func, implementation));
interfaces.add(iface);
}

final Map<String, ProcessFunction<?, ?>> processMap;
processMap = getThriftProcessMap(implementation, iface);
if (processMap != null) {
processMap.forEach(
(name, func) -> registerFunction(methodNames, iface, name, func));
(name, func) -> registerFunction(iface, name, func, implementation));
interfaces.add(iface);
}
}
Expand Down Expand Up @@ -174,19 +190,19 @@ private Set<Class<?>> init(@Nullable Object implementation, Iterable<Class<?>> c
}

@SuppressWarnings("rawtypes")
private void registerFunction(Set<String> methodNames, Class<?> iface, String name, Object func) {
if (methodNames.contains(name)) {
private void registerFunction(Class<?> iface, String name,
Object func, @Nullable Object implementation) {
if (functions.containsKey(name)) {
logger.warn("duplicate Thrift method name: " + name);
return;
}
methodNames.add(name);

try {
final ThriftFunction f;
if (func instanceof ProcessFunction) {
f = new ThriftFunction(iface, (ProcessFunction) func);
f = new ThriftFunction(iface, (ProcessFunction) func, implementation);
} else {
f = new ThriftFunction(iface, (AsyncProcessFunction) func);
f = new ThriftFunction(iface, (AsyncProcessFunction) func, implementation);
}
functions.put(name, f);
} catch (Exception e) {
Expand Down
Expand Up @@ -16,13 +16,15 @@

package com.linecorp.armeria.server.thrift;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
Expand All @@ -41,8 +43,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;

import com.linecorp.armeria.common.AggregatedHttpRequest;
import com.linecorp.armeria.common.DefaultRpcResponse;
Expand Down Expand Up @@ -97,13 +102,30 @@ public final class THttpService extends DecoratingService<RpcRequest, RpcRespons

private static final SerializationFormat[] EMPTY_FORMATS = new SerializationFormat[0];

/**
* Creates a new instance of {@link THttpServiceBuilder} which can build an instance of {@link THttpService}
* fluently.
*
* <p>The default SerializationFormat {@link ThriftSerializationFormats#BINARY} will be used when client
* does not specify one in the request, but also supports {@link ThriftSerializationFormats#values()}.
* </p>
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session
* protocol and setting the {@code "Content-Type"} header to the appropriate
* {@link SerializationFormat#mediaType()}.
*/
public static THttpServiceBuilder builder() {
return new THttpServiceBuilder();
}

/**
* Creates a new {@link THttpService} with the specified service implementation, supporting all thrift
* protocols and defaulting to {@link ThriftSerializationFormats#BINARY TBinary} protocol when the client
* doesn't specify one.
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session
* protocol and setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* protocol and setting the {@code "Content-Type"} header to the appropriate
* {@link SerializationFormat#mediaType()}.
*
* @param implementation an implementation of {@code *.Iface} or {@code *.AsyncIface} service interface
* generated by the Apache Thrift compiler
Expand All @@ -118,12 +140,16 @@ public static THttpService of(Object implementation) {
* the client doesn't specify one.
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session
* protocol and setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* protocol and setting the {@code "Content-Type"} header to the appropriate
* {@link SerializationFormat#mediaType()}.
*
* @deprecated Use {@link THttpService#builder()}.
*
* @param implementations a {@link Map} whose key is service name and value is the implementation of
* {@code *.Iface} or {@code *.AsyncIface} service interface generated by
* the Apache Thrift compiler
*/
@Deprecated
public static THttpService of(Map<String, ?> implementations) {
return of(implementations, ThriftSerializationFormats.BINARY);
}
Expand All @@ -134,8 +160,8 @@ public static THttpService of(Map<String, ?> implementations) {
* specify one.
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session
* protocol and setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
*
* protocol and setting the {@code "Content-Type"} header to the appropriate
* {@link SerializationFormat#mediaType()}.
*
* @param implementation an implementation of {@code *.Iface} or {@code *.AsyncIface} service interface
* generated by the Apache Thrift compiler
Expand All @@ -156,20 +182,21 @@ public static THttpService of(Object implementation,
* doesn't specify one.
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session
* protocol and setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* protocol and setting the {@code "Content-Type"} header to the appropriate
* {@link SerializationFormat#mediaType()}.
*
* @deprecated Use {@link THttpService#builder()}.
*
* @param implementations a {@link Map} whose key is service name and value is the implementation of
* {@code *.Iface} or {@code *.AsyncIface} service interface generated by
* the Apache Thrift compiler
* @param defaultSerializationFormat the default serialization format to use when not specified by the
* client
*/
@Deprecated
public static THttpService of(Map<String, ?> implementations,
SerializationFormat defaultSerializationFormat) {
return new THttpService(ThriftCallService.of(implementations),
newAllowedSerializationFormats(defaultSerializationFormat,
ThriftSerializationFormats.values()));
return ofFormats(implementations, defaultSerializationFormat, ThriftSerializationFormats.values());
}

/**
Expand All @@ -178,7 +205,7 @@ public static THttpService of(Map<String, ?> implementations,
* doesn't specify one.
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session protocol and
* setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* setting the {@code "Content-Type"} header to the appropriate {@link SerializationFormat#mediaType()}.
*
* @param implementation an implementation of {@code *.Iface} or {@code *.AsyncIface} service interface
* generated by the Apache Thrift compiler
Expand All @@ -204,7 +231,9 @@ public static THttpService ofFormats(
* client doesn't specify one.
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session protocol and
* setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* setting the {@code "Content-Type"} header to the appropriate {@link SerializationFormat#mediaType()}.
*
* @deprecated Use {@link THttpService#builder()}.
*
* @param implementations a {@link Map} whose key is service name and value is the implementation of
* {@code *.Iface} or {@code *.AsyncIface} service interface generated by
Expand All @@ -214,6 +243,7 @@ public static THttpService ofFormats(
* @param otherAllowedSerializationFormats other serialization formats that should be supported by this
* service in addition to the default
*/
@Deprecated
public static THttpService ofFormats(
Map<String, ?> implementations,
SerializationFormat defaultSerializationFormat,
Expand All @@ -231,7 +261,7 @@ public static THttpService ofFormats(
* {@code defaultSerializationFormat} when the client doesn't specify one.
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session protocol and
* setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* setting the {@code "Content-Type"} header to the appropriate {@link SerializationFormat#mediaType()}.
*
* @param implementation an implementation of {@code *.Iface} or {@code *.AsyncIface} service interface
* generated by the Apache Thrift compiler
Expand All @@ -256,7 +286,9 @@ public static THttpService ofFormats(
* {@code defaultSerializationFormat} when the client doesn't specify one.
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session protocol and
* setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* setting the {@code "Content-Type"} header to the appropriate {@link SerializationFormat#mediaType()}.
*
* @deprecated Use {@link THttpService#builder()}.
*
* @param implementations a {@link Map} whose key is service name and value is the implementation of
* {@code *.Iface} or {@code *.AsyncIface} service interface generated by
Expand All @@ -266,12 +298,17 @@ public static THttpService ofFormats(
* @param otherAllowedSerializationFormats other serialization formats that should be supported by this
* service in addition to the default
*/
@Deprecated
public static THttpService ofFormats(
Map<String, ?> implementations,
SerializationFormat defaultSerializationFormat,
Iterable<SerializationFormat> otherAllowedSerializationFormats) {

return new THttpService(ThriftCallService.of(implementations),
requireNonNull(implementations, "implementations");
final ImmutableMap<String, ? extends ImmutableList<?>> transformedMap =
implementations.entrySet().stream().map(
entry -> Maps.immutableEntry(entry.getKey(), ImmutableList.of(entry.getValue())))
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
return new THttpService(ThriftCallService.of(transformedMap),
newAllowedSerializationFormats(defaultSerializationFormat,
otherAllowedSerializationFormats));
}
Expand All @@ -281,7 +318,8 @@ public static THttpService ofFormats(
* {@link ThriftSerializationFormats#BINARY TBinary} protocol when the client doesn't specify one.
*
* <p>Currently, the only way to specify a serialization format is by using the HTTP session
* protocol and setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* protocol and setting the {@code "Content-Type"} header to the appropriate
* {@link SerializationFormat#mediaType()}.
*/
public static Function<? super RpcService, THttpService> newDecorator() {
return newDecorator(ThriftSerializationFormats.BINARY);
Expand All @@ -291,7 +329,8 @@ public static Function<? super RpcService, THttpService> newDecorator() {
* Creates a new decorator that supports all thrift protocols and defaults to the specified
* {@code defaultSerializationFormat} when the client doesn't specify one.
* Currently, the only way to specify a serialization format is by using the HTTP session
* protocol and setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* protocol and setting the {@code "Content-Type"} header to the appropriate
* {@link SerializationFormat#mediaType()}.
*
* @param defaultSerializationFormat the default serialization format to use when not specified by the
* client
Expand All @@ -310,7 +349,7 @@ public static Function<? super RpcService, THttpService> newDecorator(
* Creates a new decorator that supports only the formats specified and defaults to the specified
* {@code defaultSerializationFormat} when the client doesn't specify one.
* Currently, the only way to specify a serialization format is by using the HTTP session protocol and
* setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* setting the {@code "Content-Type"} header to the appropriate {@link SerializationFormat#mediaType()}.
*
* @param defaultSerializationFormat the default serialization format to use when not specified by the
* client
Expand All @@ -329,7 +368,7 @@ public static Function<? super RpcService, THttpService> newDecorator(
* Creates a new decorator that supports the protocols specified in {@code allowedSerializationFormats} and
* defaults to the specified {@code defaultSerializationFormat} when the client doesn't specify one.
* Currently, the only way to specify a serialization format is by using the HTTP session protocol and
* setting the Content-Type header to the appropriate {@link SerializationFormat#mediaType()}.
* setting the {@code "Content-Type"} header to the appropriate {@link SerializationFormat#mediaType()}.
*
* @param defaultSerializationFormat the default serialization format to use when not specified by the
* client
Expand Down Expand Up @@ -363,7 +402,7 @@ private static SerializationFormat[] newAllowedSerializationFormats(
private final Set<SerializationFormat> allowedSerializationFormats;
private final ThriftCallService thriftService;

private THttpService(RpcService delegate, SerializationFormat[] allowedSerializationFormatArray) {
THttpService(RpcService delegate, SerializationFormat[] allowedSerializationFormatArray) {
super(delegate);
thriftService = findThriftService(delegate);

Expand Down

0 comments on commit 4a5811f

Please sign in to comment.