Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor GrpcStatus to implement GrpcExceptionHandlerFunction #5571

Merged
merged 32 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c33f8ea
chore: refactor GrpcStatus to use it as DefaultGrpcExceptionHandlerFu…
jaeseung-bae Apr 5, 2024
30dd216
test: can use DefaultGrpcExceptionHandlerFunction for server builder …
jaeseung-bae Apr 5, 2024
866a041
chore: lint fix
jaeseung-bae Apr 5, 2024
3a5585f
chore: refactor, move exceptionHandling features from GrpcStatus to D…
jaeseung-bae Apr 12, 2024
1e0a4d2
Merge remote-tracking branch 'upstream/main' into refactor/grpc-status
jaeseung-bae Apr 12, 2024
0c804ef
chore: use DefaultGrpcExceptionHandlerFunction as fallback for both G…
jaeseung-bae Apr 14, 2024
a88f3c3
chore: try to add possibly missing description in case of replace_exc…
jaeseung-bae Apr 16, 2024
cccbe7d
Merge remote-tracking branch 'upstream/main' into refactor/grpc-status
jaeseung-bae Apr 16, 2024
5c56c80
chore: apply feedbacks
jaeseung-bae Apr 17, 2024
0d6f75c
chore: call peelAndUnwrap when exception thrown
jaeseung-bae Apr 17, 2024
d38f3be
Merge remote-tracking branch 'upstream/main' into HEAD
jaeseung-bae Apr 19, 2024
8604cf5
chore: UnwrappingGrpcExceptionHandlerFunction calls peelAndUnwrap at …
jaeseung-bae Apr 19, 2024
0bd68e0
chore: peelAndUnwrap is not supposed to receive null Throwable
jaeseung-bae Apr 20, 2024
63737b1
Update grpc/src/main/java/com/linecorp/armeria/common/grpc/GrpcExcept…
jaeseung-bae Apr 22, 2024
fb0692a
Update grpc/src/main/java/com/linecorp/armeria/internal/common/grpc/U…
jaeseung-bae Apr 22, 2024
f68bd28
chore: apply feedbacks
jaeseung-bae Apr 22, 2024
894fbba
Merge branch 'main' into refactor/grpc-status
ikhoon Apr 25, 2024
7090d2c
Update grpc/src/test/java/com/linecorp/armeria/common/grpc/GrpcExcept…
jaeseung-bae Apr 25, 2024
a35dc16
Update grpc/src/test/java/com/linecorp/armeria/common/grpc/GrpcExcept…
jaeseung-bae Apr 25, 2024
a9a28ba
Update grpc/src/test/java/com/linecorp/armeria/common/grpc/GrpcExcept…
jaeseung-bae Apr 25, 2024
0704f71
Update grpc/src/test/java/com/linecorp/armeria/common/grpc/GrpcExcept…
jaeseung-bae Apr 25, 2024
e7b9908
chore: revert redundant wrapping code
jaeseung-bae May 3, 2024
7244059
Merge remote-tracking branch 'upstream/main' into refactor/grpc-status
jaeseung-bae May 18, 2024
10245db
chore: make enum for singleton instance
jaeseung-bae May 19, 2024
c717229
chore: rename ofDefault to of to meet recent convention
jaeseung-bae May 19, 2024
81f3d3a
chore: add comment
jaeseung-bae May 19, 2024
d1ff8b9
chore: add assertion for newStatus
jaeseung-bae May 19, 2024
9603904
chore: make it final, static
jaeseung-bae May 19, 2024
5b6dba4
chore: fix lint
jaeseung-bae May 19, 2024
99d5fbb
chore: refactoring for clean code
jaeseung-bae May 20, 2024
81c954d
Update grpc/src/main/java/com/linecorp/armeria/client/grpc/GrpcClient…
jaeseung-bae May 20, 2024
0cef19b
Update grpc/src/main/java/com/linecorp/armeria/internal/common/grpc/U…
jaeseung-bae May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import com.linecorp.armeria.common.grpc.GrpcSerializationFormats;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageDeframer;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageFramer;
import com.linecorp.armeria.internal.common.grpc.UnwrappingGrpcExceptionHandleFunction;
import com.linecorp.armeria.unsafe.grpc.GrpcUnsafeBufferUtil;

