-
Notifications
You must be signed in to change notification settings - Fork 918
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix byte buf leak when a ContentTooLargeException
is raised
#5227
Conversation
Motivation: There are three bugs when the size of a request exceeds the max content length: - The `ResponseHeaders` that is sent to the client and is recorded to `RequestLog` are different. - The `AggregatingDecodedHttpRequest` is not aborted thus `ByteBuf`s are leaked. (When HTTP/1.1 is used) - The `AggregatingDecodedHttpRequest` does not get throught the decorators so `LoggingService` doesn't log the exception. Modifications: - Send 413 response using `HttpResponseSubscriber` so that the recoreded `ResponseHaders` is actually sent - Abort `AggregatingDecodedHttpRequest` if a `ContentTooLargeException` is raised when HTTP/1.1 is used. - Convert `AggregatingDecodedHttpRequest` to `StreamingDecodedHttpRequest` and pass it to the service chain. - Raise an stream error instead of connection error if it is stream level. Result: - Fix line#5180 - No more leaks when the size of a request exceeds the max content length.
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #5227 +/- ##
============================================
- Coverage 73.94% 73.94% -0.01%
- Complexity 20072 20085 +13
============================================
Files 1728 1728
Lines 74040 74092 +52
Branches 9438 9451 +13
============================================
+ Hits 54752 54785 +33
- Misses 14829 14833 +4
- Partials 4459 4474 +15
☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not to raise an aggregate exception.
Do you mind explaining what points raised an aggregate exception?
Also, I think it will take too long for me to thoroughly review this PR.
Feel free to merge without my approval 🤞
core/src/main/java/com/linecorp/armeria/internal/common/stream/SubscriberUtil.java
Outdated
Show resolved
Hide resolved
if (!req.isInitialized()) { | ||
assert req.needsAggregation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless of whether initialized or not, I think semantically it makes more sense that we fire the channel if the request isn't aggregated.
Also, if we happen to modify the pipeline later which reschedules the ctx.fireChannelRead(req);
downstream, I feel like this can be a potential bug very easily
ditto for the other occurrences
if (!req.isInitialized()) { | |
assert req.needsAggregation(); | |
if (req.needsAggregation()) { | |
assert !req.isInitialized(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original fix was calling ctx.fireChannelRead(req);
with the aggregating request itself (without converting to stream request) so I need to check if the fireChannelRead
method is called or not using isInitialized()
. But I've changed the logic so let me revert it. 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AggregatingDecodedHttpRequest is converted to StreamingDecodedHttpRequest not to raise an aggregate exception.
If the request is incomplete, wouldn't be clear to raise an exception when being aggregated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AggregatingDecodedHttpRequest
was implemented on the assumption that it is aggregated after being closed.
Do you mean we need to fix AggregatingDecodedHttpRequest
not to raise an exception when aggregated?
core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/server/AggregatedHttpResponseHandler.java
Show resolved
Hide resolved
@@ -394,7 +406,12 @@ public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorC | |||
|
|||
final ClosedStreamException cause = | |||
new ClosedStreamException("received a RST_STREAM frame: " + Http2Error.valueOf(errorCode)); | |||
req.abortResponse(cause, /* cancel */ true); | |||
if (req.needsAggregation() && !req.isInitialized()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (req.needsAggregation() && !req.isInitialized()) { | |
if (req.needsAggregation()) { | |
assert !req.isInitialized(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't true because the server can receive an RST_STREAM after the aggregating request is initialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
request is initialized.
Q: Initialization seems more important to call req.toAbortedStreaming()
. What do you think of using req.isInitialized()
to check whether a call to req.toAbortedStreaming()
is necessary for here and L335?
if (!req.isInitialized()) {
ctx.fireChannelRead(req.toAbortedStreaming(inboundTrafficController, cause, false));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good suggestion. Let me change it back. 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original reason for my suggestion was because I wanted to do some testing on rescheduling the layer between channel and service event loops.
Let me just handle this separately then if needed
core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java
Outdated
Show resolved
Hide resolved
An armeria/core/src/main/java/com/linecorp/armeria/internal/common/stream/AggregatingStreamMessage.java Line 94 in 871d872
However, we can just close it without fully receiving the request body.
Oh, please let me know if there're any points that you don't understand. 😉 |
core/src/main/java/com/linecorp/armeria/server/AggregatedHttpResponseHandler.java
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/server/DecodedHttpRequest.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/server/StreamingDecodedHttpRequest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good.
core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java
Outdated
Show resolved
Hide resolved
@@ -394,7 +406,12 @@ public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorC | |||
|
|||
final ClosedStreamException cause = | |||
new ClosedStreamException("received a RST_STREAM frame: " + Http2Error.valueOf(errorCode)); | |||
req.abortResponse(cause, /* cancel */ true); | |||
if (req.needsAggregation() && !req.isInitialized()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
request is initialized.
Q: Initialization seems more important to call req.toAbortedStreaming()
. What do you think of using req.isInitialized()
to check whether a call to req.toAbortedStreaming()
is necessary for here and L335?
if (!req.isInitialized()) {
ctx.fireChannelRead(req.toAbortedStreaming(inboundTrafficController, cause, false));
}
final Http2Stream stream = encoder.connection().stream(streamId); | ||
|
||
if (sendResetIfRemoteIsOpen && !stream.state().localSideOpen()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Why is the local side open checked? sendResetIfRemoteIsOpen
says reset if the remote is open.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied it from here and I think I also need to fix it. 😓
Let me run the CI a few more times.
core/src/main/java/com/linecorp/armeria/server/AbstractHttpResponseSubscriber.java
Outdated
Show resolved
Hide resolved
// Send a RST_STREAM frame only for an active stream which did not send a RST_STREAM frame already. | ||
if (stream != null && !stream.isResetSent()) { | ||
return encoder.writeRstStream(ctx, streamId, error.code(), ctx.newPromise()); | ||
if (!sendResetIfRemoteIsOpen || stream.state().remoteSideOpen()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Do we need to send RST_FRAME when half-close on remote? An EOF for the stream has been received from the remote peer already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change not to send RST if EOF is received. 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for handling this non-trivial issue. 🙇♂️🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only left a minor question but looks good 👍 Thanks @minwoox 🙇 👍 🙇
final HttpHeaders trailingHeaders = ((LastHttpContent) msg).trailingHeaders(); | ||
if (!trailingHeaders.isEmpty()) { | ||
decodedReq.write(ArmeriaHttpUtil.toArmeria(trailingHeaders)); | ||
} | ||
decodedReq.close(); | ||
if (decodedReq.needsAggregation()) { | ||
assert !decodedReq.isInitialized(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this can be removed
assert !decodedReq.isInitialized(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but let's leave it as is if you don't mind. 😉
@@ -394,7 +406,12 @@ public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorC | |||
|
|||
final ClosedStreamException cause = | |||
new ClosedStreamException("received a RST_STREAM frame: " + Http2Error.valueOf(errorCode)); | |||
req.abortResponse(cause, /* cancel */ true); | |||
if (req.needsAggregation() && !req.isInitialized()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original reason for my suggestion was because I wanted to do some testing on rescheduling the layer between channel and service event loops.
Let me just handle this separately then if needed
assert decodedReq.needsAggregation(); | ||
final StreamingDecodedHttpRequest streamingReq = decodedReq.toAbortedStreaming( | ||
inboundTrafficController, httpStatusException, shouldReset); | ||
ctx.fireChannelRead(streamingReq); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question) It seems like we don't early return when encoder
is swapped to http2 and an upgrade request is received.
Is there a code point where we guard against ctx.fireChannelRead
being invoked twice? (once for HTTP1 upgrade, once for HTTP2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. 😉
An UpgradeEvent
is created when it receives the upgrade event:
armeria/core/src/main/java/com/linecorp/armeria/server/HttpServerUpgradeHandler.java
Line 297 in b7c14c4
final UpgradeEvent event = new UpgradeEvent(request); |
Http2ConnectionHandler
is added to the pipleline:armeria/core/src/main/java/com/linecorp/armeria/server/Http2ServerUpgradeCodec.java
Line 109 in e7be74e
ctx.pipeline().addAfter(ctx.name(), null, connectionHandler); |
Http1RequestDecoder
receives the UpgradeEvent
and calls channelRead()
with the HTTP/1.1 request:channelRead(ctx, nettyReq); |
After handling the request,
Http1RequestDecoder
is removed:armeria/core/src/main/java/com/linecorp/armeria/server/Http1RequestDecoder.java
Lines 275 to 279 in 091d193
final boolean endOfStream = msg instanceof LastHttpContent; | |
if (endOfStream && encoder instanceof ServerHttp2ObjectEncoder) { | |
// An HTTP/1 connection has been upgraded to HTTP/2. | |
ctx.pipeline().remove(this); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we don't early return
My question was rather when ctx.pipeline().remove(this);
is called, but the content length of the request is exceeded.
Let me do some testing in my local env. instead and follow up if needed, feel free to merge 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misunderstood it. 😓
No, I didn't consider that case and I have to fix it. Thanks!
Thanks all for the review. 😉 |
) Motivation: There are three bugs when the size of a request exceeds the max content length: - The `ResponseHeaders` that are sent to the client and recorded to `RequestLog` are different. - The `AggregatingDecodedHttpRequest` is not aborted thus `ByteBuf`s are leaked. (When HTTP/1.1 is used) - The `AggregatingDecodedHttpRequest` does not get through the decorators so `LoggingService` doesn't log the exception. Modifications: - Send 413 responses using `HttpResponseSubscriber` instead of sending them from `HttpRequestDecoder` by aborting the response. - 413 is not a protocol error so we should send it from the `HttpResponseSubscriber`. - If a responseHeaders is already sent, the stream is reset. (the channel is closed if HTTP/1.1) - Call `ctx.fireChannelRead()` if the request wasn't handed over to `HttpServerHandler`. - This lets the `LoggingService` log the `ContentTooLargeException`. - `AggregatingDecodedHttpRequest` is converted to `StreamingDecodedHttpRequest` not to raise an aggregate exception. - (misc) Raise a stream error instead of a connection error if it is stream level. Result: - Fix line#3803, line#5180 - No more leaks when the size of a request exceeds the max content length.
Motivation:
There are three bugs when the size of a request exceeds the max content length:
ResponseHeaders
that are sent to the client and recorded toRequestLog
are different.AggregatingDecodedHttpRequest
is not aborted thusByteBuf
s are leaked. (When HTTP/1.1 is used)AggregatingDecodedHttpRequest
does not get through the decorators soLoggingService
doesn't log the exception.Modifications:
HttpResponseSubscriber
instead of sending them fromHttpRequestDecoder
by aborting the response.HttpResponseSubscriber
.ctx.fireChannelRead()
if the request wasn't handed over toHttpServerHandler
.LoggingService
log theContentTooLargeException
.AggregatingDecodedHttpRequest
is converted toStreamingDecodedHttpRequest
not to raise an aggregate exception.Result:
maxRequestLength
, the request may not be handled #3803, LEAK: ByteBuf.release() was not called before it's garbage-collected using verison 1.25.2 #5180