Skip to content

Commit

Permalink
permissions: change permission class
Browse files Browse the repository at this point in the history
  • Loading branch information
ChiaraBi committed Feb 22, 2019
1 parent 7094f57 commit 912d014
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 33 deletions.
11 changes: 10 additions & 1 deletion examples/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@
from flask import Flask, current_app
from flask_babelex import Babel
from flask_menu import Menu
from invenio_access import InvenioAccess
from invenio_access import ActionSystemRoles, InvenioAccess, any_user
from invenio_accounts import InvenioAccounts
from invenio_accounts.views import blueprint as accounts_blueprint
from invenio_admin import InvenioAdmin
Expand All @@ -183,6 +183,14 @@
MultipartObject, ObjectVersion, Part
from invenio_files_rest.views import blueprint


def allow_all(*args, **kwargs):
"""Return permission that always allow an access.
:returns: A object instance with a ``can()`` method.
"""
return type('Allow', (), {'can': lambda self: True})()


# Create Flask application
app = Flask(__name__)
app.config.update(dict(
Expand All @@ -197,6 +205,7 @@
'SQLALCHEMY_DATABASE_URI', 'sqlite:///test.db'
),
SQLALCHEMY_TRACK_MODIFICATIONS=True,
FILES_REST_PERMISSION_FACTORY=allow_all,
))

Babel(app)
Expand Down
8 changes: 4 additions & 4 deletions invenio_files_rest/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

"""Permissions for files using Invenio-Access."""

from invenio_access import DynamicPermission, action_factory
from invenio_access import Permission, action_factory

from .models import Bucket, MultipartObject, ObjectVersion

