Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UNDERTOW-2241][UNDERTOW-2247][UNDERTOW-2248] Handle write timeout properly so no chunk errors occur #1449

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -301,23 +301,35 @@ public void terminateWrites() throws IOException {
if(anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
return;
}
if (this.chunkleft != 0) {
UndertowLogger.REQUEST_IO_LOGGER.debugf("Channel closed mid-chunk");
next.truncateWrites();
}
if (!anyAreSet(state, FLAG_FIRST_DATA_WRITTEN)) {
//if no data was actually sent we just remove the transfer encoding header, and set content length 0
//TODO: is this the best way to do it?
//todo: should we make this behaviour configurable?
responseHeaders.put(Headers.CONTENT_LENGTH, "0"); //according to the spec we don't actually need this, but better to be safe
responseHeaders.remove(Headers.TRANSFER_ENCODING);
state |= FLAG_NEXT_SHUTDOWN | FLAG_WRITES_SHUTDOWN;
try {
flush();
} catch (IOException ignore) {
// just log it at debug level, this is nothing but an attempt to flush the last bytes
UndertowLogger.REQUEST_IO_LOGGER.ioException(ignore);
}
if(anyAreSet(state, CONF_FLAG_PASS_CLOSE)) {
next.terminateWrites();
}
} else {
createLastChunk(false);
state |= FLAG_WRITES_SHUTDOWN;
try {
flush();
} catch (IOException ignore) {
// just log it at debug level, this is nothing but an attempt to flush the last bytes
UndertowLogger.REQUEST_IO_LOGGER.ioException(ignore);
}
}
if (this.chunkleft != 0) {
UndertowLogger.REQUEST_IO_LOGGER.debugf("Channel closed mid-chunk");
next.truncateWrites();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ private void handleWriteTimeout(final long ret) throws IOException {
long currentTime = System.currentTimeMillis();
long expireTimeVar = expireTime;
if (expireTimeVar != -1 && currentTime > expireTimeVar) {
this.expireTime = -1;
connection.getSinkChannel().shutdownWrites();
IoUtils.safeClose(connection);
throw new ClosedChannelException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,6 @@

package io.undertow.server.protocol.http;

import static org.xnio.Bits.allAreClear;
import static org.xnio.Bits.allAreSet;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;

import org.xnio.Buffers;
import org.xnio.IoUtils;
import org.xnio.XnioWorker;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.conduits.AbstractStreamSinkConduit;
import org.xnio.conduits.ConduitWritableByteChannel;
import org.xnio.conduits.Conduits;
import org.xnio.conduits.StreamSinkConduit;

import io.undertow.UndertowMessages;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
Expand All @@ -45,9 +28,29 @@
import io.undertow.util.HttpString;
import io.undertow.util.Protocols;
import io.undertow.util.StatusCodes;
import org.xnio.Buffers;
import org.xnio.IoUtils;
import org.xnio.XnioWorker;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.conduits.AbstractStreamSinkConduit;
import org.xnio.conduits.ConduitWritableByteChannel;
import org.xnio.conduits.Conduits;
import org.xnio.conduits.StreamSinkConduit;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;

import static org.xnio.Bits.allAreClear;
import static org.xnio.Bits.allAreSet;
import static org.xnio.Bits.anyAreSet;

/**
* Conduit for writing the HTTP response.
*
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
* @author Flavia Rainone
*/
final class HttpResponseConduit extends AbstractStreamSinkConduit<StreamSinkConduit> {

Expand Down Expand Up @@ -79,6 +82,9 @@ final class HttpResponseConduit extends AbstractStreamSinkConduit<StreamSinkCond
private static final int STATE_HDR_FINAL_CR = 8; // Final CR
private static final int STATE_HDR_FINAL_LF = 9; // Final LF
private static final int STATE_BUF_FLUSH = 10; // flush the buffer and go to writing body
private static final int POOLED_BUFFER_IN_USE = 1 << 5; // can occur on recursive writes
// (the recursive write will happen when an outer channel/conduit needs to write to handle a close on write),
// POOLED_BUFFER_IN_USE can occur concomitantly with more than one of the states above (hence the flag instead of a new state)

private static final int MASK_STATE = 0x0000000F;
private static final int FLAG_SHUTDOWN = 0x00000010;
Expand Down Expand Up @@ -123,155 +129,165 @@ private int processWrite(int state, final Object userData, int pos, int length)
if (done || exchange == null) {
throw new ClosedChannelException();
}
ByteBuffer buffer = null;
try {
assert state != STATE_BODY;
if (state == STATE_BUF_FLUSH) {
final ByteBuffer byteBuffer = pooledBuffer.getBuffer();
buffer = pooledBuffer.getBuffer();
do {
long res = 0;
ByteBuffer[] data;
if (userData == null || length == 0) {
res = next.write(byteBuffer);
res = next.write(buffer);
} else if (userData instanceof ByteBuffer) {
data = writevBuffer;
if (data == null) {
data = writevBuffer = new ByteBuffer[2];
}
data[0] = byteBuffer;
data[0] = buffer;
data[1] = (ByteBuffer) userData;
res = next.write(data, 0, 2);
} else {
data = writevBuffer;
if (data == null || data.length < length + 1) {
data = writevBuffer = new ByteBuffer[length + 1];
}
data[0] = byteBuffer;
data[0] = buffer;
System.arraycopy(userData, pos, data, 1, length);
res = next.write(data, 0, length + 1);
}
if (res == 0) {
return STATE_BUF_FLUSH;
}
} while (byteBuffer.hasRemaining());
bufferDone();
} while (buffer.hasRemaining());
return STATE_BODY;
} else if (state != STATE_START) {
return processStatefulWrite(state, userData, pos, length);
}
// make sure that headers are written only once. if
// pooled buffer is in use, it is a sign the headers are being processed
// by an outer call at the stack
if (!anyAreSet(this.state, POOLED_BUFFER_IN_USE)) {
//merge the cookies into the header map
Connectors.flattenCookies(exchange);
// allocate pooled buffer
if (pooledBuffer == null) {
pooledBuffer = pool.allocate();
}
buffer = pooledBuffer.getBuffer();
// set the state after successfully allocating... so in case something goes bad
// we don't have a dangling flag that won't be cleared at the finally block
this.state |= POOLED_BUFFER_IN_USE;
assert buffer.remaining() >= 50;
// append response status and headers
Protocols.HTTP_1_1.appendTo(buffer);
buffer.put((byte) ' ');
int code = exchange.getStatusCode();
assert 999 >= code && code >= 100;
buffer.put((byte) (code / 100 + '0'));
buffer.put((byte) (code / 10 % 10 + '0'));
buffer.put((byte) (code % 10 + '0'));
buffer.put((byte) ' ');

//merge the cookies into the header map
Connectors.flattenCookies(exchange);

if (pooledBuffer == null) {
pooledBuffer = pool.allocate();
}
ByteBuffer buffer = pooledBuffer.getBuffer();


assert buffer.remaining() >= 50;
Protocols.HTTP_1_1.appendTo(buffer);
buffer.put((byte) ' ');
int code = exchange.getStatusCode();
assert 999 >= code && code >= 100;
buffer.put((byte) (code / 100 + '0'));
buffer.put((byte) (code / 10 % 10 + '0'));
buffer.put((byte) (code % 10 + '0'));
buffer.put((byte) ' ');

String string = exchange.getReasonPhrase();
if(string == null) {
string = StatusCodes.getReason(code);
}
if(string.length() > buffer.remaining()) {
pooledBuffer.close();
pooledBuffer = null;
truncateWrites();
throw UndertowMessages.MESSAGES.reasonPhraseToLargeForBuffer(string);
}
writeString(buffer, string);
buffer.put((byte) '\r').put((byte) '\n');

int remaining = buffer.remaining();


HeaderMap headers = exchange.getResponseHeaders();
long fiCookie = headers.fastIterateNonEmpty();
while (fiCookie != -1) {
HeaderValues headerValues = headers.fiCurrent(fiCookie);

HttpString header = headerValues.getHeaderName();
int headerSize = header.length();
int valueIdx = 0;
while (valueIdx < headerValues.size()) {
remaining -= (headerSize + 2);

if (remaining < 0) {
this.fiCookie = fiCookie;
this.string = string;
this.headerValues = headerValues;
this.valueIdx = valueIdx;
this.charIndex = 0;
this.state = STATE_HDR_NAME;
buffer.flip();
return processStatefulWrite(STATE_HDR_NAME, userData, pos, length);
}
header.appendTo(buffer);
buffer.put((byte) ':').put((byte) ' ');
string = headerValues.get(valueIdx++);

remaining -= (string.length() + 2);
if (remaining < 2) {//we use 2 here, to make sure we always have room for the final \r\n
this.fiCookie = fiCookie;
this.string = string;
this.headerValues = headerValues;
this.valueIdx = valueIdx;
this.charIndex = 0;
this.state = STATE_HDR_VAL;
buffer.flip();
return processStatefulWrite(STATE_HDR_VAL, userData, pos, length);
String string = exchange.getReasonPhrase();
if (string == null) {
string = StatusCodes.getReason(code);
}
if (string.length() > buffer.remaining()) {
pooledBuffer.close();
pooledBuffer = null;
truncateWrites();
throw UndertowMessages.MESSAGES.reasonPhraseToLargeForBuffer(string);
}
writeString(buffer, string);
buffer.put((byte) '\r').put((byte) '\n');

int remaining = buffer.remaining();
final HeaderMap headers = exchange.getResponseHeaders();
long fiCookie = headers.fastIterateNonEmpty();
while (fiCookie != -1) {
HeaderValues headerValues = headers.fiCurrent(fiCookie);

HttpString header = headerValues.getHeaderName();
int headerSize = header.length();
int valueIdx = 0;
while (valueIdx < headerValues.size()) {
remaining -= (headerSize + 2);

if (remaining < 0) {
this.fiCookie = fiCookie;
this.string = string;
this.headerValues = headerValues;
this.valueIdx = valueIdx;
this.charIndex = 0;
this.state = STATE_HDR_NAME;
buffer.flip();
return processStatefulWrite(STATE_HDR_NAME, userData, pos, length);
}
header.appendTo(buffer);
buffer.put((byte) ':').put((byte) ' ');
string = headerValues.get(valueIdx++);

remaining -= (string.length() + 2);
if (remaining < 2) {//we use 2 here, to make sure we always have room for the final \r\n
this.fiCookie = fiCookie;
this.string = string;
this.headerValues = headerValues;
this.valueIdx = valueIdx;
this.charIndex = 0;
this.state = STATE_HDR_VAL;
buffer.flip();
return processStatefulWrite(STATE_HDR_VAL, userData, pos, length);
}
writeString(buffer, string);
buffer.put((byte) '\r').put((byte) '\n');
}
writeString(buffer, string);
buffer.put((byte) '\r').put((byte) '\n');
fiCookie = headers.fiNextNonEmpty(fiCookie);
}
fiCookie = headers.fiNextNonEmpty(fiCookie);
buffer.put((byte) '\r').put((byte) '\n');
buffer.flip();
}
buffer.put((byte) '\r').put((byte) '\n');
buffer.flip();
// now write everything
ByteBuffer[] data = null;
do {
long res = 0;
ByteBuffer[] data;
if (userData == null) {
res = next.write(buffer);
if (buffer != null) {
res = next.write(buffer);
}
} else if (userData instanceof ByteBuffer) {
data = writevBuffer;
if (data == null) {
data = writevBuffer = new ByteBuffer[2];
}
data[0] = buffer;
data[1] = (ByteBuffer) userData;
res = next.write(data, 0, 2);
int index = 0;
if (buffer != null) {
data[index++] = buffer;
}
data[index++] = (ByteBuffer) userData;
res = next.write(data, 0, index);
} else {
data = writevBuffer;
if (data == null || data.length < length + 1) {
data = writevBuffer = new ByteBuffer[length + 1];
}
data[0] = buffer;
System.arraycopy(userData, pos, data, 1, length);
res = next.write(data, 0, length + 1);
int index = 0;
if (buffer != null) {
data[index++] = buffer;
}
System.arraycopy(userData, pos, data, index, length);
res = next.write(data, 0, index + length);
}
if (res == 0) {
return STATE_BUF_FLUSH;
}
} while (buffer.hasRemaining());
bufferDone();
} while (buffer != null && buffer.hasRemaining());
return STATE_BODY;
} catch (IOException | RuntimeException | Error e) {
//WFLY-4696, just to be safe
if (pooledBuffer != null) {
pooledBuffer.close();
pooledBuffer = null;
} finally {
if (buffer != null) {
bufferDone();
this.state &= ~POOLED_BUFFER_IN_USE;
}
throw e;
}
}

Expand All @@ -284,7 +300,6 @@ private void bufferDone() {
//if we are pipelining we hold onto the buffer
pooledBuffer.getBuffer().clear();
} else {

pooledBuffer.close();
pooledBuffer = null;
this.exchange = null;
Expand Down