Skip to content
This repository has been archived by the owner on Mar 28, 2019. It is now read-only.

Commit

Permalink
Fix pep8 and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem committed Jul 9, 2015
1 parent 869340b commit 4bf92a1
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 34 deletions.
1 change: 0 additions & 1 deletion cliquet/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from cliquet import logger
from cliquet import utils
from cliquet import statsd
from cliquet import authorization

from pyramid.events import NewRequest, NewResponse
from pyramid.httpexceptions import HTTPTemporaryRedirect, HTTPGone
Expand Down
15 changes: 9 additions & 6 deletions cliquet/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ class ViewSet(object):

def __init__(self, **kwargs):
self.update(**kwargs)
self.record_arguments = functools.partial(
self.get_view_arguments, 'record')
self.collection_arguments = functools.partial(
self.get_view_arguments, 'collection')
self.record_arguments = functools.partial(self.get_view_arguments,
'record')
self.collection_arguments = functools.partial(self.get_view_arguments,
'collection')

def update(self, **kwargs):
"""Update viewset attributes with provided values."""
Expand Down Expand Up @@ -154,7 +154,8 @@ def is_endpoint_enabled(self, endpoint_type, resource_name, method,

class ProtectedViewSet(ViewSet):
def get_record_schema(self, resource, method):
schema = super(ProtectedViewSet, self).get_record_schema(resource, method)
schema = super(ProtectedViewSet, self).get_record_schema(resource,
method)

if method.lower() not in map(str.lower, self.validate_schema_for):
return schema
Expand All @@ -166,7 +167,9 @@ def get_record_schema(self, resource, method):
return schema

def get_view_arguments(self, endpoint_type, resource, method):
args = super(ProtectedViewSet, self).get_view_arguments(endpoint_type, resource, method)
args = super(ProtectedViewSet, self).get_view_arguments(endpoint_type,
resource,
method)
args['permission'] = authorization.DYNAMIC
return args

Expand Down
23 changes: 13 additions & 10 deletions cliquet/tests/resource/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def __init__(self, *args, **kwargs):

self.app = webtest.TestApp(self.config.make_wsgi_app())
self.app.RequestClass = get_request_class(self.config.route_prefix)
self.item_url = '/mushrooms/{id}'
self.principal = USER_PRINCIPAL

self.headers = {
Expand All @@ -53,16 +52,15 @@ def get_item_url(self, id=None):

class AuthzAuthnTest(BaseWebTest):
authorization_policy = 'cliquet.authorization.AuthorizationPolicy'
# Protected resource.
collection_url = '/toadstools'

def add_permission(self, object_id, permission):
self.app.app.registry.permission.add_principal_to_ace(
object_id, permission, self.principal)


class ProtectedResourcePermissionTest(AuthzAuthnTest):
# Protected resource.
collection_url = '/toadstools'

def setUp(self):
self.add_permission(self.collection_url, 'toadstool:create')

Expand All @@ -71,7 +69,7 @@ def test_permissions_are_associated_to_object_uri_without_prefix(self):
'permissions': {'read': ['group:readers']}}
resp = self.app.post_json(self.collection_url, body,
headers=self.headers)
object_uri = '/toadstools/%s' % resp.json['data']['id']
object_uri = self.get_item_url(resp.json['data']['id'])
backend = self.app.app.registry.permission
stored_perms = backend.object_permission_principals(object_uri, 'read')
self.assertEqual(stored_perms, {'group:readers'})
Expand Down Expand Up @@ -117,7 +115,7 @@ def test_collection_get_is_granted_when_authorized(self):
self.app.get(self.collection_url, headers=self.headers, status=200)

def test_collection_post_is_granted_when_authorized(self):
self.add_permission(self.collection_url, 'mushroom:create')
self.add_permission(self.collection_url, 'toadstool:create')
self.app.post_json(self.collection_url, {'data': MINIMALIST_RECORD},
headers=self.headers, status=201)

Expand Down Expand Up @@ -147,7 +145,7 @@ def test_collection_delete_is_denied_when_not_authorized(self):
class RecordAuthzGrantedTest(AuthzAuthnTest):
def setUp(self):
super(RecordAuthzGrantedTest, self).setUp()
self.add_permission(self.collection_url, 'mushroom:create')
self.add_permission(self.collection_url, 'toadstool:create')

resp = self.app.post_json(self.collection_url,
{'data': MINIMALIST_RECORD},
Expand Down Expand Up @@ -175,7 +173,7 @@ def test_record_put_on_existing_record_is_granted_when_authorized(self):
headers=self.headers, status=200)

def test_record_put_on_unexisting_record_is_granted_when_authorized(self):
self.add_permission(self.collection_url, 'mushroom:create')
self.add_permission(self.collection_url, 'toadstool:create')
self.app.put_json(self.unknown_record_url, {'data': MINIMALIST_RECORD},
headers=self.headers, status=201)

Expand All @@ -184,17 +182,20 @@ class RecordAuthzDeniedTest(AuthzAuthnTest):
def setUp(self):
super(RecordAuthzDeniedTest, self).setUp()
# Add permission to create a sample record.
self.add_permission(self.collection_url, 'mushroom:create')
self.add_permission(self.collection_url, 'toadstool:create')
resp = self.app.post_json(self.collection_url,
{'data': MINIMALIST_RECORD},
headers=self.headers)
self.record = resp.json['data']
self.record_url = self.get_item_url()
self.unknown_record_url = self.get_item_url(uuid.uuid4())
# Remove every permissions.
self.app.app.registry.permission.flush()

def test_views_require_authentication(self):
url = self.get_item_url('abc')
self.app.get(url, status=401)
self.app.put_json(url, {'data': MINIMALIST_RECORD}, status=401)
self.app.patch_json(url, {'data': MINIMALIST_RECORD}, status=401)
self.app.delete(url, status=401)

Expand All @@ -215,7 +216,7 @@ def test_record_delete_is_denied_when_not_authorized(self):
def test_record_put_on_unexisting_record_is_rejected_if_write_perm(self):
object_id = self.collection_url
self.app.app.registry.permission.remove_principal_from_ace(
object_id, 'mushroom:create', self.principal) # Was added in setUp
object_id, 'toadstool:create', self.principal) # Added in setUp.

