-
Notifications
You must be signed in to change notification settings - Fork 17
Closed
Description
Currently the create_user_task signal handler always fails with TypeError('tuple indices must be integers or slices, not str') when using task message protocol version 2 (https://docs.celeryq.dev/en/stable/internals/protocol.html), which should be the celery default for many years now. The exception is catched and logged by celery and it seems to have no impact for simple use cases.
To workaround this issue the following code can be used:
from celery import signals
from user_tasks.signals import create_user_task
from myproject.celery.celery import app
signals.before_task_publish.disconnect(create_user_task)
signals.before_task_publish.connect(create_user_task_wrapper)
def create_user_task_wrapper(sender=None, body=None, **kwargs):
return create_user_task(
sender,
body
if app.conf.task_protocol == 1
else proto2_to_proto1(body, kwargs.get("headers", {})),
)
# Based on some old celery code:
def proto2_to_proto1(body, headers):
args, kwargs, embed = body
embedded = _extract_proto2_embed(**embed)
chained = embedded.pop("chain")
new_body = dict(
_extract_proto2_headers(**headers), args=args, kwargs=kwargs, **embedded
)
if chained:
new_body["callbacks"].append(chain(chained))
return new_body
def _extract_proto2_headers(id, retries, eta, expires, group, timelimit, task, **_):
return {
"id": id,
"task": task,
"retries": retries,
"eta": eta,
"expires": expires,
"utc": True,
"taskset": group,
"timelimit": timelimit,
}
def _extract_proto2_embed(callbacks, errbacks, chain, chord, **_):
return {
"callbacks": callbacks or [],
"errbacks": errbacks,
"chain": chain,
"chord": chord,
}
zero7749 and dc-buettgenbach
Metadata
Metadata
Assignees
Labels
No labels