Skip to content

Commit 25ec684

Browse files
committedMar 18, 2025
Add max_attempts_at_message
1 parent 319ac9f commit 25ec684

File tree

9 files changed

+303
-31
lines changed

9 files changed

+303
-31
lines changed
 

‎docs/examples/extending/broker.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import AsyncGenerator, Union
22

3-
from taskiq import AckableMessage, AsyncBroker, BrokerMessage
3+
from taskiq import WrappedMessage, AsyncBroker, BrokerMessage
44

55

66
class MyBroker(AsyncBroker):
@@ -23,7 +23,7 @@ async def kick(self, message: BrokerMessage) -> None:
2323
# Send a message.message.
2424
pass
2525

26-
async def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]:
26+
async def listen(self) -> AsyncGenerator[Union[bytes, WrappedMessage], None]:
2727
while True:
2828
# Get new message.
2929
new_message: bytes = ... # type: ignore

‎taskiq/__init__.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from taskiq.abc.middleware import TaskiqMiddleware
99
from taskiq.abc.result_backend import AsyncResultBackend
1010
from taskiq.abc.schedule_source import ScheduleSource
11-
from taskiq.acks import AckableMessage
1211
from taskiq.brokers.inmemory_broker import InMemoryBroker
1312
from taskiq.brokers.shared_broker import async_shared_broker
1413
from taskiq.brokers.zmq_broker import ZeroMQBroker
@@ -24,7 +23,7 @@
2423
TaskiqResultTimeoutError,
2524
)
2625
from taskiq.funcs import gather
27-
from taskiq.message import BrokerMessage, TaskiqMessage
26+
from taskiq.message import BrokerMessage, MessageMetadata, TaskiqMessage, WrappedMessage
2827
from taskiq.middlewares.prometheus_middleware import PrometheusMiddleware
2928
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
3029
from taskiq.result import TaskiqResult
@@ -35,7 +34,6 @@
3534