self.app.app.registry.permission.add_principal_to_ace(
object_id, 'write', self.principal)
Expand Down Expand Up @@ -417,6 +418,8 @@ def test_modify_with_empty_returns_400(self):


class InvalidPermissionsTest(BaseWebTest):
collection_url = '/toadstools'

def setUp(self):
super(InvalidPermissionsTest, self).setUp()
body = {'data': MINIMALIST_RECORD}
Expand Down
17 changes: 2 additions & 15 deletions cliquet/tests/resource/test_viewset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import colander
import mock

from cliquet import authorization
from cliquet.resource import ViewSet, register_resource

from cliquet.tests.support import unittest
Expand Down Expand Up @@ -233,20 +232,8 @@ def test_get_service_name_doesnt_use_callable_as_a_name(self):
viewset.get_service_name('record', resource),
'fakename-record')

def test_get_service_arguments_returns_factory_if_exists(self):
viewset = ViewSet(factory=mock.sentinel.factory)
service_arguments = viewset.get_service_arguments()
self.assertIn("factory", service_arguments)
self.assertEquals(service_arguments["factory"], mock.sentinel.factory)

def test_get_service_arguments_uses_cliquet_factory_by_default(self):
viewset = ViewSet() # Don't provide a factory here.
service_arguments = viewset.get_service_arguments()
self.assertEquals(service_arguments['factory'],
authorization.RouteFactory)

def test_get_service_arguments_skips_factory_if_none(self):
viewset = ViewSet(factory=None)
def test_get_service_arguments_has_no_factory_by_default(self):
viewset = ViewSet()
service_arguments = viewset.get_service_arguments()
self.assertNotIn('factory', service_arguments)

Expand Down
8 changes: 6 additions & 2 deletions cliquet/tests/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@

from cornice import errors as cornice_errors
from pyramid.url import parse_url_overrides
from pyramid.security import IAuthorizationPolicy
from pyramid.security import IAuthorizationPolicy, Authenticated
from zope.interface import implementer

from cliquet import DEFAULT_SETTINGS
from cliquet.authorization import PRIVATE
from cliquet.storage import generators
from cliquet.tests.testapp import main as testapp
from cliquet.utils import psycopg2
Expand Down Expand Up @@ -122,8 +123,11 @@ def _create_thread(self, *args, **kwargs):
@implementer(IAuthorizationPolicy)
class AllowAuthorizationPolicy(object):
def permits(self, context, principals, permission):
if permission == PRIVATE:
return Authenticated in principals
# Cliquet default authz policy uses prefixed_userid.
return USER_PRINCIPAL in (principals + [context.prefixed_userid])
prefixed = [getattr(context, 'prefixed_userid', None)]
return USER_PRINCIPAL in (principals + prefixed)

def principals_allowed_by_permission(self, context, permission):
raise NotImplementedError() # PRAGMA NOCOVER
Expand Down

0 comments on commit 4bf92a1

Please sign in to comment.