From b196d8ae76925c4323ea8d0f082e60f12b499382 Mon Sep 17 00:00:00 2001 From: Anfimov Dima Date: Tue, 18 Nov 2025 10:55:14 +0100 Subject: [PATCH] feat: broker can be passed to WorkerArgs --- taskiq/cli/worker/args.py | 5 +++-- taskiq/cli/worker/run.py | 19 +++++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/taskiq/cli/worker/args.py b/taskiq/cli/worker/args.py index ca44e9da..054e181f 100644 --- a/taskiq/cli/worker/args.py +++ b/taskiq/cli/worker/args.py @@ -1,7 +1,8 @@ from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser from dataclasses import dataclass, field -from typing import List, Optional, Sequence, Tuple +from typing import List, Optional, Sequence, Tuple, Union +from taskiq.abc.broker import AsyncBroker from taskiq.acks import AcknowledgeType from taskiq.cli.common_args import LogLevel @@ -24,7 +25,7 @@ def receiver_arg_type(string: str) -> Tuple[str, str]: class WorkerArgs: """Taskiq worker CLI arguments.""" - broker: str + broker: Union[str, AsyncBroker] modules: List[str] app_dir: Optional[str] = None tasks_pattern: Sequence[str] = ("**/tasks.py",) diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index bd824896..75d73baa 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -133,14 +133,17 @@ def interrupt_handler(signum: int, _frame: Any) -> None: # We must set this field before importing tasks, # so broker will remember all tasks it's related to. - broker = import_object(args.broker, app_dir=args.app_dir) - if inspect.isfunction(broker): - broker = broker() - if not isinstance(broker, AsyncBroker): - raise ValueError( - "Unknown broker type. Please use AsyncBroker instance " - "or pass broker factory function that returns an AsyncBroker instance.", - ) + if isinstance(args.broker, AsyncBroker): + broker = args.broker + else: + broker = import_object(args.broker, app_dir=args.app_dir) + if inspect.isfunction(broker): + broker = broker() + if not isinstance(broker, AsyncBroker): + raise ValueError( + "Unknown broker type. Please use AsyncBroker instance " + "or pass broker factory function that returns an AsyncBroker instance.", + ) broker.is_worker_process = True import_tasks(args.modules, args.tasks_pattern, args.fs_discover)