Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multiple job dependencies #279

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
129 changes: 93 additions & 36 deletions rq/job.py
Expand Up @@ -10,6 +10,8 @@
from .utils import import_attribute, utcnow, utcformat, utcparse
from rq.compat import text_type, decode_redis_hash, as_text

from redis import WatchError


def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
Expand Down Expand Up @@ -107,9 +109,12 @@ def create(cls, func, args=None, kwargs=None, connection=None,
job.timeout = timeout
job._status = status

# dependency could be job instance or id
# depends_on could be a job instance or job id, or list thereof
if depends_on is not None:
job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on
if isinstance(depends_on, (Job, text_type)):
depends_on = [depends_on]
job._dependency_ids = list(
dependency.id if isinstance(dependency, Job) else dependency for dependency in depends_on)
return job

def _get_status(self):
Expand Down Expand Up @@ -139,18 +144,33 @@ def is_started(self):
return self.status == Status.STARTED

@property
def dependency(self):
"""Returns a job's dependency. To avoid repeated Redis fetches, we cache
job.dependency as job._dependency.
def dependencies(self):
"""Returns a job's dependencies. To avoid repeated Redis fetches, we
cache job.dependencies as job._dependencies.
TODO: What if the dependency has already been removed e.g. due to result_ttl timeout?
"""
if self._dependency_id is None:
return None
if hasattr(self, '_dependency'):
return self._dependency
job = Job.fetch(self._dependency_id, connection=self.connection)
job.refresh()
self._dependency = job
return job
try:
return self._dependencies
except AttributeError:
self._dependencies = [Job.fetch(dependency_id) for dependency_id in self._dependency_ids]
return self._dependencies

def remove_dependency(self, dependency_id):
"""Removes a dependency from job. This is usually called when
dependency is successfully executed."""
# TODO: can probably be pipelined
self.connection.srem(self.remaining_dependencies_key, dependency_id)

def has_unmet_dependencies(self):
"""Checks whether job has dependencies that aren't yet finished."""
return bool(self.connection.scard(self.remaining_dependencies_key))

@property
def reverse_dependencies(self):
"""Returns a list of jobs whose execution depends on this
job's successful execution"""
reverse_dependency_ids = self.connection.smembers(self.reverse_dependencies_key)
return [Job.fetch(id) for id in reverse_dependency_ids]

@property
def func(self):
Expand Down Expand Up @@ -270,7 +290,7 @@ def __init__(self, id=None, connection=None):
self.timeout = None
self.result_ttl = None
self._status = None
self._dependency_id = None
self._dependency_ids = []
self.meta = {}

# Data access
Expand All @@ -290,23 +310,33 @@ def set_id(self, value):

@classmethod
def key_for(cls, job_id):
"""The Redis key that is used to store job hash under."""
"""Redis key for the job hash."""
return b'rq:job:' + job_id.encode('utf-8')

@classmethod
def dependents_key_for(cls, job_id):
"""The Redis key that is used to store job hash under."""
return 'rq:job:%s:dependents' % (job_id,)
def reverse_dependencies_key_for(cls, job_id):
"""Redis key for the dependent job set."""
return b'rq:job:' + job_id.encode('utf-8') + b':reverse_dependencies'

@classmethod
def remaining_dependencies_key_for(cls, job_id):
"""Redis key for the dependency job set."""
return b'rq:job:' + job_id.encode('utf-8') + b':dependencies'

@property
def key(self):
"""The Redis key that is used to store job hash under."""
"""Redis key for the job hash."""
return self.key_for(self.id)

@property
def dependents_key(self):
"""The Redis key that is used to store job hash under."""
return self.dependents_key_for(self.id)
def reverse_dependencies_key(self):
"""Redis key for the dependent job set."""
return self.reverse_dependencies_key_for(self.id)

@property
def remaining_dependencies_key(self):
"""Redis key for the dependency job set."""
return self.remaining_dependencies_key_for(self.id)

@property
def result(self):
Expand Down Expand Up @@ -364,11 +394,11 @@ def to_date(date_str):
self.enqueued_at = to_date(as_text(obj.get('enqueued_at')))
self.ended_at = to_date(as_text(obj.get('ended_at')))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.exc_info = obj.get('exc_info')
self.exc_info = as_text(obj.get('exc_info'))
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = as_text(obj.get('dependency_id', None))
self._dependency_ids = as_text(obj.get('dependency_ids', '')).split(' ')
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}

