Improve reactive streams compliance in InputStreamPublisher#5160
Improve reactive streams compliance in InputStreamPublisher#5160
Conversation
- Cancel subscription after onError for invalid request(n <= 0) per §3.9 - Use saturating addition for demand to prevent Long overflow Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR improves InputStreamPublisher reactive-streams compliance by terminating subscriptions on invalid demand requests and preventing Long overflow when accumulating demand.
Changes:
- Cancel subscription and signal
onErrorwhenrequest(n <= 0)is called (reactive streams §3.9). - Accumulate demand using saturating addition capped at
Long.MaxValueto avoid overflow.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (n <= 0) { | ||
| cancel() | ||
| subscriber.onError(new IllegalArgumentException("§3.9: n must be greater than 0")) | ||
| } else { |
There was a problem hiding this comment.
Calling cancel() when request(n <= 0) now forces evaluation of the lazy stream (because cancel closes it), which can unnecessarily open the underlying InputStream and potentially perform range skipping even though no data will be read. Consider avoiding stream initialization on this error path (e.g., track the opened stream in an AtomicReference/var and only close if it was already created), while still marking the subscription as terminated so no further reads happen.
| override def request(n: Long): Unit = { | ||
| if (n <= 0) subscriber.onError(new IllegalArgumentException("§3.9: n must be greater than 0")) | ||
| else { | ||
| demand.addAndGet(n) | ||
| if (n <= 0) { | ||
| cancel() | ||
| subscriber.onError(new IllegalArgumentException("§3.9: n must be greater than 0")) | ||
| } else { | ||
| addDemand(n) | ||
| readNextChunkIfNeeded() | ||
| } | ||
| } | ||
|
|
||
| /** Add demand using saturating addition, capping at Long.MaxValue to prevent overflow. */ | ||
| private def addDemand(n: Long): Unit = { | ||
| demand.getAndUpdate(current => if (current > Long.MaxValue - n) Long.MaxValue else current + n) | ||
| () |
There was a problem hiding this comment.
The new behaviors (saturating demand accumulation and cancelling/terminating on invalid request(n <= 0)) aren't covered by tests. Adding a focused unit test for InputStreamPublisher (e.g., verifying demand never overflows past Long.MaxValue, and that invalid request triggers onError and prevents any subsequent onNext/onComplete) would help prevent regressions and confirm reactive-streams compliance.
Summary
onErrorfor invalidrequest(n <= 0)per reactive streams §3.9, preventing further reads/leaksLongoverflow whenrequest()is called multiple times with large valuesNote: the same fixes are applied to
InputStreamSyncPublisherin #5159.Test plan
🤖 Generated with Claude Code