Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ✨ add user token as permission constraints 2.0 #5278

Merged
1 change: 1 addition & 0 deletions changes/5278.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added User Token as permission constraints.
16 changes: 6 additions & 10 deletions nautobot/core/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from nautobot.core.utils.permissions import (
permission_is_exempt,
qs_filter_from_constraints,
resolve_permission,
resolve_permission_ct,
)
Expand Down Expand Up @@ -81,16 +82,11 @@ def has_perm(self, user_obj, perm, obj=None):
if model._meta.label_lower != ".".join((app_label, model_name)):
raise ValueError(f"Invalid permission {perm} for model {model}")

# Compile a query filter that matches all instances of the specified model
obj_perm_constraints = self.get_all_permissions(user_obj)[perm]
constraints = Q()
for perm_constraints in obj_perm_constraints:
if perm_constraints:
constraints |= Q(**perm_constraints)
else:
# Found ObjectPermission with null constraints; allow model-level access
constraints = Q()
break
# Compile a QuerySet filter that matches all instances of the specified model
tokens = {
"$user": user_obj,
}
constraints = qs_filter_from_constraints(self.get_all_permissions(user_obj)[perm], tokens)

# Permission to perform the requested action on the object depends on whether the specified object matches
# the specified constraints. Note that this check is made against the *database* record representing the object,
Expand Down
15 changes: 5 additions & 10 deletions nautobot/core/models/querysets.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,11 @@ def restrict(self, user, action="view"):
# Filter the queryset to include only objects with allowed attributes
else:
attrs = Q()
for perm_attrs in user._object_perm_cache[permission_required]:
if isinstance(perm_attrs, list):
for p in perm_attrs:
attrs |= Q(**p)
glennmatthews marked this conversation as resolved.
Show resolved Hide resolved
elif perm_attrs:
attrs |= Q(**perm_attrs)
else:
# Any permission with null constraints grants access to _all_ instances
attrs = Q()
break
tokens = {
"$user": user,
}

attrs = permissions.qs_filter_from_constraints(user._object_perm_cache[permission_required], tokens)
qs = self.filter(attrs)

return qs
Expand Down
64 changes: 63 additions & 1 deletion nautobot/core/tests/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

from nautobot.core.settings_funcs import sso_auth_enabled
from nautobot.core.testing import NautobotTestClient, TestCase
from nautobot.core.utils import lookup
from nautobot.dcim.models import Location, LocationType
from nautobot.extras.models import Status
from nautobot.extras.models import ObjectChange, Status
from nautobot.ipam.models import Namespace, Prefix
from nautobot.users.models import ObjectPermission, Token

Expand Down Expand Up @@ -453,3 +454,64 @@ def test_delete_object(self):
url = reverse("ipam-api:prefix-detail", kwargs={"pk": self.prefixes[0].pk})
response = self.client.delete(url, format="json", **self.header)
self.assertEqual(response.status_code, 204)

@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_user_token_constraints(self):
"""
Test enabling remote authentication with the default configuration.
glennmatthews marked this conversation as resolved.
Show resolved Hide resolved
"""
url = reverse("ipam-api:prefix-list")
data = [
{
"prefix": "10.0.9.0/24",
"namespace": self.namespace.pk,
"location": self.locations[1].pk,
"status": self.statuses[1].pk,
},
{
"prefix": "10.0.10.0/24",
"namespace": self.namespace.pk,
"location": self.locations[1].pk,
"status": self.statuses[1].pk,
},
]

obj_user2 = User.objects.create(username="new-user")
token_user2 = Token.objects.create(user=obj_user2)
header_user2 = {"HTTP_AUTHORIZATION": f"Token {token_user2.key}"}
# Assign object permission to both users to create Prefixes
obj_perm = ObjectPermission.objects.create(
name="Test ipam permission",
actions=["add"],
)
obj_perm.users.add(self.user, obj_user2)
obj_perm.object_types.add(ContentType.objects.get_for_model(Prefix))
# Create one Prefix object per user
self.client.post(url, data[0], format="json", **self.header)
self.client.post(url, data[1], format="json", **header_user2)

