Skip to content

Commit

Permalink
Merge pull request #1491 from fl4via/UNDERTOW-2241_2247_2248_2.2.x
Browse files Browse the repository at this point in the history
[UNDERTOW-2241][UNDERTOW-2247][UNDERTOW-2248] Handle write timeout properly so no chunk errors occur
  • Loading branch information
fl4via committed Jun 7, 2023
2 parents d192ca1 + c7cf4fd commit 9c57984
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 122 deletions.
Expand Up @@ -301,23 +301,35 @@ public void terminateWrites() throws IOException {
if(anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
return;
}
if (this.chunkleft != 0) {
UndertowLogger.REQUEST_IO_LOGGER.debugf("Channel closed mid-chunk");
next.truncateWrites();
}
if (!anyAreSet(state, FLAG_FIRST_DATA_WRITTEN)) {
//if no data was actually sent we just remove the transfer encoding header, and set content length 0
//TODO: is this the best way to do it?
//todo: should we make this behaviour configurable?
responseHeaders.put(Headers.CONTENT_LENGTH, "0"); //according to the spec we don't actually need this, but better to be safe
responseHeaders.remove(Headers.TRANSFER_ENCODING);
state |= FLAG_NEXT_SHUTDOWN | FLAG_WRITES_SHUTDOWN;
try {
flush();
} catch (IOException ignore) {
// just log it at debug level, this is nothing but an attempt to flush the last bytes
UndertowLogger.REQUEST_IO_LOGGER.ioException(ignore);
}
if(anyAreSet(state, CONF_FLAG_PASS_CLOSE)) {
next.terminateWrites();
}
} else {
createLastChunk(false);
state |= FLAG_WRITES_SHUTDOWN;
try {
flush();
} catch (IOException ignore) {
// just log it at debug level, this is nothing but an attempt to flush the last bytes
UndertowLogger.REQUEST_IO_LOGGER.ioException(ignore);
}
}
if (this.chunkleft != 0) {
UndertowLogger.REQUEST_IO_LOGGER.debugf("Channel closed mid-chunk");
next.truncateWrites();
}
}

