From 8088a19c8725e2874656e10d03675f9c12a67158 Mon Sep 17 00:00:00 2001 From: purhan Date: Sat, 2 Jan 2021 15:22:24 +0530 Subject: [PATCH] [change] Add multitenancy in browsable API Also add tests for the same --- openwisp_ipam/api/serializers.py | 6 ++- openwisp_ipam/api/utils.py | 29 +++++++++++ openwisp_ipam/api/views.py | 21 ++++++-- openwisp_ipam/tests/test_multitenant.py | 64 +++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 7 deletions(-) diff --git a/openwisp_ipam/api/serializers.py b/openwisp_ipam/api/serializers.py index b88fa63..c1615dc 100644 --- a/openwisp_ipam/api/serializers.py +++ b/openwisp_ipam/api/serializers.py @@ -2,6 +2,8 @@ from rest_framework import serializers from swapper import load_model +from .utils import FilterSerializerByOrgManaged + IpAddress = load_model('openwisp_ipam', 'IpAddress') Subnet = load_model('openwisp_ipam', 'Subnet') @@ -13,14 +15,14 @@ class Meta: read_only_fields = ('created', 'modified') -class IpAddressSerializer(ValidatedModelSerializer): +class IpAddressSerializer(FilterSerializerByOrgManaged, ValidatedModelSerializer): class Meta: model = IpAddress fields = '__all__' read_only_fields = ('created', 'modified') -class SubnetSerializer(ValidatedModelSerializer): +class SubnetSerializer(FilterSerializerByOrgManaged, ValidatedModelSerializer): class Meta: model = Subnet fields = '__all__' diff --git a/openwisp_ipam/api/utils.py b/openwisp_ipam/api/utils.py index 08dfb08..28fdb53 100644 --- a/openwisp_ipam/api/utils.py +++ b/openwisp_ipam/api/utils.py @@ -83,3 +83,32 @@ def get_user_organizations(self): class AuthorizeCSVOrgManaged(AuthorizeCSVImport): def get_user_organizations(self): return self.request.user.organizations_managed + + +class FilterSerializerByOrganization: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self.context['request'].user.is_superuser: + return + self.filter_fields() + + def filter_fields(self): + raise NotImplementedError() + + +class FilterSerializerByOrgManaged(FilterSerializerByOrganization): + def filter_fields(self): + user = self.context['request'].user + organization_filter = user.organizations_managed + for field in self.fields: + if field == 'organization': + self.fields[field].queryset = self.fields[field].queryset.filter( + pk__in=organization_filter + ) + else: + try: + self.fields[field].queryset = self.fields[field].queryset.filter( + organization__in=organization_filter + ) + except AttributeError: + pass diff --git a/openwisp_ipam/api/views.py b/openwisp_ipam/api/views.py index 8619512..bad1717 100644 --- a/openwisp_ipam/api/views.py +++ b/openwisp_ipam/api/views.py @@ -56,6 +56,11 @@ def get_csv_organization(self): return org +class SerializerContextMixin: + def get_serializer_context(self): + return {'request': self.request} + + class ListViewPagination(pagination.PageNumberPagination): page_size = 10 page_size_query_param = 'page_size' @@ -171,7 +176,9 @@ def get(self, request, *args, **kwargs): return Response(subnet.get_next_available_ip()) -class IpAddressListCreateView(IpAddressOrgMixin, ListCreateAPIView): +class IpAddressListCreateView( + IpAddressOrgMixin, ListCreateAPIView, SerializerContextMixin +): queryset = IpAddress.objects.none() subnet_model = Subnet serializer_class = IpAddressSerializer @@ -185,7 +192,9 @@ def get_queryset(self): return subnet.ipaddress_set.all().order_by('ip_address') -class SubnetListCreateView(FilterByOrganizationManaged, ListCreateAPIView): +class SubnetListCreateView( + FilterByOrganizationManaged, ListCreateAPIView, SerializerContextMixin +): serializer_class = SubnetSerializer authentication_classes = (BearerAuthentication, SessionAuthentication) permission_classes = (DjangoModelPermissions,) @@ -193,7 +202,7 @@ class SubnetListCreateView(FilterByOrganizationManaged, ListCreateAPIView): queryset = Subnet.objects.all() -class SubnetView(RetrieveUpdateDestroyAPIView): +class SubnetView(RetrieveUpdateDestroyAPIView, SerializerContextMixin): serializer_class = SubnetSerializer authentication_classes = (BearerAuthentication, SessionAuthentication) permission_classes = ( @@ -203,7 +212,7 @@ class SubnetView(RetrieveUpdateDestroyAPIView): queryset = Subnet.objects.all() -class IpAddressView(RetrieveUpdateDestroyAPIView): +class IpAddressView(RetrieveUpdateDestroyAPIView, SerializerContextMixin): serializer_class = IpAddressSerializer authentication_classes = (BearerAuthentication, SessionAuthentication) permission_classes = ( @@ -226,7 +235,9 @@ def post(self, request, *args, **kwargs): subnet = get_object_or_404(self.subnet_model, pk=kwargs['subnet_id']) ip_address = subnet.request_ip(options) if ip_address: - serializer = IpAddressSerializer(ip_address) + serializer = IpAddressSerializer( + ip_address, context={'request': self.request} + ) headers = self.get_success_headers(serializer.data) return Response( serializer.data, status=status.HTTP_201_CREATED, headers=headers diff --git a/openwisp_ipam/tests/test_multitenant.py b/openwisp_ipam/tests/test_multitenant.py index 77a0d4c..81fea87 100644 --- a/openwisp_ipam/tests/test_multitenant.py +++ b/openwisp_ipam/tests/test_multitenant.py @@ -7,6 +7,14 @@ from . import CreateModelsMixin, PostDataMixin +# from openwisp_ipam.api.utils import ( +# FilterByOrganization, +# FilterByParent, +# AuthorizeCSVImport, +# FilterSerializerByOrganization, +# ) + + User = get_user_model() IpAddress = load_model('openwisp_ipam', 'IPAddress') Subnet = load_model('openwisp_ipam', 'Subnet') @@ -347,3 +355,59 @@ def test_export_subnet_api(self): reverse('ipam:export-subnet', args=(subnet.id,)) ) self.assertEqual(response.status_code, 404) + + def test_browsable_api_subnet_list(self): + # Ensures the correct filtering of `SubnetSerializer` + org_a = self._get_org(org_name='org_a') + org_b = self._get_org(org_name='org_b') + self._create_subnet(subnet='10.0.0.0/24', organization=org_a) + self._create_subnet(subnet='10.10.0.0/24', organization=org_b) + + with self.subTest( + 'Test `Organization` and `Master subnet` field filter for org manager' + ): + self._login(username='user_a', password='tester') + response = self.client.get( + f'{reverse("ipam:subnet_list_create")}?format=api' + ) + self.assertContains(response, 'org_a') + self.assertContains(response, '10.0.0.0/24') + self.assertNotContains(response, 'org_b') + self.assertNotContains(response, '10.10.0.0/24') + + with self.subTest( + 'Test `Organization` and `Master subnet` field filter for superuser' + ): + self._login(username='superuser', password='tester') + response = self.client.get( + f'{reverse("ipam:subnet_list_create")}?format=api' + ) + self.assertContains(response, 'org_a') + self.assertContains(response, '10.0.0.0/24') + self.assertContains(response, 'org_b') + self.assertContains(response, '10.10.0.0/24') + + def test_browsable_api_ipaddress_list(self): + # Ensures the correct filtering of `IpAddressSerializer` + org_a = self._get_org(org_name='org_a') + org_b = self._get_org(org_name='org_b') + subnet_a = self._create_subnet(subnet='10.0.0.0/24', organization=org_a) + self._create_subnet(subnet='10.10.0.0/24', organization=org_b) + + with self.subTest('Test `Subnet` dropdown filter for org manager'): + self._login(username='user_a', password='tester') + response = self.client.get( + f'{reverse("ipam:list_create_ip_address", args=(subnet_a.id,))}' + '?format=api' + ) + self.assertContains(response, '10.0.0.0/24') + self.assertNotContains(response, '10.10.0.0/24') + + with self.subTest('Test `Subnet` dropdown filter for superuser'): + self._login(username='superuser', password='tester') + response = self.client.get( + f'{reverse("ipam:list_create_ip_address", args=(subnet_a.id,))}' + '?format=api' + ) + self.assertContains(response, '10.0.0.0/24') + self.assertContains(response, '10.10.0.0/24')