Skip to content

Commit

Permalink
[1669] Changes to add package auth tests and fixes based on those tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rossjones committed Feb 1, 2012
1 parent cd286bc commit 00e227f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 14 deletions.
16 changes: 14 additions & 2 deletions ckan/logic/auth/publisher/create.py
Expand Up @@ -9,8 +9,13 @@
def package_create(context, data_dict=None):
model = context['model']
user = context['user']

return {'success': True}
userobj = model.User.get( user )

if userobj:
return {'success': True}

return {'success': False, 'msg': 'You must be logged in to create a package'}


def resource_create(context, data_dict):
return {'success': False, 'msg': 'Not implemented yet in the auth refactor'}
Expand All @@ -32,13 +37,20 @@ def package_relationship_create(context, data_dict):
return {'success': True}

def group_create(context, data_dict=None):
"""
Group create permission. If a group is provided, within which we want to create a group
then we check that the user is within that group. If not then we just say Yes for now
although there may be some approval issues elsewhere.
"""
model = context['model']
user = context['user']

if not user:
return {'success': False, 'msg': _('User is not authorized to create groups') }

try:
# If the user is doing this within another group then we need to make sure that
# the user has permissions for this group.
group = get_group_object( context )
except NotFound:
return { 'success' : True }
Expand Down
8 changes: 7 additions & 1 deletion ckan/logic/auth/publisher/delete.py
Expand Up @@ -10,10 +10,12 @@ def package_delete(context, data_dict):
model = context['model']
user = context['user']
package = get_package_object(context, data_dict)
packageobj = model.Package.by_name( package )
userobj = model.User.get( user )

if not userobj or \
not _groups_intersect( userobj.get_groups('publisher'), package.get_groups('publisher') ):
not packageobj or \
not _groups_intersect( userobj.get_groups('publisher'), packageobj.get_groups('publisher') ):
return {'success': False,
'msg': _('User %s not authorized to delete packages in these group') % str(user)}
return {'success': True}
Expand All @@ -37,6 +39,10 @@ def relationship_delete(context, data_dict):


def group_delete(context, data_dict):
"""
Group delete permission. Checks that the user specified is within the group to be deleted
and also have 'admin' capacity.
"""
model = context['model']
user = context['user']

Expand Down
25 changes: 15 additions & 10 deletions ckan/logic/auth/publisher/update.py
Expand Up @@ -12,13 +12,14 @@ def package_update(context, data_dict):
model = context['model']
user = context.get('user')
package = get_package_object(context, data_dict)
# group = get_group_object( context, data_dict )
packageobj = model.Package.by_name( package )

# userobj = model.User.get( user )
# if not userobj or \
# not _groups_intersect( userobj.get_groups('publisher'), [group] ):
# return {'success': False,
# 'msg': _('User %s not authorized to edit packages in these groups') % str(user)}
userobj = model.User.get( user )
if not userobj or \
not packageobj or \
not _groups_intersect( userobj.get_groups('publisher'), packageobj.get_groups('publisher') ):
return {'success': False,
'msg': _('User %s not authorized to edit packages in these groups') % str(user)}

return {'success': True}

Expand All @@ -44,6 +45,10 @@ def package_edit_permissions(context, data_dict):
'msg': _('Package edit permissions is not available')}

def group_update(context, data_dict):
"""
Group edit permission. Checks that a valid user is supplied and that the user is
a member of the group currently with any capacity.
"""
model = context['model']
user = context.get('user','')
group = get_group_object(context, data_dict)
Expand All @@ -54,11 +59,11 @@ def group_update(context, data_dict):
# Only allow package update if the user and package groups intersect
userobj = model.User.get( user )
if not userobj:
return {'success': False, 'msg': _('Could not find user %s') % str(user)}
if not _groups_intersect( userobj.get_groups('publisher', 'admin'), [group] ):
return {'success': False, 'msg': _('User %s not authorized to edit this group') % str(user)}
return { 'success' : False, 'msg': _('Could not find user %s') % str(user) }
if not _groups_intersect( userobj.get_groups( 'publisher' ), [group] ):
return { 'success': False, 'msg': _('User %s not authorized to edit this group') % str(user) }

