Skip to content

Commit

Permalink
test: Add more tests for from_dict() variants
Browse files Browse the repository at this point in the history
Notably, this exposed the bug tracked in celery#6341 where groups are not
deeply deserialized by `group.from_dict()`.
  • Loading branch information
maybe-sybr committed Oct 14, 2020
1 parent 40f9638 commit 25992ed
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 1 deletion.
65 changes: 64 additions & 1 deletion t/integration/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from time import sleep

from celery import Task, chain, chord, group, shared_task
from celery import Signature, Task, chain, chord, group, shared_task
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger

Expand Down Expand Up @@ -244,3 +244,66 @@ def run(self):
if self.request.retries:
return self.request.retries
raise ValueError()


# The signatures returned by these tasks wouldn't actually run because the
# arguments wouldn't be fulfilled - we never actually delay them so it's fine
@shared_task
def return_nested_signature_chain_chain():
return chain(chain([add.s()]))


@shared_task
def return_nested_signature_chain_group():
return chain(group([add.s()]))


@shared_task
def return_nested_signature_chain_chord():
return chain(chord([add.s()], add.s()))


@shared_task
def return_nested_signature_group_chain():
return group(chain([add.s()]))


@shared_task
def return_nested_signature_group_group():
return group(group([add.s()]))


@shared_task
def return_nested_signature_group_chord():
return group(chord([add.s()], add.s()))


@shared_task
def return_nested_signature_chord_chain():
return chord(chain([add.s()]), add.s())


@shared_task
def return_nested_signature_chord_group():
return chord(group([add.s()]), add.s())


@shared_task
def return_nested_signature_chord_chord():
return chord(chord([add.s()], add.s()), add.s())


@shared_task
def rebuild_signature(sig_dict):
sig_obj = Signature.from_dict(sig_dict)
def _recurse(sig):
if not isinstance(sig, Signature):
raise TypeError("{!r} is not a signature object".format(sig))
# Most canvas types have a `tasks` attribute
if isinstance(sig, (chain, group, chord)):
for task in sig.tasks:
_recurse(task)
# `chord`s also have a `body` attribute
if isinstance(sig, chord):
_recurse(sig.body)
_recurse(sig_obj)
102 changes: 102 additions & 0 deletions t/integration/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
print_unicode, raise_error, redis_echo, retry_once,
return_exception, return_priority, second_order_replace1,
tsum)
from . import tasks

RETRYABLE_EXCEPTIONS = (OSError, ConnectionError, TimeoutError)

Expand Down Expand Up @@ -1009,3 +1010,104 @@ def test_priority_chain(self, manager):
c = return_priority.signature(priority=3) | return_priority.signature(
priority=5)
assert c().get(timeout=TIMEOUT) == "Priority: 5"


