From de7a0cbaef477cc49c39d0a79fee4a31cfcb2930 Mon Sep 17 00:00:00 2001 From: Bazen <49089563+bazen-teklehaymanot@users.noreply.github.com> Date: Mon, 25 Dec 2023 01:51:59 +0200 Subject: [PATCH] Added consumer batch size and max wait time validation --- memphis/consumer.py | 6 +++--- memphis/memphis.py | 18 +++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 0d168ca..a0f6280 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -37,7 +37,7 @@ def __init__( self.consumer_group = consumer_group.lower() self.pull_interval_ms = pull_interval_ms self.batch_size = batch_size - self.batch_max_time_to_wait_ms = batch_max_time_to_wait_ms + self.batch_max_time_to_wait_ms = batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 1000 else 1000 self.max_ack_time_ms = max_ack_time_ms self.max_msg_deliveries = max_msg_deliveries self.ping_consumer_interval_ms = 30000 @@ -238,9 +238,9 @@ async def main(host, username, password, station): if self.connection.is_connection_active: try: - if batch_size > self.MAX_BATCH_SIZE: + if batch_size > self.MAX_BATCH_SIZE or batch_size < 1: raise MemphisError( - f"Batch size can not be greater than {self.MAX_BATCH_SIZE}") + f"Batch size can not be greater than {self.MAX_BATCH_SIZE} or less than 1") self.batch_size = batch_size if len(self.dls_messages) > 0: if len(self.dls_messages) <= batch_size: diff --git a/memphis/memphis.py b/memphis/memphis.py index e746a8c..5482ad4 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -670,7 +670,7 @@ async def consumer( consumer_group (str, optional): consumer group name. Defaults to the consumer name. pull_interval_ms (int, optional): interval in milliseconds between pulls. Defaults to 1000. batch_size (int, optional): pull batch size. Defaults to 10. - batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 5000. + batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 5000. The lowest value is 1000(1 second), and if the value is lower than 1000, it will be set to 1000. max_ack_time_ms (int, optional): max time for ack a message in milliseconds, in case a message not acked in this time period the Memphis broker will resend it. Defaults to 30000. max_msg_deliveries (int, optional): max number of message deliveries, by default is 2. generate_random_suffix (bool): Deprecated: will be stopped to be supported after November 1'st, 2023. false by default, if true concatenate a random suffix to consumer's name @@ -682,9 +682,9 @@ async def consumer( try: if not self.is_connection_active: raise MemphisError("Connection is dead") - if batch_size > self.MAX_BATCH_SIZE: + if batch_size > self.MAX_BATCH_SIZE or batch_size < 1: raise MemphisError( - f"Batch size can not be greater than {self.MAX_BATCH_SIZE}") + f"Batch size can not be greater than {self.MAX_BATCH_SIZE} or less than 1") real_name = consumer_name.lower() if generate_random_suffix: warnings.warn("Deprecation warning: generate_random_suffix will be stopped to be supported after November 1'st, 2023.") @@ -773,7 +773,7 @@ async def consumer( cg, pull_interval_ms, batch_size, - batch_max_time_to_wait_ms, + batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 1000 else 1000, max_ack_time_ms, max_msg_deliveries, start_consume_from_sequence=start_consume_from_sequence, @@ -916,8 +916,8 @@ async def fetch_messages( consumer_name (str): name for the consumer. consumer_group (str, optional): consumer group name. Defaults to the consumer name. batch_size (int, optional): pull batch size. Defaults to 10. - batch_max_time_to_wait_ms (int, optional): max time in miliseconds to wait between pulls. Defaults to 5000. - max_ack_time_ms (int, optional): max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it. Defaults to 30000. + batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 5000. The lowest value is 1000(1 second), and if the value is lower than 1000, it will be set to 1000. + max_ack_time_ms (int, optional): max time for ack a message in milliseconds, in case a message not acked in this time period the Memphis broker will resend it. Defaults to 30000. max_msg_deliveries (int, optional): max number of message deliveries, by default is 2. generate_random_suffix (bool): Deprecated: will be stopped to be supported after November 1'st, 2023. false by default, if true concatenate a random suffix to consumer's name start_consume_from_sequence(int, optional): start consuming from a specific sequence. defaults to 1. @@ -933,9 +933,9 @@ async def fetch_messages( if not self.is_connection_active: raise MemphisError( "Cant fetch messages without being connected!") - if batch_size > self.MAX_BATCH_SIZE: + if batch_size > self.MAX_BATCH_SIZE or batch_size < 1: raise MemphisError( - f"Batch size can not be greater than {self.MAX_BATCH_SIZE}") + f"Batch size can not be greater than {self.MAX_BATCH_SIZE} or less than 1") internal_station_name = get_internal_name(station_name) consumer_map_key = internal_station_name + "_" + consumer_name.lower() if consumer_map_key in self.consumers_map: @@ -946,7 +946,7 @@ async def fetch_messages( consumer_name=consumer_name, consumer_group=consumer_group, batch_size=batch_size, - batch_max_time_to_wait_ms=batch_max_time_to_wait_ms, + batch_max_time_to_wait_ms=batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 1000 else 1000, max_ack_time_ms=max_ack_time_ms, max_msg_deliveries=max_msg_deliveries, generate_random_suffix=generate_random_suffix,