Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UNDERTOW-2056][UNDERTOW-2075][UNDERTOW-2076][UNDERTOW-2077][UNDERTOW-2137] CVE-2022-1259 and related fixes #1477

Merged
merged 5 commits into from May 16, 2023
Merged
4 changes: 4 additions & 0 deletions core/src/main/java/io/undertow/UndertowLogger.java
Expand Up @@ -464,4 +464,8 @@ void nodeConfigCreated(URI connectionURI, String balancer, String domain, String
@LogMessage(level = DEBUG)
@Message(id = 5101, value = "No source to list resources from")
void noSourceToListResourcesFrom();

@LogMessage(level = WARN)
@Message(id = 5102, value = "Flushing waiting in a frame more than %s miliseconds. The framed channel will be forcibly closed.")
void noFrameflushInTimeout(long timeoutMiliseconds);
}
3 changes: 0 additions & 3 deletions core/src/main/java/io/undertow/UndertowMessages.java
Expand Up @@ -635,9 +635,6 @@ public interface UndertowMessages {
@Message(id = 203, value = "Invalid ACL entry")
IllegalArgumentException invalidACLAddress(@Cause Exception e);

@Message(id = 204, value = "Out of flow control window: no WINDOW_UPDATE received from peer within %s miliseconds")
IOException noWindowUpdate(long timeoutMiliseconds);

@Message(id = 205, value = "Path is not a directory '%s'")
IOException pathNotADirectory(Path path);

Expand Down
Expand Up @@ -19,11 +19,8 @@
package io.undertow.protocols.http2;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

import io.undertow.UndertowMessages;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.protocol.framed.SendFrameHeader;
import org.xnio.IoUtils;
Expand All @@ -45,7 +42,7 @@ public abstract class Http2StreamSinkChannel extends AbstractHttp2StreamSinkChan

private SendFrameHeader header;

private static final Object flowControlLock = new Object();
private final Object flowControlLock = new Object();

Http2StreamSinkChannel(Http2Channel channel, int streamId) {
super(channel);
Expand Down Expand Up @@ -181,52 +178,6 @@ protected PooledByteBuffer[] allocateAll(PooledByteBuffer[] allHeaderBuffers, Po
return ret;
}

/**
* Invokes super awaitWritable, with an extra check for flowControlWindow. The purpose of this is to
* warn clearly that peer is not updating the flow control window.
*
* This method will block for the maximum amount of time specified by {@link #getAwaitWritableTimeout()}
*
* @throws IOException if an IO error occurs
*/
public void awaitWritable() throws IOException {
awaitWritable(getAwaitWritableTimeout(), TimeUnit.MILLISECONDS);
}

/**
* Invokes super awaitWritable, with an extra check for flowControlWindow. The purpose of this is to
* warn clearly that peer is not updating the flow control window.
*
* @param time the time to wait
* @param timeUnit the time unit
* @throws IOException if an IO error occurs
*/
public void awaitWritable(long time, TimeUnit timeUnit) throws IOException {
final int flowControlWindow;
synchronized (flowControlLock) {
flowControlWindow = this.flowControlWindow;
}
long initialTime = System.currentTimeMillis();
super.awaitWritable(time, timeUnit);
synchronized (flowControlLock) {
if (isReadyForFlush() && flowControlWindow <= 0 && flowControlWindow == this.flowControlWindow) {
long remainingTimeout;
long timeoutInMillis = timeUnit.toMillis(time);
while ((remainingTimeout = timeoutInMillis - (System.currentTimeMillis() - initialTime)) > 0) {
try {
flowControlLock.wait(remainingTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
if (flowControlWindow != this.flowControlWindow)
return;
}
throw UndertowMessages.MESSAGES.noWindowUpdate(timeoutInMillis);
}
}
}

/**
* Method that is invoked when the stream is reset.
*/
Expand Down
Expand Up @@ -115,6 +115,8 @@ public abstract class AbstractFramedChannel<C extends AbstractFramedChannel<C, R
private volatile boolean receivesSuspendedTooManyQueuedMessages = false;
private volatile boolean receivesSuspendedTooManyBuffers = false;

// freed method forces to use a different lock as it can be called from everywhere
private final Object lockTooManyQueuedMessages = new Object();

@SuppressWarnings("unused")
private volatile int readsBroken = 0;
Expand Down Expand Up @@ -161,20 +163,17 @@ public void run() {
public void freed() {
int res = outstandingBuffersUpdater.decrementAndGet(AbstractFramedChannel.this);
// do not resume immediately, take some time to consume half the buffers
if (res == maxQueuedBuffers / 2 && receivesSuspendedTooManyBuffers) {
//we need to do the resume in the IO thread, as there is a risk of deadlock otherwise, as the calling thread is an application thread
//and may hold a lock on a stream source channel, see UNDERTOW-1312
getIoThread().execute(new Runnable() {
@Override
public void run() {
synchronized (AbstractFramedChannel.this) {
if (UndertowLogger.REQUEST_IO_LOGGER.isTraceEnabled()) {
UndertowLogger.REQUEST_IO_LOGGER.tracef("Resuming reads on %s as buffers have been consumed", AbstractFramedChannel.this);
}
new UpdateResumeState(null, false, null).run();
if (res == maxQueuedBuffers / 2) {
synchronized (lockTooManyQueuedMessages) {
if (receivesSuspendedTooManyBuffers) {
if (UndertowLogger.REQUEST_IO_LOGGER.isTraceEnabled()) {
UndertowLogger.REQUEST_IO_LOGGER.tracef("Resuming reads on %s as buffers have been consumed", AbstractFramedChannel.this);
}
//we need to do the resume in the IO thread, as there is a risk of deadlock otherwise, as the calling thread is an application thread
//and may hold a lock on a stream source channel, see UNDERTOW-1312, resume and var update should be delayed to the IO thread
runInIoThread(new UpdateResumeState(null, false, null));
}
});
}
}
}
};
Expand Down Expand Up @@ -246,6 +245,10 @@ protected IdleTimeoutConduit createIdleTimeoutChannel(StreamConnection connected
return new IdleTimeoutConduit(connectedStreamChannel);
}

/**
* Adds the task to the queue and executes the IO thread runner.
* @param task The task to be executed in the IO thread
*/
void runInIoThread(Runnable task) {
this.taskRunQueue.add(task);
try {
Expand All @@ -256,6 +259,19 @@ void runInIoThread(Runnable task) {
}
}

/**
* Executes the task in IO thread now if the current thread is the IO thread
* or schedules it calling method <em>runInIoThread</em>.
* @param task The task to be executed in the IO thread
*/
void runNowOrInIoThread(Runnable task) {
if (getIoThread() == Thread.currentThread()) {
task.run();
} else {
runInIoThread(task);
}
}

/**
* Get the buffer pool for this connection.
*
Expand Down Expand Up @@ -532,8 +548,8 @@ private ReferenceCountedPooled allocateReferenceCountedBuffer() {
do {
expect = outstandingBuffersUpdater.get(this);
if (expect >= maxQueuedBuffers || receivesSuspendedTooManyBuffers) {
synchronized (this) {
//we need to re-read in a sync block, to prevent races
synchronized (lockTooManyQueuedMessages) {
// although this method is already synched on this by receive, we need to synch on lockTooManyQueuedMessages because of freed
if (receivesSuspendedTooManyBuffers) {
// suspend already sent to the IO thread
return null;
Expand All @@ -543,8 +559,9 @@ private ReferenceCountedPooled allocateReferenceCountedBuffer() {
if (UndertowLogger.REQUEST_IO_LOGGER.isTraceEnabled()) {
UndertowLogger.REQUEST_IO_LOGGER.tracef("Suspending reads on %s due to too many outstanding buffers", this);
}
// suspend is done asap setting the var to true now
receivesSuspendedTooManyBuffers = true;
getIoThread().execute(new UpdateResumeState(null, true, null));
runNowOrInIoThread(new UpdateResumeState(null, null, null));
return null;
}
}
Expand Down Expand Up @@ -609,6 +626,8 @@ protected synchronized void flushSenders() {
framePriority.frameAdded(frame, pendingFrames, heldFrames);
}
} else {
// frame not ready for sending, add in held queue and start the timeout task
frame.addReadyForFlushTask();
heldFrames.add(frame);
}
}
Expand Down Expand Up @@ -701,12 +720,7 @@ protected synchronized void flushSenders() {
} finally {
flushingSenders = false;
if(!newFrames.isEmpty()) {
runInIoThread(new Runnable() {
@Override
public void run() {
flushSenders();
}
});
runInIoThread(this::flushSenders);
}
}
}
Expand Down Expand Up @@ -742,16 +756,7 @@ protected void queueFrame(final S channel) throws IOException {

public void flush() {
if (!flushingSenders) {
if(channel.getIoThread() == Thread.currentThread()) {
flushSenders();
} else {
runInIoThread(new Runnable() {
@Override
public void run() {
flushSenders();
}
});
}
runNowOrInIoThread(this::flushSenders);
}
}

Expand Down Expand Up @@ -789,16 +794,26 @@ public Setter<C> getReceiveSetter() {
* Suspend the receive of new frames via {@link #receive()}
*/
public synchronized void suspendReceives() {
// suspend is done asap setting the suspended var to true now
if (receivesSuspendedByUser) {
// already suspended
return;
}
receivesSuspendedByUser = true;
getIoThread().execute(new UpdateResumeState(true, null, null));
runNowOrInIoThread(new UpdateResumeState(null, null, null));
}

/**
* Resume the receive of new frames via {@link #receive()}
*/
public synchronized void resumeReceives() {
// resume is delayed to the IO thread but the var is set to false now
if (!receivesSuspendedByUser) {
// already resumed
return;
}
receivesSuspendedByUser = false;
getIoThread().execute(new UpdateResumeState(false, null, null));
runInIoThread(new UpdateResumeState(null, null, null));
}

private void doResume() {
Expand Down Expand Up @@ -955,7 +970,7 @@ public void handleEvent(final StreamSourceChannel channel) {
if (listener == null) {
listener = DRAIN_LISTENER;
}
UndertowLogger.REQUEST_IO_LOGGER.tracef("Invoking receive listener: %s - receiver: %s", listener, receiver);
//UndertowLogger.REQUEST_IO_LOGGER.tracef("Invoking receive listener: %s - receiver: %s", listener, receiver);
ChannelListeners.invokeChannelListener(AbstractFramedChannel.this, listener);
}
final boolean partialRead;
Expand All @@ -964,16 +979,7 @@ public void handleEvent(final StreamSourceChannel channel) {
}
final ReferenceCountedPooled localReadData = readData;
if (localReadData != null && !localReadData.isFreed() && channel.isOpen() && !partialRead) {
try {
runInIoThread(new Runnable() {
@Override
public void run() {
ChannelListeners.invokeChannelListener(channel, FrameReadListener.this);
}
});
} catch (RejectedExecutionException e) {
IoUtils.safeClose(AbstractFramedChannel.this);
}
runInIoThread(() -> ChannelListeners.invokeChannelListener(channel, FrameReadListener.this));
}
synchronized (AbstractFramedChannel.this) {
AbstractFramedChannel.this.partialRead = false;
Expand Down Expand Up @@ -1003,16 +1009,10 @@ private class FrameCloseListener implements ChannelListener<CloseableChannel> {
@Override
public void handleEvent(final CloseableChannel c) {
if (Thread.currentThread() != c.getIoThread() && !c.getWorker().isShutdown()) {
runInIoThread(new Runnable() {
@Override
public void run() {
ChannelListeners.invokeChannelListener(c, FrameCloseListener.this);
}
});
runInIoThread(() -> ChannelListeners.invokeChannelListener(c, FrameCloseListener.this));
return;
}


if(c instanceof StreamSinkChannel) {
sinkClosed = true;
} else if(c instanceof StreamSourceChannel) {
Expand Down Expand Up @@ -1178,16 +1178,20 @@ private UpdateResumeState(Boolean user, Boolean buffers, Boolean frames) {

@Override
public void run() {
if (user != null) {
receivesSuspendedByUser = user;
}
if (buffers != null) {
receivesSuspendedTooManyBuffers = buffers;
}
if (frames != null) {
receivesSuspendedTooManyQueuedMessages = frames;
boolean suspend;
synchronized (AbstractFramedChannel.this) {
if (user != null) {
receivesSuspendedByUser = user;
}
if (buffers != null) {
receivesSuspendedTooManyBuffers = buffers;
}
if (frames != null) {
receivesSuspendedTooManyQueuedMessages = frames;
}
suspend = receivesSuspendedByUser || receivesSuspendedTooManyQueuedMessages || receivesSuspendedTooManyBuffers;
}
if (receivesSuspendedByUser || receivesSuspendedTooManyQueuedMessages || receivesSuspendedTooManyBuffers) {
if (suspend) {
channel.getSourceChannel().suspendReads();
} else {
doResume();
Expand Down