-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support larger message sizes in Kafka Buffer #3916
Support larger message sizes in Kafka Buffer #3916
Conversation
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -102,6 +104,9 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig | |||
@JsonProperty("fetch_max_bytes") | |||
private ByteCount fetchMaxBytes = DEFAULT_FETCH_MAX_BYTES; | |||
|
|||
@JsonProperty("max_message_bytes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need another property on topic when we have this on the producer properties?
It seems that if we set the producer properties value to any non-default, it makes sense to use that. Or perhaps if the producer properties is set to greater than the default, we change to the larger value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable Kafka has separate properties for producer and topic. Producer properties are for client side property that is used when the KafkaProducer is created. Topic properties are used when the topic is created. We share producer properties with Kafka Sink. If we want to derive topic property for max_message_bytes when the producer properties is set, that's fine too. My current logic derives producer property when topic property is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately this property matters only to writing to Kafka right? I think we should derive this value from the producer properties instead.
I'd probably say this is the general algorithm:
if(producerProperties.getMaxRequestSize() > DEFAULT_MAX_REQUEST_SIZE)
setTopicMaxMessageBytes(producerProperties.getMaxRequestSize());
That is, if the client lowers the max size, we need not change the server. It is already compatible with the defaults. But, if the client increases to a size that would otherwise not work, then we do increase it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were we going to remove this property in favor of using the ProducerProperties value?
@@ -6,7 +6,22 @@ | |||
import java.util.List; | |||
|
|||
public class KafkaProducerProperties { | |||
private static final String DEFAULT_BYTE_CAPACITY = "50mb"; | |||
static final String DEFAULT_BYTE_CAPACITY = "50mb"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might it make more sense to use the Kafka client defaults here? With this change, we will have to keep our defaults in sync with Kafka's.
I do also see that this has an advantage of allowing us to stay consistent between versions even if Kafka changes. But, I'm not sure if that is an ideal gain. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is existing value that I am not changing. For the rest of the KafkaProducerProperties that I added, I chose the values that are Kafka defaults.
@@ -22,9 +26,15 @@ public TopicService(final KafkaProducerConfig kafkaProducerConfig) { | |||
this.adminClient = AdminClient.create(SinkPropertyConfigurer.getPropertiesForAdminClient(kafkaProducerConfig)); | |||
} | |||
|
|||
public void createTopic(final String topicName, final Integer numberOfPartitions, final Short replicationFactor) { | |||
public void createTopic(final String topicName, final Integer numberOfPartitions, final Short replicationFactor, final Optional<Long> maxMessageBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing an Optional
as an argument is an anti-pattern. Java does nothing to prevent us from passing in null
. So please change this to Long maxMessageBytes
.
@@ -31,6 +35,9 @@ public class SinkTopicConfig extends CommonTopicConfig implements TopicProducerC | |||
@JsonProperty("create_topic") | |||
private boolean isCreateTopic = false; | |||
|
|||
@JsonProperty("max_message_bytes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a similar question as above. Why can't we use the producer properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable please see above
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
KafkaProducerProperties producerProperties = kafkaProducerConfig.getKafkaProducerProperties(); | ||
if (producerProperties != null) { | ||
int producerMaxRequestSize = producerProperties.getMaxRequestSize(); | ||
if (producerMaxRequestSize != KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about the following change?
if (producerMaxRequestSize > KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE)
I think we could be ok with the default when the value defined is lower than the default.
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
* Support larger message sizes in Kafka Buffer Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments and added new integration tests Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> (cherry picked from commit 774fa21)
Description
Support larger message sizes in Kafka Buffer.
Supports setting max message size both in Kafka Buffer producer properties (for client side support) and setting max message size when topic is created by the buffer (for server side support). When server side support for a topic is enabled, it automatically enables client side support too. Kafka buffer config with new config options looks like this -
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.