Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ jobs:
python setup.py install
- name: Analysing the code with pylint
run: |
pylint $(git ls-files '*.py') --disable=C0111,R0902,R0913,W0718,C0121,W0201,R0903,C0301,W0707,R0914,R0912,C0103,R1710,R0911,W0719,R1720,E1101,R0801,R1705,R0904 --good-names=i,e,cg,t,s
pylint $(git ls-files '*.py') --disable=C0111,R0902,R0913,W0718,C0121,W0201,R0903,C0301,W0707,R0914,R0912,C0103,R1710,R0911,W0719,R1720,E1101,R0801,R1705,R0904,R0915 --good-names=i,e,cg,t,s
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ await memphis.produce(station_name='test_station_py', producer_name='prod_py',
generate_random_suffix=False, #defaults to false
ack_wait_sec=15, # defaults to 15
headers=headers, # default to {}
async_produce=False, #defaults to false
nonblocking=False, #defaults to false
msg_id="123"
)
```
Expand All @@ -256,15 +256,27 @@ await producer.produce(
headers=headers) # default to {}
```

### Async produce
For better performance. The client won't block requests while waiting for an acknowledgment.
### Non-blocking Produce
For better performance, the client won't block requests while waiting for an acknowledgment.

```python
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
headers={}, async_produce=True)
headers={}, nonblocking=True)
```

### Non-blocking Produce with Task Limits
For better performance, the client won't block requests while waiting for an acknowledgment.
If you are producing a large number of messages and see timeout errors, then you may need to
limit the number of concurrent tasks like so:

```python
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
headers={}, nonblocking=True, limit_concurrent_tasks=500)
```


### Message ID
Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id

Expand Down
66 changes: 52 additions & 14 deletions memphis/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import time
from typing import Union
import warnings

import graphql
from graphql import parse as parse_graphql
Expand All @@ -27,6 +28,7 @@ def __init__(
self.internal_station_name = get_internal_name(self.station_name)
self.loop = asyncio.get_running_loop()
self.real_name = real_name
self.background_tasks = set()

async def validate_msg(self, message):
if self.connection.schema_updates_data[self.internal_station_name] != {}:
Expand Down Expand Up @@ -151,21 +153,38 @@ def validate_graphql(self, message):
e = "Invalid message format, expected GraphQL"
raise MemphisSchemaError("Schema validation has failed: " + str(e))

# pylint: disable=R0913
async def produce(
self,
message,
ack_wait_sec: int = 15,
headers: Union[Headers, None] = None,
async_produce: bool = False,
async_produce: Union[bool, None] = None,
nonblocking: bool = False,
msg_id: Union[str, None] = None,
concurrent_task_limit: Union[int, None] = None
):
"""Produces a message into a station.
Args:
message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
ack_wait_sec (int, optional): max time in seconds to wait for an ack from memphis. Defaults to 15.
headers (dict, optional): Message headers, defaults to {}.
async_produce (boolean, optional): produce operation won't wait for broker acknowledgement
msg_id (string, optional): Attach msg-id header to the message in order to achieve idempotency
message (bytearray/dict): message to send into the station
- bytearray/protobuf class
(schema validated station - protobuf)
- bytearray/dict (schema validated station - json schema)
- string/bytearray/graphql.language.ast.DocumentNode
(schema validated station - graphql schema)
ack_wait_sec (int, optional): max time in seconds to wait for an ack from the broker.
Defaults to 15 sec.
headers (dict, optional): message headers, defaults to {}.
async_produce (boolean, optional): produce operation won't block (wait) on message send.
This argument is deprecated. Please use the
`nonblocking` arguemnt instead.
nonblocking (boolean, optional): produce operation won't block (wait) on message send.
msg_id (string, optional): Attach msg-id header to the message in order to
achieve idempotency.
concurrent_task_limit (int, optional): Limit the number of outstanding async produce
tasks. Calls with nonblocking=True will block
if the limit is hit and will wait until the
buffer drains halfway down.
Raises:
Exception: _description_
"""
Expand All @@ -187,15 +206,30 @@ async def produce(
headers = memphis_headers

if async_produce:
nonblocking = True
warnings.warn("The argument async_produce is deprecated. " + \
"Please use the argument nonblocking instead.")

if nonblocking:
try:
self.loop.create_task(
self.connection.broker_connection.publish(
self.internal_station_name + ".final",
message,
timeout=ack_wait_sec,
headers=headers,
)
)
task = self.loop.create_task(
self.connection.broker_connection.publish(
self.internal_station_name + ".final",
message,
timeout=ack_wait_sec,
headers=headers,
)
)
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)

# block until the number of outstanding async tasks is reduced
if concurrent_task_limit is not None and \
len(self.background_tasks) >= concurrent_task_limit:
desired_size = concurrent_task_limit / 2
while len(self.background_tasks) > desired_size:
await asyncio.sleep(0.1)

await asyncio.sleep(0)
except Exception as e:
raise MemphisError(e)
Expand Down Expand Up @@ -275,6 +309,10 @@ async def produce(
async def destroy(self):
"""Destroy the producer."""
try:
# drain buffered async messages
while len(self.background_tasks) > 0:
await asyncio.sleep(0.1)

destroy_producer_req = {
"name": self.producer_name,
"station_name": self.station_name,
Expand Down