3635
__version__ = version("taskiq")
3736
__all__ = [
38-
"AckableMessage",
3937
"AsyncBroker",
4038
"AsyncResultBackend",
4139
"AsyncTaskiqDecoratedTask",
@@ -63,6 +61,8 @@
6361
"TaskiqScheduler",
6462
"TaskiqState",
6563
"ZeroMQBroker",
64+
"WrappedMessage",
65+
"MessageMetadata",
6666
"__version__",
6767
"async_shared_broker",
6868
"gather",

‎taskiq/abc/broker.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,10 @@
2626

2727
from taskiq.abc.middleware import TaskiqMiddleware
2828
from taskiq.abc.serializer import TaskiqSerializer
29-
from taskiq.acks import AckableMessage
3029
from taskiq.decor import AsyncTaskiqDecoratedTask
3130
from taskiq.events import TaskiqEvents
3231
from taskiq.formatters.proxy_formatter import ProxyFormatter
33-
from taskiq.message import BrokerMessage
32+
from taskiq.message import BrokerMessage, WrappedMessage
3433
from taskiq.result_backends.dummy import DummyResultBackend
3534
from taskiq.serializers.json_serializer import JSONSerializer
3635
from taskiq.state import TaskiqState
@@ -77,6 +76,7 @@ def __init__(
7776
self,
7877
result_backend: "Optional[AsyncResultBackend[_T]]" = None,
7978
task_id_generator: Optional[Callable[[], str]] = None,
79+
max_attempts_at_message: Optional[int] = None,
8080
) -> None:
8181
if result_backend is None:
8282
result_backend = DummyResultBackend()
@@ -113,6 +113,7 @@ def __init__(
113113
self.state = TaskiqState()
114114
self.custom_dependency_context: Dict[Any, Any] = {}
115115
self.dependency_overrides: Dict[Any, Any] = {}
116+
self.max_attempts_at_message = max_attempts_at_message
116117
# True only if broker runs in worker process.
117118
self.is_worker_process = False
118119
# True only if broker runs in scheduler process.
@@ -237,18 +238,20 @@ async def kick(
237238
"""
238239

239240
@abstractmethod
240-
def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]:
241+
def listen(self) -> AsyncGenerator[Union[bytes, WrappedMessage], None]:
241242
"""
242243
This function listens to new messages and yields them.
243244
244245
This it the main point for workers.
245246
This function is used to get new tasks from the network.
246247
247-
If your broker support acknowledgement, then you
248-
should wrap your message in AckableMessage dataclass.
248+
If your broker support acknowledgements (or negative acknowledgements),
249+
then the returned message should implement the AckableMessage
250+
(or NackableMessage) interface by implementing the `ack` (or
251+
`nack`) callback.
249252
250-
If your messages was wrapped in AckableMessage dataclass,
251-
taskiq will call ack when finish processing message.
253+
If your message has an `ack` callbacks it will be called after the
254+
message is processed.
252255
253256
:yield: incoming messages.
254257
:return: nothing.

‎taskiq/acks.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,13 @@ class AckableMessage(BaseModel):
3333
as a whole.
3434
"""
3535

36-
data: bytes
3736
ack: Callable[[], Union[None, Awaitable[None]]]
37+
38+
39+
class NackableMessage(BaseModel):
40+
"""
41+
Message that can be negatively acknowledged, e.g.
42+
sent to a dead-letter queue, etc.
43+
"""
44+
45+
nack: Callable[[], Union[None, Awaitable[None]]]

‎taskiq/cli/worker/run.py

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
148148
ack_type=args.ack_type,
149149
max_tasks_to_execute=args.max_tasks_per_child,
150150
wait_tasks_timeout=args.wait_tasks_timeout,
151+
max_attempts_at_message=broker.max_attempts_at_message,
151152
**receiver_kwargs, # type: ignore
152153
)
153154
loop.run_until_complete(receiver.listen(shutdown_event))

‎taskiq/message.py

+40
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from pydantic import BaseModel
44

5+
from taskiq.acks import AckableMessage, NackableMessage
56
from taskiq.labels import parse_label
67

78

@@ -42,3 +43,42 @@ class BrokerMessage(BaseModel):
4243
task_name: str
4344
message: bytes
4445
labels: Dict[str, Any]
46+
47+
48+
class MessageMetadata(BaseModel):
49+
"""Incoming message metadata."""
50+
51+
delivery_count: Optional[int] = None
52+
53+
54+
class WrappedMessage(BaseModel): # noqa: D101
55+
message: bytes
56+
57+
58+
class MessageWithMetadata(BaseModel): # noqa: D101
59+
metadata: MessageMetadata
60+
61+
62+
class WrappedMessageWithMetadata(WrappedMessage, MessageWithMetadata): # noqa: D101
63+
...
64+
65+
66+
class AckableWrappedMessage(WrappedMessage, AckableMessage): # noqa: D101
67+
...
68+
69+
70+
class AckableWrappedMessageWithMetadata( # noqa: D101
71+
WrappedMessage,
72+
AckableMessage,
73+
MessageWithMetadata,
74+
):
75+
...
76+
77+
78+
class AckableNackableWrappedMessageWithMetadata( # noqa: D101
79+
WrappedMessage,
80+
AckableMessage,
81+
NackableMessage,
82+
MessageWithMetadata,
83+
):
84+
...

‎taskiq/receiver/receiver.py

+42-9
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88
import anyio
99
from taskiq_dependencies import DependencyGraph
1010

11-
from taskiq.abc.broker import AckableMessage, AsyncBroker
11+
from taskiq.abc.broker import AsyncBroker
1212
from taskiq.abc.middleware import TaskiqMiddleware
13-
from taskiq.acks import AcknowledgeType
13+
from taskiq.acks import AckableMessage, AcknowledgeType, NackableMessage
1414
from taskiq.context import Context
1515
from taskiq.exceptions import NoResultError
16-
from taskiq.message import TaskiqMessage
16+
from taskiq.message import MessageWithMetadata, TaskiqMessage, WrappedMessage
1717
from taskiq.receiver.params_parser import parse_params
1818
from taskiq.result import TaskiqResult
1919
from taskiq.state import TaskiqState
@@ -58,6 +58,7 @@ def __init__(
5858
on_exit: Optional[Callable[["Receiver"], None]] = None,
5959
max_tasks_to_execute: Optional[int] = None,
6060
wait_tasks_timeout: Optional[float] = None,
61+
max_attempts_at_message: Optional[int] = None,
6162
) -> None:
6263
self.broker = broker
6364
self.executor = executor
@@ -72,6 +73,7 @@ def __init__(
7273
self.known_tasks: Set[str] = set()
7374
self.max_tasks_to_execute = max_tasks_to_execute
7475
self.wait_tasks_timeout = wait_tasks_timeout
76+
self.max_attempts_at_message = max_attempts_at_message
7577
for task in self.broker.get_all_tasks().values():
7678
self._prepare_task(task.task_name, task.original_func)
7779
self.sem: "Optional[asyncio.Semaphore]" = None
@@ -86,7 +88,7 @@ def __init__(
8688

8789
async def callback( # noqa: C901, PLR0912
8890
self,
89-
message: Union[bytes, AckableMessage],
91+
message: Union[bytes, WrappedMessage],
9092
raise_err: bool = False,
9193
) -> None:
9294
"""
@@ -101,7 +103,38 @@ async def callback( # noqa: C901, PLR0912
101103
:param raise_err: raise an error if cannot save result in
102104
result_backend.
103105
"""
104-
message_data = message.data if isinstance(message, AckableMessage) else message
106+
message_data = (
107+
message.message if isinstance(message, WrappedMessage) else message
108+
)
109+
if isinstance(message, MessageWithMetadata):
110+
message_metadata = message.metadata
111+
else:
112+
message_metadata = None
113+
114+
delivery_count = message_metadata.delivery_count if message_metadata else None
115+
if (
116+
delivery_count
117+
and self.max_attempts_at_message
118+
and delivery_count >= self.max_attempts_at_message
119+
):
120+
logger.error(
121+
"Permitted number of attempts at processing message %s "
122+
"has been exhausted after %s attempts.",
123+
message_data,
124+
self.max_attempts_at_message,
125+
)
126+
if isinstance(
127+
message,
128+
NackableMessage,
129+
):
130+
await maybe_awaitable(message.nack())
131+
elif isinstance(
132+
message,
133+
AckableMessage,
134+
):
135+
await maybe_awaitable(message.ack())
136+
return
137+
105138
try:
106139
taskiq_msg = self.broker.formatter.loads(message=message_data)
107140
taskiq_msg.parse_labels()
@@ -331,7 +364,7 @@ async def listen(self, finish_event: asyncio.Event) -> None: # pragma: no cover
331364
if self.run_startup:
332365
await self.broker.startup()
333366
logger.info("Listening started.")
334-
queue: "asyncio.Queue[Union[bytes, AckableMessage]]" = asyncio.Queue()
367+
queue: "asyncio.Queue[Union[bytes, WrappedMessage]]" = asyncio.Queue()
335368

336369
async with anyio.create_task_group() as gr:
337370
gr.start_soon(self.prefetcher, queue, finish_event)
@@ -342,7 +375,7 @@ async def listen(self, finish_event: asyncio.Event) -> None: # pragma: no cover
342375

343376
async def prefetcher(
344377
self,
345-
queue: "asyncio.Queue[Union[bytes, AckableMessage]]",
378+
queue: "asyncio.Queue[Union[bytes, WrappedMessage]]",
346379
finish_event: asyncio.Event,
347380
) -> None:
348381
"""
@@ -354,7 +387,7 @@ async def prefetcher(
354387
fetched_tasks: int = 0
355388
iterator = self.broker.listen()
356389
current_message: asyncio.Task[
357-
Union[bytes, AckableMessage]
390+
Union[bytes, WrappedMessage]
358391
] = asyncio.create_task(
359392
iterator.__anext__(), # type: ignore
360393
)
@@ -394,7 +427,7 @@ async def prefetcher(
394427

395428
async def runner(
396429
self,
397-
queue: "asyncio.Queue[Union[bytes, AckableMessage]]",
430+
queue: "asyncio.Queue[Union[bytes, WrappedMessage]]",
398431
) -> None:
399432
"""
400433
Run tasks.

0 commit comments

Comments
 (0)
Failed to load comments.