-
Notifications
You must be signed in to change notification settings - Fork 909
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix to call onRemoval()
immediately when using StreamMessage.collect()
#3670
Conversation
…ct()`. Motivation: `DecodedHttp{Request,Response}` remove an object from the inbound traffic when `onRemoval()` is called. The current implementation of `StreamMessage.collect()` in `DefaultStreamMessage` calls `onRemoval()` when a stream is closed via `close()`. That means the inbound traffic will be removed only when the messages of a stream is fully received. Modifications: - Immediately drain the objects from the `queue` of `DefaultStreamMessage` whenever they are written to the `queue` when `collect()` is called so that `onRemoval()` is called right away. Result: You no longer see a `ResponseTimeoutException` when collecting a large number of message using `Http{Request,Response}.aggreagte()`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some nits. Thanks!
core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessage.java
Outdated
Show resolved
Hide resolved
} else { | ||
// We don't need to invoke onSubscribe() for NoopSubscriber. | ||
invokedOnSubscribe = true; | ||
notifySubscriber(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to call this method here? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though a stream is open, we need to drain queue
so that onRemoval()
should be called immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant was that if this thread fails to set the state as State.CLEANUP
, then there might be another thread that handles the elements.
So I was wondering in which case we need to call this method. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I realized that the State
can be State.OPEN
so we need to call that. 😅
Could you check the test failure? https://github.com/line/armeria/pull/3670/checks?check_run_id=2950644605 |
Oops... It is possible. |
Codecov Report
@@ Coverage Diff @@
## master #3670 +/- ##
============================================
+ Coverage 73.39% 73.42% +0.03%
- Complexity 14594 14600 +6
============================================
Files 1283 1283
Lines 56001 56010 +9
Branches 7171 7173 +2
============================================
+ Hits 41103 41127 +24
+ Misses 11269 11268 -1
+ Partials 3629 3615 -14
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 🎉 🎉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @ikhoon!
Motivation:
DecodedHttp{Request,Response}
removes an object from the inboundtraffic when
onRemoval()
is called. The current implementation ofStreamMessage.collect()
inDefaultStreamMessage
callsonRemoval()
when a stream is closed via
close()
. That means the inbound trafficwill be removed only after the messages of a stream are fully received.
Modifications:
queue
ofDefaultStreamMessage
whenever they are written to thequeue
when
collect()
is called so thatonRemoval()
is called right away.Result:
You no longer see a
ResponseTimeoutException
when collecting a large number ofmessage using
Http{Request,Response}.aggreagte()
.