diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index fdc88a7ca..da14dd462 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -11,6 +11,11 @@ on: permissions: contents: read # to fetch code (actions/checkout) +# cancel previous test builds if new commit is pushed. +concurrency: + group: workflow-master-${{ github.event_name }}-${{ github.event.number }} + cancel-in-progress: true + jobs: ssl-test: name: Run SSL tests diff --git a/docs/docs/workers.md b/docs/docs/workers.md index 262991c93..1521fe647 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -492,3 +492,4 @@ Options: * `-S` or `--serializer `: defaults to `rq.serializers.DefaultSerializer`. `rq.serializers.JSONSerializer` is also included. * `-P` or `--path `: multiple import paths are supported (e.g `rq worker --path foo --path bar`). * `-j` or `--job-class `: defaults to `rq.job.Job`. +* `--no-scheduler`: Disable scheduler. WorkerPool starts with scheduler by default. diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 6b32a571d..7cf2ae55d 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -455,6 +455,7 @@ def enqueue( @click.option('--job-class', type=str, default=None, help='Dotted path to a Job class') @click.argument('queues', nargs=-1) @click.option('--num-workers', '-n', type=int, default=1, help='Number of workers to start') +@click.option('--no-scheduler', is_flag=True, help='Run worker pool without scheduler') @pass_cli_config def worker_pool( cli_config, @@ -472,6 +473,7 @@ def worker_pool( worker_class, job_class, num_workers, + no_scheduler, **options, ): """Starts a RQ worker pool""" @@ -510,6 +512,7 @@ def worker_pool( serializer=serializer_class, worker_class=worker_class, job_class=job_class, + with_scheduler=not no_scheduler, ) pool.start(burst=burst, logging_level=logging_level) diff --git a/rq/worker_pool.py b/rq/worker_pool.py index 92a7f2a15..999996058 100644 --- a/rq/worker_pool.py +++ b/rq/worker_pool.py @@ -42,6 +42,7 @@ def __init__( worker_class: Type[BaseWorker] = Worker, serializer: Type[DefaultSerializer] = DefaultSerializer, job_class: Type[Job] = Job, + with_scheduler: bool = True, *args, **kwargs, ): @@ -59,6 +60,7 @@ def __init__( self.worker_class: Type[BaseWorker] = worker_class self.serializer: Type[DefaultSerializer] = serializer self.job_class: Type[Job] = job_class + self.with_scheduler = with_scheduler # A dictionary of WorkerData keyed by worker name self.worker_dict: Dict[str, WorkerData] = {} @@ -159,6 +161,7 @@ def get_worker_process( 'worker_class': self.worker_class, 'job_class': self.job_class, 'serializer': self.serializer, + 'with_scheduler': self.with_scheduler, }, name=f'Worker {name} (WorkerPool {self.name})', ) @@ -249,6 +252,7 @@ def run_worker( burst: bool = True, logging_level: str = "INFO", _sleep: int = 0, + with_scheduler=True, ): connection = connection_class( connection_pool=ConnectionPool(connection_class=connection_pool_class, **connection_pool_kwargs) @@ -257,4 +261,4 @@ def run_worker( worker = worker_class(queues, name=worker_name, connection=connection, serializer=serializer, job_class=job_class) worker.log.info("Starting worker started with PID %s", os.getpid()) time.sleep(_sleep) - worker.work(burst=burst, with_scheduler=True, logging_level=logging_level) + worker.work(burst=burst, with_scheduler=with_scheduler, logging_level=logging_level) diff --git a/tests/test_cli.py b/tests/test_cli.py index fbdb4c464..ae9e287e3 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -859,3 +859,19 @@ def test_worker_pool_logging_options(self): # --quiet and --verbose are mutually exclusive result = runner.invoke(main, args + ['--quiet', '--verbose']) self.assertNotEqual(result.exit_code, 0) + + def test_worker_pool_without_scheduler(self): + """rq worker --no-scheduler disables scheduler""" + queue = Queue(connection=self.connection) + queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello) + registry = ScheduledJobRegistry(queue=queue) + + runner = CliRunner() + + result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--no-scheduler']) + self.assert_normal_execution(result) + self.assertEqual(len(registry), 1) # 1 job still scheduled + + result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b']) + self.assert_normal_execution(result) + self.assertEqual(len(registry), 0) # Job has been enqueued