From 916f0b6f091b5b3b8b76352a9d39758a2e96f8e8 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Mon, 31 Jul 2023 12:49:32 -0500 Subject: [PATCH 1/5] Add async task buffer with limit --- memphis/producer.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/memphis/producer.py b/memphis/producer.py index a89df34..db7755a 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -27,6 +27,7 @@ def __init__( self.internal_station_name = get_internal_name(self.station_name) self.loop = asyncio.get_running_loop() self.real_name = real_name + self.background_tasks = set() async def validate_msg(self, message): if self.connection.schema_updates_data[self.internal_station_name] != {}: @@ -158,6 +159,7 @@ async def produce( headers: Union[Headers, None] = None, async_produce: bool = False, msg_id: Union[str, None] = None, + buffer_limit_msg: Union[int, None] = None ): """Produces a message into a station. Args: @@ -166,6 +168,8 @@ async def produce( headers (dict, optional): Message headers, defaults to {}. async_produce (boolean, optional): produce operation won't wait for broker acknowledgement msg_id (string, optional): Attach msg-id header to the message in order to achieve idempotency + buffer_limit_msg (int, optional): Limit the number of outstanding async produce tasks. + Calls with async_produce will block if the buffer is full. Raises: Exception: _description_ """ @@ -188,14 +192,22 @@ async def produce( if async_produce: try: - self.loop.create_task( - self.connection.broker_connection.publish( - self.internal_station_name + ".final", - message, - timeout=ack_wait_sec, - headers=headers, - ) - ) + task = self.loop.create_task( + self.connection.broker_connection.publish( + self.internal_station_name + ".final", + message, + timeout=ack_wait_sec, + headers=headers, + ) + ) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) + + if buffer_limit_msg is not None and len(self.background_tasks) >= buffer_limit_msg: + desired_size = buffer_limit_msg / 2 + while len(self.background_tasks) > desired_size: + await asyncio.sleep(0.1) + await asyncio.sleep(0) except Exception as e: raise MemphisError(e) From 4f7a3a63372212a13fe96aa3522ab2d35173db95 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Mon, 31 Jul 2023 12:52:41 -0500 Subject: [PATCH 2/5] Drain leftover buffer messages on destroy. Fixes #187 --- memphis/producer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/memphis/producer.py b/memphis/producer.py index db7755a..d34d1b5 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -287,6 +287,10 @@ async def produce( async def destroy(self): """Destroy the producer.""" try: + # drain buffered async messages + while len(self.background_tasks) > 0: + await asyncio.sleep(0.1) + destroy_producer_req = { "name": self.producer_name, "station_name": self.station_name, From 53242a3bdb0ebefa9e3abeff4adbfb3771dddb82 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Mon, 31 Jul 2023 12:54:41 -0500 Subject: [PATCH 3/5] Disable pylint warning --- .github/workflows/pylint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index e42908c..9bb7a01 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -24,4 +24,4 @@ jobs: python setup.py install - name: Analysing the code with pylint run: | - pylint $(git ls-files '*.py') --disable=C0111,R0902,R0913,W0718,C0121,W0201,R0903,C0301,W0707,R0914,R0912,C0103,R1710,R0911,W0719,R1720,E1101,R0801,R1705,R0904 --good-names=i,e,cg,t,s + pylint $(git ls-files '*.py') --disable=C0111,R0902,R0913,W0718,C0121,W0201,R0903,C0301,W0707,R0914,R0912,C0103,R1710,R0911,W0719,R1720,E1101,R0801,R1705,R0904,R0915 --good-names=i,e,cg,t,s From 2a3796b72ddc4ac0e3ca2674d7ba1b9e1fc66aa4 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Mon, 31 Jul 2023 13:14:46 -0500 Subject: [PATCH 4/5] Fixes #191 --- README.md | 20 ++++++++++++++++---- memphis/producer.py | 42 +++++++++++++++++++++++++++++++----------- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 7621869..1a9da6f 100644 --- a/README.md +++ b/README.md @@ -233,7 +233,7 @@ await memphis.produce(station_name='test_station_py', producer_name='prod_py', generate_random_suffix=False, #defaults to false ack_wait_sec=15, # defaults to 15 headers=headers, # default to {} - async_produce=False, #defaults to false + nonblocking=False, #defaults to false msg_id="123" ) ``` @@ -256,15 +256,27 @@ await producer.produce( headers=headers) # default to {} ``` -### Async produce -For better performance. The client won't block requests while waiting for an acknowledgment. +### Non-blocking Produce +For better performance, the client won't block requests while waiting for an acknowledgment. ```python await producer.produce( message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) - headers={}, async_produce=True) + headers={}, nonblocking=True) ``` +### Non-blocking Produce with Task Limits +For better performance, the client won't block requests while waiting for an acknowledgment. +If you are producing a large number of messages and see timeout errors, then you may need to +limit the number of concurrent tasks like so: + +```python +await producer.produce( + message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) + headers={}, nonblocking=True, limit_concurrent_tasks=500) +``` + + ### Message ID Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id diff --git a/memphis/producer.py b/memphis/producer.py index d34d1b5..f1516a4 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -4,6 +4,7 @@ import json import time from typing import Union +import warnings import graphql from graphql import parse as parse_graphql @@ -152,24 +153,38 @@ def validate_graphql(self, message): e = "Invalid message format, expected GraphQL" raise MemphisSchemaError("Schema validation has failed: " + str(e)) + # pylint: disable=R0913 async def produce( self, message, ack_wait_sec: int = 15, headers: Union[Headers, None] = None, - async_produce: bool = False, + async_produce: Union[bool, None] = None, + nonblocking: bool = False, msg_id: Union[str, None] = None, - buffer_limit_msg: Union[int, None] = None + concurrent_task_limit: Union[int, None] = None ): """Produces a message into a station. Args: - message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) - ack_wait_sec (int, optional): max time in seconds to wait for an ack from memphis. Defaults to 15. - headers (dict, optional): Message headers, defaults to {}. - async_produce (boolean, optional): produce operation won't wait for broker acknowledgement - msg_id (string, optional): Attach msg-id header to the message in order to achieve idempotency - buffer_limit_msg (int, optional): Limit the number of outstanding async produce tasks. - Calls with async_produce will block if the buffer is full. + message (bytearray/dict): message to send into the station + - bytearray/protobuf class + (schema validated station - protobuf) + - bytearray/dict (schema validated station - json schema) + - string/bytearray/graphql.language.ast.DocumentNode + (schema validated station - graphql schema) + ack_wait_sec (int, optional): max time in seconds to wait for an ack from the broker. + Defaults to 15 sec. + headers (dict, optional): message headers, defaults to {}. + async_produce (boolean, optional): produce operation won't block (wait) on message send. + This argument is deprecated. Please use the + `nonblocking` arguemnt instead. + nonblocking (boolean, optional): produce operation won't block (wait) on message send. + msg_id (string, optional): Attach msg-id header to the message in order to + achieve idempotency. + concurrent_task_limit (int, optional): Limit the number of outstanding async produce + tasks. Calls with nonblocking=True will block + if the limit is hit and will wait until the + buffer drains halfway down. Raises: Exception: _description_ """ @@ -191,6 +206,11 @@ async def produce( headers = memphis_headers if async_produce: + nonblocking = True + warnings.warn("The argument async_produce is deprecated. " + \ + "Please use the argument nonblocking instead.") + + if nonblocking: try: task = self.loop.create_task( self.connection.broker_connection.publish( @@ -203,8 +223,8 @@ async def produce( self.background_tasks.add(task) task.add_done_callback(self.background_tasks.discard) - if buffer_limit_msg is not None and len(self.background_tasks) >= buffer_limit_msg: - desired_size = buffer_limit_msg / 2 + if concurrent_task_limit is not None and len(self.background_tasks) >= concurrent_task_limit: + desired_size = concurrent_task_limit / 2 while len(self.background_tasks) > desired_size: await asyncio.sleep(0.1) From 085a09deecfecc362b8f547692efd28f2c8c4137 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Mon, 31 Jul 2023 13:19:11 -0500 Subject: [PATCH 5/5] Add comment --- memphis/producer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/memphis/producer.py b/memphis/producer.py index f1516a4..1609e5f 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -223,7 +223,9 @@ async def produce( self.background_tasks.add(task) task.add_done_callback(self.background_tasks.discard) - if concurrent_task_limit is not None and len(self.background_tasks) >= concurrent_task_limit: + # block until the number of outstanding async tasks is reduced + if concurrent_task_limit is not None and \ + len(self.background_tasks) >= concurrent_task_limit: desired_size = concurrent_task_limit / 2 while len(self.background_tasks) > desired_size: await asyncio.sleep(0.1)