Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #75 from postatum/102163798_use_new_permissins
Browse files Browse the repository at this point in the history
Change names of permissions in RAML
  • Loading branch information
jstoiko committed Sep 12, 2015
2 parents fcfc1ba + 95ac087 commit 7b37b1d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 74 deletions.
4 changes: 2 additions & 2 deletions docs/source/raml.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ In your ``securitySchemes``, you can add as many ACLs as you need. Then you can
settings:
collection: |
allow admin all
allow authenticated get
allow authenticated view
item: |
allow admin all
allow authenticated get
allow authenticated view
(...)
/items:
securedBy: [read_only_users]
Expand Down
48 changes: 18 additions & 30 deletions ramses/acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from nefertari.resource import PERMISSIONS
from nefertari.elasticsearch import ES

from .views import collection_methods, item_methods
from .utils import resolve_to_callable, is_callable_tag


Expand All @@ -27,28 +26,28 @@
ALLOW_ALL = (Allow, Everyone, ALL_PERMISSIONS)


def methods_to_perms(perms, methods_map):
""" Convert permissions ("perms") which are either HTTP methods or
the keyword 'all' into a set of valid Pyramid permissions.
def parse_permissions(perms):
""" Parse permissions ("perms") which are either exact permission
names or the keyword 'all'.
:param perms: List or comma-separated string of HTTP methods, or 'all'
:param methods_map: Map of HTTP methods to nefertari view methods
:param perms: List or comma-separated string of nefertari permission
names, or 'all'
"""
if isinstance(perms, six.string_types):
perms = perms.split(',')
perms = [perm.strip().lower() for perm in perms]
if 'all' in perms:
return ALL_PERMISSIONS
else:
try:
return [PERMISSIONS[methods_map[p]] for p in perms]
except KeyError:
valid_perms = set(PERMISSIONS.values())
if set(perms) - valid_perms:
raise ValueError(
'Unknown method name in permissions: {}. Valid methods: '
'{}'.format(perms, list(methods_map.keys())))
'Invalid ACL permission names. Valid permissions '
'are: {}'.format(', '.join(valid_perms)))
return perms


def parse_acl(acl_string, methods_map):
def parse_acl(acl_string):
""" Parse raw string :acl_string: of RAML-defined ACLs.
If :acl_string: is blank or None, all permissions are given.
Expand All @@ -59,10 +58,9 @@ def parse_acl(acl_string, methods_map):
ACEs in :acl_string: may be separated by newlines or semicolons.
Action, principal and permission lists must be separated by spaces.
Permissions must be comma-separated.
E.g. 'allow everyone get,post,patch' and 'deny authenticated delete'
E.g. 'allow everyone view,create,update' and 'deny authenticated delete'
:param acl_string: Raw RAML string containing defined ACEs.
:param methods_map: Map of HTTP methods to nefertari method handler names.
"""
if not acl_string:
return [ALLOW_ALL]
Expand Down Expand Up @@ -91,7 +89,7 @@ def parse_acl(acl_string, methods_map):
principal = princ_str

# Process permissions
permissions = methods_to_perms(perms, methods_map)
permissions = parse_permissions(perms)

result_acl.append((action, principal, permissions))

Expand All @@ -105,7 +103,7 @@ class BaseACL(CollectionACL):
_collection_acl = (ALLOW_ALL, )
_item_acl = (ALLOW_ALL, )

def _apply_callables(self, acl, methods_map, obj=None):
def _apply_callables(self, acl, obj=None):
""" Iterate over ACEs from :acl: and apply callable principals
if any.
Expand All @@ -117,8 +115,6 @@ def _apply_callables(self, acl, methods_map, obj=None):
Principals must return a single ACE or a list of ACEs.
:param acl: Sequence of valid Pyramid ACEs which will be processed
:param methods_map: Map of HTTP methods to nefertari view method names
(permissions)
:param obj: Object to be accessed via the ACL
"""
new_acl = []
Expand All @@ -130,23 +126,19 @@ def _apply_callables(self, acl, methods_map, obj=None):
continue
if not isinstance(ace[0], (list, tuple)):
ace = [ace]
ace = [(a, b, methods_to_perms(c, methods_map))
for a, b, c in ace]
ace = [(a, b, parse_permissions(c)) for a, b, c in ace]
else:
ace = [ace]
new_acl += ace
return tuple(new_acl)

def __acl__(self):
""" Apply callables to `self._collection_acl` and return result. """
return self._apply_callables(
acl=self._collection_acl,
methods_map=collection_methods)
return self._apply_callables(acl=self._collection_acl)

def generate_item_acl(self, item):
acl = self._apply_callables(
acl=self._item_acl,
methods_map=item_methods,
obj=item)
if acl is None:
acl = self.__acl__()
Expand Down Expand Up @@ -224,12 +216,8 @@ def generate_acl(config, model_cls, raml_resource, es_based=True):
sec_scheme = schemes[0]
log.debug('{} ACL scheme applied'.format(sec_scheme.name))
settings = sec_scheme.settings or {}
collection_acl = parse_acl(
acl_string=settings.get('collection'),
methods_map=collection_methods)
item_acl = parse_acl(
acl_string=settings.get('item'),
methods_map=item_methods)
collection_acl = parse_acl(acl_string=settings.get('collection'))
item_acl = parse_acl(acl_string=settings.get('item'))

class GeneratedACLBase(object):
item_model = model_cls
Expand Down
74 changes: 32 additions & 42 deletions tests/test_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,70 +11,67 @@


class TestACLHelpers(object):
methods_map = {'get': 'index', 'post': 'create'}

def test_methods_to_perms_all_permissions(self):
perms = acl.methods_to_perms('all,get,post', self.methods_map)
def test_parse_permissions_all_permissions(self):
perms = acl.parse_permissions('all,view,create')
assert perms is ALL_PERMISSIONS

def test_methods_to_perms_invalid_perm_name(self):
def test_parse_permissions_invalid_perm_name(self):
with pytest.raises(ValueError) as ex:
acl.methods_to_perms('foo,post', self.methods_map)
expected = ("Unknown method name in permissions: "
"['foo', 'post']")
acl.parse_permissions('foo,create')
expected = ('Invalid ACL permission names. Valid '
'permissions are: ')
assert expected in str(ex.value)

def test_methods_to_perms(self):
perms = acl.methods_to_perms('get', self.methods_map)
def test_parse_permissions(self):
perms = acl.parse_permissions('view')
assert perms == ['view']
perms = acl.methods_to_perms('get,post', self.methods_map)
perms = acl.parse_permissions('view,create')
assert sorted(perms) == ['create', 'view']

def test_parse_acl_no_string(self):
perms = acl.parse_acl('', self.methods_map)
perms = acl.parse_acl('')
assert perms == [acl.ALLOW_ALL]

def test_parse_acl_unknown_action(self):
with pytest.raises(ValueError) as ex:
acl.parse_acl('foobar admin all', self.methods_map)
acl.parse_acl('foobar admin all')
assert 'Unknown ACL action: foobar' in str(ex.value)

@patch.object(acl, 'methods_to_perms')
@patch.object(acl, 'parse_permissions')
def test_parse_acl_multiple_values(self, mock_perms):
mock_perms.return_value = 'Foo'
perms = acl.parse_acl(
'allow everyone read,write;allow authenticated all',
self.methods_map)
'allow everyone read,write;allow authenticated all')
mock_perms.assert_has_calls([
call(['read', 'write'], self.methods_map),
call(['all'], self.methods_map),
call(['read', 'write']),
call(['all']),
])
assert sorted(perms) == sorted([
(Allow, Everyone, 'Foo'),
(Allow, Authenticated, 'Foo'),
])

@patch.object(acl, 'methods_to_perms')
@patch.object(acl, 'parse_permissions')
def test_parse_acl_special_principal(self, mock_perms):
mock_perms.return_value = 'Foo'
perms = acl.parse_acl('allow everyone all', self.methods_map)
mock_perms.assert_called_once_with(['all'], self.methods_map)
perms = acl.parse_acl('allow everyone all')
mock_perms.assert_called_once_with(['all'])
assert perms == [(Allow, Everyone, 'Foo')]

@patch.object(acl, 'methods_to_perms')
@patch.object(acl, 'parse_permissions')
def test_parse_acl_group_principal(self, mock_perms):
mock_perms.return_value = 'Foo'
perms = acl.parse_acl('allow g:admin all', self.methods_map)
mock_perms.assert_called_once_with(['all'], self.methods_map)
perms = acl.parse_acl('allow g:admin all')
mock_perms.assert_called_once_with(['all'])
assert perms == [(Allow, 'g:admin', 'Foo')]

@patch.object(acl, 'resolve_to_callable')
@patch.object(acl, 'methods_to_perms')
@patch.object(acl, 'parse_permissions')
def test_parse_acl_callable_principal(self, mock_perms, mock_res):
mock_perms.return_value = 'Foo'
mock_res.return_value = 'registry callable'
perms = acl.parse_acl('allow {{my_user}} all', self.methods_map)
mock_perms.assert_called_once_with(['all'], self.methods_map)
perms = acl.parse_acl('allow {{my_user}} all')
mock_perms.assert_called_once_with(['all'])
mock_res.assert_called_once_with('{{my_user}}')
assert perms == [(Allow, 'registry callable', 'Foo')]

Expand Down Expand Up @@ -123,8 +120,8 @@ def test_correct_security_scheme(self, mock_parse):
raml_resource=raml_resource,
es_based=False)
mock_parse.assert_has_calls([
call(acl_string=4, methods_map=acl.collection_methods),
call(acl_string=7, methods_map=acl.item_methods),
call(acl_string=4),
call(acl_string=7),
])
instance = acl_cls(request=None)
assert instance._collection_acl == mock_parse()
Expand Down Expand Up @@ -163,34 +160,31 @@ def test_apply_callables_no_callables(self):
obj = acl.BaseACL('req')
new_acl = obj._apply_callables(
acl=[('foo', 'bar', 'baz')],
methods_map={'zoo': 1},
obj='obj')
assert new_acl == (('foo', 'bar', 'baz'),)

@patch.object(acl, 'methods_to_perms')
@patch.object(acl, 'parse_permissions')
def test_apply_callables(self, mock_meth):
mock_meth.return_value = '123'
principal = Mock(return_value=(7, 8, 9))
obj = acl.BaseACL('req')
new_acl = obj._apply_callables(
acl=[('foo', principal, 'bar')],
methods_map={'zoo': 1},
obj='obj')
assert new_acl == ((7, 8, '123'),)
principal.assert_called_once_with(
ace=('foo', principal, 'bar'),
request='req',
obj='obj')
mock_meth.assert_called_once_with(9, {'zoo': 1})
mock_meth.assert_called_once_with(9)

@patch.object(acl, 'methods_to_perms')
@patch.object(acl, 'parse_permissions')
def test_apply_callables_principal_returns_none(self, mock_meth):
mock_meth.return_value = '123'
principal = Mock(return_value=None)
obj = acl.BaseACL('req')
new_acl = obj._apply_callables(
acl=[('foo', principal, 'bar')],
methods_map={'zoo': 1},
obj='obj')
assert new_acl == ()
principal.assert_called_once_with(
Expand All @@ -199,28 +193,26 @@ def test_apply_callables_principal_returns_none(self, mock_meth):
obj='obj')
assert not mock_meth.called

@patch.object(acl, 'methods_to_perms')
@patch.object(acl, 'parse_permissions')
def test_apply_callables_principal_returns_list(self, mock_meth):
mock_meth.return_value = '123'
principal = Mock(return_value=[(7, 8, 9)])
obj = acl.BaseACL('req')
new_acl = obj._apply_callables(
acl=[('foo', principal, 'bar')],
methods_map={'zoo': 1},
obj='obj')
assert new_acl == ((7, 8, '123'),)
principal.assert_called_once_with(
ace=('foo', principal, 'bar'),
request='req',
obj='obj')
mock_meth.assert_called_once_with(9, {'zoo': 1})
mock_meth.assert_called_once_with(9)

def test_apply_callables_functional(self):
obj = acl.BaseACL('req')
principal = lambda ace, request, obj: [(Allow, Everyone, 'get')]
principal = lambda ace, request, obj: [(Allow, Everyone, 'view')]
new_acl = obj._apply_callables(
acl=[(Deny, principal, ALL_PERMISSIONS)],
methods_map=acl.item_methods,
)
assert new_acl == ((Allow, Everyone, ['view']),)

Expand All @@ -231,7 +223,6 @@ def test_magic_acl(self):
result = obj.__acl__()
obj._apply_callables.assert_called_once_with(
acl=[(1, 2, 3)],
methods_map=acl.collection_methods
)
assert result == obj._apply_callables()

Expand All @@ -242,7 +233,6 @@ def test_item_acl(self):
result = obj.item_acl('foobar')
obj._apply_callables.assert_called_once_with(
acl=[(1, 2, 3)],
methods_map=acl.item_methods,
obj='foobar'
)
assert result == obj._apply_callables()
Expand Down

0 comments on commit 7b37b1d

Please sign in to comment.