-
Notifications
You must be signed in to change notification settings - Fork 252
/
querysets.py
185 lines (141 loc) · 7.98 KB
/
querysets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from django.db.models import Count, OuterRef, Q, QuerySet, Subquery
from django.db.models.functions import Coalesce
from nautobot.core.models.utils import deconstruct_composite_key
from nautobot.core.utils import permissions
from nautobot.core.utils.data import merge_dicts_without_collision
def count_related(model, field, *, filter_dict=None):
"""
Return a Subquery suitable for annotating a child object count.
Args:
model (Model): The related model to aggregate
field (str): The field on the related model which points back to the OuterRef model
filter_dict (dict): Optional dict of filter key/value pairs to limit the Subquery
"""
filters = {field: OuterRef("pk")}
if filter_dict:
filters.update(filter_dict)
manager = model.objects
if hasattr(model.objects, "without_tree_fields"):
manager = manager.without_tree_fields()
subquery = Subquery(manager.filter(**filters).order_by().values(field).annotate(c=Count("*")).values("c"))
return Coalesce(subquery, 0)
class CompositeKeyQuerySetMixin:
"""
Mixin to extend a base queryset class with support for filtering by `composite_key=...` as a virtual parameter.
Example:
>>> Location.objects.last().composite_key
'Durham;AMER'
Note that `Location.composite_key` is a `@property`, *not* a database field, and so would not normally be usable in
a `QuerySet` query, but because `RestrictedQuerySet` inherits from this mixin, the following "just works":
>>> Location.objects.get(composite_key="Durham;AMER")
<Location: Durham>
This is a shorthand for what would otherwise be a multi-step process:
>>> from nautobot.core.models.utils import deconstruct_composite_key
>>> deconstruct_composite_key("Durham;AMER")
['Durham', 'AMER']
>>> Location.natural_key_args_to_kwargs(['Durham', 'AMER'])
{'name': 'Durham', 'parent__name': 'AMER'}
>>> Location.objects.get(name="Durham", parent__name="AMER")
<Location: Durham>
This works for QuerySet `filter()` and `exclude()` as well:
>>> Location.objects.filter(composite_key='Durham;AMER')
<LocationQuerySet [<Location: Durham>]>
>>> Location.objects.exclude(composite_key='Durham;AMER')
<LocationQuerySet [<Location: AMER>]>
`composite_key` can also be used in combination with other query parameters:
>>> Location.objects.filter(composite_key='Durham;AMER', status__name='Planned')
<LocationQuerySet []>
It will raise a ValueError if the deconstructed composite key collides with another query parameter:
>>> Location.objects.filter(composite_key='Durham;AMER', name='Raleigh')
ValueError: Conflicting values for key "name": ('Durham', 'Raleigh')
See also `BaseModel.composite_key` and `utils.construct_composite_key()`/`utils.deconstruct_composite_key()`.
"""
def split_composite_key_into_kwargs(self, composite_key=None, **kwargs):
"""
Helper method abstracting a common need from filter() and exclude().
Subclasses may need to call this directly if they also have special processing of other filter/exclude params.
"""
if composite_key and isinstance(composite_key, str):
natural_key_values = deconstruct_composite_key(composite_key)
return merge_dicts_without_collision(self.model.natural_key_args_to_kwargs(natural_key_values), kwargs)
return kwargs
def filter(self, *args, composite_key=None, **kwargs):
"""
Explicitly handle `filter(composite_key="...")` by decomposing the composite-key into natural key parameters.
Counterpart to BaseModel.composite_key property.
"""
return super().filter(*args, **self.split_composite_key_into_kwargs(composite_key, **kwargs))
def exclude(self, *args, composite_key=None, **kwargs):
"""
Explicitly handle `exclude(composite_key="...")` by decomposing the composite-key into natural key parameters.
Counterpart to BaseModel.composite_key property.
"""
return super().exclude(*args, **self.split_composite_key_into_kwargs(composite_key, **kwargs))
class RestrictedQuerySet(CompositeKeyQuerySetMixin, QuerySet):
def restrict(self, user, action="view"):
"""
Filter the QuerySet to return only objects on which the specified user has been granted the specified
permission.
:param user: User instance
:param action: The action which must be permitted (e.g. "view" for "dcim.view_location"); default is 'view'
"""
# Resolve the full name of the required permission
app_label = self.model._meta.app_label
model_name = self.model._meta.model_name
permission_required = f"{app_label}.{action}_{model_name}"
# Bypass restriction for superusers and exempt views
if user.is_superuser or permissions.permission_is_exempt(permission_required):
qs = self
# User is anonymous or has not been granted the requisite permission
elif not user.is_authenticated or permission_required not in user.get_all_permissions():
qs = self.none()
# Filter the queryset to include only objects with allowed attributes
else:
attrs = Q()
for perm_attrs in user._object_perm_cache[permission_required]:
if isinstance(perm_attrs, list):
for p in perm_attrs:
attrs |= Q(**p)
elif perm_attrs:
attrs |= Q(**perm_attrs)
else:
# Any permission with null constraints grants access to _all_ instances
attrs = Q()
break
qs = self.filter(attrs)
return qs
def check_perms(self, user, *, instance=None, pk=None, action="view"):
"""
Check whether the given user can perform the given action with regard to the given instance of this model.
Either instance or pk must be specified, but not both.
Args:
user (User): User instance
instance (self.model): Instance of this queryset's model to check, if pk is not provided
pk (uuid): Primary key of the desired instance to check for, if instance is not provided
action (str): The action which must be permitted (e.g. "view" for "dcim.view_location"); default is 'view'
Returns:
(bool): Whether the action is permitted or not
"""
if instance is not None and pk is not None and instance.pk != pk:
raise RuntimeError("Should not be called with both instance and pk specified!")
if instance is None and pk is None:
raise ValueError("Either instance or pk must be specified!")
if instance is not None and not isinstance(instance, self.model):
raise TypeError(f"{instance} is not a {self.model}")
if pk is None:
pk = instance.pk
return self.restrict(user, action).filter(pk=pk).exists()
def distinct_values_list(self, *fields, flat=False, named=False):
"""Wrapper for `QuerySet.values_list()` that adds the `distinct()` query to return a list of unique values.
Note:
Uses `QuerySet.order_by()` to disable ordering, preventing unexpected behavior when using `values_list` described
in the Django `distinct()` documentation at https://docs.djangoproject.com/en/stable/ref/models/querysets/#distinct
Args:
*fields (str): Optional positional arguments which specify field names.
flat (bool): Set to True to return a QuerySet of individual values instead of a QuerySet of tuples.
Defaults to False.
named (bool): Set to True to return a QuerySet of namedtuples. Defaults to False.
Returns:
(QuerySet): A QuerySet of tuples or, if `flat` is set to True, a queryset of individual values.
"""
return self.order_by().values_list(*fields, flat=flat, named=named).distinct()