Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
398 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from .actions import CREATE_ACTION | ||
from .actions import DESTROY_ACTION | ||
from .actions import LIST_ACTION | ||
from .actions import READ_ACTION | ||
from .actions import UPDATE_ACTION | ||
from .checkers import ObjectLevelPermissionChecker | ||
from .checkers import PermissionChecker | ||
from .checkers import PermissionException | ||
from .views import ProtectedDetailView | ||
from .views import ProtectedListView | ||
from .views import ProtectedViewSet | ||
|
||
__all__ = [ | ||
CREATE_ACTION, | ||
READ_ACTION, | ||
UPDATE_ACTION, | ||
DESTROY_ACTION, | ||
LIST_ACTION, | ||
PermissionChecker, | ||
ObjectLevelPermissionChecker, | ||
PermissionException, | ||
ProtectedDetailView, | ||
ProtectedListView, | ||
ProtectedViewSet, | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
READ_ACTION = 'read' | ||
DESTROY_ACTION = 'destroy' | ||
UPDATE_ACTION = 'update' | ||
LIST_ACTION = 'list' | ||
CREATE_ACTION = 'create' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import abc | ||
import http | ||
import typing | ||
|
||
from flask_jsonapi import exceptions | ||
from flask_jsonapi.permissions import actions | ||
|
||
|
||
class PermissionException(exceptions.JsonApiException): | ||
status = http.HTTPStatus.FORBIDDEN.value | ||
title = http.HTTPStatus.FORBIDDEN.phrase | ||
|
||
def __init__(self, source=None, detail=None, *args, title=None, status=None, **kwargs): | ||
super().__init__(source, detail, *args, title=title, status=status, **kwargs) | ||
|
||
|
||
Resource = typing.TypeVar('Resource') | ||
|
||
|
||
class PermissionChecker(abc.ABC): | ||
@abc.abstractmethod | ||
def check_read_permission(self, *, resource: Resource) -> Resource: | ||
""" | ||
:param resource: resource for which we want to check permissions | ||
:return: resource if user has permission, exception in other case | ||
""" | ||
pass | ||
|
||
@abc.abstractmethod | ||
def check_destroy_permission(self, *, resource: Resource) -> Resource: | ||
""" | ||
:param resource: resource for which we want to check permissions | ||
:return: resource if user has permission, exception in other case | ||
""" | ||
pass | ||
|
||
@abc.abstractmethod | ||
def check_update_permission(self, *, resource: Resource, data: dict) -> typing.Tuple[Resource, dict]: | ||
""" | ||
:param resource: resource for which we want to check permissions | ||
:param data: new data | ||
:return: new data if user has permission, exception in other case | ||
""" | ||
pass | ||
|
||
def check_list_permission(self, *, resources: typing.List[Resource]) -> typing.List[Resource]: | ||
""" | ||
:param resources: resources list for which we want to check permissions | ||
:return: filtered resources list | ||
""" | ||
result = [] | ||
for resource in resources: | ||
try: | ||
result.append(self.check_read_permission(resource=resource)) | ||
except PermissionException: | ||
pass | ||
return result | ||
|
||
@abc.abstractmethod | ||
def check_create_permission(self, *, data: dict) -> dict: | ||
""" | ||
:param data: data for which we want to check permissions | ||
:return: data if user has permission, exception in other case | ||
""" | ||
pass | ||
|
||
|
||
class ObjectLevelPermissionChecker(PermissionChecker): | ||
class ObjectIdNotFoundInData(Exception): | ||
pass | ||
|
||
@property | ||
@abc.abstractmethod | ||
def object_id_attribute(self) -> str: | ||
pass | ||
|
||
def check_create_permission(self, *, data: dict) -> dict: | ||
return self._check_data_permission(data=data, action=actions.CREATE_ACTION) | ||
|
||
def check_read_permission(self, *, resource: Resource) -> Resource: | ||
return self._check_resource_permission(resource=resource, action=actions.READ_ACTION) | ||
|
||
def check_update_permission(self, *, resource: Resource, data: dict) -> typing.Tuple[Resource, dict]: | ||
self._check_resource_permission(resource=resource, action=actions.UPDATE_ACTION) | ||
try: | ||
self._check_data_permission(data=data, action=actions.UPDATE_ACTION) | ||
except self.ObjectIdNotFoundInData: | ||
pass | ||
return resource, data | ||
|
||
def check_destroy_permission(self, *, resource) -> Resource: | ||
return self._check_resource_permission(resource=resource, action=actions.DESTROY_ACTION) | ||
|
||
def _check_resource_permission(self, *, resource: Resource, action: str) -> Resource: | ||
object_id = self.get_object_id_from_resource(resource) | ||
self.check_or_raise(object_id=object_id, action=action) | ||
return resource | ||
|
||
def _check_data_permission(self, *, data: dict, action: str): | ||
object_id = self.get_object_id_from_data(data) | ||
self.check_or_raise(object_id=object_id, action=action) | ||
return data | ||
|
||
def get_object_id_from_resource(self, resource: Resource): | ||
return getattr(resource, self.object_id_attribute) | ||
|
||
def get_object_id_from_data(self, data: dict): | ||
try: | ||
return data[self.object_id_attribute] | ||
except KeyError: | ||
raise self.ObjectIdNotFoundInData | ||
|
||
def check_or_raise(self, *, object_id, action: str): | ||
has_permission = self.has_permission( | ||
object_id=object_id, | ||
action=action, | ||
) | ||
if not has_permission: | ||
raise PermissionException | ||
|
||
@abc.abstractmethod | ||
def has_permission(self, object_id, action) -> bool: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import abc | ||
import logging | ||
|
||
from flask_jsonapi import ResourceRepositoryDetailView | ||
from flask_jsonapi import ResourceRepositoryListView | ||
from flask_jsonapi import ResourceRepositoryViewSet | ||
from flask_jsonapi import descriptors | ||
from flask_jsonapi import exceptions | ||
|
||
from . import checkers | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ProtectedDetailView(ResourceRepositoryDetailView): | ||
def __init__(self, permission_checker: checkers.PermissionChecker, **kwargs): | ||
super().__init__(**kwargs) | ||
self.permission_checker = permission_checker | ||
|
||
def read(self, id): | ||
resource = super().read(id) | ||
resource = self.permission_checker.check_read_permission(resource=resource) | ||
return resource | ||
|
||
def destroy(self, id): | ||
resource = super().read(id) | ||
resource = self.permission_checker.check_destroy_permission(resource=resource) | ||
return super().destroy(resource.id) | ||
|
||
def update(self, id, data, **kwargs): | ||
resource = super().read(id) | ||
resource, data = self.permission_checker.check_update_permission(resource=resource, data=data) | ||
return super().update(id, data, **kwargs) | ||
|
||
|
||
class ProtectedListView(ResourceRepositoryListView, abc.ABC): | ||
def __init__(self, permission_checker: checkers.PermissionChecker, **kwargs): | ||
super().__init__(**kwargs) | ||
self.permission_checker = permission_checker | ||
|
||
def get(self, *args, **kwargs): | ||
try: | ||
return super().get(*args, **kwargs) | ||
except checkers.PermissionException: | ||
raise exceptions.ForbiddenError("You don't have permissions") | ||
|
||
def post(self, *args, **kwargs): | ||
try: | ||
return super().post(*args, **kwargs) | ||
except checkers.PermissionException: | ||
raise exceptions.ForbiddenError("You don't have permissions") | ||
|
||
def read_many(self, filters: dict, **kwargs): | ||
filters = self._apply_permission_filter(filters) | ||
resources = super().read_many(filters, **kwargs) | ||
length_before_permission = len(resources) | ||
resources = self.permission_checker.check_list_permission(resources=resources) | ||
if length_before_permission != len(resources): | ||
logger.warning('No permission for some items!', extra=kwargs) | ||
return resources | ||
|
||
@abc.abstractmethod | ||
def _apply_permission_filter(self, filters: dict) -> dict: | ||
pass | ||
|
||
def create(self, data: dict, **kwargs): | ||
data = self.permission_checker.check_create_permission(data=data) | ||
return super().create(data, **kwargs) | ||
|
||
|
||
class ProtectedViewSet(ResourceRepositoryViewSet): | ||
detail_view_cls = ProtectedDetailView | ||
list_view_cls: ProtectedListView | ||
permission_checker: checkers.PermissionChecker = descriptors.NotImplementedProperty('permission_checker') | ||
|
||
def get_views_kwargs(self): | ||
kwargs = super().get_views_kwargs() | ||
kwargs['permission_checker'] = self.permission_checker | ||
return kwargs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Oops, something went wrong.