import io.grpc.CallCredentials;
Expand Down Expand Up @@ -418,7 +419,8 @@ public <T> T build(Class<T> clientType) {
option(INTERCEPTORS.newValue(clientInterceptors));
}
if (exceptionHandler != null) {
option(EXCEPTION_HANDLER.newValue(exceptionHandler));
option(EXCEPTION_HANDLER.newValue(new UnwrappingGrpcExceptionHandleFunction(exceptionHandler.orElse(
GrpcExceptionHandlerFunction.of()))));
}

final Object client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageFramer;
import com.linecorp.armeria.internal.client.grpc.NullCallCredentials;
import com.linecorp.armeria.internal.client.grpc.NullGrpcClientStubFactory;
import com.linecorp.armeria.internal.common.grpc.GrpcStatus;
import com.linecorp.armeria.internal.common.grpc.UnwrappingGrpcExceptionHandleFunction;
import com.linecorp.armeria.unsafe.grpc.GrpcUnsafeBufferUtil;

import io.grpc.CallCredentials;
Expand Down Expand Up @@ -174,8 +174,8 @@ public final class GrpcClientOptions {
* to a gRPC {@link Status}.
*/
public static final ClientOption<GrpcExceptionHandlerFunction> EXCEPTION_HANDLER =
ClientOption.define("EXCEPTION_HANDLER",
(ctx, cause, metadata) -> GrpcStatus.fromThrowable(cause));
ClientOption.define("EXCEPTION_HANDLER", new UnwrappingGrpcExceptionHandleFunction(
GrpcExceptionHandlerFunction.of()));

/**
* Sets whether to respect the marshaller specified in gRPC {@link MethodDescriptor}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.grpc;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;

import com.google.protobuf.InvalidProtocolBufferException;

import com.linecorp.armeria.client.UnprocessedRequestException;
import com.linecorp.armeria.client.circuitbreaker.FailFastException;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.ContentTooLargeException;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.TimeoutException;
import com.linecorp.armeria.common.stream.ClosedStreamException;
import com.linecorp.armeria.server.RequestTimeoutException;

import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;

enum DefaultGrpcExceptionHandlerFunction implements GrpcExceptionHandlerFunction {
INSTANCE;

/**
* Converts the {@link Throwable} to a {@link Status}, taking into account exceptions specific to Armeria as
* well and the protocol package.
*/
@Override
public Status apply(RequestContext ctx, Throwable cause, Metadata metadata) {
final Status s = Status.fromThrowable(cause);
if (s.getCode() != Code.UNKNOWN) {
return s;
}

if (cause instanceof ClosedSessionException || cause instanceof ClosedChannelException) {
// ClosedChannelException is used any time the Netty channel is closed. Proper error
// processing requires remembering the error that occurred before this one and using it
// instead.
return s;
}
if (cause instanceof ClosedStreamException || cause instanceof RequestTimeoutException) {
return Status.CANCELLED.withCause(cause);
}
if (cause instanceof InvalidProtocolBufferException) {
return Status.INVALID_ARGUMENT.withCause(cause);
}
if (cause instanceof UnprocessedRequestException ||
cause instanceof IOException ||
cause instanceof FailFastException) {
return Status.UNAVAILABLE.withCause(cause);
}
if (cause instanceof Http2Exception) {
if (cause instanceof Http2Exception.StreamException &&
((Http2Exception.StreamException) cause).error() == Http2Error.CANCEL) {
return Status.CANCELLED;
}
return Status.INTERNAL.withCause(cause);
}
if (cause instanceof TimeoutException) {
return Status.DEADLINE_EXCEEDED.withCause(cause);
}
if (cause instanceof ContentTooLargeException) {
return Status.RESOURCE_EXHAUSTED.withCause(cause);
}
return s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ static GrpcExceptionHandlerFunctionBuilder builder() {
return new GrpcExceptionHandlerFunctionBuilder();
}

/**
* Returns the default {@link GrpcExceptionHandlerFunction}.
*/
@UnstableApi
static GrpcExceptionHandlerFunction of() {
return DefaultGrpcExceptionHandlerFunction.INSTANCE;
}

/**
* Maps the specified {@link Throwable} to a gRPC {@link Status},
* and mutates the specified {@link Metadata}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,9 @@ public void start(Listener<O> responseListener, Metadata metadata) {
prepareHeaders(compressor, metadata, remainingNanos);

final BiFunction<ClientRequestContext, Throwable, HttpResponse> errorResponseFactory =
(unused, cause) -> HttpResponse.ofFailure(
GrpcStatus.fromThrowable(exceptionHandler, ctx, cause, metadata)
.withDescription(cause.getMessage())
.asRuntimeException());
(unused, cause) -> HttpResponse.ofFailure(exceptionHandler.apply(ctx, cause, metadata)
.withDescription(cause.getMessage())
.asRuntimeException());
final HttpResponse res = initContextAndExecuteWithFallback(
httpClient, ctx, endpointGroup, HttpResponse::of, errorResponseFactory);

Expand Down Expand Up @@ -455,7 +454,7 @@ public void onNext(DeframedMessage message) {
});
} catch (Throwable t) {
final Metadata metadata = new Metadata();
close(GrpcStatus.fromThrowable(exceptionHandler, ctx, t, metadata), metadata);
close(exceptionHandler.apply(ctx, t, metadata), metadata);
}
}

Expand Down Expand Up @@ -512,7 +511,7 @@ private void prepareHeaders(Compressor compressor, Metadata metadata, long remai

private void closeWhenListenerThrows(Throwable t) {
final Metadata metadata = new Metadata();
closeWhenEos(GrpcStatus.fromThrowable(exceptionHandler, ctx, t, metadata), metadata);
closeWhenEos(exceptionHandler.apply(ctx, t, metadata), metadata);
}

private void closeWhenEos(Status status, Metadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@

package com.linecorp.armeria.internal.common.grpc;

import static java.util.Objects.requireNonNull;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.channels.ClosedChannelException;
import java.util.Base64;

import org.slf4j.Logger;
Expand All @@ -44,34 +40,19 @@
import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;

import com.linecorp.armeria.client.UnprocessedRequestException;
import com.linecorp.armeria.client.circuitbreaker.FailFastException;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.ContentTooLargeException;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.TimeoutException;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.common.grpc.StackTraceElementProto;
import com.linecorp.armeria.common.grpc.StatusCauseException;
import com.linecorp.armeria.common.grpc.ThrowableProto;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException;
import com.linecorp.armeria.common.grpc.protocol.DeframedMessage;
import com.linecorp.armeria.common.grpc.protocol.GrpcHeaderNames;
import com.linecorp.armeria.common.grpc.protocol.StatusMessageEscaper;
import com.linecorp.armeria.common.stream.ClosedStreamException;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.server.RequestTimeoutException;

import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;

/**
* Utilities for handling {@link Status} in Armeria.
Expand All @@ -80,134 +61,6 @@ public final class GrpcStatus {

private static final Logger logger = LoggerFactory.getLogger(GrpcStatus.class);

/**
* Converts the {@link Throwable} to a {@link Status}, taking into account exceptions specific to Armeria as
* well and the protocol package.
*/
public static Status fromThrowable(Throwable t) {
t = peelAndUnwrap(requireNonNull(t, "t"));
return statusFromThrowable(t);
}

/**
* Converts the {@link Throwable} to a {@link Status}.
* If the specified {@code statusFunction} returns {@code null},
* the built-in exception mapping rule, which takes into account exceptions specific to Armeria as well
* and the protocol package, is used by default.
*/
public static Status fromThrowable(@Nullable GrpcStatusFunction statusFunction, RequestContext ctx,
Throwable t, Metadata metadata) {
final GrpcExceptionHandlerFunction exceptionHandler =
statusFunction != null ? statusFunction::apply : null;
return fromThrowable(exceptionHandler, ctx, t, metadata);
}

/**
* Converts the {@link Throwable} to a {@link Status}.
* If the specified {@link GrpcExceptionHandlerFunction} returns {@code null},
* the built-in exception mapping rule, which takes into account exceptions specific to Armeria as well
* and the protocol package, is used by default.
*/
public static Status fromThrowable(@Nullable GrpcExceptionHandlerFunction exceptionHandler,
RequestContext ctx, Throwable t, Metadata metadata) {
t = peelAndUnwrap(requireNonNull(t, "t"));

if (exceptionHandler != null) {
final Status status = exceptionHandler.apply(ctx, t, metadata);
if (status != null) {
return status;
}
}

return statusFromThrowable(t);
}

private static Status statusFromThrowable(Throwable t) {
final Status s = Status.fromThrowable(t);
if (s.getCode() != Code.UNKNOWN) {
return s;
}

if (t instanceof ClosedSessionException || t instanceof ClosedChannelException) {
// ClosedChannelException is used any time the Netty channel is closed. Proper error
// processing requires remembering the error that occurred before this one and using it
// instead.
return s;
}
if (t instanceof ClosedStreamException || t instanceof RequestTimeoutException) {
return Status.CANCELLED.withCause(t);
}
if (t instanceof InvalidProtocolBufferException) {
return Status.INVALID_ARGUMENT.withCause(t);
}
if (t instanceof UnprocessedRequestException ||
t instanceof IOException ||
t instanceof FailFastException) {
return Status.UNAVAILABLE.withCause(t);
}
if (t instanceof Http2Exception) {
if (t instanceof Http2Exception.StreamException &&
((Http2Exception.StreamException) t).error() == Http2Error.CANCEL) {
return Status.CANCELLED;
}
return Status.INTERNAL.withCause(t);
}
if (t instanceof TimeoutException) {
return Status.DEADLINE_EXCEEDED.withCause(t);
}
if (t instanceof ContentTooLargeException) {
return Status.RESOURCE_EXHAUSTED.withCause(t);
}
return s;
}

/**
* Converts the specified {@link Status} to a new user-specified {@link Status}
* using the specified {@link GrpcStatusFunction}.
* Returns the given {@link Status} as is if the {@link GrpcStatusFunction} returns {@code null}.
*/
public static Status fromStatusFunction(@Nullable GrpcStatusFunction statusFunction,
RequestContext ctx, Status status, Metadata metadata) {
final GrpcExceptionHandlerFunction exceptionHandler =
statusFunction != null ? statusFunction::apply : null;
return fromExceptionHandler(exceptionHandler, ctx, status, metadata);
}

/**
* Converts the specified {@link Status} to a new user-specified {@link Status}
* using the specified {@link GrpcExceptionHandlerFunction}.
* Returns the given {@link Status} as is if the {@link GrpcExceptionHandlerFunction} returns {@code null}.
*/
public static Status fromExceptionHandler(@Nullable GrpcExceptionHandlerFunction exceptionHandler,
RequestContext ctx, Status status, Metadata metadata) {
requireNonNull(status, "status");

if (exceptionHandler != null) {
final Throwable cause = status.getCause();
if (cause != null) {
final Throwable unwrapped = peelAndUnwrap(cause);
final Status newStatus = exceptionHandler.apply(ctx, unwrapped, metadata);
if (newStatus != null) {
return newStatus;
}
}
}
return status;
}

private static Throwable peelAndUnwrap(Throwable t) {
t = Exceptions.peel(t);
Throwable cause = t;
while (cause != null) {
if (cause instanceof ArmeriaStatusException) {
t = StatusExceptionConverter.toGrpc((ArmeriaStatusException) cause);
break;
}
cause = cause.getCause();
}
return t;
}

/**
* Maps gRPC {@link Status} to {@link HttpStatus}. If there is no matched rule for the specified
* {@link Status}, the mapping rules defined in upstream Google APIs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public final class HttpStreamDeframer extends ArmeriaMessageDeframer {
private final RequestContext ctx;
private final DecompressorRegistry decompressorRegistry;
private final TransportStatusListener transportStatusListener;
@Nullable
private final GrpcExceptionHandlerFunction exceptionHandler;

@Nullable
Expand All @@ -56,7 +55,7 @@ public HttpStreamDeframer(
DecompressorRegistry decompressorRegistry,
RequestContext ctx,
TransportStatusListener transportStatusListener,
@Nullable GrpcExceptionHandlerFunction exceptionHandler,
GrpcExceptionHandlerFunction exceptionHandler,
int maxMessageLength, boolean grpcWebText, boolean server) {
super(maxMessageLength, ctx.alloc(), grpcWebText);
this.ctx = requireNonNull(ctx, "ctx");
Expand Down Expand Up @@ -121,9 +120,8 @@ public void processHeaders(HttpHeaders headers, StreamDecoderOutput<DeframedMess
decompressor(ForwardingDecompressor.forGrpc(decompressor));
} catch (Throwable t) {
final Metadata metadata = new Metadata();
transportStatusListener.transportReportStatus(
GrpcStatus.fromThrowable(exceptionHandler, ctx, t, metadata),
metadata);
transportStatusListener.transportReportStatus(exceptionHandler.apply(ctx, t, metadata),
metadata);
return;
}
}
Expand All @@ -150,8 +148,7 @@ public void processTrailers(HttpHeaders headers, StreamDecoderOutput<DeframedMes
@Override
public void processOnError(Throwable cause) {
final Metadata metadata = new Metadata();
transportStatusListener.transportReportStatus(
GrpcStatus.fromThrowable(exceptionHandler, ctx, cause, metadata), metadata);
transportStatusListener.transportReportStatus(exceptionHandler.apply(ctx, cause, metadata), metadata);
}

@Override
Expand Down
Loading
Loading