Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.frame.SettingsFrame;

import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE;
import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH;
import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE;
Expand Down Expand Up @@ -289,9 +291,13 @@ int getConnectionWindowSize(SettingsFrame clientSettings) {
// and the connection window size.
int defaultValue = Math.max(streamWindow, K*K*32);

// The min value is the max between the streamWindow and
// the initial connection window size
int minValue = Math.max(INITIAL_CONNECTION_WINDOW_SIZE, streamWindow);

return getParameter(
"jdk.httpclient.connectionWindowSize",
streamWindow, Integer.MAX_VALUE, defaultValue);
minValue, Integer.MAX_VALUE, defaultValue);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,34 @@ private String checkMaxOrphanedHeadersExceeded(HeaderFrame hf) {
return null;
}

// This method is called when a DataFrame that was added
// to a Stream::inputQ is later dropped from the queue
// without being consumed.
//
// Before adding a frame to the queue, the Stream calls
// connection.windowUpdater.canBufferUnprocessedBytes(), which
// increases the count of unprocessed bytes in the connection.
// After consuming the frame, it calls connection.windowUpdater::processed,
// which decrements the count of unprocessed bytes, and possibly
// sends a window update to the peer.
//
// This method is called when connection.windowUpdater::processed
// will not be called, which can happen when consuming the frame
// fails, or when an empty DataFrame terminates the stream,
// or when the stream is cancelled while data is still
// sitting in its inputQ. In the later case, it is called for
// each frame that is dropped from the queue.
final void releaseUnconsumed(DataFrame df) {
windowUpdater.released(df.payloadLength());
dropDataFrame(df);
}

