Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UNDERTOW-2034][UNDERTOW-2035][UNDERTOW-2079][UNDERTOW-2080] Fixes for AbstractFramedStreamSinkChannel and Http2StreamSinkChannel await methods #1330

Merged
merged 4 commits into from Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,15 +19,20 @@
package io.undertow.protocols.http2;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

import io.undertow.UndertowMessages;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.protocol.framed.SendFrameHeader;
import org.xnio.IoUtils;

/**
* Stream sink channel used for HTTP2 communication.
*
* @author Stuart Douglas
* @author Flavia Rainone
*/
public abstract class Http2StreamSinkChannel extends AbstractHttp2StreamSinkChannel {

Expand Down Expand Up @@ -118,8 +123,11 @@ protected int grabFlowControlBytes(int toSend) {
int newWindowSize = this.getChannel().getInitialSendWindowSize();
int settingsDelta = newWindowSize - this.initialWindowSize;
//first adjust for any settings frame updates
this.initialWindowSize = newWindowSize;
this.flowControlWindow += settingsDelta;
if (settingsDelta != 0) {
this.initialWindowSize = newWindowSize;
this.flowControlWindow += settingsDelta;
flowControlLock.notifyAll();
}

int min = Math.min(toSend, this.flowControlWindow);
int actualBytes = this.getChannel().grabFlowControlBytes(min);
Expand All @@ -140,6 +148,7 @@ void updateFlowControlWindow(final int delta) throws IOException {
return;
}
flowControlWindow += delta;
flowControlLock.notifyAll();
}
if (exhausted) {
getChannel().notifyFlowControlAllowed();
Expand Down Expand Up @@ -176,17 +185,44 @@ protected PooledByteBuffer[] allocateAll(PooledByteBuffer[] allHeaderBuffers, Po
* Invokes super awaitWritable, with an extra check for flowControlWindow. The purpose of this is to
* warn clearly that peer is not updating the flow control window.
*
* This method will block for the maximum amount of time specified by {@link #getAwaitWritableTimeout()}
*
* @throws IOException if an IO error occurs
*/
public void awaitWritable() throws IOException {
awaitWritable(getAwaitWritableTimeout(), TimeUnit.MILLISECONDS);
}

/**
* Invokes super awaitWritable, with an extra check for flowControlWindow. The purpose of this is to
* warn clearly that peer is not updating the flow control window.
*
* @param time the time to wait
* @param timeUnit the time unit
* @throws IOException if an IO error occurs
*/
public void awaitWritable(long time, TimeUnit timeUnit) throws IOException {
final int flowControlWindow;
synchronized (flowControlLock) {
flowControlWindow = this.flowControlWindow;
}
super.awaitWritable();
long initialTime = System.currentTimeMillis();
super.awaitWritable(time, timeUnit);
synchronized (flowControlLock) {
if (isReadyForFlush() && flowControlWindow <= 0 && flowControlWindow == this.flowControlWindow) {
throw UndertowMessages.MESSAGES.noWindowUpdate(getAwaitWritableTimeout());
long remainingTimeout;
long timeoutInMillis = timeUnit.toMillis(time);
while ((remainingTimeout = timeoutInMillis - (System.currentTimeMillis() - initialTime)) > 0) {
try {
flowControlLock.wait(remainingTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
if (flowControlWindow != this.flowControlWindow)
return;
}
throw UndertowMessages.MESSAGES.noWindowUpdate(timeoutInMillis);
}
}
}
Expand Down
Expand Up @@ -286,21 +286,11 @@ protected boolean isFinalFrameQueued() {

@Override
public void awaitWritable() throws IOException {
awaitWritable(AWAIT_WRITABLE_TIMEOUT * 1_000_000L);
awaitWritable(AWAIT_WRITABLE_TIMEOUT, TimeUnit.MILLISECONDS);
}

@Override
public void awaitWritable(long l, TimeUnit timeUnit) throws IOException {
awaitWritable(timeUnit.toNanos(l));
}

/**
* Block until this channel is writable again for the maximum timeout.
*
* @param timeoutInNanos maximum timeout to block in nanoseconds
* @throws IOException if an I/O error occurs
*/
protected void awaitWritable(final long timeoutInNanos) throws IOException {
if(Thread.currentThread() == getIoThread()) {
throw UndertowMessages.MESSAGES.awaitCalledFromIoThread();
}
Expand All @@ -311,12 +301,13 @@ protected void awaitWritable(final long timeoutInNanos) throws IOException {
if (readyForFlush) {
try {
waiterCount++;
final long initialTime = System.nanoTime();
long remainingTimeout = timeoutInNanos;
final long initialTime = System.currentTimeMillis();
long timeoutInMillis = timeUnit.toMillis(l);
long remainingTimeout = timeoutInMillis;
//we need to re-check after incrementing the waiters count
while(readyForFlush && !anyAreSet(state, STATE_CLOSED) && !broken && remainingTimeout > 0) {
lock.wait(remainingTimeout / 1_000_000, (int) (remainingTimeout % 1_000_000));
remainingTimeout = timeoutInNanos - (System.nanoTime() - initialTime);
lock.wait(remainingTimeout);
remainingTimeout = timeoutInMillis - (System.currentTimeMillis() - initialTime);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down