From ce6f438dc001ef3a0966215ead5b146c5e340410 Mon Sep 17 00:00:00 2001 From: shay23b Date: Thu, 5 Oct 2023 17:15:27 +0300 Subject: [PATCH] bug fix partitions numbers --- memphis/consumer.py | 4 ++-- memphis/producer.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 1a70697..31effe3 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -117,7 +117,7 @@ async def __consume(self, callback, partition_key: str = None, consumer_partitio if self.connection.is_connection_active and self.pull_interval_ms: try: if len(self.subscriptions) > 1: - if partition_key is None: + if partition_key is None and consumer_partition_number < 1: partition_number = next(self.partition_generator) memphis_messages = [] @@ -355,7 +355,7 @@ def get_partition_from_key(self, key): def validate_partition_number(self, partition_number, station_name): partitions_list = self.connection.partition_consumers_updates_data[station_name]["partitions_list"] if partitions_list is not None: - if partition_number < 0 or partition_number >= len(partitions_list): + if partition_number < 1 or partition_number > len(partitions_list): raise MemphisError("Partition number is out of range") elif partition_number not in partitions_list: raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") diff --git a/memphis/producer.py b/memphis/producer.py index 8fc5339..464dd42 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -419,7 +419,7 @@ def get_partition_from_key(self, key): def validate_partition_number(self, partition_number, station_name): partitions_list = self.connection.partition_producers_updates_data[station_name]["partitions_list"] if partitions_list is not None: - if partition_number < 0 or partition_number >= len(partitions_list): + if partition_number < 1 or partition_number > len(partitions_list): raise MemphisError("Partition number is out of range") elif partition_number not in partitions_list: raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}")