return {'success': True}
return { 'success': True }

def group_change_state(context, data_dict):
return group_update(context, data_dict)
Expand Down
2 changes: 1 addition & 1 deletion ckan/tests/functional/test_group.py
Expand Up @@ -654,7 +654,7 @@ def gg(*args, **kwargs):

context = { 'group': grp, 'model': model, 'user': 'russianfan' }
try:
self.auth.check_access('group_update',context, {}):
self.auth.check_access('group_update',context, {})
except NotAuthorized, e:
assert False, "The user should have access"

Expand Down
92 changes: 92 additions & 0 deletions ckan/tests/functional/test_publisher_auth.py
Expand Up @@ -100,3 +100,95 @@ def test_delete_anon_fail(self):
def test_delete_unknown_fail(self):
self._run_fail_test( 'nosuchuser', 'group_delete' )


class TestPublisherGroupPackages(FunctionalTestCase):

@classmethod
def setup_class(self):
from ckan.tests.mock_publisher_auth import MockPublisherAuth
self.auth = MockPublisherAuth()

model.Session.remove()
CreateTestData.create(auth_profile='publisher')
self.groupname = u'david'
self.packagename = u'testpkg'
model.repo.new_revision()
model.Session.add(model.Package(name=self.packagename))
model.repo.commit_and_remove()

@classmethod
def teardown_class(self):
model.Session.remove()
model.repo.rebuild_db()
model.Session.remove()

def _run_fail_test( self, username, action):
context = { 'package': self.packagename, 'model': model, 'user': username }
try:
self.auth.check_access(action, context, {})
assert False, "The user should not have access"
except NotAuthorized, e:
pass

def _run_success_test( self, username, action):
userobj = model.User.get(username)
grp = model.Group.by_name(self.groupname)

f = model.User.get_groups
g = model.Package.get_groups
def gg(*args, **kwargs):
return [grp]
model.User.get_groups = gg
model.Package.get_groups = gg

context = { 'package': self.packagename, 'model': model, 'user': username }
try:
self.auth.check_access(action, context, {})
except NotAuthorized, e:
assert False, "The user should have %s access: %r." % (action, e.extra_msg)
model.User.get_groups = f
model.Package.get_groups = g

def test_new_success(self):
self._run_success_test( 'russianfan', 'package_create' )

# Currently valid to have any logged in user succeed
#def test_new_fail(self):
# self._run_fail_test( 'russianfan', 'package_create' )

def test_new_anon_fail(self):
self._run_fail_test( '', 'package_create' )

def test_new_unknown_fail(self):
self._run_fail_test( 'nosuchuser', 'package_create' )

def test_edit_success(self):
""" Success because user in group """
self._run_success_test( 'russianfan', 'package_update' )

def test_edit_fail(self):
""" Fail because user not in group """
self._run_fail_test( 'russianfan', 'package_update' )

def test_edit_anon_fail(self):
""" Fail because user is anon """
self._run_fail_test( '', 'package_update' )

def test_edit_unknown_fail(self):
self._run_fail_test( 'nosuchuser', 'package_update' )

def test_delete_success(self):
""" Success because user in group """
self._run_success_test( 'russianfan', 'package_delete' )

def test_delete_fail(self):
""" Fail because user not in group """
self._run_fail_test( 'russianfan', 'package_delete' )

def test_delete_anon_fail(self):
""" Fail because user is anon """
self._run_fail_test( '', 'package_delete' )

def test_delete_unknown_fail(self):
self._run_fail_test( 'nosuchuser', 'package_delete' )

0 comments on commit 00e227f

Please sign in to comment.