Skip to content

Commit

Permalink
[1669] New test_publisher_auth functions (currently for Group) added …
Browse files Browse the repository at this point in the history
…and fixes to auth based on failing tests.
  • Loading branch information
rossjones committed Feb 1, 2012
1 parent 363ad6c commit cd286bc
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 8 deletions.
13 changes: 9 additions & 4 deletions ckan/logic/auth/publisher/create.py
Expand Up @@ -33,16 +33,21 @@ def package_relationship_create(context, data_dict):

def group_create(context, data_dict=None):
model = context['model']
user = context['user']
user = context['user']

if not user:
return {'success': False, 'msg': _('User is not authorized to create groups') }

# TODO: We need to check whether this group is being created within another group
try:
group = get_group_object( context )
except NotFound:
return { 'success' : True }

usergrps = User.get( user ).get_groups('publisher')
authorized = _groups_intersect( usergrps, group.get_groups('publisher') )
userobj = model.User.get( user )
if not userobj:
return {'success': False, 'msg': _('User %s not authorized to create groups') % str(user)}

authorized = _groups_intersect( userobj.get_groups('publisher'), [group] )
if not authorized:
return {'success': False, 'msg': _('User %s not authorized to create groups') % str(user)}
else:
Expand Down
11 changes: 8 additions & 3 deletions ckan/logic/auth/publisher/delete.py
Expand Up @@ -39,11 +39,16 @@ def relationship_delete(context, data_dict):
def group_delete(context, data_dict):
model = context['model']
user = context['user']

if not user:
return {'success': False, 'msg': _('Only members of this group are authorized to delete this group')}

group = get_group_object(context, data_dict)
usergrps = model.User.get( user ).get_groups('publisher', 'admin')

authorized = _groups_intersect( usergrps, group.get_groups('publisher') )
userobj = model.User.get( user )
if not userobj:
return {'success': False, 'msg': _('Only members of this group are authorized to delete this group')}

authorized = _groups_intersect( userobj.get_groups('publisher', 'admin'), [group] )
if not authorized:
return {'success': False, 'msg': _('User %s not authorized to delete group %s') % (str(user),group.id)}
else:
Expand Down
1 change: 0 additions & 1 deletion ckan/logic/auth/publisher/update.py
Expand Up @@ -53,7 +53,6 @@ def group_update(context, data_dict):

# Only allow package update if the user and package groups intersect
userobj = model.User.get( user )

if not userobj:
return {'success': False, 'msg': _('Could not find user %s') % str(user)}
if not _groups_intersect( userobj.get_groups('publisher', 'admin'), [group] ):
Expand Down
102 changes: 102 additions & 0 deletions ckan/tests/functional/test_publisher_auth.py
@@ -0,0 +1,102 @@
import re

from nose.tools import assert_equal

import ckan.model as model
from ckan.lib.create_test_data import CreateTestData
from ckan.logic import NotAuthorized


from ckan.tests import *
from ckan.tests import setup_test_search_index
from base import FunctionalTestCase
from ckan.tests import search_related, is_search_supported


class TestPublisherGroups(FunctionalTestCase):

@classmethod
def setup_class(self):
from ckan.tests.mock_publisher_auth import MockPublisherAuth
self.auth = MockPublisherAuth()

model.Session.remove()
CreateTestData.create(auth_profile='publisher')
self.groupname = u'david'
self.packagename = u'testpkg'
model.repo.new_revision()
model.Session.add(model.Package(name=self.packagename))
model.repo.commit_and_remove()

@classmethod
def teardown_class(self):
model.Session.remove()
model.repo.rebuild_db()
model.Session.remove()

def _run_fail_test( self, username, action):
grp = model.Group.by_name(self.groupname)
context = { 'group': grp, 'model': model, 'user': username }
try:
self.auth.check_access(action,context, {})
assert False, "The user should not have access"
except NotAuthorized, e:
pass

def _run_success_test( self, username, action):
userobj = model.User.get(username)
grp = model.Group.by_name(self.groupname)
f = model.User.get_groups
def gg(*args, **kwargs):
return [grp]
model.User.get_groups = gg

context = { 'group': grp, 'model': model, 'user': username }
try:
self.auth.check_access(action, context, {})
except NotAuthorized, e:
assert False, "The user should have %s access: %r." % (action, e.extra_msg)
model.User.get_groups = f

def test_new_success(self):
self._run_success_test( 'russianfan', 'group_create' )

def test_new_fail(self):
self._run_fail_test( 'russianfan', 'group_create' )

def test_new_anon_fail(self):
self._run_fail_test( '', 'group_create' )

def test_new_unknown_fail(self):
self._run_fail_test( 'nosuchuser', 'group_create' )

def test_edit_success(self):
""" Success because user in group """
self._run_success_test( 'russianfan', 'group_update' )

def test_edit_fail(self):
""" Fail because user not in group """
self._run_fail_test( 'russianfan', 'group_update' )

def test_edit_anon_fail(self):
""" Fail because user is anon """
self._run_fail_test( '', 'group_update' )

def test_edit_unknown_fail(self):
self._run_fail_test( 'nosuchuser', 'group_update' )

def test_delete_success(self):
""" Success because user in group """
self._run_success_test( 'russianfan', 'group_delete' )

def test_delete_fail(self):
""" Fail because user not in group """
self._run_fail_test( 'russianfan', 'group_delete' )

def test_delete_anon_fail(self):
""" Fail because user is anon """
self._run_fail_test( '', 'group_delete' )

def test_delete_unknown_fail(self):
self._run_fail_test( 'nosuchuser', 'group_delete' )

0 comments on commit cd286bc

Please sign in to comment.