Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: option to disable scheduler in WorkerPool #2059

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ on:
permissions:
contents: read # to fetch code (actions/checkout)

# cancel previous test builds if new commit is pushed.
concurrency:
group: workflow-master-${{ github.event_name }}-${{ github.event.number }}
cancel-in-progress: true
ankush marked this conversation as resolved.
Show resolved Hide resolved

jobs:
ssl-test:
name: Run SSL tests
Expand Down
1 change: 1 addition & 0 deletions docs/docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,4 @@ Options:
* `-S` or `--serializer <path.to.Serializer>`: defaults to `rq.serializers.DefaultSerializer`. `rq.serializers.JSONSerializer` is also included.
* `-P` or `--path <path>`: multiple import paths are supported (e.g `rq worker --path foo --path bar`).
* `-j` or `--job-class <path.to.Job>`: defaults to `rq.job.Job`.
* `--no-scheduler`: Disable scheduler. WorkerPool starts with scheduler by default.
3 changes: 3 additions & 0 deletions rq/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ def enqueue(
@click.option('--job-class', type=str, default=None, help='Dotted path to a Job class')
@click.argument('queues', nargs=-1)
@click.option('--num-workers', '-n', type=int, default=1, help='Number of workers to start')
@click.option('--no-scheduler', is_flag=True, help='Run worker pool without scheduler')
@pass_cli_config
def worker_pool(
cli_config,
Expand All @@ -472,6 +473,7 @@ def worker_pool(
worker_class,
job_class,
num_workers,
no_scheduler,
**options,
):
"""Starts a RQ worker pool"""
Expand Down Expand Up @@ -510,6 +512,7 @@ def worker_pool(
serializer=serializer_class,
worker_class=worker_class,
job_class=job_class,
with_scheduler=not no_scheduler,
)
pool.start(burst=burst, logging_level=logging_level)

Expand Down
6 changes: 5 additions & 1 deletion rq/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(
worker_class: Type[BaseWorker] = Worker,
serializer: Type[DefaultSerializer] = DefaultSerializer,
job_class: Type[Job] = Job,
with_scheduler: bool = True,
*args,
**kwargs,
):
Expand All @@ -59,6 +60,7 @@ def __init__(
self.worker_class: Type[BaseWorker] = worker_class
self.serializer: Type[DefaultSerializer] = serializer
self.job_class: Type[Job] = job_class
self.with_scheduler = with_scheduler

# A dictionary of WorkerData keyed by worker name
self.worker_dict: Dict[str, WorkerData] = {}
Expand Down Expand Up @@ -159,6 +161,7 @@ def get_worker_process(
'worker_class': self.worker_class,
'job_class': self.job_class,
'serializer': self.serializer,
'with_scheduler': self.with_scheduler,
},
name=f'Worker {name} (WorkerPool {self.name})',
)
Expand Down Expand Up @@ -249,6 +252,7 @@ def run_worker(
burst: bool = True,
logging_level: str = "INFO",
_sleep: int = 0,
with_scheduler=True,
):
connection = connection_class(
connection_pool=ConnectionPool(connection_class=connection_pool_class, **connection_pool_kwargs)
Expand All @@ -257,4 +261,4 @@ def run_worker(
worker = worker_class(queues, name=worker_name, connection=connection, serializer=serializer, job_class=job_class)
worker.log.info("Starting worker started with PID %s", os.getpid())
time.sleep(_sleep)
worker.work(burst=burst, with_scheduler=True, logging_level=logging_level)
worker.work(burst=burst, with_scheduler=with_scheduler, logging_level=logging_level)
16 changes: 16 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,3 +859,19 @@ def test_worker_pool_logging_options(self):
# --quiet and --verbose are mutually exclusive
result = runner.invoke(main, args + ['--quiet', '--verbose'])
self.assertNotEqual(result.exit_code, 0)

def test_worker_pool_without_scheduler(self):
"""rq worker --no-scheduler disables scheduler"""
queue = Queue(connection=self.connection)
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello)
registry = ScheduledJobRegistry(queue=queue)

runner = CliRunner()

result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--no-scheduler'])
self.assert_normal_execution(result)
self.assertEqual(len(registry), 1) # 1 job still scheduled

result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b'])
self.assert_normal_execution(result)
self.assertEqual(len(registry), 0) # Job has been enqueued