Permalink
Browse files

Preserve order of return values from groups

Fixes #3781.
  • Loading branch information...
lpsinger committed Jun 27, 2018
1 parent 9778f2b commit 026c813c01a794aa60de16d4e123133c087a7499
Showing with 38 additions and 16 deletions.
  1. +4 −2 celery/app/amqp.py
  2. +3 −2 celery/app/base.py
  3. +6 −4 celery/backends/redis.py
  4. +18 −7 celery/canvas.py
  5. +2 −1 celery/utils/abstract.py
  6. +5 −0 celery/worker/request.py
View
@@ -298,7 +298,7 @@ def TaskConsumer(self, channel, queues=None, accept=None, **kw):
)
def as_task_v2(self, task_id, name, args=None, kwargs=None,
countdown=None, eta=None, group_id=None,
countdown=None, eta=None, group_id=None, group_index=None,
expires=None, retries=0, chord=None,
callbacks=None, errbacks=None, reply_to=None,
time_limit=None, soft_time_limit=None,
@@ -356,6 +356,7 @@ def as_task_v2(self, task_id, name, args=None, kwargs=None,
'eta': eta,
'expires': expires,
'group': group_id,
'group_index': group_index,
'retries': retries,
'timelimit': [time_limit, soft_time_limit],
'root_id': root_id,
@@ -390,7 +391,7 @@ def as_task_v2(self, task_id, name, args=None, kwargs=None,
)
def as_task_v1(self, task_id, name, args=None, kwargs=None,
countdown=None, eta=None, group_id=None,
countdown=None, eta=None, group_id=None, group_index=None,
expires=None, retries=0,
chord=None, callbacks=None, errbacks=None, reply_to=None,
time_limit=None, soft_time_limit=None,
@@ -435,6 +436,7 @@ def as_task_v1(self, task_id, name, args=None, kwargs=None,
'args': args,
'kwargs': kwargs,
'group': group_id,
'group_index': group_index,
'retries': retries,
'eta': eta,
'expires': expires,
View
@@ -689,7 +689,8 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
eta=None, task_id=None, producer=None, connection=None,
router=None, result_cls=None, expires=None,
publisher=None, link=None, link_error=None,
add_to_parent=True, group_id=None, retries=0, chord=None,
add_to_parent=True, group_id=None, group_index=None,
retries=0, chord=None,
reply_to=None, time_limit=None, soft_time_limit=None,
root_id=None, parent_id=None, route_name=None,
shadow=None, chain=None, task_type=None, **options):
@@ -725,7 +726,7 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
parent_id = parent.request.id
message = amqp.create_task_message(
task_id, name, args, kwargs, countdown, eta, group_id,
task_id, name, args, kwargs, countdown, eta, group_id, group_index,
expires, retries, chord,
maybe_list(link), maybe_list(link_error),
reply_to or self.oid, time_limit, soft_time_limit,
View
@@ -334,18 +334,20 @@ def apply_chord(self, header_result, body, **kwargs):
def on_chord_part_return(self, request, state, result,
propagate=None, **kwargs):
app = self.app
tid, gid = request.id, request.group
tid, gid, group_index = request.id, request.group, request.group_index
if not gid or not tid:
return
if group_index is None:
group_index = '+inf'
client = self.client
jkey = self.get_key_for_group(gid, '.j')
tkey = self.get_key_for_group(gid, '.t')
result = self.encode_result(result, state)
with client.pipeline() as pipe:
_, readycount, totaldiff, _, _ = pipe \
.rpush(jkey, self.encode([1, tid, state, result])) \
.llen(jkey) \
.zadd(jkey, group_index, self.encode([1, tid, state, result])) \
.zcount(jkey, '-inf', '+inf') \
.get(tkey) \
.expire(jkey, self.expires) \
.expire(tkey, self.expires) \
@@ -360,7 +362,7 @@ def on_chord_part_return(self, request, state, result,
decode, unpack = self.decode, self._unpack_chord_result
with client.pipeline() as pipe:
resl, _, _ = pipe \
.lrange(jkey, 0, total) \
.zrangebyscore(jkey, '-inf', '+inf') \
.delete(jkey) \
.delete(tkey) \
.execute()
View
@@ -249,7 +249,7 @@ def clone(self, args=(), kwargs={}, **opts):
partial = clone
def freeze(self, _id=None, group_id=None, chord=None,
root_id=None, parent_id=None):
root_id=None, parent_id=None, group_index=None):
"""Finalize the signature by adding a concrete task id.
The task won't be called and you shouldn't call the signature
@@ -276,6 +276,8 @@ def freeze(self, _id=None, group_id=None, chord=None,
opts['group_id'] = group_id
if chord:
opts['chord'] = chord
if group_index is not None:
opts['group_index'] = group_index
# pylint: disable=too-many-function-args
# Borks on this, as it's a property.
return self.AsyncResult(tid)
@@ -585,19 +587,21 @@ def run(self, args=(), kwargs={}, group_id=None, chord=None,
return results[0]
def freeze(self, _id=None, group_id=None, chord=None,
root_id=None, parent_id=None):
root_id=None, parent_id=None, group_index=None):
# pylint: disable=redefined-outer-name
# XXX chord is also a class in outer scope.
_, results = self._frozen = self.prepare_steps(
self.args, self.kwargs, self.tasks, root_id, parent_id, None,
self.app, _id, group_id, chord, clone=False,
group_index=group_index,
)
return results[0]
def prepare_steps(self, args, kwargs, tasks,
root_id=None, parent_id=None, link_error=None, app=None,
last_task_id=None, group_id=None, chord_body=None,
clone=True, from_dict=Signature.from_dict):
clone=True, from_dict=Signature.from_dict,
group_index=None):
app = app or self.app
# use chain message field for protocol 2 and later.
# this avoids pickle blowing the stack on the recursion
@@ -664,6 +668,7 @@ def prepare_steps(self, args, kwargs, tasks,
res = task.freeze(
last_task_id,
root_id=root_id, group_id=group_id, chord=chord_body,
group_index=group_index,
)
else:
res = task.freeze(root_id=root_id)
@@ -1073,7 +1078,7 @@ def _freeze_gid(self, options):
return options, group_id, options.get('root_id')
def freeze(self, _id=None, group_id=None, chord=None,
root_id=None, parent_id=None):
root_id=None, parent_id=None, group_index=None):
# pylint: disable=redefined-outer-name
# XXX chord is also a class in outer scope.
opts = self.options
@@ -1085,6 +1090,8 @@ def freeze(self, _id=None, group_id=None, chord=None,
opts['group_id'] = group_id
if chord:
opts['chord'] = chord
if group_index is not None:
opts['group_index'] = group_index
root_id = opts.setdefault('root_id', root_id)
parent_id = opts.setdefault('parent_id', parent_id)
new_tasks = []
@@ -1104,6 +1111,7 @@ def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
# pylint: disable=redefined-outer-name
# XXX chord is also a class in outer scope.
stack = deque(self.tasks)
i = 0
while stack:
task = maybe_signature(stack.popleft(), app=self._app).clone()
if isinstance(task, group):
@@ -1112,7 +1120,9 @@ def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
new_tasks.append(task)
yield task.freeze(group_id=group_id,
chord=chord, root_id=root_id,
parent_id=parent_id)
parent_id=parent_id,
group_index=i)
i += 1
def __repr__(self):
if self.tasks:
@@ -1189,14 +1199,15 @@ def __call__(self, body=None, **options):
return self.apply_async((), {'body': body} if body else {}, **options)
def freeze(self, _id=None, group_id=None, chord=None,
root_id=None, parent_id=None):
root_id=None, parent_id=None, group_index=None):
# pylint: disable=redefined-outer-name
# XXX chord is also a class in outer scope.
if not isinstance(self.tasks, group):
self.tasks = group(self.tasks, app=self.app)
header_result = self.tasks.freeze(
parent_id=parent_id, root_id=root_id, chord=self.body)
bodyres = self.body.freeze(_id, root_id=root_id)
bodyres = self.body.freeze(
_id, root_id=root_id, group_index=group_index)
# we need to link the body result back to the group result,
# but the body may actually be a chain,
# so find the first result without a parent
View
@@ -112,7 +112,8 @@ def clone(self, args=None, kwargs=None):
pass
@abstractmethod
def freeze(self, id=None, group_id=None, chord=None, root_id=None):
def freeze(self, id=None, group_id=None, chord=None, root_id=None,
group_index=None):
pass
@abstractmethod
View
@@ -500,6 +500,11 @@ def group(self):
# by parent process
return self.request_dict['group']
@cached_property
def group_index(self):
# used by backend.on_chord_part_return to order return values in group
return self.request_dict['group_index']
def create_request_cls(base, task, pool, hostname, eventer,
ref=ref, revoked_tasks=revoked_tasks,

0 comments on commit 026c813

Please sign in to comment.