Skip to content

Commit 73589ae

Browse files
authored
[feature][python] PIP 37: Add chunking support for python client (apache#17128)
### Motivation This is the python client feature catch up for [PIP 37](https://github.com/apache/pulsar/wiki/PIP-37%3A-Large-message-size-handling-in-Pulsar) ### Modifications * Add `chunking_enabled ` to the producer configuration * Add `max_pending_chunked_message` and `auto_ack_oldest_chunked_message_on_queue_full ` to the consumer configuration
1 parent 5911326 commit 73589ae

File tree

3 files changed

+70
-3
lines changed

3 files changed

+70
-3
lines changed

pulsar-client-cpp/python/pulsar/__init__.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ def create_producer(self, topic,
472472
batching_max_messages=1000,
473473
batching_max_allowed_size_in_bytes=128*1024,
474474
batching_max_publish_delay_ms=10,
475+
chunking_enabled=False,
475476
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
476477
lazy_start_partitioned_producers=False,
477478
properties=None,
@@ -556,6 +557,10 @@ def create_producer(self, topic,
556557
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
557558
batched into single batch message:
558559
[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
560+
* `chunking_enabled`:
561+
If message size is higher than allowed max publish-payload size by broker then chunking_enabled
562+
helps producer to split message into multiple chunks and publish them to broker separately and in
563+
order. So, it allows client to successfully publish large size of messages in pulsar.
559564
* encryption_key:
560565
The key used for symmetric encryption, configured on the producer side
561566
* crypto_key_reader:
@@ -575,6 +580,7 @@ def create_producer(self, topic,
575580
_check_type(int, batching_max_messages, 'batching_max_messages')
576581
_check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
577582
_check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
583+
_check_type(bool, chunking_enabled, 'chunking_enabled')
578584
_check_type_or_none(dict, properties, 'properties')
579585
_check_type(BatchingType, batching_type, 'batching_type')
580586
_check_type_or_none(str, encryption_key, 'encryption_key')
@@ -593,6 +599,7 @@ def create_producer(self, topic,
593599
conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
594600
conf.partitions_routing_mode(message_routing_mode)
595601
conf.batching_type(batching_type)
602+
conf.chunking_enabled(chunking_enabled)
596603
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
597604
if producer_name:
598605
conf.producer_name(producer_name)
@@ -608,6 +615,9 @@ def create_producer(self, topic,
608615
if crypto_key_reader:
609616
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
610617

618+
if batching_enabled and chunking_enabled:
619+
raise ValueError("Batching and chunking of messages can't be enabled together.")
620+
611621
p = Producer()
612622
p._producer = self._client.create_producer(topic, conf)
613623
p._schema = schema
@@ -629,7 +639,9 @@ def subscribe(self, topic, subscription_name,
629639
pattern_auto_discovery_period=60,
630640
initial_position=InitialPosition.Latest,
631641
crypto_key_reader=None,
632-
replicate_subscription_state_enabled=False
642+
replicate_subscription_state_enabled=False,
643+
max_pending_chunked_message=10,
644+
auto_ack_oldest_chunked_message_on_queue_full=False
633645
):
634646
"""
635647
Subscribe to the given topic and subscription combination.
@@ -708,6 +720,22 @@ def my_listener(consumer, message):
708720
* replicate_subscription_state_enabled:
709721
Set whether the subscription status should be replicated.
710722
Default: `False`.
723+
* max_pending_chunked_message:
724+
Consumer buffers chunk messages into memory until it receives all the chunks of the original message.
725+
While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they
726+
might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage
727+
chunks coming from different messages. This mainly happens when multiple publishers are publishing
728+
messages on the topic concurrently or publisher failed to publish all chunks of the messages.
729+
730+
If it's zero, the pending chunked messages will not be limited.
731+
732+
Default: `10`.
733+
* auto_ack_oldest_chunked_message_on_queue_full:
734+
Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it
735+
can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage.
736+
Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking
737+
if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
738+
Default: `False`.
711739
"""
712740
_check_type(str, subscription_name, 'subscription_name')
713741
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -724,6 +752,8 @@ def my_listener(consumer, message):
724752
_check_type_or_none(dict, properties, 'properties')
725753
_check_type(InitialPosition, initial_position, 'initial_position')
726754
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
755+
_check_type(int, max_pending_chunked_message, 'max_pending_chunked_message')
756+
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
727757

728758
conf = _pulsar.ConsumerConfiguration()
729759
conf.consumer_type(consumer_type)
@@ -750,6 +780,8 @@ def my_listener(consumer, message):
750780
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
751781

752782
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
783+
conf.max_pending_chunked_message(max_pending_chunked_message)
784+
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
753785

754786
c = Consumer()
755787
if isinstance(topic, str):
@@ -1253,7 +1285,7 @@ def is_connected(self):
12531285
Check if the consumer is connected or not.
12541286
"""
12551287
return self._consumer.is_connected()
1256-
1288+
12571289
def get_last_message_id(self):
12581290
"""
12591291
Get the last message id.

pulsar-client-cpp/python/pulsar_test.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,32 @@ def test_json_schema_encode(self):
12481248
second_encode = schema.encode(record)
12491249
self.assertEqual(first_encode, second_encode)
12501250

1251+
def test_chunking(self):
1252+
client = Client(self.serviceUrl)
1253+
data_size = 10 * 1024 * 1024
1254+
producer = client.create_producer(
1255+
'test_chunking',
1256+
chunking_enabled=True
1257+
)
1258+
1259+
consumer = client.subscribe('test_chunking', "my-subscription",
1260+
max_pending_chunked_message=10,
1261+
auto_ack_oldest_chunked_message_on_queue_full=False
1262+
)
1263+
1264+
producer.send(bytes(bytearray(os.urandom(data_size))), None)
1265+
msg = consumer.receive(TM)
1266+
self.assertEqual(len(msg.data()), data_size)
1267+
1268+
def test_invalid_chunking_config(self):
1269+
client = Client(self.serviceUrl)
1270+
1271+
self._check_value_error(lambda: client.create_producer(
1272+
'test_invalid_chunking_config',
1273+
chunking_enabled=True,
1274+
batching_enabled=True
1275+
))
1276+
12511277
def _check_value_error(self, fun):
12521278
with self.assertRaises(ValueError):
12531279
fun()

pulsar-client-cpp/python/src/config.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ void export_config() {
259259
return_value_policy<copy_const_reference>())
260260
.def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs,
261261
return_self<>())
262+
.def("chunking_enabled", &ProducerConfiguration::isChunkingEnabled)
263+
.def("chunking_enabled", &ProducerConfiguration::setChunkingEnabled, return_self<>())
262264
.def("property", &ProducerConfiguration::setProperty, return_self<>())
263265
.def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>())
264266
.def("batching_type", &ProducerConfiguration::getBatchingType)
@@ -299,7 +301,14 @@ void export_config() {
299301
.def("replicate_subscription_state_enabled",
300302
&ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
301303
.def("replicate_subscription_state_enabled",
302-
&ConsumerConfiguration::isReplicateSubscriptionStateEnabled);
304+
&ConsumerConfiguration::isReplicateSubscriptionStateEnabled)
305+
.def("max_pending_chunked_message", &ConsumerConfiguration::getMaxPendingChunkedMessage)
306+
.def("max_pending_chunked_message", &ConsumerConfiguration::setMaxPendingChunkedMessage,
307+
return_self<>())
308+
.def("auto_ack_oldest_chunked_message_on_queue_full",
309+
&ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull)
310+
.def("auto_ack_oldest_chunked_message_on_queue_full",
311+
&ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, return_self<>());
303312

304313
class_<ReaderConfiguration>("ReaderConfiguration")
305314
.def("reader_listener", &ReaderConfiguration_setReaderListener, return_self<>())

0 commit comments

Comments
 (0)