Skip to content

Commit

Permalink
Handle a leak when thrift requests are aborted early (#5535)
Browse files Browse the repository at this point in the history
Motivation:

We received a report where thrift was leaking pooled byte buffers.
We use pooled byte buffers to create the thrift request in
`THttpClientDelegate` assuming
that it will be released when sent on the wire for normal cases.


https://github.com/line/armeria/blob/7f302505a05d24e7704bcffc43fab6337344cb2c/thrift/thrift0.13/src/main/java/com/linecorp/armeria/internal/client/thrift/THttpClientDelegate.java#L134

However, if any requests don't have a chance to reach the wire, they
will leak the byte buffer.
I believe it is reasonable to assume that requests not decoded properly
should be cleaned up. Hence, I propose we do an extra `req.abort` inside
`handlePreDecodeException`.

Modifications:

- Modify `handlePreDecodeException` to accept an additional
`HttpRequest` parameter
- Abort the request with the exception if exists

Result:

- #5481

<!--
Visit this URL to learn more about how to write a pull request
description:

https://armeria.dev/community/developer-guide#how-to-write-pull-request-description
-->
  • Loading branch information
jrhee17 committed Apr 3, 2024
1 parent 900002d commit 3fc6f8f
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public RpcResponse execute(ClientRequestContext ctx, RpcRequest call) {
httpResponse.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop()))
.handle((res, cause) -> {
if (cause != null) {
handlePreDecodeException(ctx, reply, func, Exceptions.peel(cause));
handlePreDecodeException(ctx, reply, func, Exceptions.peel(cause), httpReq);
return null;
}

Expand All @@ -178,21 +178,21 @@ public RpcResponse execute(ClientRequestContext ctx, RpcRequest call) {
if (status.code() != HttpStatus.OK.code()) {
handlePreDecodeException(
ctx, reply, func,
new InvalidResponseHeadersException(res.headers()));
new InvalidResponseHeadersException(res.headers()), httpReq);
return null;
}

try {
handle(ctx, seqId, reply, func, content);
} catch (Throwable t) {
handlePreDecodeException(ctx, reply, func, t);
handlePreDecodeException(ctx, reply, func, t, httpReq);
}
}

return null;
}).exceptionally(CompletionActions::log);
} catch (Throwable cause) {
handlePreDecodeException(ctx, reply, func, cause);
handlePreDecodeException(ctx, reply, func, cause, null);
}

return reply;
Expand Down Expand Up @@ -309,9 +309,15 @@ private static void handleException(ClientRequestContext ctx, CompletableRpcResp
}

private static void handlePreDecodeException(ClientRequestContext ctx, CompletableRpcResponse reply,
ThriftFunction thriftMethod, Throwable cause) {
handleException(ctx, reply, null,
decodeException(cause, thriftMethod.declaredExceptions()));
ThriftFunction thriftMethod, Throwable cause,
@Nullable HttpRequest request) {
final Exception decodedCause =
decodeException(cause, thriftMethod.declaredExceptions());
if (request != null) {
// abort the request to potentially release pooled buffers
request.abort(decodedCause);
}
handleException(ctx, reply, null, decodedCause);
}

static Exception decodeException(Throwable cause, @Nullable Class<?>[] declaredThrowableExceptions) {
Expand Down

0 comments on commit 3fc6f8f

Please sign in to comment.