From 049239db82b59380ecc2a1e68a9db99ad34c8bd7 Mon Sep 17 00:00:00 2001 From: shay23b Date: Thu, 9 Feb 2023 16:20:07 +0200 Subject: [PATCH 1/6] added produce without creating producer --- README.md | 12 ++++++++++ memphis/memphis.py | 59 +++++++++++++++++++++++++++++++++------------- 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 975ae94..8f597d8 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,18 @@ producer = await memphis.producer(station_name="", producer_name=" ``` ### Producing a message +Without creating a producer (receiver function of the connection struct). +In cases where extra performance is needed the recommended way is to create a producer first +and produce messages by using the produce receiver function of it +```python +await memphis.produce(station_name='test_station_py', producer_name='prod_py', message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + ack_wait_sec=15, # defaults to 15 + generate_random_suffix=False, #defaults to false + headers=headers, # default to {} + async_produce=False, #defaults to false + msg_id="123" +) +``` ```python await prod.produce( diff --git a/memphis/memphis.py b/memphis/memphis.py index d310818..3e6f187 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -48,6 +48,24 @@ def func_wrapper(): def cancel(self): self.t.cancel() +class Headers: + def __init__(self): + self.headers = {} + + def add(self, key, value): + """Add a header. + Args: + key (string): header key. + value (string): header value. + Raises: + Exception: _description_ + """ + if not key.startswith("$memphis"): + self.headers[key] = value + else: + raise MemphisHeaderError( + "Keys in headers should not start with $memphis") + class Memphis: def __init__(self): @@ -63,6 +81,7 @@ def __init__(self): self.station_schemaverse_to_dls = {} self.update_configurations_sub = {} self.configuration_tasks = {} + self.producers_map = dict() async def get_msgs_update_configurations(self, iterable: Iterable): try: @@ -347,7 +366,10 @@ async def producer(self, station_name: str, producer_name: str, generate_random_ elif self.schema_updates_data[station_name_internal]['type'] == "graphql": self.graphql_schemas[station_name_internal] = build_graphql_schema( self.schema_updates_data[station_name_internal]['active_version']['schema_content']) - return Producer(self, producer_name, station_name) + producer = Producer(self, producer_name, station_name) + map_key = station_name+"_"+producer_name + self.producers_map[map_key] = producer + return producer except Exception as e: raise MemphisError(str(e)) from e @@ -463,28 +485,31 @@ async def consumer(self, station_name: str, consumer_name: str, consumer_group: raise MemphisError(err_msg) return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries, start_consume_from_sequence=start_consume_from_sequence, last_messages=last_messages) - except Exception as e: raise MemphisError(str(e)) from e - - -class Headers: - def __init__(self): - self.headers = {} - - def add(self, key, value): - """Add a header. + async def produce(self, message, station_name: str, producer_name: str, generate_random_suffix: bool =False, ack_wait_sec: int = 15, headers: Union[Headers, None] = None, async_produce: bool=False, msg_id: Union[str, None]= None): + """Produces a message into a station without the need to create a producer. Args: - key (string): header key. - value (string): header value. + message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + ack_wait_sec (int, optional): max time in seconds to wait for an ack from memphis. Defaults to 15. + headers (dict, optional): Message headers, defaults to {}. + async_produce (boolean, optional): produce operation won't wait for broker acknowledgement + msg_id (string, optional): Attach msg-id header to the message in order to achieve idempotency Raises: Exception: _description_ + Exception: _description_ """ - if not key.startswith("$memphis"): - self.headers[key] = value - else: - raise MemphisHeaderError( - "Keys in headers should not start with $memphis") + try: + map_key = station_name+"_"+producer_name + producer = None + if map_key in self.producers_map: + producer = self.producers_map[map_key] + else: + producer = await self.producer(station_name=station_name, producer_name=producer_name, generate_random_suffix=generate_random_suffix) + self.producers_map[map_key] = producer + await producer.produce(message=message, ack_wait_sec=ack_wait_sec, headers=headers, async_produce= async_produce, msg_id=msg_id) + except Exception as e: + raise MemphisError(str(e)) from e class Station: From 9cfcc7e21898f197b987e25bc7c06d430131cf3a Mon Sep 17 00:00:00 2001 From: shay23b Date: Sun, 12 Feb 2023 11:08:43 +0200 Subject: [PATCH 2/6] fix destroy functions + add is_connected func --- README.md | 11 +++++++---- memphis/memphis.py | 10 ++++++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8f597d8..e6c526e 100644 --- a/README.md +++ b/README.md @@ -197,19 +197,22 @@ producer = await memphis.producer(station_name="", producer_name=" ``` ### Producing a message -Without creating a producer (receiver function of the connection struct). +Without creating a producer. In cases where extra performance is needed the recommended way is to create a producer first -and produce messages by using the produce receiver function of it +and produce messages by using the produce function of it ```python -await memphis.produce(station_name='test_station_py', producer_name='prod_py', message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) - ack_wait_sec=15, # defaults to 15 +await memphis.produce(message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) +station_name='test_station_py', producer_name='prod_py', generate_random_suffix=False, #defaults to false + ack_wait_sec=15, # defaults to 15 headers=headers, # default to {} async_produce=False, #defaults to false msg_id="123" ) ``` + +Creating a producer first ```python await prod.produce( message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) diff --git a/memphis/memphis.py b/memphis/memphis.py index 3e6f187..bb07286 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -300,6 +300,7 @@ async def close(self): await sub.unsubscribe() if self.update_configurations_sub is not None: await self.update_configurations_sub.unsubscribe() + self.producers_map.clear() except: return @@ -487,6 +488,7 @@ async def consumer(self, station_name: str, consumer_name: str, consumer_group: return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries, start_consume_from_sequence=start_consume_from_sequence, last_messages=last_messages) except Exception as e: raise MemphisError(str(e)) from e + async def produce(self, message, station_name: str, producer_name: str, generate_random_suffix: bool =False, ack_wait_sec: int = 15, headers: Union[Headers, None] = None, async_produce: bool=False, msg_id: Union[str, None]= None): """Produces a message into a station without the need to create a producer. Args: @@ -511,6 +513,9 @@ async def produce(self, message, station_name: str, producer_name: str, generate except Exception as e: raise MemphisError(str(e)) from e + def is_connected(self): + return self.broker_manager.is_connected + class Station: def __init__(self, connection, name: str): @@ -548,6 +553,8 @@ async def destroy(self): if sub is not None: await sub.unsubscribe() + self.connection.producers_map = {k: v for k, v in self.connection.producers_map.items() if self.name not in k} + except Exception as e: raise MemphisError(str(e)) from e @@ -778,6 +785,9 @@ async def destroy(self): if sub is not None: await sub.unsubscribe() + map_key = self.station_name+"_"+self.producer_name + del self.connection.producers_map[map_key] + except Exception as e: raise Exception(e) From 007e1c6e51a9d0bc1aaa02fc3a6715988b9e25e7 Mon Sep 17 00:00:00 2001 From: shay23b Date: Sun, 12 Feb 2023 11:12:15 +0200 Subject: [PATCH 3/6] internal station name + producer name to lower --- memphis/memphis.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index bb07286..8da7305 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -368,7 +368,7 @@ async def producer(self, station_name: str, producer_name: str, generate_random_ self.graphql_schemas[station_name_internal] = build_graphql_schema( self.schema_updates_data[station_name_internal]['active_version']['schema_content']) producer = Producer(self, producer_name, station_name) - map_key = station_name+"_"+producer_name + map_key = station_name_internal+"_"+producer_name.lower() self.producers_map[map_key] = producer return producer @@ -502,7 +502,8 @@ async def produce(self, message, station_name: str, producer_name: str, generate Exception: _description_ """ try: - map_key = station_name+"_"+producer_name + station_name_internal = get_internal_name(station_name) + map_key = station_name_internal+"_"+producer_name.lower() producer = None if map_key in self.producers_map: producer = self.producers_map[map_key] @@ -785,7 +786,7 @@ async def destroy(self): if sub is not None: await sub.unsubscribe() - map_key = self.station_name+"_"+self.producer_name + map_key = station_name_internal+"_"+self.producer_name.lower() del self.connection.producers_map[map_key] except Exception as e: From f8adf2b444169db603707d17e1d03bec3cb12660 Mon Sep 17 00:00:00 2001 From: shay23b Date: Sun, 12 Feb 2023 13:46:55 +0200 Subject: [PATCH 4/6] fix is connected --- memphis/memphis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 8da7305..254f580 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -278,7 +278,6 @@ async def close(self): try: if self.is_connection_active: await self.broker_manager.close() - self.broker_manager = None self.connection_id = None self.is_connection_active = False keys_schema_updates_subs = self.schema_updates_subs.keys() From 74c374c5428b1bb4b8bd968bff2beecbc0c8ee19 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 13 Feb 2023 08:49:51 +0200 Subject: [PATCH 5/6] rearrange args places in call for produce --- README.md | 4 ++-- memphis/memphis.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index e6c526e..32c51f0 100644 --- a/README.md +++ b/README.md @@ -201,8 +201,8 @@ Without creating a producer. In cases where extra performance is needed the recommended way is to create a producer first and produce messages by using the produce function of it ```python -await memphis.produce(message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) -station_name='test_station_py', producer_name='prod_py', +await memphis.produce(station_name='test_station_py', producer_name='prod_py', + message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) generate_random_suffix=False, #defaults to false ack_wait_sec=15, # defaults to 15 headers=headers, # default to {} diff --git a/memphis/memphis.py b/memphis/memphis.py index 254f580..e4b8e4d 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -488,7 +488,7 @@ async def consumer(self, station_name: str, consumer_name: str, consumer_group: except Exception as e: raise MemphisError(str(e)) from e - async def produce(self, message, station_name: str, producer_name: str, generate_random_suffix: bool =False, ack_wait_sec: int = 15, headers: Union[Headers, None] = None, async_produce: bool=False, msg_id: Union[str, None]= None): + async def produce(self, station_name: str, producer_name: str, message, generate_random_suffix: bool =False, ack_wait_sec: int = 15, headers: Union[Headers, None] = None, async_produce: bool=False, msg_id: Union[str, None]= None): """Produces a message into a station without the need to create a producer. Args: message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) From 8e03c930f3fabb4e3deef725fd1b1aef91e3fbb3 Mon Sep 17 00:00:00 2001 From: shay23b Date: Mon, 13 Feb 2023 09:10:26 +0200 Subject: [PATCH 6/6] fix produce args + add is_connected to readme --- README.md | 7 +++++++ memphis/memphis.py | 3 +++ 2 files changed, 10 insertions(+) diff --git a/README.md b/README.md index 32c51f0..b5fec84 100644 --- a/README.md +++ b/README.md @@ -321,3 +321,10 @@ sequence_number = msg.get_sequence_number() ```python consumer.destroy() ``` + + +### Check if broker is connected + +```python +memphis.is_connected() +``` \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index e4b8e4d..2a18c6f 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -491,7 +491,10 @@ async def consumer(self, station_name: str, consumer_name: str, consumer_group: async def produce(self, station_name: str, producer_name: str, message, generate_random_suffix: bool =False, ack_wait_sec: int = 15, headers: Union[Headers, None] = None, async_produce: bool=False, msg_id: Union[str, None]= None): """Produces a message into a station without the need to create a producer. Args: + station_name (str): station name to produce messages into. + producer_name (str): name for the producer. message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + generate_random_suffix (bool): false by default, if true concatenate a random suffix to producer's name ack_wait_sec (int, optional): max time in seconds to wait for an ack from memphis. Defaults to 15. headers (dict, optional): Message headers, defaults to {}. async_produce (boolean, optional): produce operation won't wait for broker acknowledgement