Skip to content

Commit

Permalink
Worker pool (#1874)
Browse files Browse the repository at this point in the history
* First stab at implementating worker pool

* Use process.is_alive() to check whether a process is still live

* Handle shutdown signal

* Check worker loop done

* First working version of `WorkerPool`.

* Added test for check_workers()

* Added test for pool.start()

* Better shutdown process

* Comment out test_start() to see if it fixes CI

* Make tests pass

* Make CI pass

* Comment out some tests

* Comment out more tests

* Re-enable a test

* Re-enable another test

* Uncomment check_workers test

* Added run_worker test

* Minor modification to dead worker detection

* More test cases

* Better process name for workers

* Added back pool.stop_workers() when signal is received

* Cleaned up cli.py

* WIP on worker-pool command

* Fix test

* Test that worker pool ignores consecutive shutdown signals

* Added test for worker-pool CLI command.

* Added timeout to CI jobs

* Fix worker pool test

* Comment out test_scheduler.py

* Fixed worker-pool in burst mode

* Increase test coverage

* Exclude tests directory from coverage.py

* Improve test coverage

* Renamed `Pool(num_workers=2) to `Pool(size=2)`

* Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`"

This reverts commit a1306f8.

* Renamed Pool to WorkerPool

* Added a new TestCase that doesn't use LocalStack

* Added job_class, worker_class and serializer arguments to WorkerPool

* Use parse_connection() in WorkerPool.__init__

* Added CLI arguments for worker-pool

* Minor WorkerPool and test fixes

* Fixed failing CLI test

* Document WorkerPool
  • Loading branch information
selwin committed May 1, 2023
1 parent 8a9daec commit 64cb1a2
Show file tree
Hide file tree
Showing 23 changed files with 1,558 additions and 961 deletions.
5 changes: 4 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ omit =

[report]
exclude_lines =
if TYPE_CHECKING:
if TYPE_CHECKING:
pragma: no cover
if __name__ == .__main__.:

1 change: 1 addition & 0 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
build:
name: Python${{ matrix.python-version }}/Redis${{ matrix.redis-version }}/redis-py${{ matrix.redis-py-version }}
runs-on: ubuntu-20.04
timeout-minutes: 10
strategy:
matrix:
python-version: ["3.6", "3.7", "3.8.3", "3.9", "3.10", "3.11"]
Expand Down
74 changes: 44 additions & 30 deletions docs/docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ You should use process managers like [Supervisor](/patterns/supervisor/) or
### Burst Mode

By default, workers will start working immediately and will block and wait for
new work when they run out of work. Workers can also be started in _burst
new work when they run out of work. Workers can also be started in _burst
mode_ to finish all currently available work and quit as soon as all given
queues are emptied.

Expand All @@ -58,6 +58,7 @@ just to scale up your workers temporarily during peak periods.
In addition to `--burst`, `rq worker` also accepts these arguments:

* `--url` or `-u`: URL describing Redis connection details (e.g `rq worker --url redis://:secrets@example.com:1234/9` or `rq worker --url unix:///var/run/redis/redis.sock`)
* `--burst` or `-b`: run worker in burst mode (stops after all jobs in queue have been processed).
* `--path` or `-P`: multiple import paths are supported (e.g `rq worker --path foo --path bar`)
* `--config` or `-c`: path to module containing RQ settings.
* `--results-ttl`: job results will be kept for this number of seconds (defaults to 500).
Expand Down Expand Up @@ -155,8 +156,6 @@ worker = Worker([queue], connection=redis, name='foo')

### Retrieving Worker Information

_Updated in version 0.10.0._

`Worker` instances store their runtime information in Redis. Here's how to
retrieve them:

Expand All @@ -173,6 +172,10 @@ queue = Queue('queue_name')
workers = Worker.all(queue=queue)
worker = workers[0]
print(worker.name)

print('Successful jobs: ' + worker.successful_job_count)
print('Failed jobs: ' + worker.failed_job_count)
print('Total working time: '+ worker.total_working_time) # In seconds
```

Aside from `worker.name`, worker also have the following properties:
Expand Down Expand Up @@ -230,20 +233,6 @@ w = Queue('foo', serializer=JSONSerializer)
Queues will now use custom serializer


### Worker Statistics

If you want to check the utilization of your queues, `Worker` instances
store a few useful information:

```python
from rq.worker import Worker
worker = Worker.find_by_key('rq:worker:name')

worker.successful_job_count # Number of jobs finished successfully
worker.failed_job_count # Number of failed jobs processed by this worker
worker.total_working_time # Amount of time spent executing jobs (in seconds)
```

## Better worker process title
Worker process will have a better title (as displayed by system tools such as ps and top)
after you installed a third-party package `setproctitle`:
Expand Down Expand Up @@ -318,22 +307,19 @@ $ rq worker -w 'path.to.GeventWorker'
```


## Round Robin and Random strategies for dequeuing jobs from queues
## Strategies for Dequeuing Jobs from Queues

The default worker considers the order of queues as their priority order,
and if a task is pending in a higher priority queue
it will be selected before any other in queues with lower priority (the `default` behavior).
To choose the strategy that should be used, `rq` provides the `--dequeue-strategy / -ds` option.
The default worker considers the order of queues as their priority order.
That's to say if the supplied queues are `rq worker high low`, the worker will
prioritize dequeueing jobs from `high` before `low`. To choose a different strategy,
`rq` provides the `--dequeue-strategy / -ds` option.

In certain circumstances it can be useful that a when a worker is listening to multiple queues,
say `q1`,`q2`,`q3`, the jobs are dequeued using a Round Robin strategy. That is, the 1st
dequeued job is taken from `q1`, the 2nd from `q2`, the 3rd from `q3`, the 4th
from `q1`, the 5th from `q2` and so on. To implement this strategy use `-ds round_robin` argument.
In certain circumstances, you may want to dequeue jobs in a round robin fashion. For example,
when you have `q1`,`q2`,`q3`, the 1st dequeued job is taken from `q1`, the 2nd from `q2`,
the 3rd from `q3`, the 4th from `q1`, the 5th from `q2` and so on.
To implement this strategy use `-ds round_robin` argument.

In other circumstances, it can be useful to pull jobs from the different queues randomly.
To implement this strategy use `-ds random` argument.
In fact, whenever a job is pulled from any queue with the `random` strategy, the list of queues is
shuffled, so that no queue has more priority than the other ones.
To dequeue jobs from the different queues randomly, use `-ds random` argument.

Deprecation Warning: Those strategies were formely being implemented by using the custom classes `rq.worker.RoundRobinWorker`
and `rq.worker.RandomWorker`. As the `--dequeue-strategy` argument allows for this option to be used with any worker, those worker classes are deprecated and will be removed from future versions.
Expand Down Expand Up @@ -451,3 +437,31 @@ redis = Redis()
# This will raise an exception if job is invalid or not currently executing
send_stop_job_command(redis, job_id)
```

## Worker Pool

_New in version 1.14.0._

<div class="warning">
<img style="float: right; margin-right: -60px; margin-top: -38px" src="/img/warning.png" />
<strong>Note:</strong>
<p>`WorkerPool` is still in beta, use at your own risk!</p>
</div>

WorkerPool allows you to run multiple workers in a single CLI command.

Usage:

```shell
rq worker-pool high default low -n 3
```

Options:
* `-u` or `--url <Redis connection URL>`: as defined in [redis-py's docs](https://redis.readthedocs.io/en/stable/connections.html#redis.Redis.from_url).
* `-w` or `--worker-class <path.to.Worker>`: defaults to `rq.worker.Worker`. `rq.worker.SimpleWorker` is also an option.
* `-n` or `--num-workers <number of worker>`: defaults to 2.
* `-b` or `--burst`: run workers in burst mode (stops after all jobs in queue have been processed).
* `-l` or `--logging-level <level>`: defaults to `INFO`. `DEBUG`, `WARNING`, `ERROR` and `CRITICAL` are supported.
* `-S` or `--serializer <path.to.Serializer>`: defaults to `rq.serializers.DefaultSerializer`. `rq.serializers.JSONSerializer` is also included.
* `-P` or `--path <path>`: multiple import paths are supported (e.g `rq worker --path foo --path bar`).
* `-j` or `--job-class <path.to.Job>`: defaults to `rq.job.Job`.
90 changes: 88 additions & 2 deletions rq/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import sys
import warnings

from typing import List, Type

import click
from redis.exceptions import ConnectionError

Expand All @@ -21,6 +23,8 @@
parse_schedule,
pass_cli_config,
)

# from rq.cli.pool import pool
from rq.contrib.legacy import cleanup_ghosts
from rq.defaults import (
DEFAULT_RESULT_TTL,
Expand All @@ -31,12 +35,15 @@
DEFAULT_MAINTENANCE_TASK_INTERVAL,
)
from rq.exceptions import InvalidJobOperationError
from rq.job import JobStatus
from rq.job import Job, JobStatus
from rq.logutils import blue
from rq.registry import FailedJobRegistry, clean_registries
from rq.serializers import DefaultSerializer
from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended
from rq.utils import import_attribute, get_call_string
from rq.worker import Worker
from rq.worker_pool import WorkerPool
from rq.worker_registration import clean_worker_registry
from rq.utils import import_attribute, get_call_string


@click.group()
Expand Down Expand Up @@ -425,3 +432,82 @@ def enqueue(

if not quiet:
click.echo('Enqueued %s with job-id \'%s\'.' % (blue(function_string), job.id))


@main.command()
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--logging-level', '-l', type=str, default="INFO", help='Set logging level')
@click.option('--sentry-ca-certs', envvar='RQ_SENTRY_CA_CERTS', help='Path to CRT file for Sentry DSN')
@click.option('--sentry-debug', envvar='RQ_SENTRY_DEBUG', help='Enable debug')
@click.option('--sentry-dsn', envvar='RQ_SENTRY_DSN', help='Report exceptions to this Sentry DSN')
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs')
@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
@click.option('--job-class', type=str, default=None, help='Dotted path to a Job class')
@click.argument('queues', nargs=-1)
@click.option('--num-workers', '-n', type=int, default=1, help='Number of workers to start')
@pass_cli_config
def worker_pool(
cli_config,
burst: bool,
logging_level,
queues,
serializer,
sentry_ca_certs,
sentry_debug,
sentry_dsn,
verbose,
quiet,
log_format,
date_format,
worker_class,
job_class,
num_workers,
**options,
):
"""Starts a RQ worker pool"""
settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments
queue_names: List[str] = queues or settings.get('QUEUES', ['default'])
sentry_ca_certs = sentry_ca_certs or settings.get('SENTRY_CA_CERTS')
sentry_debug = sentry_debug or settings.get('SENTRY_DEBUG')
sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')

setup_loghandlers_from_args(verbose, quiet, date_format, log_format)

if serializer:
serializer_class: Type[DefaultSerializer] = import_attribute(serializer)
else:
serializer_class = DefaultSerializer

if worker_class:
worker_class = import_attribute(worker_class)
else:
worker_class = Worker

if job_class:
job_class = import_attribute(job_class)
else:
job_class = Job

pool = WorkerPool(
queue_names,
connection=cli_config.connection,
num_workers=num_workers,
serializer=serializer_class,
worker_class=worker_class,
job_class=job_class,
)
pool.start(burst=burst, logging_level=logging_level)

# Should we configure Sentry?
if sentry_dsn:
sentry_opts = {"ca_certs": sentry_ca_certs, "debug": sentry_debug}
from rq.contrib.sentry import register_sentry

register_sentry(sentry_dsn, **sentry_opts)


if __name__ == '__main__':
main()
6 changes: 3 additions & 3 deletions rq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def all(
"""
connection = resolve_connection(connection)

def to_queue(queue_key):
def to_queue(queue_key: Union[bytes, str]):
return cls.from_queue_key(
as_text(queue_key),
connection=connection,
Expand Down Expand Up @@ -145,7 +145,7 @@ def __init__(
default_timeout: Optional[int] = None,
connection: Optional['Redis'] = None,
is_async: bool = True,
job_class: Union[str, Type['Job'], None] = None,
job_class: Optional[Union[str, Type['Job']]] = None,
serializer: Any = None,
death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
**kwargs,
Expand Down Expand Up @@ -439,7 +439,7 @@ def remove(self, job_or_id: Union['Job', str], pipeline: Optional['Pipeline'] =
Returns:
_type_: _description_
"""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
job_id: str = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id

if pipeline is not None:
return pipeline.lrem(self.key, 1, job_id)
Expand Down
29 changes: 6 additions & 23 deletions rq/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import datetime
from enum import Enum
from multiprocessing import Process
from typing import List, Set

from redis import ConnectionPool, Redis, SSLConnection, UnixDomainSocketConnection

Expand All @@ -16,7 +17,7 @@
from .queue import Queue
from .registry import ScheduledJobRegistry
from .serializers import resolve_serializer
from .utils import current_timestamp
from .utils import current_timestamp, parse_names

SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s'
SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s'
Expand Down Expand Up @@ -46,14 +47,10 @@ def __init__(
serializer=None,
):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set()
self._scheduled_job_registries = []
self._acquired_locks: Set[str] = set()
self._scheduled_job_registries: List[ScheduledJobRegistry] = []
self.lock_acquisition_time = None
(
self._connection_class,
self._connection_pool_class,
self._connection_kwargs,
) = parse_connection(connection)
self._connection_class, self._pool_class, self._connection_kwargs = parse_connection(connection)
self.serializer = resolve_serializer(serializer)

self._connection = None
Expand All @@ -74,10 +71,7 @@ def connection(self):
if self._connection:
return self._connection
self._connection = self._connection_class(
connection_pool=ConnectionPool(
connection_class=self._connection_pool_class,
**self._connection_kwargs
)
connection_pool=ConnectionPool(connection_class=self._pool_class, **self._connection_kwargs)
)
return self._connection

Expand Down Expand Up @@ -231,14 +225,3 @@ def run(scheduler):
scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc())
raise
scheduler.log.info('Scheduler with PID %d has stopped', os.getpid())


def parse_names(queues_or_names):
"""Given a list of strings or queues, returns queue names"""
names = []
for queue_or_name in queues_or_names:
if isinstance(queue_or_name, Queue):
names.append(queue_or_name.name)
else:
names.append(str(queue_or_name))
return names
4 changes: 2 additions & 2 deletions rq/serializers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from functools import partial
import pickle
import json
from typing import Optional, Union
from typing import Optional, Type, Union

from .utils import import_attribute

Expand All @@ -21,7 +21,7 @@ def loads(s, *args, **kwargs):
return json.loads(s.decode('utf-8'), *args, **kwargs)


def resolve_serializer(serializer=None):
def resolve_serializer(serializer: Optional[Union[Type[DefaultSerializer], str]] = None) -> Type[DefaultSerializer]:
"""This function checks the user defined serializer for ('dumps', 'loads') methods
It returns a default pickle serializer if not found else it returns a MySerializer
The returned serializer objects implement ('dumps', 'loads') methods
Expand Down
2 changes: 1 addition & 1 deletion rq/timeouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class HorseMonitorTimeoutException(BaseTimeoutException):
class BaseDeathPenalty:
"""Base class to setup job timeouts."""

def __init__(self, timeout, exception=JobTimeoutException, **kwargs):
def __init__(self, timeout, exception=BaseTimeoutException, **kwargs):
self._timeout = timeout
self._exception = exception

Expand Down
Loading

0 comments on commit 64cb1a2

Please sign in to comment.