Skip to content

Commit

Permalink
feat: option to disable scheduler in WorkerPool
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush committed Mar 16, 2024
1 parent bab0061 commit 5390a72
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
3 changes: 3 additions & 0 deletions rq/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ def enqueue(
@click.option('--job-class', type=str, default=None, help='Dotted path to a Job class')
@click.argument('queues', nargs=-1)
@click.option('--num-workers', '-n', type=int, default=1, help='Number of workers to start')
@click.option('--disable-scheduler', is_flag=True, help='Run worker pool with scheduler')
@pass_cli_config
def worker_pool(
cli_config,
Expand All @@ -472,6 +473,7 @@ def worker_pool(
worker_class,
job_class,
num_workers,
disable_scheduler,
**options,
):
"""Starts a RQ worker pool"""
Expand Down Expand Up @@ -510,6 +512,7 @@ def worker_pool(
serializer=serializer_class,
worker_class=worker_class,
job_class=job_class,
with_scheduler=not disable_scheduler,
)
pool.start(burst=burst, logging_level=logging_level)

Expand Down
6 changes: 5 additions & 1 deletion rq/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(
worker_class: Type[BaseWorker] = Worker,
serializer: Type[DefaultSerializer] = DefaultSerializer,
job_class: Type[Job] = Job,
with_scheduler: bool = True,
*args,
**kwargs,
):
Expand All @@ -59,6 +60,7 @@ def __init__(
self.worker_class: Type[BaseWorker] = worker_class
self.serializer: Type[DefaultSerializer] = serializer
self.job_class: Type[Job] = job_class
self.with_scheduler = with_scheduler

# A dictionary of WorkerData keyed by worker name
self.worker_dict: Dict[str, WorkerData] = {}
Expand Down Expand Up @@ -159,6 +161,7 @@ def get_worker_process(
'worker_class': self.worker_class,
'job_class': self.job_class,
'serializer': self.serializer,
'with_scheduler': self.with_scheduler,
},
name=f'Worker {name} (WorkerPool {self.name})',
)
Expand Down Expand Up @@ -249,6 +252,7 @@ def run_worker(
burst: bool = True,
logging_level: str = "INFO",
_sleep: int = 0,
with_scheduler=True,
):
connection = connection_class(
connection_pool=ConnectionPool(connection_class=connection_pool_class, **connection_pool_kwargs)
Expand All @@ -257,4 +261,4 @@ def run_worker(
worker = worker_class(queues, name=worker_name, connection=connection, serializer=serializer, job_class=job_class)
worker.log.info("Starting worker started with PID %s", os.getpid())
time.sleep(_sleep)
worker.work(burst=burst, with_scheduler=True, logging_level=logging_level)
worker.work(burst=burst, with_scheduler=with_scheduler, logging_level=logging_level)
16 changes: 16 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,3 +859,19 @@ def test_worker_pool_logging_options(self):
# --quiet and --verbose are mutually exclusive
result = runner.invoke(main, args + ['--quiet', '--verbose'])
self.assertNotEqual(result.exit_code, 0)

def test_worker_pool_without_scheduler(self):
"""rq worker --disable-scheduler disables scheduler"""
queue = Queue(connection=self.connection)
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello)
registry = ScheduledJobRegistry(queue=queue)

runner = CliRunner()

result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--disable-scheduler'])
self.assert_normal_execution(result)
self.assertEqual(len(registry), 1) # 1 job still scheduled

result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--with-scheduler'])
self.assert_normal_execution(result)
self.assertEqual(len(registry), 0) # Job has been enqueued

0 comments on commit 5390a72

Please sign in to comment.