Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[api] Implement multitenancy in API #61 #76

Merged
merged 12 commits into from
Jan 28, 2021
6 changes: 4 additions & 2 deletions openwisp_ipam/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from rest_framework import serializers
from swapper import load_model

from .utils import FilterSerializerByOrgManaged

IpAddress = load_model('openwisp_ipam', 'IpAddress')
Subnet = load_model('openwisp_ipam', 'Subnet')

Expand All @@ -13,14 +15,14 @@ class Meta:
read_only_fields = ('created', 'modified')


class IpAddressSerializer(ValidatedModelSerializer):
class IpAddressSerializer(FilterSerializerByOrgManaged, ValidatedModelSerializer):
class Meta:
model = IpAddress
fields = '__all__'
read_only_fields = ('created', 'modified')


class SubnetSerializer(ValidatedModelSerializer):
class SubnetSerializer(FilterSerializerByOrgManaged, ValidatedModelSerializer):
class Meta:
model = Subnet
fields = '__all__'
Expand Down
29 changes: 29 additions & 0 deletions openwisp_ipam/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,32 @@ def get_user_organizations(self):
class AuthorizeCSVOrgManaged(AuthorizeCSVImport):
def get_user_organizations(self):
return self.request.user.organizations_managed


class FilterSerializerByOrganization:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.context['request'].user.is_superuser:
return
self.filter_fields()

def filter_fields(self):
raise NotImplementedError()


class FilterSerializerByOrgManaged(FilterSerializerByOrganization):
def filter_fields(self):
user = self.context['request'].user
organization_filter = user.organizations_managed
for field in self.fields:
if field == 'organization':
self.fields[field].queryset = self.fields[field].queryset.filter(
pk__in=organization_filter
)
else:
purhan marked this conversation as resolved.
Show resolved Hide resolved
try:
self.fields[field].queryset = self.fields[field].queryset.filter(
organization__in=organization_filter
)
except AttributeError:
pass
21 changes: 16 additions & 5 deletions openwisp_ipam/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def get_csv_organization(self):
return org
purhan marked this conversation as resolved.
Show resolved Hide resolved


class SerializerContextMixin:
def get_serializer_context(self):
return {'request': self.request}
purhan marked this conversation as resolved.
Show resolved Hide resolved


class ListViewPagination(pagination.PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
Expand Down Expand Up @@ -171,7 +176,9 @@ def get(self, request, *args, **kwargs):
return Response(subnet.get_next_available_ip())


class IpAddressListCreateView(IpAddressOrgMixin, ListCreateAPIView):
class IpAddressListCreateView(
IpAddressOrgMixin, ListCreateAPIView, SerializerContextMixin
):
queryset = IpAddress.objects.none()
subnet_model = Subnet
serializer_class = IpAddressSerializer
Expand All @@ -185,15 +192,17 @@ def get_queryset(self):
return subnet.ipaddress_set.all().order_by('ip_address')


class SubnetListCreateView(FilterByOrganizationManaged, ListCreateAPIView):
class SubnetListCreateView(
FilterByOrganizationManaged, ListCreateAPIView, SerializerContextMixin
):
serializer_class = SubnetSerializer
authentication_classes = (BearerAuthentication, SessionAuthentication)
permission_classes = (DjangoModelPermissions,)
pagination_class = ListViewPagination
queryset = Subnet.objects.all()


class SubnetView(RetrieveUpdateDestroyAPIView):
class SubnetView(RetrieveUpdateDestroyAPIView, SerializerContextMixin):
serializer_class = SubnetSerializer
authentication_classes = (BearerAuthentication, SessionAuthentication)
permission_classes = (
Expand All @@ -203,7 +212,7 @@ class SubnetView(RetrieveUpdateDestroyAPIView):
queryset = Subnet.objects.all()


class IpAddressView(RetrieveUpdateDestroyAPIView):
class IpAddressView(RetrieveUpdateDestroyAPIView, SerializerContextMixin):
serializer_class = IpAddressSerializer
authentication_classes = (BearerAuthentication, SessionAuthentication)
permission_classes = (
Expand All @@ -226,7 +235,9 @@ def post(self, request, *args, **kwargs):
subnet = get_object_or_404(self.subnet_model, pk=kwargs['subnet_id'])
ip_address = subnet.request_ip(options)
if ip_address:
serializer = IpAddressSerializer(ip_address)
serializer = IpAddressSerializer(
ip_address, context={'request': self.request}
)
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data, status=status.HTTP_201_CREATED, headers=headers
Expand Down
56 changes: 56 additions & 0 deletions openwisp_ipam/tests/test_multitenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,59 @@ def test_export_subnet_api(self):
reverse('ipam:export-subnet', args=(subnet.id,))
)
self.assertEqual(response.status_code, 404)

def test_browsable_api_subnet_list(self):
# Ensures the correct filtering of `SubnetSerializer`
org_a = self._get_org(org_name='org_a')
org_b = self._get_org(org_name='org_b')
self._create_subnet(subnet='10.0.0.0/24', organization=org_a)
self._create_subnet(subnet='10.10.0.0/24', organization=org_b)

with self.subTest(
'Test `Organization` and `Master subnet` field filter for org manager'
):
self._login(username='user_a', password='tester')
response = self.client.get(
f'{reverse("ipam:subnet_list_create")}?format=api'
)
self.assertContains(response, 'org_a</option>')
self.assertContains(response, '10.0.0.0/24</option>')
self.assertNotContains(response, 'org_b</option>')
self.assertNotContains(response, '10.10.0.0/24</option>')

with self.subTest(
'Test `Organization` and `Master subnet` field filter for superuser'
):
self._login(username='superuser', password='tester')
response = self.client.get(
f'{reverse("ipam:subnet_list_create")}?format=api'
)
self.assertContains(response, 'org_a</option>')
self.assertContains(response, '10.0.0.0/24</option>')
self.assertContains(response, 'org_b</option>')
self.assertContains(response, '10.10.0.0/24</option>')

def test_browsable_api_ipaddress_list(self):
# Ensures the correct filtering of `IpAddressSerializer`
org_a = self._get_org(org_name='org_a')
org_b = self._get_org(org_name='org_b')
subnet_a = self._create_subnet(subnet='10.0.0.0/24', organization=org_a)
self._create_subnet(subnet='10.10.0.0/24', organization=org_b)

with self.subTest('Test `Subnet` dropdown filter for org manager'):
self._login(username='user_a', password='tester')
response = self.client.get(
f'{reverse("ipam:list_create_ip_address", args=(subnet_a.id,))}'
'?format=api'
)
self.assertContains(response, '10.0.0.0/24</option>')
self.assertNotContains(response, '10.10.0.0/24</option>')

with self.subTest('Test `Subnet` dropdown filter for superuser'):
self._login(username='superuser', password='tester')
response = self.client.get(
f'{reverse("ipam:list_create_ip_address", args=(subnet_a.id,))}'
'?format=api'
)
self.assertContains(response, '10.0.0.0/24</option>')
self.assertContains(response, '10.10.0.0/24</option>')