Skip to content

Commit

Permalink
Merge pull request quarkusio#32858 from franz1981/append_buffer
Browse files Browse the repository at this point in the history
Batching buffer with minChunkSize
  • Loading branch information
geoand committed May 29, 2023
2 parents 97482f2 + 63b30a6 commit 29d71ec
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ private org.jboss.resteasy.reactive.common.ResteasyReactiveConfig createRestReac

return new org.jboss.resteasy.reactive.common.ResteasyReactiveConfig(
getEffectivePropertyValue("input-buffer-size", config.inputBufferSize().asLongValue(), Long.class, mpConfig),
getEffectivePropertyValue("min-chunk-size", config.minChunkSize(), Integer.class, mpConfig),
getEffectivePropertyValue("output-buffer-size", config.outputBufferSize(), Integer.class, mpConfig),
getEffectivePropertyValue("single-default-produces", config.singleDefaultProduces(), Boolean.class, mpConfig),
getEffectivePropertyValue("default-produces", config.defaultProduces(), Boolean.class, mpConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@ public interface ResteasyReactiveConfig {
@WithDefault("10k")
MemorySize inputBufferSize();

/**
* The size of the chunks of memory allocated when writing data.
* <p>
* This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations
* of the application.
*/
@WithDefault("128")
int minChunkSize();

/**
* The size of the output stream response buffer. If a response is larger than this and no content-length
* is provided then the request will be chunked.
*
* <p>
* Larger values may give slight performance increases for large responses, at the expense of more memory usage.
*/
@WithDefault("8191")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,7 @@ private org.jboss.resteasy.reactive.common.ResteasyReactiveConfig createRestReac

return new org.jboss.resteasy.reactive.common.ResteasyReactiveConfig(
getEffectivePropertyValue("input-buffer-size", config.inputBufferSize().asLongValue(), Long.class, mpConfig),
getEffectivePropertyValue("min-chunk-size", config.outputBufferSize(), Integer.class, mpConfig),
getEffectivePropertyValue("output-buffer-size", config.outputBufferSize(), Integer.class, mpConfig),
getEffectivePropertyValue("single-default-produces", config.singleDefaultProduces(), Boolean.class, mpConfig),
getEffectivePropertyValue("default-produces", config.defaultProduces(), Boolean.class, mpConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ public class ResteasyReactiveConfig {
*/
private long inputBufferSize;

/**
* The size of the chunks of memory allocated when writing data.
* <p>
* This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations
* of the application.
*/
private int minChunkSize = 128;

/**
* The size of the output stream response buffer. If a response is larger than this and no content-length
* is provided then the request will be chunked.
*
* <p>
* Larger values may give slight performance increases for large responses, at the expense of more memory usage.
*/
private int outputBufferSize = 8192;
Expand All @@ -35,9 +43,10 @@ public class ResteasyReactiveConfig {
public ResteasyReactiveConfig() {
}

public ResteasyReactiveConfig(long inputBufferSize, int outputBufferSize, boolean singleDefaultProduces,
public ResteasyReactiveConfig(long inputBufferSize, int minChunkSize, int outputBufferSize, boolean singleDefaultProduces,
boolean defaultProduces) {
this.inputBufferSize = inputBufferSize;
this.minChunkSize = minChunkSize;
this.outputBufferSize = outputBufferSize;
this.singleDefaultProduces = singleDefaultProduces;
this.defaultProduces = defaultProduces;
Expand All @@ -55,6 +64,14 @@ public int getOutputBufferSize() {
return outputBufferSize;
}

public int getMinChunkSize() {
return minChunkSize;
}

public void setMinChunkSize(int minChunkSize) {
this.minChunkSize = minChunkSize;
}

public void setOutputBufferSize(int outputBufferSize) {
this.outputBufferSize = outputBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public static ScanStep start(IndexView nonCalculatingIndex) {
public static class ScanStep {
final IndexView index;
int inputBufferSize = 10000;

int minChunkSize = 128;
int outputBufferSize = 8192;
/**
* By default, we assume a default produced media type of "text/plain"
Expand Down Expand Up @@ -215,8 +217,9 @@ public ScanResult scan() {
.setAdditionalReaders(readers)
.setAdditionalWriters(writers)
.setInjectableBeans(new HashMap<>())
.setConfig(new ResteasyReactiveConfig(inputBufferSize, outputBufferSize, singleDefaultProduces,
defaultProduces))
.setConfig(
new ResteasyReactiveConfig(inputBufferSize, minChunkSize, outputBufferSize, singleDefaultProduces,
defaultProduces))
.setHttpAnnotationToMethod(resources.getHttpAnnotationToMethod())
.setApplicationScanningResult(applicationScanningResult);
for (MethodScanner scanner : methodScanners) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package org.jboss.resteasy.reactive.server.vertx;

import java.util.ArrayDeque;
import java.util.Objects;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;

/**
* It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.<br>
* In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}.
*/
final class AppendBuffer {
private final ByteBufAllocator allocator;

private final int minChunkSize;
private final int capacity;
private ByteBuf buffer;
private ArrayDeque<ByteBuf> otherBuffers;
private int size;

private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) {
this.allocator = allocator;
this.minChunkSize = Math.min(minChunkSize, capacity);
this.capacity = capacity;
}

/**
* This buffer append data in a single eagerly allocated {@link ByteBuf}.
*/
public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, capacity, capacity);
}

/**
* This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.<br>
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}.
*/
public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, 0, capacity);
}

/**
* This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or
* as each {@code len}, if greater than it.<br>
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}.
*/
public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) {
return new AppendBuffer(allocator, minChunkSize, capacity);
}

private ByteBuf lastBuffer() {
if (otherBuffers == null || otherBuffers.isEmpty()) {
return buffer;
}
return otherBuffers.peekLast();
}

/**
* It returns how many bytes have been appended<br>
* If returns a value different from {@code len}, is it required to invoke {@link #clear}
* that would refill the available capacity till {@link #capacity()}
*/
public int append(byte[] bytes, int off, int len) {
Objects.requireNonNull(bytes);
if (len == 0) {
return 0;
}
int alreadyWritten = 0;
if (minChunkSize > 0) {
var lastBuffer = lastBuffer();
if (lastBuffer != null) {
int availableOnLast = lastBuffer.writableBytes();
if (availableOnLast > 0) {
int toWrite = Math.min(len, availableOnLast);
lastBuffer.writeBytes(bytes, off, toWrite);
size += toWrite;
len -= toWrite;
// we stop if there's no more to append
if (len == 0) {
return toWrite;
}
off += toWrite;
alreadyWritten = toWrite;
}
}
}
final int availableCapacity = capacity - size;
if (availableCapacity == 0) {
return alreadyWritten;
}
// we can still write some
int toWrite = Math.min(len, availableCapacity);
assert toWrite > 0;
final int chunkCapacity;
if (minChunkSize > 0) {
// Cannot allocate less than minChunkSize, till the limit of capacity left
chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity);
} else {
chunkCapacity = toWrite;
}
var tmpBuf = allocator.directBuffer(chunkCapacity);
try {
tmpBuf.writeBytes(bytes, off, toWrite);
} catch (Throwable t) {
tmpBuf.release();
throw t;
}
if (buffer == null) {
buffer = tmpBuf;
} else {
boolean resetOthers = false;
try {
if (otherBuffers == null) {
otherBuffers = new ArrayDeque<>();
resetOthers = true;
}
otherBuffers.add(tmpBuf);
} catch (Throwable t) {
rollback(alreadyWritten, tmpBuf, resetOthers);
throw t;
}
}
size += toWrite;
return toWrite + alreadyWritten;
}

private void rollback(int alreadyWritten, ByteBuf tmpBuf, boolean resetOthers) {
tmpBuf.release();
if (resetOthers) {
otherBuffers = null;
}
if (alreadyWritten > 0) {
var last = lastBuffer();
last.writerIndex(last.writerIndex() - alreadyWritten);
size -= alreadyWritten;
assert last.writerIndex() > 0;
}
}

public ByteBuf clear() {
var firstBuf = buffer;
if (firstBuf == null) {
return null;
}
var others = otherBuffers;
if (others == null || others.isEmpty()) {
size = 0;
buffer = null;
// super fast-path
return firstBuf;
}
return clearBuffers();
}

private CompositeByteBuf clearBuffers() {
var firstBuf = buffer;
var others = otherBuffers;
var batch = allocator.compositeDirectBuffer(1 + others.size());
try {
buffer = null;
size = 0;
batch.addComponent(true, 0, firstBuf);
for (int i = 0, othersCount = others.size(); i < othersCount; i++) {
// if addComponent fail, it takes care of releasing curr and throwing the exception:
batch.addComponent(true, 1 + i, others.poll());
}
return batch;
} catch (Throwable anyError) {
batch.release();
releaseOthers(others);
throw anyError;
}
}

private static void releaseOthers(ArrayDeque<ByteBuf> others) {
ByteBuf buf;
while ((buf = others.poll()) != null) {
buf.release();
}
}

public int capacity() {
return capacity;
}

public int availableCapacity() {
return capacity - size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public class ResteasyReactiveOutputStream extends OutputStream {
private static final Logger log = Logger.getLogger("org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveOutputStream");
private final ResteasyReactiveRequestContext context;
protected final HttpServerRequest request;
private final int outputBufferSize;
private ByteBuf pooledBuffer;
private final AppendBuffer appendBuffer;
private boolean committed;

private boolean closed;
Expand All @@ -40,7 +39,9 @@ public class ResteasyReactiveOutputStream extends OutputStream {
public ResteasyReactiveOutputStream(VertxResteasyReactiveRequestContext context) {
this.context = context;
this.request = context.getContext().request();
this.outputBufferSize = context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize();
this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT,
context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(),
context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize());
request.response().exceptionHandler(new Handler<Throwable>() {
@Override
public void handle(Throwable event) {
Expand Down Expand Up @@ -202,26 +203,16 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti

int rem = len;
int idx = off;
ByteBuf buffer = pooledBuffer;
try {
if (buffer == null) {
pooledBuffer = buffer = PooledByteBufAllocator.DEFAULT.directBuffer(outputBufferSize);
}
while (rem > 0) {
int toWrite = Math.min(rem, buffer.writableBytes());
buffer.writeBytes(b, idx, toWrite);
rem -= toWrite;
idx += toWrite;
if (!buffer.isWritable()) {
ByteBuf tmpBuf = buffer;
this.pooledBuffer = buffer = PooledByteBufAllocator.DEFAULT.directBuffer(outputBufferSize);
writeBlocking(tmpBuf, false);
final int written = appendBuffer.append(b, idx, rem);
if (written < rem) {
writeBlocking(appendBuffer.clear(), false);
}
rem -= written;
idx += written;
}
} catch (Exception e) {
if (buffer != null && buffer.refCnt() > 0) {
buffer.release();
}
throw new IOException(e);
}
}
Expand Down Expand Up @@ -282,15 +273,11 @@ public void flush() throws IOException {
throw new IOException("Stream is closed");
}
try {
if (pooledBuffer != null) {
writeBlocking(pooledBuffer, false);
pooledBuffer = null;
var toFlush = appendBuffer.clear();
if (toFlush != null) {
writeBlocking(toFlush, false);
}
} catch (Exception e) {
if (pooledBuffer != null) {
pooledBuffer.release();
pooledBuffer = null;
}
throw new IOException(e);
}
}
Expand All @@ -302,12 +289,11 @@ public void close() throws IOException {
if (closed)
return;
try {
writeBlocking(pooledBuffer, true);
writeBlocking(appendBuffer.clear(), true);
} catch (Exception e) {
throw new IOException(e);
} finally {
closed = true;
pooledBuffer = null;
}
}

Expand Down

0 comments on commit 29d71ec

Please sign in to comment.