Skip to content

Commit

Permalink
[change] Add multitenancy in browsable API
Browse files Browse the repository at this point in the history
Also add tests for the same
  • Loading branch information
purhan committed Jan 2, 2021
1 parent 7accddc commit 8088a19
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 7 deletions.
6 changes: 4 additions & 2 deletions openwisp_ipam/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from rest_framework import serializers
from swapper import load_model

from .utils import FilterSerializerByOrgManaged

IpAddress = load_model('openwisp_ipam', 'IpAddress')
Subnet = load_model('openwisp_ipam', 'Subnet')

Expand All @@ -13,14 +15,14 @@ class Meta:
read_only_fields = ('created', 'modified')


class IpAddressSerializer(ValidatedModelSerializer):
class IpAddressSerializer(FilterSerializerByOrgManaged, ValidatedModelSerializer):
class Meta:
model = IpAddress
fields = '__all__'
read_only_fields = ('created', 'modified')


class SubnetSerializer(ValidatedModelSerializer):
class SubnetSerializer(FilterSerializerByOrgManaged, ValidatedModelSerializer):
class Meta:
model = Subnet
fields = '__all__'
Expand Down
29 changes: 29 additions & 0 deletions openwisp_ipam/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,32 @@ def get_user_organizations(self):
class AuthorizeCSVOrgManaged(AuthorizeCSVImport):
def get_user_organizations(self):
return self.request.user.organizations_managed


class FilterSerializerByOrganization:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.context['request'].user.is_superuser:
return
self.filter_fields()

def filter_fields(self):
raise NotImplementedError()


class FilterSerializerByOrgManaged(FilterSerializerByOrganization):
def filter_fields(self):
user = self.context['request'].user
organization_filter = user.organizations_managed
for field in self.fields:
if field == 'organization':
self.fields[field].queryset = self.fields[field].queryset.filter(
pk__in=organization_filter
)
else:
try:
self.fields[field].queryset = self.fields[field].queryset.filter(
organization__in=organization_filter
)
except AttributeError:
pass
21 changes: 16 additions & 5 deletions openwisp_ipam/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def get_csv_organization(self):
return org


class SerializerContextMixin:
def get_serializer_context(self):
return {'request': self.request}


class ListViewPagination(pagination.PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
Expand Down Expand Up @@ -171,7 +176,9 @@ def get(self, request, *args, **kwargs):
return Response(subnet.get_next_available_ip())


class IpAddressListCreateView(IpAddressOrgMixin, ListCreateAPIView):
class IpAddressListCreateView(
IpAddressOrgMixin, ListCreateAPIView, SerializerContextMixin
):
queryset = IpAddress.objects.none()
subnet_model = Subnet
serializer_class = IpAddressSerializer
Expand All @@ -185,15 +192,17 @@ def get_queryset(self):
return subnet.ipaddress_set.all().order_by('ip_address')


class SubnetListCreateView(FilterByOrganizationManaged, ListCreateAPIView):
class SubnetListCreateView(
FilterByOrganizationManaged, ListCreateAPIView, SerializerContextMixin
):
serializer_class = SubnetSerializer
authentication_classes = (BearerAuthentication, SessionAuthentication)
permission_classes = (DjangoModelPermissions,)
pagination_class = ListViewPagination
queryset = Subnet.objects.all()


class SubnetView(RetrieveUpdateDestroyAPIView):
class SubnetView(RetrieveUpdateDestroyAPIView, SerializerContextMixin):
serializer_class = SubnetSerializer
authentication_classes = (BearerAuthentication, SessionAuthentication)
permission_classes = (
Expand All @@ -203,7 +212,7 @@ class SubnetView(RetrieveUpdateDestroyAPIView):
queryset = Subnet.objects.all()


class IpAddressView(RetrieveUpdateDestroyAPIView):
class IpAddressView(RetrieveUpdateDestroyAPIView, SerializerContextMixin):
serializer_class = IpAddressSerializer
authentication_classes = (BearerAuthentication, SessionAuthentication)
permission_classes = (
Expand All @@ -226,7 +235,9 @@ def post(self, request, *args, **kwargs):
subnet = get_object_or_404(self.subnet_model, pk=kwargs['subnet_id'])
ip_address = subnet.request_ip(options)
if ip_address:
serializer = IpAddressSerializer(ip_address)
serializer = IpAddressSerializer(
ip_address, context={'request': self.request}
)
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data, status=status.HTTP_201_CREATED, headers=headers
Expand Down
64 changes: 64 additions & 0 deletions openwisp_ipam/tests/test_multitenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@

from . import CreateModelsMixin, PostDataMixin

# from openwisp_ipam.api.utils import (
# FilterByOrganization,
# FilterByParent,
# AuthorizeCSVImport,
# FilterSerializerByOrganization,
# )


User = get_user_model()
IpAddress = load_model('openwisp_ipam', 'IPAddress')
Subnet = load_model('openwisp_ipam', 'Subnet')
Expand Down Expand Up @@ -347,3 +355,59 @@ def test_export_subnet_api(self):
reverse('ipam:export-subnet', args=(subnet.id,))
)
self.assertEqual(response.status_code, 404)

def test_browsable_api_subnet_list(self):
# Ensures the correct filtering of `SubnetSerializer`
org_a = self._get_org(org_name='org_a')
org_b = self._get_org(org_name='org_b')
self._create_subnet(subnet='10.0.0.0/24', organization=org_a)
self._create_subnet(subnet='10.10.0.0/24', organization=org_b)

with self.subTest(
'Test `Organization` and `Master subnet` field filter for org manager'
):
self._login(username='user_a', password='tester')
response = self.client.get(
f'{reverse("ipam:subnet_list_create")}?format=api'
)
self.assertContains(response, 'org_a</option>')
self.assertContains(response, '10.0.0.0/24</option>')
self.assertNotContains(response, 'org_b</option>')
self.assertNotContains(response, '10.10.0.0/24</option>')

with self.subTest(
'Test `Organization` and `Master subnet` field filter for superuser'
):
self._login(username='superuser', password='tester')
response = self.client.get(
f'{reverse("ipam:subnet_list_create")}?format=api'
)
self.assertContains(response, 'org_a</option>')
self.assertContains(response, '10.0.0.0/24</option>')
self.assertContains(response, 'org_b</option>')
self.assertContains(response, '10.10.0.0/24</option>')

def test_browsable_api_ipaddress_list(self):
# Ensures the correct filtering of `IpAddressSerializer`
org_a = self._get_org(org_name='org_a')
org_b = self._get_org(org_name='org_b')
subnet_a = self._create_subnet(subnet='10.0.0.0/24', organization=org_a)
self._create_subnet(subnet='10.10.0.0/24', organization=org_b)

with self.subTest('Test `Subnet` dropdown filter for org manager'):
self._login(username='user_a', password='tester')
response = self.client.get(
f'{reverse("ipam:list_create_ip_address", args=(subnet_a.id,))}'
'?format=api'
)
self.assertContains(response, '10.0.0.0/24</option>')
self.assertNotContains(response, '10.10.0.0/24</option>')

with self.subTest('Test `Subnet` dropdown filter for superuser'):
self._login(username='superuser', password='tester')
response = self.client.get(
f'{reverse("ipam:list_create_ip_address", args=(subnet_a.id,))}'
'?format=api'
)
self.assertContains(response, '10.0.0.0/24</option>')
self.assertContains(response, '10.10.0.0/24</option>')

0 comments on commit 8088a19

Please sign in to comment.