Expand Down
Expand Up @@ -98,6 +98,8 @@ private void handleWriteTimeout(final long ret) throws IOException {
long currentTime = System.currentTimeMillis();
long expireTimeVar = expireTime;
if (expireTimeVar != -1 && currentTime > expireTimeVar) {
this.expireTime = -1;
connection.getSinkChannel().shutdownWrites();
IoUtils.safeClose(connection);
throw new ClosedChannelException();
}
Expand Down
Expand Up @@ -18,23 +18,6 @@

package io.undertow.server.protocol.http;

import static org.xnio.Bits.allAreClear;
import static org.xnio.Bits.allAreSet;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;

import org.xnio.Buffers;
import org.xnio.IoUtils;
import org.xnio.XnioWorker;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.conduits.AbstractStreamSinkConduit;
import org.xnio.conduits.ConduitWritableByteChannel;
import org.xnio.conduits.Conduits;
import org.xnio.conduits.StreamSinkConduit;

import io.undertow.UndertowMessages;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
Expand All @@ -45,9 +28,29 @@
import io.undertow.util.HttpString;
import io.undertow.util.Protocols;
import io.undertow.util.StatusCodes;
import org.xnio.Buffers;
import org.xnio.IoUtils;
import org.xnio.XnioWorker;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.conduits.AbstractStreamSinkConduit;
import org.xnio.conduits.ConduitWritableByteChannel;
import org.xnio.conduits.Conduits;
import org.xnio.conduits.StreamSinkConduit;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;

import static org.xnio.Bits.allAreClear;
import static org.xnio.Bits.allAreSet;
import static org.xnio.Bits.anyAreSet;

/**
* Conduit for writing the HTTP response.
*
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
* @author Flavia Rainone
*/
final class HttpResponseConduit extends AbstractStreamSinkConduit<StreamSinkConduit> {

Expand Down Expand Up @@ -79,6 +82,9 @@ final class HttpResponseConduit extends AbstractStreamSinkConduit<StreamSinkCond
private static final int STATE_HDR_FINAL_CR = 8; // Final CR
private static final int STATE_HDR_FINAL_LF = 9; // Final LF
private static final int STATE_BUF_FLUSH = 10; // flush the buffer and go to writing body
private static final int POOLED_BUFFER_IN_USE = 1 << 5; // can occur on recursive writes
// (the recursive write will happen when an outer channel/conduit needs to write to handle a close on write),
// POOLED_BUFFER_IN_USE can occur concomitantly with more than one of the states above (hence the flag instead of a new state)

private static final int MASK_STATE = 0x0000000F;
private static final int FLAG_SHUTDOWN = 0x00000010;
Expand Down Expand Up @@ -123,155 +129,165 @@ private int processWrite(int state, final Object userData, int pos, int length)
if (done || exchange == null) {
throw new ClosedChannelException();
}
ByteBuffer buffer = null;
try {
assert state != STATE_BODY;
if (state == STATE_BUF_FLUSH) {
final ByteBuffer byteBuffer = pooledBuffer.getBuffer();
buffer = pooledBuffer.getBuffer();
do {
long res = 0;
ByteBuffer[] data;
if (userData == null || length == 0) {
res = next.write(byteBuffer);
res = next.write(buffer);
} else if (userData instanceof ByteBuffer) {
data = writevBuffer;
if (data == null) {
data = writevBuffer = new ByteBuffer[2];
}
data[0] = byteBuffer;
data[0] = buffer;
data[1] = (ByteBuffer) userData;
res = next.write(data, 0, 2);
} else {
data = writevBuffer;
if (data == null || data.length < length + 1) {
data = writevBuffer = new ByteBuffer[length + 1];
}
data[0] = byteBuffer;
data[0] = buffer;
System.arraycopy(userData, pos, data, 1, length);
res = next.write(data, 0, length + 1);
}
if (res == 0) {
return STATE_BUF_FLUSH;
}
} while (byteBuffer.hasRemaining());
bufferDone();
} while (buffer.hasRemaining());
return STATE_BODY;
} else if (state != STATE_START) {
return processStatefulWrite(state, userData, pos, length);
}
// make sure that headers are written only once. if
// pooled buffer is in use, it is a sign the headers are being processed
// by an outer call at the stack
if (!anyAreSet(this.state, POOLED_BUFFER_IN_USE)) {
//merge the cookies into the header map
Connectors.flattenCookies(exchange);
// allocate pooled buffer
if (pooledBuffer == null) {
pooledBuffer = pool.allocate();
}
buffer = pooledBuffer.getBuffer();
// set the state after successfully allocating... so in case something goes bad
// we don't have a dangling flag that won't be cleared at the finally block
this.state |= POOLED_BUFFER_IN_USE;
assert buffer.remaining() >= 50;
// append response status and headers
Protocols.HTTP_1_1.appendTo(buffer);
buffer.put((byte) ' ');
int code = exchange.getStatusCode();
assert 999 >= code && code >= 100;
buffer.put((byte) (code / 100 + '0'));
buffer.put((byte) (code / 10 % 10 + '0'));
buffer.put((byte) (code % 10 + '0'));
buffer.put((byte) ' ');

//merge the cookies into the header map
Connectors.flattenCookies(exchange);

if (pooledBuffer == null) {
pooledBuffer = pool.allocate();
}
ByteBuffer buffer = pooledBuffer.getBuffer();


assert buffer.remaining() >= 50;
Protocols.HTTP_1_1.appendTo(buffer);
buffer.put((byte) ' ');
int code = exchange.getStatusCode();
assert 999 >= code && code >= 100;
buffer.put((byte) (code / 100 + '0'));
buffer.put((byte) (code / 10 % 10 + '0'));
buffer.put((byte) (code % 10 + '0'));
buffer.put((byte) ' ');

String string = exchange.getReasonPhrase();
if(string == null) {
string = StatusCodes.getReason(code);
}
if(string.length() > buffer.remaining()) {
pooledBuffer.close();
pooledBuffer = null;
truncateWrites();
throw UndertowMessages.MESSAGES.reasonPhraseToLargeForBuffer(string);
}
writeString(buffer, string);
buffer.put((byte) '\r').put((byte) '\n');

int remaining = buffer.remaining();


HeaderMap headers = exchange.getResponseHeaders();
long fiCookie = headers.fastIterateNonEmpty();
while (fiCookie != -1) {
HeaderValues headerValues = headers.fiCurrent(fiCookie);

HttpString header = headerValues.getHeaderName();
int headerSize = header.length();
int valueIdx = 0;
while (valueIdx < headerValues.size()) {
remaining -= (headerSize + 2);

if (remaining < 0) {
this.fiCookie = fiCookie;
this.string = string;
this.headerValues = headerValues;
this.valueIdx = valueIdx;
this.charIndex = 0;
this.state = STATE_HDR_NAME;
buffer.flip();
return processStatefulWrite(STATE_HDR_NAME, userData, pos, length);
}
header.appendTo(buffer);
buffer.put((byte) ':').put((byte) ' ');
string = headerValues.get(valueIdx++);

remaining -= (string.length() + 2);
if (remaining < 2) {//we use 2 here, to make sure we always have room for the final \r\n
this.fiCookie = fiCookie;
this.string = string;
this.headerValues = headerValues;
this.valueIdx = valueIdx;
this.charIndex = 0;
this.state = STATE_HDR_VAL;
buffer.flip();
return processStatefulWrite(STATE_HDR_VAL, userData, pos, length);
String string = exchange.getReasonPhrase();
if (string == null) {
string = StatusCodes.getReason(code);
}
if (string.length() > buffer.remaining()) {
pooledBuffer.close();
pooledBuffer = null;
truncateWrites();
throw UndertowMessages.MESSAGES.reasonPhraseToLargeForBuffer(string);
}
writeString(buffer, string);
buffer.put((byte) '\r').put((byte) '\n');

int remaining = buffer.remaining();
final HeaderMap headers = exchange.getResponseHeaders();
long fiCookie = headers.fastIterateNonEmpty();
while (fiCookie != -1) {
HeaderValues headerValues = headers.fiCurrent(fiCookie);

HttpString header = headerValues.getHeaderName();
int headerSize = header.length();
int valueIdx = 0;
while (valueIdx < headerValues.size()) {
remaining -= (headerSize + 2);

if (remaining < 0) {
this.fiCookie = fiCookie;
this.string = string;
this.headerValues = headerValues;
this.valueIdx = valueIdx;
this.charIndex = 0;
this.state = STATE_HDR_NAME;
buffer.flip();
return processStatefulWrite(STATE_HDR_NAME, userData, pos, length);
}
header.appendTo(buffer);
buffer.put((byte) ':').put((byte) ' ');
string = headerValues.get(valueIdx++);

remaining -= (string.length() + 2);
if (remaining < 2) {//we use 2 here, to make sure we always have room for the final \r\n
this.fiCookie = fiCookie;
this.string = string;
this.headerValues = headerValues;
this.valueIdx = valueIdx;
this.charIndex = 0;
this.state = STATE_HDR_VAL;
buffer.flip();
return processStatefulWrite(STATE_HDR_VAL, userData, pos, length);
}
writeString(buffer, string);
buffer.put((byte) '\r').put((byte) '\n');
}
writeString(buffer, string);
buffer.put((byte) '\r').put((byte) '\n');
fiCookie = headers.fiNextNonEmpty(fiCookie);
}
fiCookie = headers.fiNextNonEmpty(fiCookie);
buffer.put((byte) '\r').put((byte) '\n');
buffer.flip();
}
buffer.put((byte) '\r').put((byte) '\n');
buffer.flip();
// now write everything
ByteBuffer[] data = null;
do {
long res = 0;
ByteBuffer[] data;
if (userData == null) {
res = next.write(buffer);
if (buffer != null) {
res = next.write(buffer);
}
} else if (userData instanceof ByteBuffer) {
data = writevBuffer;
if (data == null) {
data = writevBuffer = new ByteBuffer[2];
}
data[0] = buffer;
data[1] = (ByteBuffer) userData;
res = next.write(data, 0, 2);
int index = 0;
if (buffer != null) {
data[index++] = buffer;
}
data[index++] = (ByteBuffer) userData;
res = next.write(data, 0, index);
} else {
data = writevBuffer;
if (data == null || data.length < length + 1) {
data = writevBuffer = new ByteBuffer[length + 1];
}
data[0] = buffer;
System.arraycopy(userData, pos, data, 1, length);
res = next.write(data, 0, length + 1);
int index = 0;
if (buffer != null) {
data[index++] = buffer;
}
System.arraycopy(userData, pos, data, index, length);
res = next.write(data, 0, index + length);
}
if (res == 0) {
return STATE_BUF_FLUSH;
}
} while (buffer.hasRemaining());
bufferDone();
} while (buffer != null && buffer.hasRemaining());
return STATE_BODY;
} catch (IOException | RuntimeException | Error e) {
//WFLY-4696, just to be safe
if (pooledBuffer != null) {
pooledBuffer.close();
pooledBuffer = null;
} finally {
if (buffer != null) {
bufferDone();
this.state &= ~POOLED_BUFFER_IN_USE;
}
throw e;
}
}

Expand All @@ -284,7 +300,6 @@ private void bufferDone() {
//if we are pipelining we hold onto the buffer
pooledBuffer.getBuffer().clear();
} else {

pooledBuffer.close();
pooledBuffer = null;
this.exchange = null;
Expand Down

0 comments on commit 9c57984

Please sign in to comment.