# Assign object permission to both users to view Change Logs, based on user token constraint
obj_perm = ObjectPermission.objects.create(
name="Test change log permission",
constraints={"user": "$user"},
actions=["view", "list"],
)
obj_perm.users.add(self.user, obj_user2)
obj_perm.object_types.add(ContentType.objects.get_for_model(ObjectChange))

# Retrieve all ObjectChange Log entries for every user
url = reverse(lookup.get_route_for_model(ObjectChange, "list", api=True))
response_user1 = self.client.get(url, **self.header)
response_user2 = self.client.get(url, **header_user2)

# Assert every user has permissions to view Change Logs
self.assertTrue(self.user.has_perms(["extras.view_objectchange", "extras.list_objectchange"]))
self.assertTrue(obj_user2.has_perms(["extras.view_objectchange", "extras.list_objectchange"]))

# Check against 1st user's response
self.assertEqual(response_user1.status_code, 200)
self.assertNotEqual(response_user1.data["count"], ObjectChange.objects.count())
glennmatthews marked this conversation as resolved.
Show resolved Hide resolved

# Check against 2nd user's response
self.assertEqual(response_user2.status_code, 200)
self.assertEqual(response_user2.data["count"], 1)
glennmatthews marked this conversation as resolved.
Show resolved Hide resolved
31 changes: 31 additions & 0 deletions nautobot/core/utils/permissions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q


def get_permission_for_model(model, action):
Expand Down Expand Up @@ -66,3 +67,33 @@ def permission_is_exempt(name):
return True

return False


def qs_filter_from_constraints(constraints, tokens=None):
"""
Construct filtered QuerySet from user constraints (tokens).
Args:
constraints (dict): User's permissions cached items.
tokens (dict, optional): user tokens. Defaults to a None.
Returns:
QuerySet object: A QuerySet of tuples or, an empty QuerySet if constraints are null.
"""
if tokens is None:
tokens = {}

def _replace_tokens(value, tokens):
if isinstance(value, list):
return list(map(lambda v: tokens.get(v, v), value))
return tokens.get(value, value)

params = Q()
for constraint in constraints:
if constraint:
params |= Q(**{k: _replace_tokens(v, tokens) for k, v in constraint.items()})
else:
# permit model level access, constrains are null
return Q()

return params
13 changes: 13 additions & 0 deletions nautobot/docs/user-guide/administration/guides/permissions.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ Location.objects.filter(
)
```

### Tokens

!!! note
The below mechanism is only applicable for applying constraints where the model has user relationship (eg Change Log, Job results).

Permissions constraints can be defined by using the special token `$user` to reference the current user at evaluation. This can be beneficial in order to restrict users to only view their own Change log entries for example. Such a constraint can be defined as:

```json
{
"user": "$user"
}
```

### Creating and Modifying Objects

The same sort of logic is in play when a user attempts to create or modify an object in Nautobot, with a twist. Once validation has completed, Nautobot starts an atomic database transaction to facilitate the change, and the object is created or saved normally. Next, still within the transaction, Nautobot issues a second query to retrieve the newly created/updated object, filtering the restricted queryset with the object's primary key. If this query fails to return the object, Nautobot knows that the new revision does not match the constraints imposed by the permission. The transaction is then rolled back, leaving the database in its original state prior to the change, and the user is informed of the violation.
Expand Down
5 changes: 3 additions & 2 deletions nautobot/users/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from django.contrib.auth.models import Group
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import FieldError, ValidationError
from django.db import models
from django.urls import reverse
from django.utils.html import escape, format_html

from nautobot.core.admin import NautobotModelAdmin
from nautobot.core.utils.permissions import qs_filter_from_constraints
from nautobot.extras.admin import order_content_types
from nautobot.users.models import AdminGroup, ObjectPermission, Token, User

Expand Down Expand Up @@ -241,7 +241,8 @@ def clean(self):
for ct in object_types:
model = ct.model_class()
try:
model.objects.filter(*[models.Q(**c) for c in constraints]).exists()
tokens = {"$user": 0} # setting token to null user ID
glennmatthews marked this conversation as resolved.
Show resolved Hide resolved
model.objects.filter(qs_filter_from_constraints(constraints, tokens)).exists()
except FieldError as e:
raise ValidationError({"constraints": f"Invalid filter for {model}: {e}"})

Expand Down