Expand Down Expand Up @@ -118,12 +118,12 @@ def permission_factory(obj, action):
the action is global.
:param action: The required action.
:raises RuntimeError: If the object is unknown.
:returns: A :class:`invenio_access.permissions.DynamicPermission` instance.
:returns: A :class:`invenio_access.permissions.Permission` instance.
"""
need_class = _action2need_map[action]

if obj is None:
return DynamicPermission(need_class(None))
return Permission(need_class(None))

arg = None
if isinstance(obj, Bucket):
Expand All @@ -135,4 +135,4 @@ def permission_factory(obj, action):
else:
raise RuntimeError('Unknown object')

return DynamicPermission(need_class(arg))
return Permission(need_class(arg))
40 changes: 39 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from flask_celeryext import FlaskCeleryExt
from flask_menu import Menu
from invenio_access import InvenioAccess
from invenio_access.models import ActionUsers
from invenio_access.models import ActionRoles, ActionUsers, Role
from invenio_accounts import InvenioAccounts
from invenio_accounts.testutils import create_test_user
from invenio_accounts.views import blueprint as accounts_blueprint
Expand Down Expand Up @@ -346,6 +346,44 @@ def permissions(db, bucket):
yield users


@pytest.yield_fixture()
def admin_user(db):
"""Permission for admin users."""
perms = [
location_update_all,
bucket_read_all,
bucket_read_versions_all,
bucket_update_all,
bucket_listmultiparts_all,
object_read_all,
object_read_version_all,
object_delete_all,
object_delete_version_all,
multipart_read_all,
multipart_delete_all,
bucket_read_all,
object_read_all,
bucket_update_all,
object_delete_all,
multipart_read_all,
object_read_all,
]

admin = Role(name='admin')

for perm in perms:
db.session.add(ActionRoles.allow(perm, role=admin))

admin_user = create_test_user(email='admin@invenio-software.org',
password='pass1',
active=True)
admin.users.append(admin_user)

db.session.commit()

yield admin_user


@pytest.fixture()
def get_md5():
"""Get MD5 of data."""
Expand Down
5 changes: 1 addition & 4 deletions tests/test_examples_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ def example_app():
cmd = 'FLASK_APP=app.py flask run --debugger -p 5000'
webapp = subprocess.Popen(cmd, stdout=subprocess.PIPE,
preexec_fn=os.setsid, shell=True)
# Starting celery
cmd = 'celery worker -A app.celery -l info --purge'
celeryapp = subprocess.Popen(cmd, stdout=subprocess.PIPE,
preexec_fn=os.setsid, shell=True)

time.sleep(10)
# return webapp
yield webapp
Expand Down
62 changes: 48 additions & 14 deletions tests/test_views_multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ def test_post_init(client, headers, permissions, bucket, get_json):
assert k in data


def test_post_init_querystring(client, bucket, get_json):
def test_post_init_querystring(client, bucket, get_json, admin_user):
"""Test init multipart upload."""
login_user(client, admin_user)

res = client.post(
obj_url(bucket),
query_string='uploads&size=10&partSize=4',
Expand All @@ -84,8 +86,10 @@ def test_get_init_not_allowed(client, bucket, get_json):
assert res.status_code == 405


def test_post_invalid_partsizes(client, headers, bucket, get_json):
def test_post_invalid_partsizes(client, headers, bucket, get_json, admin_user):
"""Test invalid multipart init."""
login_user(client, admin_user)

# Part size too large
res = client.post(
obj_url(bucket), query_string='uploads', headers=headers,
Expand All @@ -105,8 +109,10 @@ def test_post_invalid_partsizes(client, headers, bucket, get_json):
assert res.status_code == 400


def test_post_size_limits(client, db, headers, bucket):
def test_post_size_limits(client, db, headers, bucket, admin_user):
"""Test invalid multipart init."""
login_user(client, admin_user)

bucket.quota_size = 100
db.session.commit()

Expand All @@ -126,8 +132,10 @@ def test_post_size_limits(client, db, headers, bucket):
assert res.status_code == 400


def test_post_locked_bucket(client, db, headers, bucket, get_json):
def test_post_locked_bucket(client, db, headers, bucket, get_json, admin_user):
"""Test invalid multipart init."""
login_user(client, admin_user)

bucket.locked = True
db.session.commit()

Expand All @@ -145,8 +153,10 @@ def test_post_locked_bucket(client, db, headers, bucket, get_json):
assert res.status_code == 404


def test_post_invalidkey(client, db, headers, bucket):
def test_post_invalidkey(client, db, headers, bucket, admin_user):
"""Test invalid multipart init."""
login_user(client, admin_user)

object_url = url_for(
'invenio_files_rest.object_api',
bucket_id=str(bucket.id),
Expand Down Expand Up @@ -206,8 +216,11 @@ def test_put_not_found(client, db, bucket, permissions, multipart,
assert res.status_code == 404


def test_put_wrong_sizes(client, db, bucket, multipart, multipart_url):
def test_put_wrong_sizes(client, db, bucket, multipart, multipart_url,
admin_user):
"""Test invalid part sizes."""
login_user(client, admin_user)

cases = [
b'a' * (multipart.chunk_size + 1),
b'a' * (multipart.chunk_size - 1),
Expand All @@ -221,8 +234,11 @@ def test_put_wrong_sizes(client, db, bucket, multipart, multipart_url):
assert res.status_code == 400


def test_put_ngfileupload(client, db, bucket, multipart, multipart_url):
def test_put_ngfileupload(client, db, bucket, multipart, multipart_url,
admin_user):
"""Test invalid part sizes."""
login_user(client, admin_user)

res = client.put(
multipart_url,
data={
Expand All @@ -239,8 +255,11 @@ def test_put_ngfileupload(client, db, bucket, multipart, multipart_url):
assert res.status_code == 200


def test_put_invalid_part_number(client, db, bucket, multipart, multipart_url):
def test_put_invalid_part_number(client, db, bucket, multipart, multipart_url,
admin_user):
"""Test invalid part number."""
login_user(client, admin_user)

data = b'a' * multipart.chunk_size
for c in [400, 2000, 'a']:
res = client.put(
Expand All @@ -250,8 +269,11 @@ def test_put_invalid_part_number(client, db, bucket, multipart, multipart_url):
assert res.status_code == 400


def test_put_completed_multipart(client, db, bucket, multipart, multipart_url):
def test_put_completed_multipart(client, db, bucket, multipart, multipart_url,
admin_user):
"""Test uploading to a completed multipart upload."""
login_user(client, admin_user)

multipart.completed = True
db.session.commit()

Expand All @@ -262,8 +284,11 @@ def test_put_completed_multipart(client, db, bucket, multipart, multipart_url):
assert res.status_code == 403


def test_put_badstream(client, db, bucket, multipart, multipart_url, get_json):
def test_put_badstream(client, db, bucket, multipart, multipart_url, get_json,
admin_user):
"""Test uploading to a completed multipart upload."""
login_user(client, admin_user)

client.put(
multipart_url + '&partNumber={0}'.format(1),
input_stream=BytesIO(b'a' * multipart.chunk_size),
Expand Down Expand Up @@ -313,15 +338,20 @@ def test_get(client, db, bucket, permissions, multipart, multipart_url,
assert len(data['parts']) == 3


def test_get_empty(client, multipart, multipart_url, get_json):
def test_get_empty(client, multipart, multipart_url, get_json, admin_user):
"""Test get parts when empty."""
login_user(client, admin_user)

data = get_json(client.get(multipart_url), code=200)
assert len(data['parts']) == 0
assert data['id'] == str(multipart.upload_id)


def test_get_serialization(client, multipart, multipart_url, get_json):
def test_get_serialization(client, multipart, multipart_url, get_json,
admin_user):
"""Test get parts when empty."""
login_user(client, admin_user)

client.put(
multipart_url + '&partNumber={0}'.format(1),
input_stream=BytesIO(b'a' * multipart.chunk_size),
Expand Down Expand Up @@ -393,8 +423,10 @@ def _mock_celery_result():


def test_post_complete_fail(client, headers, bucket, multipart,
multipart_url, parts, get_json):
multipart_url, parts, get_json, admin_user):
"""Test completing multipart when merge fails."""
login_user(client, admin_user)

# Mock celery task to emulate real usage.
task_result = MagicMock()
task_result.ready = MagicMock(side_effect=[False, False, True])
Expand All @@ -419,8 +451,10 @@ def test_post_complete_fail(client, headers, bucket, multipart,


def test_post_complete_timeout(app, client, headers, bucket, multipart,
multipart_url, parts, get_json):
multipart_url, parts, get_json, admin_user):
"""Test completing multipart when merge fails."""
login_user(client, admin_user)

max_rounds = int(
app.config['FILES_REST_TASK_WAIT_MAX_SECONDS'] //
app.config['FILES_REST_TASK_WAIT_INTERVAL'])
Expand Down
Loading

0 comments on commit 912d014

Please sign in to comment.