class test_signature_serialization:
"""
Confirm nested signatures can be rebuilt after passing through a backend.
These tests are expected to finish and return `None` or raise an exception
in the error case. The exception indicates that some element of a nested
signature object was not properly deserialized from its dictionary
representation, and would explode later on if it were used as a signature.
"""
def test_rebuild_nested_chain_chain(self, manager):
sig = chain(
tasks.return_nested_signature_chain_chain.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chain_group(self, manager):
sig = chain(
tasks.return_nested_signature_chain_group.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chain_chord(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_chain_chord.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

@pytest.mark.xfail(reason="#6341")
def test_rebuild_nested_group_chain(self, manager):
sig = chain(
tasks.return_nested_signature_group_chain.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

@pytest.mark.xfail(reason="#6341")
def test_rebuild_nested_group_group(self, manager):
sig = chain(
tasks.return_nested_signature_group_group.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

@pytest.mark.xfail(reason="#6341")
def test_rebuild_nested_group_chord(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_group_chord.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chord_chain(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_chord_chain.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chord_group(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_chord_group.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chord_chord(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_chord_chord.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)
88 changes: 88 additions & 0 deletions t/unit/tasks/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,25 @@ def test_from_dict(self):
x['args'] = None
assert group.from_dict(dict(x))

@pytest.mark.xfail(reason="#6341")
def test_from_dict_deep_deserialize(self):
original_group = group([self.add.s(1, 2)] * 42)
serialized_group = json.loads(json.dumps(original_group))
deserialized_group = group.from_dict(serialized_group)
for ds_task in deserialized_group.tasks:
assert isinstance(ds_task, Signature)

@pytest.mark.xfail(reason="#6341")
def test_from_dict_deeper_deserialize(self):
inner_group = group([self.add.s(1, 2)] * 42)
outer_group = group([inner_group] * 42)
serialized_group = json.loads(json.dumps(outer_group))
deserialized_group = group.from_dict(serialized_group)
for outer_task in deserialized_group.tasks:
assert isinstance(outer_task, group)
for inner_task in outer_task.tasks:
assert isinstance(inner_task, Signature)

def test_call_empty_group(self):
x = group(app=self.app)
assert not len(x())
Expand Down Expand Up @@ -860,6 +879,75 @@ def chord_add():
_state.task_join_will_block = fixture_task_join_will_block
result.task_join_will_block = fixture_task_join_will_block

def test_from_dict(self):
header = self.add.s(1, 2)
original_chord = chord(header=header)
rebuilt_chord = chord.from_dict(dict(original_chord))
assert isinstance(rebuilt_chord, chord)

def test_from_dict_with_body(self):
header = body = self.add.s(1, 2)
original_chord = chord(header=header, body=body)
rebuilt_chord = chord.from_dict(dict(original_chord))
assert isinstance(rebuilt_chord, chord)

def test_from_dict_deep_deserialize(self):
header = body = self.add.s(1, 2)
original_chord = chord(header=header, body=body)
serialized_chord = json.loads(json.dumps(original_chord))
deserialized_chord = chord.from_dict(serialized_chord)
assert isinstance(deserialized_chord, chord)
for task in deserialized_chord.tasks:
assert isinstance(task, Signature)
assert isinstance(deserialized_chord.body, Signature)

@pytest.mark.xfail(reason="#6341")
def test_from_dict_deep_deserialize_group(self):
header = body = group([self.add.s(1, 2)]* 42)
original_chord = chord(header=header, body=body)
serialized_chord = json.loads(json.dumps(original_chord))
deserialized_chord = chord.from_dict(serialized_chord)
assert isinstance(deserialized_chord, chord)
# A header which is a group gets unpacked into the chord's `tasks`
for task in deserialized_chord.tasks:
assert isinstance(task, Signature)
# A body which is a group remains as it we passed in
assert isinstance(deserialized_chord.body, group)
for task in deserialized_chord.body.tasks:
assert isinstance(task, Signature)

@pytest.mark.xfail(reason="#6341")
def test_from_dict_deeper_deserialize_group(self):
inner_group = group([self.add.s(1, 2)] * 42)
header = body = group([inner_group] * 42)
original_chord = chord(header=header, body=body)
serialized_chord = json.loads(json.dumps(original_chord))
deserialized_chord = chord.from_dict(serialized_chord)
assert isinstance(deserialized_chord, chord)
# A header which is a group gets unpacked into the chord's `tasks`
for outer_task in deserialized_chord.tasks:
assert isinstance(outer_task, group)
for inner_task in outer_task.tasks:
assert isinstance(inner_task, Signature)
# A body which is a group remains as it we passed in
assert isinstance(deserialized_chord.body, group)
for outer_task in deserialized_chord.body.tasks:
assert isinstance(outer_task, group)
for inner_task in outer_task.tasks:
assert isinstance(inner_task, Signature)

def test_from_dict_deep_deserialize_chain(self):
header = body = chain([self.add.s(1, 2)] * 42)
original_chord = chord(header=header, body=body)
serialized_chord = json.loads(json.dumps(original_chord))
deserialized_chord = chord.from_dict(serialized_chord)
assert isinstance(deserialized_chord, chord)
# A header which is a group gets unpacked into the chord's `tasks`
for task in deserialized_chord.tasks:
assert isinstance(task, Signature)
# A body which is a chain gets mutatated into the hidden `_chain` class
assert isinstance(deserialized_chord.body, _chain)


class test_maybe_signature(CanvasCase):

Expand Down

0 comments on commit 25992ed

Please sign in to comment.