Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions memphis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(
self.consumer_group = consumer_group.lower()
self.pull_interval_ms = pull_interval_ms
self.batch_size = batch_size
self.batch_max_time_to_wait_ms = batch_max_time_to_wait_ms
self.batch_max_time_to_wait_ms = batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 1000 else 1000
self.max_ack_time_ms = max_ack_time_ms
self.max_msg_deliveries = max_msg_deliveries
self.ping_consumer_interval_ms = 30000
Expand Down Expand Up @@ -238,9 +238,9 @@ async def main(host, username, password, station):

if self.connection.is_connection_active:
try:
if batch_size > self.MAX_BATCH_SIZE:
if batch_size > self.MAX_BATCH_SIZE or batch_size < 1:
raise MemphisError(
f"Batch size can not be greater than {self.MAX_BATCH_SIZE}")
f"Batch size can not be greater than {self.MAX_BATCH_SIZE} or less than 1")
self.batch_size = batch_size
if len(self.dls_messages) > 0:
if len(self.dls_messages) <= batch_size:
Expand Down
18 changes: 9 additions & 9 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ async def consumer(
consumer_group (str, optional): consumer group name. Defaults to the consumer name.
pull_interval_ms (int, optional): interval in milliseconds between pulls. Defaults to 1000.
batch_size (int, optional): pull batch size. Defaults to 10.
batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 5000.
batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 5000. The lowest value is 1000(1 second), and if the value is lower than 1000, it will be set to 1000.
max_ack_time_ms (int, optional): max time for ack a message in milliseconds, in case a message not acked in this time period the Memphis broker will resend it. Defaults to 30000.
max_msg_deliveries (int, optional): max number of message deliveries, by default is 2.
generate_random_suffix (bool): Deprecated: will be stopped to be supported after November 1'st, 2023. false by default, if true concatenate a random suffix to consumer's name
Expand All @@ -682,9 +682,9 @@ async def consumer(
try:
if not self.is_connection_active:
raise MemphisError("Connection is dead")
if batch_size > self.MAX_BATCH_SIZE:
if batch_size > self.MAX_BATCH_SIZE or batch_size < 1:
raise MemphisError(
f"Batch size can not be greater than {self.MAX_BATCH_SIZE}")
f"Batch size can not be greater than {self.MAX_BATCH_SIZE} or less than 1")
real_name = consumer_name.lower()
if generate_random_suffix:
warnings.warn("Deprecation warning: generate_random_suffix will be stopped to be supported after November 1'st, 2023.")
Expand Down Expand Up @@ -773,7 +773,7 @@ async def consumer(
cg,
pull_interval_ms,
batch_size,
batch_max_time_to_wait_ms,
batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 1000 else 1000,
max_ack_time_ms,
max_msg_deliveries,
start_consume_from_sequence=start_consume_from_sequence,
Expand Down Expand Up @@ -916,8 +916,8 @@ async def fetch_messages(
consumer_name (str): name for the consumer.
consumer_group (str, optional): consumer group name. Defaults to the consumer name.
batch_size (int, optional): pull batch size. Defaults to 10.
batch_max_time_to_wait_ms (int, optional): max time in miliseconds to wait between pulls. Defaults to 5000.
max_ack_time_ms (int, optional): max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it. Defaults to 30000.
batch_max_time_to_wait_ms (int, optional): max time in milliseconds to wait between pulls. Defaults to 5000. The lowest value is 1000(1 second), and if the value is lower than 1000, it will be set to 1000.
max_ack_time_ms (int, optional): max time for ack a message in milliseconds, in case a message not acked in this time period the Memphis broker will resend it. Defaults to 30000.
max_msg_deliveries (int, optional): max number of message deliveries, by default is 2.
generate_random_suffix (bool): Deprecated: will be stopped to be supported after November 1'st, 2023. false by default, if true concatenate a random suffix to consumer's name
start_consume_from_sequence(int, optional): start consuming from a specific sequence. defaults to 1.
Expand All @@ -933,9 +933,9 @@ async def fetch_messages(
if not self.is_connection_active:
raise MemphisError(
"Cant fetch messages without being connected!")
if batch_size > self.MAX_BATCH_SIZE:
if batch_size > self.MAX_BATCH_SIZE or batch_size < 1:
raise MemphisError(
f"Batch size can not be greater than {self.MAX_BATCH_SIZE}")
f"Batch size can not be greater than {self.MAX_BATCH_SIZE} or less than 1")
internal_station_name = get_internal_name(station_name)
consumer_map_key = internal_station_name + "_" + consumer_name.lower()
if consumer_map_key in self.consumers_map:
Expand All @@ -946,7 +946,7 @@ async def fetch_messages(
consumer_name=consumer_name,
consumer_group=consumer_group,
batch_size=batch_size,
batch_max_time_to_wait_ms=batch_max_time_to_wait_ms,
batch_max_time_to_wait_ms=batch_max_time_to_wait_ms if batch_max_time_to_wait_ms >= 1000 else 1000,
max_ack_time_ms=max_ack_time_ms,
max_msg_deliveries=max_msg_deliveries,
generate_random_suffix=generate_random_suffix,
Expand Down