Skip to content

Commit

Permalink
Merge pull request #721 from kobotoolbox/kpi-3115-row-level-write-perms
Browse files Browse the repository at this point in the history
Add support to KPI row level write permissions
  • Loading branch information
jnm committed Jul 26, 2021
2 parents 488cb24 + 649ead6 commit 1215471
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 205 deletions.
119 changes: 102 additions & 17 deletions onadata/apps/api/permissions.py
Expand Up @@ -6,11 +6,15 @@
)

from onadata.libs.constants import (
CAN_DELETE_DATA_XFORM,
CAN_CHANGE_XFORM,
CAN_VALIDATE_XFORM,
CAN_DELETE_DATA_XFORM,
CAN_VIEW_XFORM,
)
from onadata.apps.logger.models import (
XForm,
OneTimeAuthRequest,
)
from onadata.apps.logger.models import XForm


class ViewDjangoObjectPermissions(DjangoObjectPermissions):
Expand Down Expand Up @@ -102,35 +106,116 @@ def has_permission(self, request, view):
lookup_field = view.lookup_field
lookup = view.kwargs.get(lookup_field)
# Allow anonymous users to access access shared data
allowed_anonymous_action = ['retrieve', 'enketo']
allowed_anonymous_actions = ['retrieve']
if lookup:
# We need to grant access to anonymous on list endpoint too when
# a form pk is specified. e.g. `/api/v1/data/{pk}.json
allowed_anonymous_action.append('list')
if request.method in SAFE_METHODS and \
view.action in allowed_anonymous_action:
allowed_anonymous_actions.append('list')

if (
request.method in SAFE_METHODS
and view.action in allowed_anonymous_actions
):
return True
return super().has_permission(request, view)

def has_object_permission(self, request, view, obj):
user = request.user
is_granted_once = OneTimeAuthRequest.grant_access(request)
# If a one-time authentication request token has been detected,
# we return its validity.
# Otherwise, the permissions validation keeps going as normal
if is_granted_once is not None:
return is_granted_once

# Grant access if user is owner or super user
if user.is_superuser or user == obj.user:
return True

allowed_anonymous_actions = ['retrieve', 'list']
# Allow anonymous users to access shared data
if request.method in SAFE_METHODS and \
view.action in ['retrieve', 'list', 'enketo']:
if obj.shared_data:
return True
if (
request.method in SAFE_METHODS
and view.action in allowed_anonymous_actions
and obj.shared_data
):
return True

if request.method == 'DELETE' and view.action == 'labels':
user = request.user
return user.has_perms([CAN_CHANGE_XFORM], obj)
# TODO Use a better solution than these mapping.
# E.g.:
# - Add new endpoints for enketo-edit and enketo-view -- DONE
# - Add new permission classes for each action -- DONE
# - Remove this kludgy solution
perms_actions_map = {
'bulk_delete': {
'DELETE': [f'logger.{CAN_DELETE_DATA_XFORM}'],
},
'bulk_validation_status': {
'PATCH': [f'logger.{CAN_VALIDATE_XFORM}'],
},
'labels': {
'DELETE': [f'logger.{CAN_CHANGE_XFORM}']
},
'validation_status': {
'DELETE': [f'logger.{CAN_VALIDATE_XFORM}'],
'PATCH': [f'logger.{CAN_VALIDATE_XFORM}'],
},
}

if request.method in ['PATCH', 'DELETE'] \
and view.action.endswith('validation_status'):
user = request.user
return user.has_perms([CAN_VALIDATE_XFORM], obj)
try:
required_perms = perms_actions_map[view.action][request.method]
except KeyError:
pass
else:
return user.has_perms(required_perms, obj)

return super().has_object_permission(request, view, obj)


class EnketoSubmissionEditPermissions(ObjectPermissionsWithViewRestricted):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def has_object_permission(self, request, view, obj):
user = request.user
required_perms = [f'logger.{CAN_CHANGE_XFORM}']
is_granted_once = OneTimeAuthRequest.grant_access(request)

if is_granted_once is not None:
return is_granted_once

# Grant access if user is owner or super user
if user.is_superuser or user == obj.user:
return True

return user.has_perms(required_perms, obj)


