Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithread Worker #1803

Draft
wants to merge 25 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6744f41
Multithreading first WIP
ccrvlh Feb 6, 2023
3037e0d
Merge branch 'master' into feature/multithread
ccrvlh Feb 7, 2023
4e90730
Add locking mechanism
ccrvlh Feb 7, 2023
a39497b
Add Callback, Dependencies & Registration Tests
ccrvlh Feb 7, 2023
0bf6809
Remove lifespan log test
ccrvlh Feb 10, 2023
23b1f97
fix: multhread & job dependencies
ccrvlh Feb 11, 2023
8b18764
docs: Add documentation for ThreadpoolWorker
ccrvlh Feb 11, 2023
78af69b
docs/feat: black, docstrings, refactoring
ccrvlh Feb 12, 2023
2bc3197
fix: missing imports
ccrvlh Feb 12, 2023
6414270
Multithreading first WIP
ccrvlh Feb 6, 2023
6d4d059
Add locking mechanism
ccrvlh Feb 7, 2023
80fbc64
Add Callback, Dependencies & Registration Tests
ccrvlh Feb 7, 2023
fe10d94
Remove lifespan log test
ccrvlh Feb 10, 2023
5464164
fix: multhread & job dependencies
ccrvlh Feb 11, 2023
52ae512
docs: Add documentation for ThreadpoolWorker
ccrvlh Feb 11, 2023
7e8c3f2
docs/feat: black, docstrings, refactoring
ccrvlh Feb 12, 2023
7375e15
fix: missing imports
ccrvlh Feb 12, 2023
a6b4079
Merge branch 'feature/multithread' of https://github.com/lowercase00/…
ccrvlh Feb 12, 2023
66b0121
fix: `dequeue_timeout` param.
ccrvlh Feb 12, 2023
7976f18
feat: add dynamic default pool size based on CPU count
ccrvlh Feb 12, 2023
45e96d1
Merge branch 'master' into feature/multithread
ccrvlh Feb 15, 2023
51d61eb
Merge branch 'master' into feature/multithread
ccrvlh Feb 16, 2023
dac55cd
feat: dotenv / bootstrap / test ignore
ccrvlh Feb 16, 2023
f89bd55
Merge branch 'master' into feature/multithread
ccrvlh Feb 17, 2023
5e25ccb
Merge branch 'master' into feature/multithread
ccrvlh Mar 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ Gemfile
Gemfile.lock
_site/
.venv/
.vscode/
.vscode/
.env
71 changes: 71 additions & 0 deletions docs/docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,74 @@ redis = Redis()
# This will raise an exception if job is invalid or not currently executing
send_stop_job_command(redis, job_id)
```


## Concurrency
_New in version 1.12.2._

<div class="warning">
<img style="float: right; margin-right: -60px; margin-top: -38px" src="/img/warning.png" />
<strong>Warning:</strong>
<p>
This feature is experimental, unstable and may have bugs and/or not function properly.
Please do not use this in production.
</p>
</div>

The simplest way of running jobs concurrently is just running multiple workers.
This will give you the horizontal scalability benefits, and the performance needed for most use cases.
To run multiple workers just run `rq worker` at two separate terminal windows - you just need to make sure worker names are unique.

As of `v1.12.2`, `rq` now has native support for a `ThreadPoolWorker`, a multithreaded worker based on the `ThreadPoolExecutor` from the `concurrent.futures` library. This custom worker class runs a single worker process, but executes each job in a thread from a `ThreadPool`.

That means that the dequeueing process is always controlled by the main workers' process, and only the task is executed within the thread.
It also means that `fork()` is not used, which can be a downside or upside depending on the use-case/environment.

The thread-based concurrency model is normally well suited for I/O-bound applications (think of network requests, database operations, file I/O, etc.),
since the threads will be cooperative between them (yielding execution while waiting), without actually running in parallel.

To run the `ThreadPoolWorker`:

```python
from redis import Redis
from rq import Queue, ThreadPoolWorker

q = Queue(connection=Redis())
w = ThreadPoolWorker([q], connection=q.connection)
w.work()
```

As each job is being in run its own thread, waiting operations are done concurrently (not in parallel, even though it may look like it).

```python
from redis import Redis
from rq import Queue, ThreadPoolWorker

