@@ -157,14 +157,13 @@ class Stream<T> extends ExchangeImpl<T> {
157157
158158 // send lock: prevent sending DataFrames after reset occurred.
159159 private final Object sendLock = new Object ();
160-
161160 /**
162161 * A reference to this Stream's connection Send Window controller. The
163162 * stream MUST acquire the appropriate amount of Send Window before
164163 * sending any data. Will be null for PushStreams, as they cannot send data.
165164 */
166165 private final WindowController windowController ;
167- private final WindowUpdateSender windowUpdater ;
166+ private final WindowUpdateSender streamWindowUpdater ;
168167
169168 @ Override
170169 HttpConnection connection () {
@@ -203,7 +202,8 @@ private void schedule() {
203202 int size = Utils .remaining (dsts , Integer .MAX_VALUE );
204203 if (size == 0 && finished ) {
205204 inputQ .remove ();
206- connection .ensureWindowUpdated (df ); // must update connection window
205+ // consumed will not be called
206+ connection .releaseUnconsumed (df ); // must update connection window
207207 Log .logTrace ("responseSubscriber.onComplete" );
208208 if (debug .on ()) debug .log ("incoming: onComplete" );
209209 sched .stop ();
@@ -219,7 +219,11 @@ private void schedule() {
219219 try {
220220 subscriber .onNext (dsts );
221221 } catch (Throwable t ) {
222- connection .dropDataFrame (df ); // must update connection window
222+ // Data frames that have been added to the inputQ
223+ // must be released using releaseUnconsumed() to
224+ // account for the amount of unprocessed bytes
225+ // tracked by the connection.windowUpdater.
226+ connection .releaseUnconsumed (df );
223227 throw t ;
224228 }
225229 if (consumed (df )) {
@@ -271,8 +275,12 @@ private void schedule() {
271275 private void drainInputQueue () {
272276 Http2Frame frame ;
273277 while ((frame = inputQ .poll ()) != null ) {
274- if (frame instanceof DataFrame ) {
275- connection .dropDataFrame ((DataFrame )frame );
278+ if (frame instanceof DataFrame df ) {
279+ // Data frames that have been added to the inputQ
280+ // must be released using releaseUnconsumed() to
281+ // account for the amount of unprocessed bytes
282+ // tracked by the connection.windowUpdater.
283+ connection .releaseUnconsumed (df );
276284 }
277285 }
278286 }
@@ -298,12 +306,13 @@ private boolean consumed(DataFrame df) {
298306 boolean endStream = df .getFlag (DataFrame .END_STREAM );
299307 if (len == 0 ) return endStream ;
300308
301- connection .windowUpdater .update (len );
302-
309+ connection .windowUpdater .processed (len );
303310 if (!endStream ) {
311+ streamWindowUpdater .processed (len );
312+ } else {
304313 // Don't send window update on a stream which is
305314 // closed or half closed.
306- windowUpdater . update (len );
315+ streamWindowUpdater . released (len );
307316 }
308317
309318 // true: end of stream; false: more data coming
@@ -373,8 +382,21 @@ public String toString() {
373382 }
374383
375384 private void receiveDataFrame (DataFrame df ) {
376- inputQ .add (df );
377- sched .runOrSchedule ();
385+ try {
386+ int len = df .payloadLength ();
387+ if (len > 0 ) {
388+ // we return from here if the connection is being closed.
389+ if (!connection .windowUpdater .canBufferUnprocessedBytes (len )) return ;
390+ // we return from here if the stream is being closed.
391+ if (closed || !streamWindowUpdater .canBufferUnprocessedBytes (len )) {
392+ connection .releaseUnconsumed (df );
393+ return ;
394+ }
395+ }
396+ inputQ .add (df );
397+ } finally {
398+ sched .runOrSchedule ();
399+ }
378400 }
379401
380402 /** Handles a RESET frame. RESET is always handled inline in the queue. */
@@ -452,7 +474,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
452474 this .responseHeadersBuilder = new HttpHeadersBuilder ();
453475 this .rspHeadersConsumer = new HeadersConsumer ();
454476 this .requestPseudoHeaders = createPseudoHeaders (request );
455- this .windowUpdater = new StreamWindowUpdateSender (connection );
477+ this .streamWindowUpdater = new StreamWindowUpdateSender (connection );
456478 }
457479
458480 private boolean checkRequestCancelled () {
@@ -486,6 +508,8 @@ void incoming(Http2Frame frame) throws IOException {
486508 if (debug .on ()) {
487509 debug .log ("request cancelled or stream closed: dropping data frame" );
488510 }
511+ // Data frames that have not been added to the inputQ
512+ // can be released using dropDataFrame
489513 connection .dropDataFrame (df );
490514 } else {
491515 receiveDataFrame (df );
@@ -1365,12 +1389,18 @@ void cancel(IOException cause) {
13651389
13661390 @ Override
13671391 void onProtocolError (final IOException cause ) {
1392+ onProtocolError (cause , ResetFrame .PROTOCOL_ERROR );
1393+ }
1394+
1395+ void onProtocolError (final IOException cause , int code ) {
13681396 if (debug .on ()) {
1369- debug .log ("cancelling exchange on stream %d due to protocol error: %s" , streamid , cause .getMessage ());
1397+ debug .log ("cancelling exchange on stream %d due to protocol error [%s]: %s" ,
1398+ streamid , ErrorFrame .stringForCode (code ),
1399+ cause .getMessage ());
13701400 }
13711401 Log .logError ("cancelling exchange on stream {0} due to protocol error: {1}\n " , streamid , cause );
13721402 // send a RESET frame and close the stream
1373- cancelImpl (cause , ResetFrame . PROTOCOL_ERROR );
1403+ cancelImpl (cause , code );
13741404 }
13751405
13761406 void connectionClosing (Throwable cause ) {
@@ -1666,6 +1696,13 @@ String dbgString() {
16661696 return dbgString = dbg ;
16671697 }
16681698 }
1699+
1700+ @ Override
1701+ protected boolean windowSizeExceeded (long received ) {
1702+ onProtocolError (new ProtocolException ("stream %s flow control window exceeded"
1703+ .formatted (streamid )), ResetFrame .FLOW_CONTROL_ERROR );
1704+ return true ;
1705+ }
16691706 }
16701707
16711708 /**
0 commit comments