diff --git a/README.md b/README.md index 975ae94..b5fec84 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,22 @@ producer = await memphis.producer(station_name="", producer_name=" ``` ### Producing a message +Without creating a producer. +In cases where extra performance is needed the recommended way is to create a producer first +and produce messages by using the produce function of it +```python +await memphis.produce(station_name='test_station_py', producer_name='prod_py', + message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + generate_random_suffix=False, #defaults to false + ack_wait_sec=15, # defaults to 15 + headers=headers, # default to {} + async_produce=False, #defaults to false + msg_id="123" +) +``` + +Creating a producer first ```python await prod.produce( message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) @@ -306,3 +321,10 @@ sequence_number = msg.get_sequence_number() ```python consumer.destroy() ``` + + +### Check if broker is connected + +```python +memphis.is_connected() +``` \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index d310818..2a18c6f 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -48,6 +48,24 @@ def func_wrapper(): def cancel(self): self.t.cancel() +class Headers: + def __init__(self): + self.headers = {} + + def add(self, key, value): + """Add a header. + Args: + key (string): header key. + value (string): header value. + Raises: + Exception: _description_ + """ + if not key.startswith("$memphis"): + self.headers[key] = value + else: + raise MemphisHeaderError( + "Keys in headers should not start with $memphis") + class Memphis: def __init__(self): @@ -63,6 +81,7 @@ def __init__(self): self.station_schemaverse_to_dls = {} self.update_configurations_sub = {} self.configuration_tasks = {} + self.producers_map = dict() async def get_msgs_update_configurations(self, iterable: Iterable): try: @@ -259,7 +278,6 @@ async def close(self): try: if self.is_connection_active: await self.broker_manager.close() - self.broker_manager = None self.connection_id = None self.is_connection_active = False keys_schema_updates_subs = self.schema_updates_subs.keys() @@ -281,6 +299,7 @@ async def close(self): await sub.unsubscribe() if self.update_configurations_sub is not None: await self.update_configurations_sub.unsubscribe() + self.producers_map.clear() except: return @@ -347,7 +366,10 @@ async def producer(self, station_name: str, producer_name: str, generate_random_ elif self.schema_updates_data[station_name_internal]['type'] == "graphql": self.graphql_schemas[station_name_internal] = build_graphql_schema( self.schema_updates_data[station_name_internal]['active_version']['schema_content']) - return Producer(self, producer_name, station_name) + producer = Producer(self, producer_name, station_name) + map_key = station_name_internal+"_"+producer_name.lower() + self.producers_map[map_key] = producer + return producer except Exception as e: raise MemphisError(str(e)) from e @@ -463,28 +485,39 @@ async def consumer(self, station_name: str, consumer_name: str, consumer_group: raise MemphisError(err_msg) return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries, start_consume_from_sequence=start_consume_from_sequence, last_messages=last_messages) - except Exception as e: raise MemphisError(str(e)) from e - -class Headers: - def __init__(self): - self.headers = {} - - def add(self, key, value): - """Add a header. + async def produce(self, station_name: str, producer_name: str, message, generate_random_suffix: bool =False, ack_wait_sec: int = 15, headers: Union[Headers, None] = None, async_produce: bool=False, msg_id: Union[str, None]= None): + """Produces a message into a station without the need to create a producer. Args: - key (string): header key. - value (string): header value. + station_name (str): station name to produce messages into. + producer_name (str): name for the producer. + message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + generate_random_suffix (bool): false by default, if true concatenate a random suffix to producer's name + ack_wait_sec (int, optional): max time in seconds to wait for an ack from memphis. Defaults to 15. + headers (dict, optional): Message headers, defaults to {}. + async_produce (boolean, optional): produce operation won't wait for broker acknowledgement + msg_id (string, optional): Attach msg-id header to the message in order to achieve idempotency Raises: Exception: _description_ + Exception: _description_ """ - if not key.startswith("$memphis"): - self.headers[key] = value - else: - raise MemphisHeaderError( - "Keys in headers should not start with $memphis") + try: + station_name_internal = get_internal_name(station_name) + map_key = station_name_internal+"_"+producer_name.lower() + producer = None + if map_key in self.producers_map: + producer = self.producers_map[map_key] + else: + producer = await self.producer(station_name=station_name, producer_name=producer_name, generate_random_suffix=generate_random_suffix) + self.producers_map[map_key] = producer + await producer.produce(message=message, ack_wait_sec=ack_wait_sec, headers=headers, async_produce= async_produce, msg_id=msg_id) + except Exception as e: + raise MemphisError(str(e)) from e + + def is_connected(self): + return self.broker_manager.is_connected class Station: @@ -523,6 +556,8 @@ async def destroy(self): if sub is not None: await sub.unsubscribe() + self.connection.producers_map = {k: v for k, v in self.connection.producers_map.items() if self.name not in k} + except Exception as e: raise MemphisError(str(e)) from e @@ -753,6 +788,9 @@ async def destroy(self): if sub is not None: await sub.unsubscribe() + map_key = station_name_internal+"_"+self.producer_name.lower() + del self.connection.producers_map[map_key] + except Exception as e: raise Exception(e)