Skip to content

Commit

Permalink
[fix] Validate organization membership when importing subnets #77
Browse files Browse the repository at this point in the history
Closes #77

Co-authored-by: sankalp <sankalp123427@gmail.com>
  • Loading branch information
purhan and codesankalp committed Oct 13, 2021
1 parent ab63ccf commit c94ec0c
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 22 deletions.
22 changes: 19 additions & 3 deletions openwisp_ipam/admin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import csv
from collections import OrderedDict
from copy import deepcopy

import swapper
from django import forms
Expand All @@ -13,8 +14,10 @@
from django.utils.translation import gettext_lazy as _
from openwisp_users.multitenancy import MultitenantAdminMixin, MultitenantOrgFilter
from openwisp_utils.admin import TimeReadonlyAdminMixin
from rest_framework.exceptions import PermissionDenied
from reversion.admin import VersionAdmin

from .api.utils import AuthorizeCSVOrgManaged, CsvImportAPIException
from .api.views import HostsSet
from .base.forms import IpAddressImportForm
from .base.models import CsvImportException
Expand All @@ -25,7 +28,11 @@

@admin.register(Subnet)
class SubnetAdmin(
VersionAdmin, MultitenantAdminMixin, TimeReadonlyAdminMixin, ModelAdmin
VersionAdmin,
MultitenantAdminMixin,
TimeReadonlyAdminMixin,
ModelAdmin,
AuthorizeCSVOrgManaged,
):
app_label = 'openwisp_ipam'
change_form_template = 'admin/openwisp-ipam/subnet/change_form.html'
Expand Down Expand Up @@ -122,7 +129,7 @@ def export_view(self, request, subnet_id):
def import_view(self, request):
form = IpAddressImportForm()
form_template = 'admin/openwisp-ipam/subnet/import.html'
subnet_list_url = 'admin:{0}_subnet_changelist'.format(self.app_label)
subnet_list_url = f'admin:{self.app_label}_{Subnet._meta.model_name}_changelist'
context = {
'form': form,
'subnet_list_url': subnet_list_url,
Expand All @@ -133,6 +140,11 @@ def import_view(self, request):
context['form'] = form
if form.is_valid():
file = request.FILES['csvfile']
try:
self.assert_organization_permissions(request)
except (PermissionDenied, CsvImportAPIException) as e:
messages.error(request, str(e))
return redirect(reverse(context['subnet_list_url']))
if not file.name.endswith(('.csv', '.xls', '.xlsx')):
messages.error(request, _('File type not supported.'))
return render(request, form_template, context)
Expand All @@ -142,9 +154,13 @@ def import_view(self, request):
messages.error(request, str(e))
return render(request, form_template, context)
messages.success(request, _('Successfully imported data.'))
return redirect('/admin/{0}/subnet'.format(self.app_label))
return redirect(reverse(context['subnet_list_url']))
return render(request, form_template, context)

def get_csv_organization(self, request):
data = Subnet._get_csv_reader(self, deepcopy(request.FILES['csvfile']))
return Subnet._get_org(self, org_slug=list(data)[2][0].strip())

class Media:
js = (
'admin/js/jquery.init.js',
Expand Down
37 changes: 22 additions & 15 deletions openwisp_ipam/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
import swapper
from django.contrib.auth.models import Permission
from rest_framework.exceptions import PermissionDenied
from django.utils.translation import gettext_lazy as _
from rest_framework import status
from rest_framework.exceptions import APIException, PermissionDenied

from openwisp_ipam.base.models import CsvImportException

Organization = swapper.load_model('openwisp_users', 'Organization')


class CsvImportAPIException(APIException):
status_code = status.HTTP_400_BAD_REQUEST


class AuthorizeCSVImport:
def assert_organization_permissions(self, request):
if request.user.is_superuser:
return
try:
organization = self.get_csv_organization()
if str(organization.pk) in self.get_user_organizations():
organization = self.get_csv_organization(request)
if organization is None or str(
organization.pk
) in self.get_user_organizations(request):
return
except Organization.DoesNotExist:
# if organization in CSV doesn't exist, then check if
# user can create new organizations
permission = Permission.objects.filter(user=request.user).filter(
codename='add_organization'
)
if permission.exists():
return
raise PermissionDenied()
except CsvImportException as e:
raise CsvImportAPIException(str(e))
except IndexError:
raise CsvImportAPIException(_('Invalid data format'))
raise PermissionDenied(
_('You do not have permission to import data into this organization')
)

def get_csv_organization(self):
raise NotImplementedError()
Expand All @@ -31,5 +38,5 @@ def get_user_organizations(self):


class AuthorizeCSVOrgManaged(AuthorizeCSVImport):
def get_user_organizations(self):
return self.request.user.organizations_managed
def get_user_organizations(self, request):
return request.user.organizations_managed
7 changes: 3 additions & 4 deletions openwisp_ipam/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,11 @@ class ImportSubnetView(ProtectedAPIMixin, CreateAPIView, AuthorizeCSVOrgManaged)
queryset = Subnet.objects.none()
serializer_class = ImportSubnetSerializer

def get_csv_organization(self):
def get_csv_organization(self, request):
data = self.subnet_model._get_csv_reader(
self, deepcopy(self.request.FILES['csvfile'])
self, deepcopy(request.FILES['csvfile'])
)
org = Organization.objects.get(name=list(data)[2][0].strip())
return org
return self.subnet_model._get_org(self, org_slug=list(data)[2][0].strip())

def post(self, request, *args, **kwargs):
self.assert_organization_permissions(request)
Expand Down
61 changes: 61 additions & 0 deletions openwisp_ipam/tests/test_multitenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,41 @@ def test_multitenancy_subnet_queryset(self):
hidden=[data['subnet2']],
)

def test_import_subnet_permission(self):
self._create_multitenancy_test_env()
self.client.login(username='operator', password='tester')

with self.subTest('Import successful'):
csv_data = """Monachers - Matera,
10.27.1.0/24,
test1organization,
ip address,description
10.27.1.1,Monachers"""
csvfile = SimpleUploadedFile('data.csv', bytes(csv_data, 'utf-8'))
self.assertEqual(Subnet.objects.count(), 2)
response = self.client.post(
reverse('admin:ipam_import_subnet'), {'csvfile': csvfile}, follow=True,
)
self.assertContains(response, '<li class="success">Successfully imported')
self.assertEqual(response.status_code, 200)
self.assertEqual(Subnet.objects.count(), 3)
self.assertEqual(
Subnet.objects.filter(subnet='10.27.1.0/24').exists(), True
)

with self.subTest('Import unsuccessful'):
csv_data = """Monachers - Matera,
10.27.1.0/24,
test2organization,
ip address,description
10.27.1.1,Monachers"""
csvfile = SimpleUploadedFile('data.csv', bytes(csv_data, 'utf-8'))
response = self.client.post(
reverse('admin:ipam_import_subnet'), {'csvfile': csvfile}, follow=True,
)
self.assertContains(response, '<li class="error">You do not have')
self.assertEqual(Subnet.objects.count(), 3)


class TestMultitenantApi(
TestMultitenantAdminMixin, CreateModelsMixin, PostDataMixin, TestCase
Expand Down Expand Up @@ -305,6 +340,32 @@ def test_import_subnet_new_org(self):
)
self.assertEqual(response.status_code, 200)

def test_import_subnet_org_do_not_exist(self):
csv_data = """Monachers - Matera,
10.27.1.0/24,
monachers,
ip address,description
10.27.1.1,Monachers
10.27.1.254,Nano Beam 5 19AC"""
self._login(username='user_a', password='tester')
response = self.client.post(
reverse('ipam:import-subnet'),
{'csvfile': SimpleUploadedFile('data.csv', bytes(csv_data, 'utf-8'))},
)
self.assertEqual(response.status_code, 400)
self.assertIn('The import operation failed', str(response.data.get('detail')))

def test_invalid_csv_data(self):
csv_data = """Monachers - Matera,
10.27.1.0/24,"""
self._login(username='user_a', password='tester')
response = self.client.post(
reverse('ipam:import-subnet'),
{'csvfile': SimpleUploadedFile('data.csv', bytes(csv_data, 'utf-8'))},
)
self.assertEqual(response.status_code, 400)
self.assertEqual(str(response.data.get('detail')), 'Invalid data format')

def test_export_subnet_api(self):
org_a = self._get_org(org_name='org_a')
subnet = self._create_subnet(
Expand Down

0 comments on commit c94ec0c

Please sign in to comment.