New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decompress Kinesis message if needed #6442
Conversation
return data; | ||
} | ||
int magic = data[0] & 0xff | ((data[1] << 8) & 0xff00); | ||
if (magic == GZIPInputStream.GZIP_MAGIC) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this support only GZIP or ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about wrapping this logic in a RowDecoder
which decompresses the row and passes it to the underlying decoder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea was to limit it now to Kinesis, since other consumer libraries like Kafka and Redis might have an option to uncompress it.
cf92bd3
to
50776ad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nits
@@ -220,7 +232,7 @@ public boolean advanceNextPosition() | |||
{ | |||
if (shardIterator == null && getRecordsRequest == null) { | |||
getIterator(); // first shard iterator | |||
log.debug("Starting read. Retrieved first shard iterator from AWS Kinesis."); | |||
log.debug("(%s:%s) Starting read. Retrieved first shard iterator from AWS Kinesis.", split.getStreamName(), split.getShardId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : Can we capture these changes as a separate commit ?
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue = messageDecoder.decodeRow(messageData); | ||
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue; | ||
if (isGZipped(messageData)) { | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using try-with-resources ?
@@ -97,7 +100,21 @@ private void createDummyMessages(String streamName, int count) | |||
mockClient.putRecords(putRecordsRequest); | |||
} | |||
|
|||
private void createJsonMessages(String streamName, int count, int idStart) | |||
private byte[] compressMessage(byte[] data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : can we move it below createJsonMessages
return false; | ||
} | ||
int magic = data[0] & 0xff | ((data[1] << 8) & 0xff00); | ||
return magic == GZIPInputStream.GZIP_MAGIC; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import ?
50776ad
to
edbab1a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it is sad that we don't have end-to-end tests for Kinesis. Please create ticket for that.
Can kinesis message be compressed with something else?
private byte[] compressMessage(byte[] data) | ||
{ | ||
try { | ||
ByteArrayOutputStream byteOS = new ByteArrayOutputStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use try-with-resources
for byteOS
and gzipOS
@@ -97,7 +100,7 @@ private void createDummyMessages(String streamName, int count) | |||
mockClient.putRecords(putRecordsRequest); | |||
} | |||
|
|||
private void createJsonMessages(String streamName, int count, int idStart) | |||
private void createJsonMessages(String streamName, int count, int idStart, boolean compress) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have utility methods below the usage (below test)? Please reorder methods in separate commit.
@@ -160,7 +182,8 @@ public void testStreamHasData() | |||
public void testJsonStream() | |||
{ | |||
// Simple case: add a few specific items, query object and internal fields: | |||
createJsonMessages(jsonStreamName, 4, 100); | |||
createJsonMessages(jsonStreamName, 2, 100, false); | |||
createJsonMessages(jsonStreamName, 2, 102, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make test parametric. You can use io.trino.testing.DataProviders#trueFalse
.
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue; | ||
if (isGZipped(messageData)) { | ||
try ( | ||
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(messageData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use try-with-resources
for streams.
decodedValue = messageDecoder.decodeRow(gZIPInputStream.readAllBytes()); | ||
} | ||
catch (IOException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PrestoException or better UncheckedIOException
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue = messageDecoder.decodeRow(messageData); | ||
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue; | ||
if (isGZipped(messageData)) { | ||
try ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please extract a method
Compression is done by the message producer so that actually the message can by anything |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please squash first and last commits together.
log.info("Closing cursor - read complete. Total read: %d batches %d messages, processed: %d messages and %d bytes.", | ||
batchesRead, messagesRead, totalMessages, totalBytes); | ||
log.info("(%s:%s) Closing cursor - read complete. Total read: %d batches %d messages, processed: %d messages and %d bytes.", | ||
split.getStreamName(), split.getShardId(), batchesRead, messagesRead, totalMessages, totalBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please put each argument in separate line
/** | ||
* Compression codec to use for decompressing message. Can be null. | ||
*/ | ||
private final String compressionCodec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please enum.
@@ -47,17 +49,24 @@ | |||
|
|||
private final String messageDataFormat; | |||
|
|||
/** | |||
* Compression codec to use for decompressing message. Can be null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is obvious, I would remove this comment.
@@ -84,6 +93,13 @@ public String getMessageDataFormat() | |||
return messageDataFormat; | |||
} | |||
|
|||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please return Optional
8cd54af
to
122cfd3
Compare
|
||
public enum KinesisCompressionCodec | ||
{ | ||
GZIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a coma at the end GZIP,
@@ -171,6 +179,15 @@ public RecordCursor cursor() | |||
return new KinesisRecordCursor(); | |||
} | |||
|
|||
public static boolean isGZipped(byte[] data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it private, and move below the usage
return messageDecoder.decodeRow(messageData); | ||
} | ||
if (split.getCompressionCodec().get().equals(KinesisCompressionCodec.GZIP)) { | ||
if (isGZipped(messageData)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getCompressionCodec() == GZIP
then I guess we should fail if !isGZipped(messageData)
as this is unexpected.
Maybe we could have KinesisCompressionCodec.AUTOMATIC
, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we should fail if !isGZipped(messageData) as this is unexpected.
It's expected for cases that both compressed and uncompressed messages exist in stream at the same time.
For example: during a deployment rollout, for compression based on message size or for cases that some producers didn't implement the compression yet.
Maybe we could have KinesisCompressionCodec.AUTOMATIC
That would be the ultimate option. I would expect it to decompress any codec, but it's not needed for now, so I would leave it for further implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's expected for cases that both compressed and uncompressed messages exist in stream at the same time.
This is the case I would like to be handled by KinesisCompressionCodec.AUTOMATIC
@@ -119,6 +127,20 @@ private void createJsonMessages(String streamName, int count, int idStart) | |||
mockClient.putRecords(putRecordsRequest); | |||
} | |||
|
|||
private byte[] compressMessage(byte[] data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static
@@ -119,6 +127,20 @@ private void createJsonMessages(String streamName, int count, int idStart) | |||
mockClient.putRecords(putRecordsRequest); | |||
} | |||
|
|||
private byte[] compressMessage(byte[] data) | |||
{ | |||
try ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not not wrap line here.
Use:
try (ByteArrayOutputStream byteOS = new ByteArrayOutputStream()) {
Same above.
plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestRecordAccess.java
Show resolved
Hide resolved
a051ccf
to
f40a1f4
Compare
return messageDecoder.decodeRow(messageData); | ||
} | ||
if (split.getCompressionCodec().get() == GZIP || split.getCompressionCodec().get() == AUTOMATIC) { | ||
if (isGZipped(messageData)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please invert the condition, so there will be less code nested, and corner case will be handled first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to skip testing the GZIP header if it's not required
if (!split.getCompressionCodec().isPresent()) { | ||
return messageDecoder.decodeRow(messageData); | ||
} | ||
if (split.getCompressionCodec().get() == GZIP || split.getCompressionCodec().get() == AUTOMATIC) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract variable for split.getCompressionCodec().get()
. Or even introduce a method to enum canUseGzip()
which would return true for AUTOMATIC
and GZIP
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Failed to decompress %s message", split.getCompressionCodec().get())); | ||
} | ||
} | ||
if (split.getCompressionCodec().get() == AUTOMATIC) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add UNCOMPRESSED
(default value) then we could merge this block with the one at top of this method.
f40a1f4
to
c8ac6f4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
% minor comments
{ | ||
KinesisCompressionCodec kinesisCompressionCodec = split.getCompressionCodec(); | ||
if (isGZipped(messageData)) { | ||
if (canUseGzip(kinesisCompressionCodec)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please invert the condition
@@ -42,6 +47,12 @@ public String getDataFormat() | |||
return dataFormat; | |||
} | |||
|
|||
@JsonProperty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this going to work. You would need to add round trip json serialization test to verify this. Maybe simply remove this annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just remove the annotation
@@ -290,6 +300,28 @@ private void getKinesisRecords() | |||
messagesRead += kinesisRecords.size(); | |||
} | |||
|
|||
private Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeMessage(byte[] messageData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please move this method under the usage
c8ac6f4
to
58098e2
Compare
Automation failures:
|
58098e2
to
3223a03
Compare
Merged, thanks! |
Producer might push compressed messages to save network bandwidth and consumers should transparently uncompress the message before decoding it