@@ -160,14 +160,13 @@ class Stream<T> extends ExchangeImpl<T> {
160160 // send lock: prevent sending DataFrames after reset occurred.
161161 private final Lock sendLock = new ReentrantLock ();
162162 private final Lock stateLock = new ReentrantLock ();
163-
164163 /**
165164 * A reference to this Stream's connection Send Window controller. The
166165 * stream MUST acquire the appropriate amount of Send Window before
167166 * sending any data. Will be null for PushStreams, as they cannot send data.
168167 */
169168 private final WindowController windowController ;
170- private final WindowUpdateSender windowUpdater ;
169+ private final WindowUpdateSender streamWindowUpdater ;
171170
172171 // Only accessed in all method calls from incoming(), no need for volatile
173172 private boolean endStreamSeen ;
@@ -217,7 +216,8 @@ private void schedule() {
217216 int size = Utils .remaining (dsts , Integer .MAX_VALUE );
218217 if (size == 0 && finished ) {
219218 inputQ .remove ();
220- connection .ensureWindowUpdated (df ); // must update connection window
219+ // consumed will not be called
220+ connection .releaseUnconsumed (df ); // must update connection window
221221 Log .logTrace ("responseSubscriber.onComplete" );
222222 if (debug .on ()) debug .log ("incoming: onComplete" );
223223 connection .decrementStreamsCount (streamid );
@@ -232,7 +232,11 @@ private void schedule() {
232232 try {
233233 subscriber .onNext (dsts );
234234 } catch (Throwable t ) {
235- connection .dropDataFrame (df ); // must update connection window
235+ // Data frames that have been added to the inputQ
236+ // must be released using releaseUnconsumed() to
237+ // account for the amount of unprocessed bytes
238+ // tracked by the connection.windowUpdater.
239+ connection .releaseUnconsumed (df );
236240 throw t ;
237241 }
238242 if (consumed (df )) {
@@ -283,8 +287,12 @@ private void schedule() {
283287 private void drainInputQueue () {
284288 Http2Frame frame ;
285289 while ((frame = inputQ .poll ()) != null ) {
286- if (frame instanceof DataFrame ) {
287- connection .dropDataFrame ((DataFrame )frame );
290+ if (frame instanceof DataFrame df ) {
291+ // Data frames that have been added to the inputQ
292+ // must be released using releaseUnconsumed() to
293+ // account for the amount of unprocessed bytes
294+ // tracked by the connection.windowUpdater.
295+ connection .releaseUnconsumed (df );
288296 }
289297 }
290298 }
@@ -310,12 +318,13 @@ private boolean consumed(DataFrame df) {
310318 boolean endStream = df .getFlag (DataFrame .END_STREAM );
311319 if (len == 0 ) return endStream ;
312320
313- connection .windowUpdater .update (len );
314-
321+ connection .windowUpdater .processed (len );
315322 if (!endStream ) {
323+ streamWindowUpdater .processed (len );
324+ } else {
316325 // Don't send window update on a stream which is
317326 // closed or half closed.
318- windowUpdater . update (len );
327+ streamWindowUpdater . released (len );
319328 }
320329
321330 // true: end of stream; false: more data coming
@@ -385,8 +394,21 @@ public String toString() {
385394 }
386395
387396 private void receiveDataFrame (DataFrame df ) {
388- inputQ .add (df );
389- sched .runOrSchedule ();
397+ try {
398+ int len = df .payloadLength ();
399+ if (len > 0 ) {
400+ // we return from here if the connection is being closed.
401+ if (!connection .windowUpdater .canBufferUnprocessedBytes (len )) return ;
402+ // we return from here if the stream is being closed.
403+ if (closed || !streamWindowUpdater .canBufferUnprocessedBytes (len )) {
404+ connection .releaseUnconsumed (df );
405+ return ;
406+ }
407+ }
408+ inputQ .add (df );
409+ } finally {
410+ sched .runOrSchedule ();
411+ }
390412 }
391413
392414 /** Handles a RESET frame. RESET is always handled inline in the queue. */
@@ -470,7 +492,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
470492 this .responseHeadersBuilder = new HttpHeadersBuilder ();
471493 this .rspHeadersConsumer = new HeadersConsumer ();
472494 this .requestPseudoHeaders = createPseudoHeaders (request );
473- this .windowUpdater = new StreamWindowUpdateSender (connection );
495+ this .streamWindowUpdater = new StreamWindowUpdateSender (connection );
474496 }
475497
476498 private boolean checkRequestCancelled () {
@@ -506,6 +528,8 @@ void incoming(Http2Frame frame) throws IOException {
506528 if (debug .on ()) {
507529 debug .log ("request cancelled or stream closed: dropping data frame" );
508530 }
531+ // Data frames that have not been added to the inputQ
532+ // can be released using dropDataFrame
509533 connection .dropDataFrame (df );
510534 } else {
511535 receiveDataFrame (df );
@@ -1427,12 +1451,18 @@ void cancel(IOException cause) {
14271451
14281452 @ Override
14291453 void onProtocolError (final IOException cause ) {
1454+ onProtocolError (cause , ResetFrame .PROTOCOL_ERROR );
1455+ }
1456+
1457+ void onProtocolError (final IOException cause , int code ) {
14301458 if (debug .on ()) {
1431- debug .log ("cancelling exchange on stream %d due to protocol error: %s" , streamid , cause .getMessage ());
1459+ debug .log ("cancelling exchange on stream %d due to protocol error [%s]: %s" ,
1460+ streamid , ErrorFrame .stringForCode (code ),
1461+ cause .getMessage ());
14321462 }
14331463 Log .logError ("cancelling exchange on stream {0} due to protocol error: {1}\n " , streamid , cause );
14341464 // send a RESET frame and close the stream
1435- cancelImpl (cause , ResetFrame . PROTOCOL_ERROR );
1465+ cancelImpl (cause , code );
14361466 }
14371467
14381468 void connectionClosing (Throwable cause ) {
@@ -1736,6 +1766,13 @@ String dbgString() {
17361766 return dbgString = dbg ;
17371767 }
17381768 }
1769+
1770+ @ Override
1771+ protected boolean windowSizeExceeded (long received ) {
1772+ onProtocolError (new ProtocolException ("stream %s flow control window exceeded"
1773+ .formatted (streamid )), ResetFrame .FLOW_CONTROL_ERROR );
1774+ return true ;
1775+ }
17391776 }
17401777
17411778 /**
0 commit comments