def my_io_heavy_task(time):
time.sleep(time)
return

q = Queue(connection=Redis())
q.enqueue(my_io_heavy_task, 1)
q.enqueue(my_io_heavy_task, 1)

w = ThreadPoolWorker([q], connection=q.connection)
w.work(burst=True)

# This will take ~1sec instead of ~2sec when running with the default worker.
1 > w.total_working_time > 1.1
```

By default, the number of threads available for `ThreadPoolWorker` will be 5 times the number of CPU cores available (as per the `cpu_count` function from `multiprocessing`) - this is a sane default and the same value used by the `concurrent.futures` library.
This should be enough in most cases, but you can always customize the pool size according to your specific needs by using the `pool_size` param to the worker.

```python
w = ThreadPoolWorker([q], connection=q.connection, pool_size=12)
```

There are relevant limitations to the `ThreadPoolWorker`, here's a non extensive list:

- Dependencies might not behave like expected
- Job registries may not be reliable (specially with jobs that were timed out)
Copy link

@gyandeeps gyandeeps Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Biggest thing i like about rq is its job reliability as it maintains different job registries correctly. Is it possible to have the same level of reliability with thread pool worker?
we process kafka queue and hand create rq task to process those events. Reliability is super important as we dont want to loose task without knowing about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, definitely something to take into account. I've been working on a different branch since there were other relevant changes to allow a more robust design, will work on this as well. I do imagine that there will be limitations though, specially on the first couple of versions. Hopefully nothing that will hurt reliability... we'll see.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate it @ccrvlh . We are in the works to switch from celery to rq for the reliability factor.
btw, i pulled this branch down and tried the threadpool with task timing out but was able to get to consistently get a retry on it. Any thing specific you can point out where you have seen this issue? (just curious)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a while since I worked on this... I'm mostly working on the v2 branch now (which has this worker setup), but still a lot to do. I'll try to make some tests with it this weekend.

Copy link

@gyandeeps gyandeeps Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you can share the link of the branch i would appreciate it. Just want to see and learn. I tried searching on this repo but was not able to find it.

UPDATE: i got it under your fork. thanks

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! It's on my fork only, you might be able to find it from this repo (or from my profile), I'm on mobile, but I'll post the link here when I get to the PC.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccrvlh just curious on the timeline for this feature or in general V2 since we want to rollout rq in prod but multithread is critical to use. thanks

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can we try to release this with 2.0 under experimental flag. Based on my tests things looked good.
cc @selwin

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyandeeps like I mentioned, I'll only merge this in slowly, one bit at a time to make sure we don't mess anything up.

RQ version 2.0 also changed how jobs are enqueued so this branch is probably stale.

- The implementation does not consider thread-safety aspects of the job itself, so make sure your code is thread safe before using this
-
1 change: 0 additions & 1 deletion rq/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,

try:
with Connection(cli_config.connection):

if queues:
qs = list(map(cli_config.queue_class, queues))
else:
Expand Down
2 changes: 0 additions & 2 deletions rq/cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def state_symbol(state):


def show_queues(queues, raw, by_queue, queue_class, worker_class):

num_jobs = 0
termwidth = get_terminal_size().columns
chartwidth = min(20, termwidth - 20)
Expand Down Expand Up @@ -141,7 +140,6 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
workers.add(worker)

if not by_queue:

for worker in workers:
queue_names = ', '.join(worker.queue_names())
name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid)
Expand Down
6 changes: 6 additions & 0 deletions rq/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@
Uses Python's default attributes as defined
https://docs.python.org/3/library/logging.html#logrecord-attributes
"""


DEFAULT_CPU_THREADS = 5
""" The number of threads per core the ThreadPoolWorker will use by default.
This is the same default used by the `concurrent.futures` module.
"""
234 changes: 225 additions & 9 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import signal
import socket
import sys
import threading
import time
import traceback
import warnings

from concurrent.futures import Future, ThreadPoolExecutor, wait
from datetime import timedelta
from enum import Enum
from uuid import uuid4
Expand All @@ -28,30 +29,40 @@

from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
from .utils import as_text
from .connections import get_current_connection, push_connection, pop_connection

from .defaults import (
CALLBACK_TIMEOUT,
DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_MAINTENANCE_TASK_INTERVAL,
DEFAULT_RESULT_TTL,
DEFAULT_CPU_THREADS,
DEFAULT_WORKER_TTL,
DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_DATE_FORMAT,
DEFAULT_LOGGING_FORMAT,
)
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
from .exceptions import DequeueTimeout, DeserializationError, ShutDownImminentException
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import Queue
from .registry import StartedJobRegistry, clean_registries
from .scheduler import RQScheduler
from .serializers import resolve_serializer
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact
from .timeouts import HorseMonitorTimeoutException, JobTimeoutException, TimerDeathPenalty, UnixSignalDeathPenalty
from .utils import (
as_text,
backend_class,
compact,
ensure_list,
get_version,
make_colorizer,
utcformat,
utcnow,
utcparse,
)
from .version import VERSION
from .worker_registration import clean_worker_registry, get_keys
from .serializers import resolve_serializer

try:
from setproctitle import setproctitle as setprocname
Expand Down Expand Up @@ -1197,7 +1208,7 @@ def prepare_job_execution(self, job: 'Job'):

job.prepare_for_execution(self.name, pipeline=pipeline)
pipeline.execute()
self.log.debug(f"Job preparation finished.")
self.log.debug("Job preparation finished.")

msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
Expand Down Expand Up @@ -1573,3 +1584,208 @@ class RandomWorker(Worker):

def reorder_queues(self, reference_queue):
shuffle(self._ordered_queues)


class ThreadPoolWorker(Worker):
death_penalty_class = TimerDeathPenalty # type: ignore

def __init__(self, *args, **kwargs):
self.threadpool_size = kwargs.pop('pool_size', self.default_pool_size)
self.executor = ThreadPoolExecutor(max_workers=self.threadpool_size, thread_name_prefix="rq_workers_")
self._idle_threads = self.threadpool_size
self._lock = threading.Lock()
self._current_jobs: List[Tuple['Job', 'Future']] = [] # type: ignore
ccrvlh marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also bring _current_jobs to the main worker class.

super(ThreadPoolWorker, self).__init__(*args, **kwargs)

@property
def is_pool_full(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better if we change this to number_of_available_horses() instead of a simple boolean.

"""Checks whether the thread pool is full.
Returns True if there are no idle threads, False otherwise

Returns:
is_full (bool): True if full, False otherwise.
"""
if self._idle_threads == 0:
return True
return False

@property
def default_pool_size(self) -> int:
"""THe default TheadPool size.
By default, each CPU core should run N Threads,
where N is the `DEFAULT_CPU_THREADS``