// This method can be called directly when a DataFrame is dropped
// before/without having been added to any Stream::inputQ.
// In that case, the number of unprocessed bytes hasn't been incremented
// by the stream, and does not need to be decremented.
// Otherwise, if the frame is dropped after having been added to the
// inputQ, releaseUnconsumed above should be called.
final void dropDataFrame(DataFrame df) {
if (isMarked(closedState, SHUTDOWN_REQUESTED)) return;
if (debug.on()) {
Expand Down Expand Up @@ -1453,11 +1481,12 @@ private void sendConnectionPreface() throws IOException {
// Note that the default initial window size, not to be confused
// with the initial window size, is defined by RFC 7540 as
// 64K -1.
final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
if (len != 0) {
final int len = windowUpdater.initialWindowSize - INITIAL_CONNECTION_WINDOW_SIZE;
assert len >= 0;
if (len > 0) {
if (Log.channel()) {
Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
len, windowUpdater.initialWindowSize, INITIAL_CONNECTION_WINDOW_SIZE);
}
windowUpdater.sendWindowUpdate(len);
}
Expand Down Expand Up @@ -1911,6 +1940,19 @@ public ConnectionWindowUpdateSender(Http2Connection connection,
int getStreamId() {
return 0;
}

@Override
protected boolean windowSizeExceeded(long received) {
if (connection.isOpen()) {
try {
connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR,
"connection window exceeded");
} catch (IOException io) {
connection.shutdown(io);
}
}
return true;
}
}

/**
Expand Down
65 changes: 51 additions & 14 deletions src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,13 @@ class Stream<T> extends ExchangeImpl<T> {
// send lock: prevent sending DataFrames after reset occurred.
private final Lock sendLock = new ReentrantLock();
private final Lock stateLock = new ReentrantLock();

/**
* A reference to this Stream's connection Send Window controller. The
* stream MUST acquire the appropriate amount of Send Window before
* sending any data. Will be null for PushStreams, as they cannot send data.
*/
private final WindowController windowController;
private final WindowUpdateSender windowUpdater;
private final WindowUpdateSender streamWindowUpdater;

@Override
HttpConnection connection() {
Expand Down Expand Up @@ -206,7 +205,8 @@ private void schedule() {
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
if (size == 0 && finished) {
inputQ.remove();
connection.ensureWindowUpdated(df); // must update connection window
// consumed will not be called
connection.releaseUnconsumed(df); // must update connection window
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
Expand All @@ -222,7 +222,11 @@ private void schedule() {
try {
subscriber.onNext(dsts);
} catch (Throwable t) {
connection.dropDataFrame(df); // must update connection window
// Data frames that have been added to the inputQ
// must be released using releaseUnconsumed() to
// account for the amount of unprocessed bytes
// tracked by the connection.windowUpdater.
connection.releaseUnconsumed(df);
throw t;
}
if (consumed(df)) {
Expand Down Expand Up @@ -274,8 +278,12 @@ private void schedule() {
private void drainInputQueue() {
Http2Frame frame;
while ((frame = inputQ.poll()) != null) {
if (frame instanceof DataFrame) {
connection.dropDataFrame((DataFrame)frame);
if (frame instanceof DataFrame df) {
// Data frames that have been added to the inputQ
// must be released using releaseUnconsumed() to
// account for the amount of unprocessed bytes
// tracked by the connection.windowUpdater.
connection.releaseUnconsumed(df);
}
}
}
Expand All @@ -301,12 +309,13 @@ private boolean consumed(DataFrame df) {
boolean endStream = df.getFlag(DataFrame.END_STREAM);
if (len == 0) return endStream;

connection.windowUpdater.update(len);

connection.windowUpdater.processed(len);
if (!endStream) {
streamWindowUpdater.processed(len);
} else {
// Don't send window update on a stream which is
// closed or half closed.
windowUpdater.update(len);
streamWindowUpdater.released(len);
}

// true: end of stream; false: more data coming
Expand Down Expand Up @@ -376,8 +385,21 @@ public String toString() {
}

private void receiveDataFrame(DataFrame df) {
inputQ.add(df);
sched.runOrSchedule();
try {
int len = df.payloadLength();
if (len > 0) {
// we return from here if the connection is being closed.
if (!connection.windowUpdater.canBufferUnprocessedBytes(len)) return;
// we return from here if the stream is being closed.
if (closed || !streamWindowUpdater.canBufferUnprocessedBytes(len)) {
connection.releaseUnconsumed(df);
return;
}
}
inputQ.add(df);
} finally {
sched.runOrSchedule();
}
}

/** Handles a RESET frame. RESET is always handled inline in the queue. */
Expand Down Expand Up @@ -461,7 +483,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
this.responseHeadersBuilder = new HttpHeadersBuilder();
this.rspHeadersConsumer = new HeadersConsumer();
this.requestPseudoHeaders = createPseudoHeaders(request);
this.windowUpdater = new StreamWindowUpdateSender(connection);
this.streamWindowUpdater = new StreamWindowUpdateSender(connection);
}

private boolean checkRequestCancelled() {
Expand Down Expand Up @@ -495,6 +517,8 @@ void incoming(Http2Frame frame) throws IOException {
if (debug.on()) {
debug.log("request cancelled or stream closed: dropping data frame");
}
// Data frames that have not been added to the inputQ
// can be released using dropDataFrame
connection.dropDataFrame(df);
} else {
receiveDataFrame(df);
Expand Down Expand Up @@ -1397,12 +1421,18 @@ void cancel(IOException cause) {

@Override
void onProtocolError(final IOException cause) {
onProtocolError(cause, ResetFrame.PROTOCOL_ERROR);
}

void onProtocolError(final IOException cause, int code) {
if (debug.on()) {
debug.log("cancelling exchange on stream %d due to protocol error: %s", streamid, cause.getMessage());
debug.log("cancelling exchange on stream %d due to protocol error [%s]: %s",
streamid, ErrorFrame.stringForCode(code),
cause.getMessage());
}
Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause);
// send a RESET frame and close the stream
cancelImpl(cause, ResetFrame.PROTOCOL_ERROR);
cancelImpl(cause, code);
}

void connectionClosing(Throwable cause) {
Expand Down Expand Up @@ -1699,6 +1729,13 @@ String dbgString() {
return dbgString = dbg;
}
}

@Override
protected boolean windowSizeExceeded(long received) {
onProtocolError(new ProtocolException("stream %s flow control window exceeded"
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
return true;
}
}

/**
Expand Down
Loading