From f24d0257120e17e824fc49d16b2e2be8f164be9d Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 09:11:38 +0300 Subject: [PATCH 01/11] produce consume producer number --- README.md | 28 +++++++++++++++++++++++++--- memphis/consumer.py | 35 ++++++++++++++++++++++++++++------- memphis/memphis.py | 14 ++++++++++---- memphis/producer.py | 20 +++++++++++++++++++- 4 files changed, 82 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 164fc1b..78e2354 100644 --- a/README.md +++ b/README.md @@ -250,7 +250,8 @@ await memphis.produce(station_name='test_station_py', producer_name='prod_py', headers=headers, # default to {} nonblocking=False, #defaults to false msg_id="123", - producer_partition_key="key" #default to None + producer_partition_key="key", #default to None + producer_partition_number=-1, #default to -1 ) ``` @@ -287,7 +288,18 @@ Use any string to produce messages to a specific partition ```python await producer.produce( message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) - producer_partition_key="key") #default to None + producer_partition_key="key", #default to None +) +``` + +### Produce using partition number +Use number of partition to produce messages to a specific partition + +```python +await producer.produce( + message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + producer_partition_number=-1 #default to -1 +) ``` ### Non-blocking Produce with Task Limits @@ -366,6 +378,15 @@ consumer.consume(msg_handler, ) ``` +### Consume using a partition number +The number will be used to consume from a specific partition + +```python +consumer.consume(msg_handler, + consumer_partition_number = -1 #consume from a specific partition + ) +``` + ### Fetch a single batch of messages ```python msgs = await memphis.fetch_messages( @@ -378,7 +399,8 @@ msgs = await memphis.fetch_messages( max_msg_deliveries=10, # defaults to 10 start_consume_from_sequence=1, # start consuming from a specific sequence. defaults to 1 last_messages=-1, # consume the last N messages, defaults to -1 (all messages in the station)) - consumer_partition_key="key" # used to consume from a specific partition, default to None + consumer_partition_key="key", # used to consume from a specific partition, default to None + consumer_partition_number=-1 # used to consume from a specific partition, default to -1 ) ``` diff --git a/memphis/consumer.py b/memphis/consumer.py index 9952b01..dc932ca 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -60,7 +60,7 @@ def set_context(self, context): """Set a context (dict) that will be passed to each message handler call.""" self.context = context - def consume(self, callback, consumer_partition_key: str = None): + def consume(self, callback, consumer_partition_key: str = None, consumer_partition_number: int = -1): """ This method starts consuming events from the specified station and invokes the provided callback function for each batch of messages received. @@ -71,6 +71,7 @@ def consume(self, callback, consumer_partition_key: str = None): - `error`: An optional `MemphisError` object if there was an error while consuming the messages. - `context`: A dictionary representing the context that was set using the `set_context()` method. consumer_partition_key (str): A string that will be used to determine the partition to consume from. If not provided, the consume will work in a Round Robin fashion. + consumer_partition_number (int): An integer that will be used to determine the partition to consume from. If not provided, the consume will work in a Round Robin fashion. Example: import asyncio @@ -97,13 +98,17 @@ async def main(): asyncio.run(main()) """ self.dls_callback_func = callback - self.t_consume = asyncio.create_task(self.__consume(callback, partition_key=consumer_partition_key)) + self.t_consume = asyncio.create_task(self.__consume(callback, partition_key=consumer_partition_key, consumer_partition_number=consumer_partition_number)) - async def __consume(self, callback, partition_key: str = None): + async def __consume(self, callback, partition_key: str = None, consumer_partition_number: int = -1): partition_number = 1 - - if partition_key is not None: + if consumer_partition_number > 0 and partition_key is not None: + raise MemphisError('Can not use both partition number and partition key') + elif partition_key is not None: partition_number = self.get_partition_from_key(partition_key) + elif consumer_partition_number > 0: + validate_partition_number(self, consumer_partition_number, self.inner_station_name) + partition_number = consumer_partition_number while True: if self.connection.is_connection_active and self.pull_interval_ms: @@ -162,7 +167,7 @@ async def __consume_dls(self): await self.dls_callback_func([], MemphisError(str(e)), self.context) return - async def fetch(self, batch_size: int = 10, consumer_partition_key: str = None): + async def fetch(self, batch_size: int = 10, consumer_partition_key: str = None, consumer_partition_number: int = -1): """ Fetch a batch of messages. @@ -223,8 +228,13 @@ async def main(host, username, password, station): partition_number = 1 if len(self.subscriptions) > 1: - if consumer_partition_key is not None: + if consumer_partition_number > 0 and consumer_partition_key is not None: + raise MemphisError('Can not use both partition number and partition key') + elif consumer_partition_key is not None: partition_number = self.get_partition_from_key(consumer_partition_key) + elif consumer_partition_number > 0: + validate_partition_number(self, consumer_partition_number, self.inner_station_name) + partition_number = consumer_partition_number else: partition_number = next(self.partition_generator) @@ -299,3 +309,14 @@ def get_partition_from_key(self, key): return self.connection.partition_consumers_updates_data[self.inner_station_name]["partitions_list"][index] except Exception as e: raise e + +def validate_partition_number(self, partition_number, station_name): + partitions_list = self.connection.partition_consumers_updates_data[station_name]["partitions_list"] + if partitions_list is not None: + if partition_number < 0 or partition_number >= len(partitions_list): + raise MemphisError("Partition number is out of range") + elif partition_number not in partitions_list: + raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") + else: + raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") + diff --git a/memphis/memphis.py b/memphis/memphis.py index 74f039a..9ab3e2e 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -708,7 +708,8 @@ async def produce( headers: Union[Headers, None] = None, async_produce: bool = False, msg_id: Union[str, None] = None, - producer_partition_key: Union[str, None] = None + producer_partition_key: Union[str, None] = None, + producer_partition_number: Union[int, -1] = -1 ): """Produces a message into a station without the need to create a producer. Args: @@ -721,6 +722,7 @@ async def produce( async_produce (boolean, optional): produce operation won't wait for broker acknowledgement msg_id (string, optional): Attach msg-id header to the message in order to achieve idempotence producer_partition_key (string, optional): produce to a specific partition using the partition key + producer_partition_number (int, optional): produce to a specific partition using the partition number Raises: Exception: _description_ """ @@ -742,7 +744,8 @@ async def produce( headers=headers, async_produce=async_produce, msg_id=msg_id, - producer_partition_key=producer_partition_key + producer_partition_key=producer_partition_key, + producer_partition_number=producer_partition_number ) except Exception as e: raise MemphisError(str(e)) from e @@ -760,6 +763,7 @@ async def fetch_messages( start_consume_from_sequence: int = 1, last_messages: int = -1, consumer_partition_key: str = None, + consumer_partition_number: int = -1 ): """Consume a batch of messages. Args:. @@ -773,7 +777,8 @@ async def fetch_messages( generate_random_suffix (bool): Deprecated: will be stopped to be supported after November 1'st, 2023. false by default, if true concatenate a random suffix to consumer's name start_consume_from_sequence(int, optional): start consuming from a specific sequence. defaults to 1. last_messages: consume the last N messages, defaults to -1 (all messages in the station). - consumer_partition_key (str): consume from a specific partition using the partition key + consumer_partition_key (str): consume from a specific partition using the partition key. + consumer_partition_number (int): consume from a specific partition using the partition number. Returns: list: Message """ @@ -802,7 +807,7 @@ async def fetch_messages( start_consume_from_sequence=start_consume_from_sequence, last_messages=last_messages, ) - messages = await consumer.fetch(batch_size, consumer_partition_key=consumer_partition_key) + messages = await consumer.fetch(batch_size, consumer_partition_key=consumer_partition_key, consumer_partition_number=consumer_partition_number) if messages == None: messages = [] return messages @@ -887,3 +892,4 @@ def unset_cached_consumer_station(self, station_name): del self.consumers_map[key] except Exception as e: raise e + \ No newline at end of file diff --git a/memphis/producer.py b/memphis/producer.py index 33729f2..8038c55 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -193,7 +193,8 @@ async def produce( nonblocking: bool = False, msg_id: Union[str, None] = None, concurrent_task_limit: Union[int, None] = None, - producer_partition_key: Union[str, None] = None + producer_partition_key: Union[str, None] = None, + producer_partition_number: Union[int, -1] = -1 ): """Produces a message into a station. Args: @@ -218,6 +219,7 @@ async def produce( if the limit is hit and will wait until the buffer drains halfway down. producer_partition_key (string, optional): Produce messages to a specific partition using the partition key. + producer_partition_number (int, optional): Produce messages to a specific partition using the partition number. Raises: Exception: _description_ """ @@ -242,9 +244,14 @@ async def produce( partition_name = self.internal_station_name elif len(self.connection.partition_producers_updates_data[self.internal_station_name]['partitions_list']) == 1: partition_name = f"{self.internal_station_name}${self.connection.partition_producers_updates_data[self.internal_station_name]['partitions_list'][0]}" + elif producer_partition_number > 0 and producer_partition_key is not None: + raise MemphisError('Can not use both partition number and partition key') elif producer_partition_key is not None: partition_number = self.get_partition_from_key(producer_partition_key) partition_name = f"{self.internal_station_name}${str(partition_number)}" + elif producer_partition_number > 0: + self.validate_partition_number(producer_partition_number, self.internal_station_name) + partition_name = f"{self.internal_station_name}${str(producer_partition_number)}" else: partition_name = f"{self.internal_station_name}${str(next(self.partition_generator))}" @@ -408,3 +415,14 @@ def get_partition_from_key(self, key): return self.connection.partition_producers_updates_data[self.internal_station_name]["partitions_list"][index] except Exception as e: raise e + + def validate_partition_number(self, partition_number, station_name): + partitions_list = self.connection.partition_producers_updates_data[station_name]["partitions_list"] + if partitions_list is not None: + if partition_number < 0 or partition_number >= len(partitions_list): + raise MemphisError("Partition number is out of range") + elif partition_number not in partitions_list: + raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") + else: + raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") + From 6abd7809c03f46575ee9d4d54ba3e0f8bcf7d987 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 09:41:13 +0300 Subject: [PATCH 02/11] linter --- memphis/consumer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 27cf3b5..348bde4 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -252,7 +252,6 @@ async def main(host, username, password, station): del self.dls_messages[0:batch_size] self.dls_current_index -= len(messages) return messages - msgs = await self.subscriptions[partition_number].fetch(batch_size) for msg in msgs: messages.append( From 0b0cd16ca483e3324193745b3d23a4aef50e7808 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 09:45:07 +0300 Subject: [PATCH 03/11] linter --- memphis/consumer.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 348bde4..b18dbb6 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -353,14 +353,14 @@ def get_partition_from_key(self, key): raise e def validate_partition_number(self, partition_number, station_name): - partitions_list = self.connection.partition_consumers_updates_data[station_name]["partitions_list"] - if partitions_list is not None: - if partition_number < 0 or partition_number >= len(partitions_list): - raise MemphisError("Partition number is out of range") - elif partition_number not in partitions_list: - raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") - else: + partitions_list = self.connection.partition_consumers_updates_data[station_name]["partitions_list"] + if partitions_list is not None: + if partition_number < 0 or partition_number >= len(partitions_list): + raise MemphisError("Partition number is out of range") + elif partition_number not in partitions_list: raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") + else: + raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") def load_messages_to_cache(self, batch_size, partition_number): if not self.loading_thread or not self.loading_thread.is_alive(): From c7a99818bcae7fb7d263942f3146904f71f42a6a Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 09:47:35 +0300 Subject: [PATCH 04/11] linter --- memphis/producer.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/memphis/producer.py b/memphis/producer.py index 926d445..567d073 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -250,8 +250,8 @@ async def produce( partition_number = self.get_partition_from_key(producer_partition_key) partition_name = f"{self.internal_station_name}${str(partition_number)}" elif producer_partition_number > 0: - self.validate_partition_number(producer_partition_number, self.internal_station_name) - partition_name = f"{self.internal_station_name}${str(producer_partition_number)}" + self.validate_partition_number(producer_partition_number, self.internal_station_name) + partition_name = f"{self.internal_station_name}${str(producer_partition_number)}" else: partition_name = f"{self.internal_station_name}${str(next(self.partition_generator))}" @@ -424,5 +424,4 @@ def validate_partition_number(self, partition_number, station_name): elif partition_number not in partitions_list: raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") else: - raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") - + raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") \ No newline at end of file From 40c7421d81c52b15e8d6267e612aebcd250fb073 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 09:51:01 +0300 Subject: [PATCH 05/11] linter --- memphis/producer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/memphis/producer.py b/memphis/producer.py index 567d073..bc72af9 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -424,4 +424,5 @@ def validate_partition_number(self, partition_number, station_name): elif partition_number not in partitions_list: raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") else: - raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") \ No newline at end of file + raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") + \ No newline at end of file From 760fb67af952bf35b05d12342335daf0ecb50ad0 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 09:53:08 +0300 Subject: [PATCH 06/11] lint --- memphis/producer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/memphis/producer.py b/memphis/producer.py index bc72af9..81cad90 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -425,4 +425,3 @@ def validate_partition_number(self, partition_number, station_name): raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") else: raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") - \ No newline at end of file From 0b4fe77ad56ae83f58cb6e3fe33448088b03d71d Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 09:56:50 +0300 Subject: [PATCH 07/11] lint --- memphis/consumer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index b18dbb6..b21a02f 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -366,7 +366,6 @@ def load_messages_to_cache(self, batch_size, partition_number): if not self.loading_thread or not self.loading_thread.is_alive(): asyncio.ensure_future(self.__load_messages(batch_size, partition_number)) - async def __load_messages(self, batch_size, partition_number): new_messages = await self.fetch(batch_size, consumer_partition_number=partition_number) if new_messages is not None: From 3270576ee90866a7ad5b5841018ac67a9982f496 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 10:27:07 +0300 Subject: [PATCH 08/11] lint --- memphis/producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memphis/producer.py b/memphis/producer.py index 81cad90..8fc5339 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -415,7 +415,7 @@ def get_partition_from_key(self, key): return self.connection.partition_producers_updates_data[self.internal_station_name]["partitions_list"][index] except Exception as e: raise e - + def validate_partition_number(self, partition_number, station_name): partitions_list = self.connection.partition_producers_updates_data[station_name]["partitions_list"] if partitions_list is not None: From 6f4739309e2b883b08c1c3ac93bd400a07b2c1a1 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 10:38:33 +0300 Subject: [PATCH 09/11] lint --- memphis/consumer.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index b21a02f..42081bc 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -352,21 +352,21 @@ def get_partition_from_key(self, key): except Exception as e: raise e -def validate_partition_number(self, partition_number, station_name): - partitions_list = self.connection.partition_consumers_updates_data[station_name]["partitions_list"] - if partitions_list is not None: - if partition_number < 0 or partition_number >= len(partitions_list): - raise MemphisError("Partition number is out of range") - elif partition_number not in partitions_list: + def validate_partition_number(self, partition_number, station_name): + partitions_list = self.connection.partition_consumers_updates_data[station_name]["partitions_list"] + if partitions_list is not None: + if partition_number < 0 or partition_number >= len(partitions_list): + raise MemphisError("Partition number is out of range") + elif partition_number not in partitions_list: + raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") + else: raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") - else: - raise MemphisError(f"Partition {str(partition_number)} does not exist in station {station_name}") -def load_messages_to_cache(self, batch_size, partition_number): - if not self.loading_thread or not self.loading_thread.is_alive(): - asyncio.ensure_future(self.__load_messages(batch_size, partition_number)) + def load_messages_to_cache(self, batch_size, partition_number): + if not self.loading_thread or not self.loading_thread.is_alive(): + asyncio.ensure_future(self.__load_messages(batch_size, partition_number)) -async def __load_messages(self, batch_size, partition_number): - new_messages = await self.fetch(batch_size, consumer_partition_number=partition_number) - if new_messages is not None: - self.cached_messages.extend(new_messages) + async def __load_messages(self, batch_size, partition_number): + new_messages = await self.fetch(batch_size, consumer_partition_number=partition_number) + if new_messages is not None: + self.cached_messages.extend(new_messages) From 3e1d7189b4e2e502cc10c6f2eff25f11a95feb5f Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 13:04:43 +0300 Subject: [PATCH 10/11] fix --- memphis/consumer.py | 4 ++-- memphis/producer.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 42081bc..0516a1e 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -110,7 +110,7 @@ async def __consume(self, callback, partition_key: str = None, consumer_partitio elif partition_key is not None: partition_number = self.get_partition_from_key(partition_key) elif consumer_partition_number > 0: - validate_partition_number(self, consumer_partition_number, self.inner_station_name) + self.validate_partition_number(self, consumer_partition_number, self.inner_station_name) partition_number = consumer_partition_number while True: @@ -218,7 +218,7 @@ async def main(host, username, password, station): elif consumer_partition_key is not None: partition_number = self.get_partition_from_key(consumer_partition_key) elif consumer_partition_number > 0: - validate_partition_number(self, consumer_partition_number, self.inner_station_name) + self.validate_partition_number(self, consumer_partition_number, self.inner_station_name) partition_number = consumer_partition_number else: partition_number = next(self.partition_generator) diff --git a/memphis/producer.py b/memphis/producer.py index 8fc5339..3e5fac9 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -417,7 +417,7 @@ def get_partition_from_key(self, key): raise e def validate_partition_number(self, partition_number, station_name): - partitions_list = self.connection.partition_producers_updates_data[station_name]["partitions_list"] + partitions_list = self.connection.partition_consumers_updates_data[station_name]["partitions_list"] if partitions_list is not None: if partition_number < 0 or partition_number >= len(partitions_list): raise MemphisError("Partition number is out of range") From debc4d5f52ee0ee2622beecf78c2e47cd0d916b6 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 2 Oct 2023 13:07:04 +0300 Subject: [PATCH 11/11] fix --- memphis/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 0516a1e..1a70697 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -110,7 +110,7 @@ async def __consume(self, callback, partition_key: str = None, consumer_partitio elif partition_key is not None: partition_number = self.get_partition_from_key(partition_key) elif consumer_partition_number > 0: - self.validate_partition_number(self, consumer_partition_number, self.inner_station_name) + self.validate_partition_number(consumer_partition_number, self.inner_station_name) partition_number = consumer_partition_number while True: @@ -218,7 +218,7 @@ async def main(host, username, password, station): elif consumer_partition_key is not None: partition_number = self.get_partition_from_key(consumer_partition_key) elif consumer_partition_number > 0: - self.validate_partition_number(self, consumer_partition_number, self.inner_station_name) + self.validate_partition_number(consumer_partition_number, self.inner_station_name) partition_number = consumer_partition_number else: partition_number = next(self.partition_generator)