Returns:
cpus (int): Number of CPUs
"""
from multiprocessing import cpu_count
return cpu_count() * DEFAULT_CPU_THREADS

def work(
self,
burst: bool = False,
logging_level: str = "INFO",
date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT,
max_jobs=None,
with_scheduler: bool = False,
):
"""Starts the work loop.

Pops jobs from the current list of queues, and submits each job to the ThreadPool.
When all queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` mode is enabled.

The return value indicates whether any jobs were processed.
"""
self.bootstrap(logging_level, date_format, log_format)
completed_jobs = 0
self.log.info("ThreadPoolWorker %s: started with %s threads, version %s", self.key, self.threadpool_size, VERSION)
self.log.warning("*** WARNING: ThreadPoolWorker is in beta and may be unstable. Don't use it in production!")
if with_scheduler:
self._start_scheduler(burst, logging_level, date_format, log_format)

self._install_signal_handlers()
try:
while True:
try:
self.check_for_suspension(burst)

if self.should_run_maintenance_tasks:
self.run_maintenance_tasks()

if self._stop_requested:
self.log.info('Worker %s: stopping on request', self.key)
break

if self.is_pool_full:
self.log.debug('ThreadPool is full, waiting for idle threads...')
self.__wait_for_slot()

timeout = None if burst else self.dequeue_timeout
result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
if not burst:
break
has_pending_dependents = self.__check_pending_dependents()
if has_pending_dependents:
continue
self.log.info("Worker %s: done, quitting", self.key)
break

job, queue = result
self.reorder_queues(reference_queue=queue)
ccrvlh marked this conversation as resolved.
Show resolved Hide resolved
self.execute_job(job, queue)
self.heartbeat()

completed_jobs += 1
if max_jobs is not None:
if completed_jobs < max_jobs:
continue
self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs)
break

except redis.exceptions.TimeoutError:
self.log.error(f"Worker {self.key}: Redis connection timeout, quitting...")
break

except StopRequested:
break

except SystemExit:
# Cold shutdown detected
raise

except: # noqa
self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True)
break
finally:
self.register_death()
self.unsubscribe()
ccrvlh marked this conversation as resolved.
Show resolved Hide resolved

return bool(completed_jobs)

def execute_job(self, job, queue):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the next step would be to make execute_job() on the main Worker class follow this pattern. After executing the job on a separate process, it should return to the main loop and wait for job to finish before dequeueing the next job.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to find some time to work on this sometime in the next few days, I think this will be quite complicated.

def job_done(future: Future):
"""Callback function that runs after the job (future) is finished.
This will update the `idle_counter` object and update the `_current_jobs` array,
removing the job that just finished from the list.

Args:
future (Future): The Future object.
"""
self.__change_idle_counter(+1)
self.heartbeat()
job_element = list(filter(lambda x: id(x[1]) == id(future), self._current_jobs))
for el in job_element:
self._current_jobs.remove(el)
if job.get_status() == JobStatus.FINISHED:
queue.enqueue_dependents(job)

self.log.info("Executing job %s from %s", blue(job.id), green(queue.name))
future = self.executor.submit(self.perform_job, job, queue)
self._current_jobs.append((job, future))
self.__change_idle_counter(-1)
future.add_done_callback(job_done)

def wait_all(self, timeout: Optional[int] = None):
""" Wait all current jobs """
wait([future for _, future in self._current_jobs])

def __change_idle_counter(self, operation: int):
"""Updates the idle threads counter using a lock to make it safe.

Args:
operation (int): +1 to increment, -1 to decrement.
"""
with self._lock:
self._idle_threads += operation

def __wait_for_slot(self, wait_interval: float = 0.25):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some kind of push based mechanism instead of polling for free workers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I could find, the ThreadPoolExecutor is a pretty thin wrapper - the only push based mechanics I saw was related to a single future with the done_callback.

"""Waits for a free slot in the thread pool.
Sleeps for `wait_interval` seconds to avoid high CPU burden on long jobs.

Args:
wait_interval (float, 0.25): How long to wait between each check. Default to 0.25 second.
"""
while 1:
if not self.is_pool_full:
self.log.debug('Found idle thread, ready to work')
break
time.sleep(wait_interval)
continue

def __check_pending_dependents(self) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to restructure work() in such a way that this is not needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I went this path here is that in this case, things don't happen sequentially, so we need a global way of handling dependencies (thread Y can't work unless thread X has finished) - this is not a challenge on the Worker and on the SimpleWorker.

"""Checks whether any job that's current being executed in the pool has dependents.
If there are dependents, appends it to a `pending_dependents` array.
If this array has items (> 0), we know something that's currently running must enqueue dependents
before we can actually stop a worker (on burst mode, for example).
If there are dependents returns True, False otherwise.

Returns:
pending_dependents (bool): Whether any job currently running has dependents.
"""
pending_dependents = []
for job, _ in self._current_jobs:
if not job.dependents_key:
continue
pending_dependents.append(job)
if len(pending_dependents) > 0:
return True
return False

def _shutdown(self):
"""
If shutdown is requested in the middle of a job, wait until
finish before shutting down and save the request in redis
"""
if self.get_state() != WorkerStatus.BUSY:
if self.scheduler:
self.stop_scheduler()
raise StopRequested()
self._stop_requested = True
self.set_shutdown_requested_date()
self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.')
self.executor.shutdown()
if self.scheduler:
self.stop_scheduler()