-
Notifications
You must be signed in to change notification settings - Fork 252
/
tree_queries.py
70 lines (55 loc) · 2.17 KB
/
tree_queries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from functools import cached_property
from django.core.cache import cache
from tree_queries.models import TreeNode
from tree_queries.query import TreeManager as TreeManager_, TreeQuerySet as TreeQuerySet_
from nautobot.core.models import BaseManager, querysets
class TreeQuerySet(TreeQuerySet_, querysets.RestrictedQuerySet):
"""
Combine django-tree-queries' TreeQuerySet with our RestrictedQuerySet for permissions enforcement.
"""
def max_tree_depth(self):
"""
Get the maximum depth of any tree in this queryset.
"""
deepest = self.with_tree_fields().extra(order_by=["-__tree.tree_depth"]).first()
if deepest is not None:
return deepest.tree_depth
return 0
class TreeManager(TreeManager_, BaseManager.from_queryset(TreeQuerySet)):
"""
Extend django-tree-queries' TreeManager to incorporate RestrictedQuerySet.
"""
_with_tree_fields = True
use_in_migrations = True
@cached_property
def max_depth(self):
return self.max_tree_depth()
class TreeModel(TreeNode):
"""
Nautobot-specific base class for models that exist in a self-referential tree.
"""
objects = TreeManager()
class Meta:
abstract = True
@property
def display(self):
"""
By default, TreeModels display their full ancestry for clarity.
As this is an expensive thing to calculate, we cache it for a few seconds in the case of repeated lookups.
"""
if not hasattr(self, "name"):
raise NotImplementedError("default TreeModel.display implementation requires a `name` attribute!")
cache_key = f"{self.__class__.__name__}.{self.id}.display"
display_str = cache.get(cache_key, "")
if display_str:
return display_str
try:
if self.parent is not None:
display_str = self.parent.display + " → "
except self.DoesNotExist:
# Expected to occur at times during bulk-delete operations
pass
finally:
display_str += self.name
cache.set(cache_key, display_str, 5)
return display_str # pylint: disable=lost-exception