class EnketoSubmissionViewPermissions(ObjectPermissionsWithViewRestricted):

def has_permission(self, request, view):
return True

def has_object_permission(self, request, view, obj):
user = request.user
required_perms = [f'logger.{CAN_VIEW_XFORM}']
is_granted_once = OneTimeAuthRequest.grant_access(request)

if is_granted_once is not None:
return is_granted_once

# Grant access if user is owner or super user
if user.is_superuser or user == obj.user:
return True

# Allow anonymous users to access shared data
if obj.shared_data:
return True

return user.has_perms(required_perms, obj)


class MetaDataObjectPermissions(ObjectPermissionsWithViewRestricted):

# Users can read MetaData objects with 'view_xform' permissions and
Expand Down
55 changes: 27 additions & 28 deletions onadata/apps/api/tests/viewsets/test_data_viewset.py
Expand Up @@ -275,46 +275,45 @@ def test_data_list_filter_by_user(self):

def test_get_enketo_edit_url(self):
self._make_submissions()
view = DataViewSet.as_view({'get': 'enketo'})
request = self.factory.get('/', **self.extra)
formid = self.xform.pk
dataid = self.xform.instances.all().order_by('id')[0].pk

response = view(request, pk=formid, dataid=dataid)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# add data check
self.assertEqual(
response.data,
{'detail': 'return_url not provided.'})

request = self.factory.get(
'/',
data={'return_url': "http://test.io/test_url"}, **self.extra)

with HTTMock(enketo_mock):
for view_ in ['enketo', 'enketo_edit']:
# ensure both legacy `/enketo` and the new `/enketo_edit` endpoints
# do the same thing
view = DataViewSet.as_view({'get': view_})
formid = self.xform.pk
dataid = self.xform.instances.all().order_by('id')[0].pk

request = self.factory.get('/', **self.extra)
response = view(request, pk=formid, dataid=dataid)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# add data check
self.assertEqual(
response.data['url'],
"https://hmh2a.enketo.formhub.org")
response.data['detail'], '`return_url` not provided.'
)

request = self.factory.get(
'/',
data={'return_url': "http://test.io/test_url"},
**self.extra
)

with HTTMock(enketo_mock):
response = view(request, pk=formid, dataid=dataid)
self.assertEqual(
response.data['url'], "https://hmh2a.enketo.formhub.org"
)

def test_get_enketo_view_url(self):
self._make_submissions()
view = DataViewSet.as_view({'get': 'enketo'})
view = DataViewSet.as_view({'get': 'enketo_view'})
request = self.factory.get('/', **self.extra)
formid = self.xform.pk
dataid = self.xform.instances.all().order_by('id')[0].pk

request = self.factory.get(
'/',
data={'return_url': "http://test.io/test_url", 'action': 'view'},
**self.extra
)

with HTTMock(enketo_mock):
response = view(request, pk=formid, dataid=dataid)
self.assertEqual(
response.data['url'],
"https://hmh2a.enketo.formhub.org")
response.data['url'], 'https://hmh2a.enketo.formhub.org'
)

def test_get_form_public_data(self):
self._make_submissions()
Expand Down
15 changes: 7 additions & 8 deletions onadata/apps/api/tools.py
Expand Up @@ -133,22 +133,21 @@ class TagForm(forms.Form):
instance.save()


def add_validation_status_to_instance(request, instance):
def add_validation_status_to_instance(
request: Request, instance: 'Instance'
) -> bool:
"""
Saves instance validation status if it's valid (belong to XForm/Asset validation statuses)
:param request: REST framework's Request object
:param instance: Instance object
:return: Boolean
Save instance validation status if it is valid.
To be valid, it has to belong to XForm validation statuses
"""
validation_status_uid = request.data.get("validation_status.uid")
validation_status_uid = request.data.get('validation_status.uid')
success = False

# Payload must contain validation_status property.
if validation_status_uid:

validation_status = get_validation_status(
validation_status_uid, instance.asset, request.user.username)
validation_status_uid, instance.xform, request.user.username)
if validation_status:
instance.validation_status = validation_status
instance.save()
Expand Down

0 comments on commit 1215471

Please sign in to comment.