Skip to content

Commit

Permalink
JdkZlibEncoder can use pooled heap buffers for deflater input (#11891)
Browse files Browse the repository at this point in the history
Motivation:

Previously, when the input ByteBuf was not heap allocated we copied the
input into a new byte array allocated on the heap for every encode() call.
This puts high-throughput clients that use the JdkZlibEncoder under memory
pressure.

Modifications:

Now, when the input ByteBuf is not heap allocated we copy the input into a
heap ByteBuf allocated from the configured ByteBufAllocator (which might be pooled), releasing it after
the encode() has completed.

Result:

The result is less heap allocation and GC activity when the PooledByteBufAllocator is used (which is the default)

Fixes #11890

Signed-off-by: Daniel Ferstay <dferstay@splunk.com>
Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
Co-authored-by: Norman Maurer <norman_maurer@apple.com>
  • Loading branch information
3 people committed Dec 8, 2021
1 parent 559c085 commit d80a34e
Showing 1 changed file with 38 additions and 30 deletions.
Expand Up @@ -200,42 +200,50 @@ protected void encode(ChannelHandlerContext ctx, ByteBuf uncompressed, ByteBuf o

int offset;
byte[] inAry;
if (uncompressed.hasArray()) {
// if it is backed by an array we not need to to do a copy at all
inAry = uncompressed.array();
offset = uncompressed.arrayOffset() + uncompressed.readerIndex();
// skip all bytes as we will consume all of them
uncompressed.skipBytes(len);
} else {
inAry = new byte[len];
uncompressed.readBytes(inAry);
offset = 0;
}
ByteBuf heapBuf = null;
try {
if (uncompressed.hasArray()) {
// if it is backed by an array we not need to do a copy at all
inAry = uncompressed.array();
offset = uncompressed.arrayOffset() + uncompressed.readerIndex();
// skip all bytes as we will consume all of them
uncompressed.skipBytes(len);
} else {
heapBuf = ctx.alloc().heapBuffer(len, len);
uncompressed.readBytes(heapBuf, len);
inAry = heapBuf.array();
offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
}

if (writeHeader) {
writeHeader = false;
if (wrapper == ZlibWrapper.GZIP) {
out.writeBytes(gzipHeader);
if (writeHeader) {
writeHeader = false;
if (wrapper == ZlibWrapper.GZIP) {
out.writeBytes(gzipHeader);
}
}
}

if (wrapper == ZlibWrapper.GZIP) {
crc.update(inAry, offset, len);
}
if (wrapper == ZlibWrapper.GZIP) {
crc.update(inAry, offset, len);
}

deflater.setInput(inAry, offset, len);
for (;;) {
deflate(out);
if (deflater.needsInput()) {
// Consumed everything
break;
} else {
if (!out.isWritable()) {
// We did not consume everything but the buffer is not writable anymore. Increase the capacity to
// make more room.
out.ensureWritable(out.writerIndex());
deflater.setInput(inAry, offset, len);
for (;;) {
deflate(out);
if (deflater.needsInput()) {
// Consumed everything
break;
} else {
if (!out.isWritable()) {
// We did not consume everything but the buffer is not writable anymore. Increase the capacity
// to make more room.
out.ensureWritable(out.writerIndex());
}
}
}
} finally {
if (heapBuf != null) {
heapBuf.release();
}
}
}

Expand Down

0 comments on commit d80a34e

Please sign in to comment.