Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions taskbadger/systems/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ def __init__(self, auto_track_tasks=True, includes=None, excludes=None):
self.includes = includes
self.excludes = excludes

if auto_track_tasks:
# Importing this here ensures that the Celery signal handlers are registered
import taskbadger.celery # noqa

def track_task(self, task_name):
if not self.auto_track_tasks:
return False
Expand Down
36 changes: 36 additions & 0 deletions tests/test_celery_system_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
Celery runner thread will not have the configuration set.
"""
import logging
import sys
import weakref
from unittest import mock

import pytest
from celery.signals import task_prerun

from taskbadger.mug import Badger, Settings
from taskbadger.systems.celery import CelerySystemIntegration
Expand Down Expand Up @@ -77,3 +80,36 @@ def add_normal(self, a, b):
def test_task_name_matching(include, exclude, expected: bool):
integration = CelerySystemIntegration(includes=include, excludes=exclude)
assert integration.track_task("myapp.tasks.export_data") is expected


def test_celery_system_integration_connects_signals():
# clean the slate
_disconnect_signals()
if "taskbadger.celery" in sys.modules:
del sys.modules["taskbadger.celery"]
assert "taskbadger.celery" not in sys.modules

# this should result in the signals being connected
CelerySystemIntegration()

assert "taskbadger.celery" in sys.modules
_assert_signals()


def _assert_signals(check_is_connected=True):
# test that signals are actually connected
receivers = [rcv[1] for rcv in task_prerun.receivers]
receiver_names = set()
for receiver in receivers:
if isinstance(receiver, weakref.ReferenceType):
receiver = receiver()
receiver_names.add(f"{receiver.__module__}.{receiver.__name__}")
is_connected = "taskbadger.celery.task_prerun_handler" in receiver_names
assert check_is_connected == is_connected


def _disconnect_signals():
from taskbadger.celery import task_prerun_handler

task_prerun.disconnect(task_prerun_handler)
_assert_signals(check_is_connected=False)
12 changes: 9 additions & 3 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ def test_session_multiple_threads():
threads.append(t)
t.start()

for t in threads:
t.join(1)
assert not t.is_alive()
loopcount = 0
max_loops = len(threads) * 2
while len(threads):
threads[0].join(1)
if not threads[0].is_alive():
threads.pop(0)
loopcount += 1
if loopcount > max_loops:
pytest.fail("Threads did not complete")

assert len(clients) == num_tasks
assert len({id(s) for s in clients}) == 10
Expand Down