-
Notifications
You must be signed in to change notification settings - Fork 4
/
apps.py
40 lines (34 loc) · 1.31 KB
/
apps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import importlib
from typing import List, Tuple, Union
from django.utils.translation import gettext_lazy as _
from django.apps import AppConfig
class MqueueConfig(AppConfig):
name = "mqueue"
default_auto_field = "django.db.models.BigAutoField"
verbose_name = _("Events queue")
def ready(self):
# models registration from settings
from django.conf import settings
from .tracking import mqueue_tracker
registered_models: Union[
Tuple[str, List[str]], List[Union[str, List[str]]]
] = getattr(settings, "MQUEUE_AUTOREGISTER", [])
for modtup in registered_models:
modpath = modtup[0]
level = modtup[1]
modsplit = modpath.split(".")
path = ".".join(modsplit[:-1])
modname = ".".join(modsplit[-1:])
try:
module = importlib.import_module(path)
model = getattr(module, modname)
mqueue_tracker.register(model, level) # type: ignore
except ImportError as e:
msg = "ERROR from Django Mqueue : can not import model "
msg += modpath
print(msg)
raise (e)
# watchers
from .watchers import init_watchers
from .conf import WATCH
init_watchers(WATCH)