Skip to content

Commit

Permalink
Use internal netty classes to make sure we dont copies bytes all over
Browse files Browse the repository at this point in the history
the places when we are doing decompression.
  • Loading branch information
ph committed Jun 30, 2016
1 parent 846630a commit 9abb0a6
Showing 1 changed file with 19 additions and 26 deletions.
45 changes: 19 additions & 26 deletions src/main/java/org/logstash/beats/BeatsParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
Expand All @@ -15,7 +17,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import java.util.zip.InflaterOutputStream;


public class BeatsParser extends ByteToMessageDecoder {
Expand Down Expand Up @@ -163,33 +167,22 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
case READ_COMPRESSED_FRAME: {
logger.debug("Running: READ_COMPRESSED_FRAME");


byte[] bytes = new byte[(int) requiredBytes];
in.readBytes(bytes);

InputStream inflater = new InflaterInputStream(new ByteArrayInputStream(bytes));
ByteArrayOutputStream decompressed = new ByteArrayOutputStream();


byte[] chunk = new byte[CHUNK_SIZE];
int length = 0;

while ((length = inflater.read(chunk)) > 0) {
decompressed.write(chunk, 0, length);
}

inflater.close();
decompressed.close();

transition(States.READ_HEADER);
ByteBuf newInput = Unpooled.wrappedBuffer(decompressed.toByteArray());

try {
while (newInput.readableBytes() > 0) {
decode(ctx, newInput, out);
try (
// Use the compressed size as the safe start for the buffer.
ByteBufOutputStream buffOutput = new ByteBufOutputStream(ctx.alloc().buffer((int) requiredBytes));
InflaterOutputStream inflater = new InflaterOutputStream(buffOutput, new Inflater());
) {

in.readBytes(inflater, (int) requiredBytes);
ByteBuf buffer = buffOutput.buffer();
transition(States.READ_HEADER);
try {
while (buffer.readableBytes() > 0) {
decode(ctx, buffOutput.buffer(), out);
}
} finally {
buffer.release();
}
} finally {
newInput.release();
}

break;
Expand Down

0 comments on commit 9abb0a6

Please sign in to comment.