def dump(self):
Expand All @@ -395,8 +425,8 @@ def dump(self):
obj['result_ttl'] = self.result_ttl
if self._status is not None:
obj['status'] = self._status
if self._dependency_id is not None:
obj['dependency_id'] = self._dependency_id
if self._dependency_ids:
obj['dependency_ids'] = ' '.join(self._dependency_ids)
if self.meta:
obj['meta'] = dumps(self.meta)

Expand All @@ -417,10 +447,12 @@ def cancel(self):
without worrying about the internals required to implement job
cancellation. Technically, this call is (currently) the same as just
deleting the job hash.

NOTE: Any job that depends on this job becomes orphaned.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are these cleaned up?

"""
pipeline = self.connection.pipeline()
self.delete(pipeline=pipeline)
pipeline.delete(self.dependents_key)
pipeline.delete(self.reverse_dependencies_key)
pipeline.execute()

def delete(self, pipeline=None):
Expand Down Expand Up @@ -473,18 +505,43 @@ def cleanup(self, ttl=None, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl)

def register_dependency(self):
"""Jobs may have dependencies. Jobs are enqueued only if the job they
depend on is successfully performed. We record this relation as
a reverse dependency (a Redis set), with a key that looks something
like:
def register_dependencies(self, dependencies):
"""Register this job as being dependent on its dependencies.
A job is added to its queue only if all its dependencies have succeeded.

rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
For each unmet dependency (jobs that aren't successfully completed), we
register this job's id in a Redis set:

This method adds the current job in its dependency's dependents set.
rq:job:job_id:reverse_dependencies = {'job_id_1', 'job_id_2'}
"""
# TODO: This can probably be pipelined
self.connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)
remaining_dependencies = []
with self.connection.pipeline() as pipeline:
while True:
try:
# Check whether any of dependencies have been met
# pipeline.watch() is used to ensure that no dependency
# is modified in the duration of the check
# TODO: Each dependency.status call issues a Redis query
# We should probably use bulk fetches if possible
pipeline.watch(*[dependency.key for dependency in dependencies])
for dependency in dependencies:
if dependency.status != Status.FINISHED:
remaining_dependencies.append(dependency)

if remaining_dependencies:
pipeline.multi()
pipeline.sadd(self.remaining_dependencies_key,
*[dependency.id for dependency in remaining_dependencies])

for dependency in remaining_dependencies:
pipeline.sadd(Job.reverse_dependencies_key_for(dependency.id), self.id)

pipeline.execute()
break
except WatchError:
continue

return remaining_dependencies

def __str__(self):
return '<Job %s: %s>' % (self.id, self.description)
Expand Down
41 changes: 10 additions & 31 deletions rq/queue.py
Expand Up @@ -8,8 +8,6 @@
NoSuchJobError, UnpickleError)
from .compat import total_ordering, string_types, as_text

from redis import WatchError


