Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions taskbadger/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,18 @@ def scrape_urls(self, urls):

def apply_async(self, *args, **kwargs):
headers = kwargs.setdefault("headers", {})
headers["taskbadger_track"] = True
tb_kwargs = self._get_tb_kwargs(kwargs)
if kwargs.get("kwargs"):
# extract taskbadger options from task kwargs when supplied as keyword argument
tb_kwargs.update(self._get_tb_kwargs(kwargs["kwargs"]))
elif len(args) > 1 and isinstance(args[1], dict):
# extract taskbadger options from task kwargs when supplied as positional argument
tb_kwargs.update(self._get_tb_kwargs(args[1]))
headers[TB_KWARGS_ARG] = tb_kwargs

if Badger.is_configured():
headers["taskbadger_track"] = True
headers[TB_KWARGS_ARG] = tb_kwargs

result = super().apply_async(*args, **kwargs)

tb_task_id = result.info.get(TB_TASK_ID) if result.info else None
Expand Down Expand Up @@ -150,13 +153,13 @@ def taskbadger_task(self):
@before_task_publish.connect
def task_publish_handler(sender=None, headers=None, body=None, **kwargs):
headers = headers if "task" in headers else body
header_kwargs = headers.pop(TB_KWARGS_ARG, {}) # always remove TB headers
if sender.startswith("celery.") or not Badger.is_configured():
return

celery_system = Badger.current.settings.get_system_by_id("celery")
auto_track = celery_system and celery_system.track_task(sender)
manual_track = headers.get("taskbadger_track")
header_kwargs = headers.pop(TB_KWARGS_ARG, {})
if not manual_track and not auto_track:
return

Expand Down
9 changes: 8 additions & 1 deletion tests/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,14 @@ def add_no_tb(self, a, b):
with mock.patch("taskbadger.celery.create_task_safe") as create, mock.patch(
"taskbadger.celery.update_task_safe"
) as update:
result = add_no_tb.delay(2, 2)
result = add_no_tb.delay(
2,
2,
taskbadger_kwargs={
# add an action here to test serialization failure when Badger is not configured
"actions": [Action("stale", integration=EmailIntegration(to="test@test.com"))]
},
)
assert result.get(timeout=10, propagate=True) == 4

create.assert_not_called()
Expand Down
2 changes: 2 additions & 0 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def test_session_multiple_threads():


class TestThread(threading.Thread):
__test__ = False

def __init__(self, name, barrier, clients):
threading.Thread.__init__(self, name=name)
self.barrier = barrier
Expand Down