Skip to content

Commit

Permalink
feat: Add API to enable pipe_target in enqueue call.
Browse files Browse the repository at this point in the history
  • Loading branch information
DanCardin committed Apr 1, 2024
1 parent 0cd8387 commit ca92e5a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "strapp"
version = "0.3.20"
version = "0.3.21"
description = ""
authors = []
packages = [
Expand Down
50 changes: 40 additions & 10 deletions src/strapp/dramatiq/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@


def configure(
*, redis_dsn=None, enable_datadog_middleware: bool = False, env: Optional[str] = None
*,
redis_dsn=None,
enable_datadog_middleware: bool = False,
env: Optional[str] = None,
) -> RedisBroker:
"""Configure a Redis broker.
Expand Down Expand Up @@ -86,7 +89,14 @@ def register(self, broker: dramatiq.Broker = None) -> dramatiq.Actor:
return actor_decorator(self.fn)


def enqueue(task_name, *, queue_name="default", broker=None, **kwargs) -> dramatiq.Message:
def enqueue(
task_name: str,
*,
queue_name="default",
broker=None,
pipe_target: dramatiq.Message | None,
**kwargs,
) -> dramatiq.Message:
"""Enqueue work onto the queue, by `task_name`.
The "default" behavior is to use the actual python function object `actor.send`. Given
Expand All @@ -97,19 +107,39 @@ def enqueue(task_name, *, queue_name="default", broker=None, **kwargs) -> dramat
task_name: The string name given to the `@actor` decorator.
queue_name: optional queue name. defaults to "default".
broker: Overrides the global broker
pipe_target: Optional pipe target. This is used to chain tasks together.
**kwargs: Passed through to the corresponding `@actor` function. Must be json serializable.
"""
if broker is None:
broker = dramatiq.get_broker()

return broker.enqueue(
dramatiq.Message(
queue_name=queue_name,
actor_name=task_name,
args=(),
kwargs=kwargs,
options={},
)
m = message(
queue_name=queue_name,
actor_name=task_name,
pipe_target=pipe_target,
**kwargs,
)
return broker.enqueue(m)


def message(
task_name,
*,
queue_name="default",
pipe_target: dramatiq.Message | None = None,
**kwargs,
) -> dramatiq.Message:
"""Create a dramatiq message."""
options = {}
if pipe_target:
options["pipe_target"] = pipe_target.asdict()

return dramatiq.Message(
queue_name=queue_name,
actor_name=task_name,
args=(),
kwargs=kwargs,
options=options,
)


Expand Down

0 comments on commit ca92e5a

Please sign in to comment.