@@ -160,14 +160,13 @@ class Stream<T> extends ExchangeImpl<T> {
160160 // send lock: prevent sending DataFrames after reset occurred.
161161 private final Lock sendLock = new ReentrantLock ();
162162 private final Lock stateLock = new ReentrantLock ();
163-
164163 /**
165164 * A reference to this Stream's connection Send Window controller. The
166165 * stream MUST acquire the appropriate amount of Send Window before
167166 * sending any data. Will be null for PushStreams, as they cannot send data.
168167 */
169168 private final WindowController windowController ;
170- private final WindowUpdateSender windowUpdater ;
169+ private final WindowUpdateSender streamWindowUpdater ;
171170
172171 @ Override
173172 HttpConnection connection () {
@@ -206,7 +205,8 @@ private void schedule() {
206205 int size = Utils .remaining (dsts , Integer .MAX_VALUE );
207206 if (size == 0 && finished ) {
208207 inputQ .remove ();
209- connection .ensureWindowUpdated (df ); // must update connection window
208+ // consumed will not be called
209+ connection .releaseUnconsumed (df ); // must update connection window
210210 Log .logTrace ("responseSubscriber.onComplete" );
211211 if (debug .on ()) debug .log ("incoming: onComplete" );
212212 sched .stop ();
@@ -222,7 +222,11 @@ private void schedule() {
222222 try {
223223 subscriber .onNext (dsts );
224224 } catch (Throwable t ) {
225- connection .dropDataFrame (df ); // must update connection window
225+ // Data frames that have been added to the inputQ
226+ // must be released using releaseUnconsumed() to
227+ // account for the amount of unprocessed bytes
228+ // tracked by the connection.windowUpdater.
229+ connection .releaseUnconsumed (df );
226230 throw t ;
227231 }
228232 if (consumed (df )) {
@@ -274,8 +278,12 @@ private void schedule() {
274278 private void drainInputQueue () {
275279 Http2Frame frame ;
276280 while ((frame = inputQ .poll ()) != null ) {
277- if (frame instanceof DataFrame ) {
278- connection .dropDataFrame ((DataFrame )frame );
281+ if (frame instanceof DataFrame df ) {
282+ // Data frames that have been added to the inputQ
283+ // must be released using releaseUnconsumed() to
284+ // account for the amount of unprocessed bytes
285+ // tracked by the connection.windowUpdater.
286+ connection .releaseUnconsumed (df );
279287 }
280288 }
281289 }
@@ -301,12 +309,13 @@ private boolean consumed(DataFrame df) {
301309 boolean endStream = df .getFlag (DataFrame .END_STREAM );
302310 if (len == 0 ) return endStream ;
303311
304- connection .windowUpdater .update (len );
305-
312+ connection .windowUpdater .processed (len );
306313 if (!endStream ) {
314+ streamWindowUpdater .processed (len );
315+ } else {
307316 // Don't send window update on a stream which is
308317 // closed or half closed.
309- windowUpdater . update (len );
318+ streamWindowUpdater . released (len );
310319 }
311320
312321 // true: end of stream; false: more data coming
@@ -376,8 +385,21 @@ public String toString() {
376385 }
377386
378387 private void receiveDataFrame (DataFrame df ) {
379- inputQ .add (df );
380- sched .runOrSchedule ();
388+ try {
389+ int len = df .payloadLength ();
390+ if (len > 0 ) {
391+ // we return from here if the connection is being closed.
392+ if (!connection .windowUpdater .canBufferUnprocessedBytes (len )) return ;
393+ // we return from here if the stream is being closed.
394+ if (closed || !streamWindowUpdater .canBufferUnprocessedBytes (len )) {
395+ connection .releaseUnconsumed (df );
396+ return ;
397+ }
398+ }
399+ inputQ .add (df );
400+ } finally {
401+ sched .runOrSchedule ();
402+ }
381403 }
382404
383405 /** Handles a RESET frame. RESET is always handled inline in the queue. */
@@ -461,7 +483,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
461483 this .responseHeadersBuilder = new HttpHeadersBuilder ();
462484 this .rspHeadersConsumer = new HeadersConsumer ();
463485 this .requestPseudoHeaders = createPseudoHeaders (request );
464- this .windowUpdater = new StreamWindowUpdateSender (connection );
486+ this .streamWindowUpdater = new StreamWindowUpdateSender (connection );
465487 }
466488
467489 private boolean checkRequestCancelled () {
@@ -495,6 +517,8 @@ void incoming(Http2Frame frame) throws IOException {
495517 if (debug .on ()) {
496518 debug .log ("request cancelled or stream closed: dropping data frame" );
497519 }
520+ // Data frames that have not been added to the inputQ
521+ // can be released using dropDataFrame
498522 connection .dropDataFrame (df );
499523 } else {
500524 receiveDataFrame (df );
@@ -1397,12 +1421,18 @@ void cancel(IOException cause) {
13971421
13981422 @ Override
13991423 void onProtocolError (final IOException cause ) {
1424+ onProtocolError (cause , ResetFrame .PROTOCOL_ERROR );
1425+ }
1426+
1427+ void onProtocolError (final IOException cause , int code ) {
14001428 if (debug .on ()) {
1401- debug .log ("cancelling exchange on stream %d due to protocol error: %s" , streamid , cause .getMessage ());
1429+ debug .log ("cancelling exchange on stream %d due to protocol error [%s]: %s" ,
1430+ streamid , ErrorFrame .stringForCode (code ),
1431+ cause .getMessage ());
14021432 }
14031433 Log .logError ("cancelling exchange on stream {0} due to protocol error: {1}\n " , streamid , cause );
14041434 // send a RESET frame and close the stream
1405- cancelImpl (cause , ResetFrame . PROTOCOL_ERROR );
1435+ cancelImpl (cause , code );
14061436 }
14071437
14081438 void connectionClosing (Throwable cause ) {
@@ -1699,6 +1729,13 @@ String dbgString() {
16991729 return dbgString = dbg ;
17001730 }
17011731 }
1732+
1733+ @ Override
1734+ protected boolean windowSizeExceeded (long received ) {
1735+ onProtocolError (new ProtocolException ("stream %s flow control window exceeded"
1736+ .formatted (streamid )), ResetFrame .FLOW_CONTROL_ERROR );
1737+ return true ;
1738+ }
17021739 }
17031740
17041741 /**
0 commit comments