def get_failed_queue(connection=None):
"""Returns a handle to the special failed queue."""
Expand Down Expand Up @@ -146,28 +144,19 @@ def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
"""
timeout = timeout or self._default_timeout

# TODO: job with dependency shouldn't have "queued" as status
job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED,
result_ttl=result_ttl, status=None if depends_on else Status.QUEUED,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like the None status here, I think that's going to be confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, a job always starts with Status.QUEUED even if it depends on some unfinished job and does not actually get added to a queue. I changed enqueue_call() to use None in that case, but it's not necessary for correctness of the dependency logic.

I think there's a bigger problem here related to what you're talking about. A user could call Job.create() with dependencies, followed by Queue.enqueue_job() without calling Job.register_prerequisites(), resulting in an inconsistent state. This holds even without this pull request.

Thus, user error can lead to inconsistent state. How much of the lower-level (e.g. Job.create) API should be public?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main trouble is that there are many states and it's currently inconceivable from the implementation what the allowed/correct/desired state transitions are and which are not. Even more confusing, prior to this patch, state wasn't anything that RQ was really depending on—it was just there for monitoring purposes / user convenience. Now, we're making decisions based on this value, which was never the intention. The property wasn't designed with a 100% correctness guarantee in mind for a computer to trust.

Additionally, I think register_dependencies() should be considered a "private" API, and therefore named like one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe even with this patch, RQ still doesn't depend on status value. I just made the distinction that if a new job depends on other jobs, it's not really enqueued, so I set the status to None.

If register_dependencies() becomes private, then create() should also become private. Creating a dependent job with create() without calling register_dependencies() results in inconsistent state (NOT status).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my point would be that create() needs to call _register_dependencies() when appropriate, therefore shielding the implementation details and making sure the state is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create() currently doesn't write to redis. That's what save() and enqueue_job() do. Calling _register_dependencies() in create() without also saving the job hash would make no sense. So, who's supposed to do the _register_dependencies()?

description=description, depends_on=depends_on, timeout=timeout)

# If job depends on an unfinished job, register itself on it's
# parent's dependents instead of enqueueing it.
# If WatchError is raised in the process, that means something else is
# modifying the dependency. In this case we simply retry
if depends_on is not None:
with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(depends_on.key)
if depends_on.status != Status.FINISHED:
job.register_dependency()
job.save()
return job
break
except WatchError:
continue

# A job having unmet dependencies will not be enqueued right away
if depends_on:
if isinstance(depends_on, Job):
depends_on = [depends_on]
remaining_dependencies = job.register_dependencies(depends_on)
if remaining_dependencies:
job.save()
return job

return self.enqueue_job(job)

def enqueue(self, f, *args, **kwargs):
Expand Down Expand Up @@ -233,16 +222,6 @@ def enqueue_job(self, job, set_meta_data=True):
job.save()
return job

def enqueue_dependents(self, job):
"""Enqueues all jobs in the given job's dependents set and clears it."""
# TODO: can probably be pipelined
while True:
job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None:
break
dependent = Job.fetch(job_id, connection=self.connection)
self.enqueue_job(dependent)

def pop_job_id(self):
"""Pops a given job ID from this Redis queue."""
return as_text(self.connection.lpop(self.key))
Expand Down
6 changes: 5 additions & 1 deletion rq/worker.py
Expand Up @@ -320,8 +320,12 @@ def work(self, burst=False): # noqa
self.heartbeat((job.timeout or 180) + 60)
self.fork_and_perform_job(job)
self.heartbeat()

if job.status == Status.FINISHED:
queue.enqueue_dependents(job)
for reverse_dependency in job.reverse_dependencies:
reverse_dependency.remove_dependency(job.id)
if not reverse_dependency.has_unmet_dependencies():
queue.enqueue_job(reverse_dependency)

did_perform_work = True
finally:
Expand Down
8 changes: 4 additions & 4 deletions tests/test_connection.py
Expand Up @@ -12,7 +12,7 @@ class TestConnectionInheritance(RQTestCase):
def test_connection_detection(self):
"""Automatic detection of the connection."""
q = Queue()
self.assertEquals(q.connection, self.testconn)
self.assertEqual(q.connection, self.testconn)

def test_connection_stacking(self):
"""Connection stacking."""
Expand All @@ -23,7 +23,7 @@ def test_connection_stacking(self):
q1 = Queue()
with Connection(conn2):
q2 = Queue()
self.assertNotEquals(q1.connection, q2.connection)
self.assertNotEqual(q1.connection, q2.connection)

def test_connection_pass_thru(self):
"""Connection passed through from queues to jobs."""
Expand All @@ -32,5 +32,5 @@ def test_connection_pass_thru(self):
q2 = Queue()
job1 = q1.enqueue(do_nothing)
job2 = q2.enqueue(do_nothing)
self.assertEquals(q1.connection, job1.connection)
self.assertEquals(q2.connection, job2.connection)
self.assertEqual(q1.connection, job1.connection)
self.assertEqual(q2.connection, job2.connection)