Skip to content

Commit

Permalink
UNDERTOW-1234 Make the blocking sender set the content length
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 15, 2017
1 parent b6a87a4 commit 9b2fa12
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
57 changes: 53 additions & 4 deletions core/src/main/java/io/undertow/io/BlockingSenderImpl.java
Expand Up @@ -26,9 +26,12 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import io.undertow.UndertowLogger;
import io.undertow.UndertowMessages;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import org.xnio.Buffers;
import org.xnio.IoUtils;

/**
Expand All @@ -55,6 +58,17 @@ public void send(final ByteBuffer buffer, final IoCallback callback) {
if (inCall) {
queue(new ByteBuffer[]{buffer}, callback);
return;
} else {
long responseContentLength = exchange.getResponseContentLength();
if(responseContentLength > 0 && buffer.remaining() > responseContentLength) {
callback.onException(exchange, this, UndertowLogger.ROOT_LOGGER.dataLargerThanContentLength(buffer.remaining(), responseContentLength));
return;
}
if (!exchange.isResponseStarted() && callback == IoCallback.END_EXCHANGE) {
if (responseContentLength == -1 && !exchange.getResponseHeaders().contains(Headers.TRANSFER_ENCODING)) {
exchange.setResponseContentLength(buffer.remaining());
}
}
}
if (writeBuffer(buffer, callback)) {
invokeOnComplete(callback);
Expand All @@ -67,6 +81,17 @@ public void send(final ByteBuffer[] buffer, final IoCallback callback) {
if (inCall) {
queue(buffer, callback);
return;
} else {
long responseContentLength = exchange.getResponseContentLength();
if(responseContentLength > 0 && Buffers.remaining(buffer) > responseContentLength) {
callback.onException(exchange, this, UndertowLogger.ROOT_LOGGER.dataLargerThanContentLength(Buffers.remaining(buffer), responseContentLength));
return;
}
if (!exchange.isResponseStarted() && callback == IoCallback.END_EXCHANGE) {
if (responseContentLength == -1 && !exchange.getResponseHeaders().contains(Headers.TRANSFER_ENCODING)) {
exchange.setResponseContentLength(Buffers.remaining(buffer));
}
}
}
if (!writeBuffer(buffer, callback)) {
return;
Expand All @@ -86,12 +111,24 @@ public void send(final ByteBuffer[] buffer) {

@Override
public void send(final String data, final IoCallback callback) {
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
if (inCall) {
queue(new ByteBuffer[]{ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))}, callback);
queue(new ByteBuffer[]{ByteBuffer.wrap(bytes)}, callback);
return;
} else {
long responseContentLength = exchange.getResponseContentLength();
if(responseContentLength > 0 && bytes.length > responseContentLength) {
callback.onException(exchange, this, UndertowLogger.ROOT_LOGGER.dataLargerThanContentLength(bytes.length, responseContentLength));
return;
}
if (!exchange.isResponseStarted() && callback == IoCallback.END_EXCHANGE) {
if (responseContentLength == -1 && !exchange.getResponseHeaders().contains(Headers.TRANSFER_ENCODING)) {
exchange.setResponseContentLength(bytes.length);
}
}
}
try {
outputStream.write(data.getBytes(StandardCharsets.UTF_8));
outputStream.write(bytes);
invokeOnComplete(callback);
} catch (IOException e) {
callback.onException(exchange, this, e);
Expand All @@ -100,12 +137,24 @@ public void send(final String data, final IoCallback callback) {

@Override
public void send(final String data, final Charset charset, final IoCallback callback) {
byte[] bytes = data.getBytes(charset);
if (inCall) {
queue(new ByteBuffer[]{ByteBuffer.wrap(data.getBytes(charset))}, callback);
queue(new ByteBuffer[]{ByteBuffer.wrap(bytes)}, callback);
return;
}else {
long responseContentLength = exchange.getResponseContentLength();
if(responseContentLength > 0 && bytes.length > responseContentLength) {
callback.onException(exchange, this, UndertowLogger.ROOT_LOGGER.dataLargerThanContentLength(bytes.length, responseContentLength));
return;
}
if (!exchange.isResponseStarted() && callback == IoCallback.END_EXCHANGE) {
if (responseContentLength == -1 && !exchange.getResponseHeaders().contains(Headers.TRANSFER_ENCODING)) {
exchange.setResponseContentLength(bytes.length);
}
}
}
try {
outputStream.write(data.getBytes(charset));
outputStream.write(bytes);
invokeOnComplete(callback);
} catch (IOException e) {
callback.onException(exchange, this, e);
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/io/undertow/io/UndertowOutputStream.java
Expand Up @@ -50,7 +50,7 @@ public class UndertowOutputStream extends OutputStream implements BufferWritable
private PooledByteBuffer pooledBuffer;
private StreamSinkChannel channel;
private int state;
private int written;
private long written;
private final long contentLength;

private static final int FLAG_CLOSED = 1;
Expand Down Expand Up @@ -87,6 +87,9 @@ public void resetBuffer() {

}

public long getBytesWritten() {
return written;
}

/**
* {@inheritDoc}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/undertow/server/HttpServerExchange.java
Expand Up @@ -1888,7 +1888,7 @@ public void proceed() {
private static class DefaultBlockingHttpExchange implements BlockingHttpExchange {

private InputStream inputStream;
private OutputStream outputStream;
private UndertowOutputStream outputStream;
private Sender sender;
private final HttpServerExchange exchange;

Expand All @@ -1903,7 +1903,7 @@ public InputStream getInputStream() {
return inputStream;
}

public OutputStream getOutputStream() {
public UndertowOutputStream getOutputStream() {
if (outputStream == null) {
outputStream = new UndertowOutputStream(exchange);
}
Expand Down

0 comments on commit 9b2fa12

Please sign in to comment.