Skip to content

Commit

Permalink
feat: Add mediafile_download_policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Delsin Van Grembergen committed Sep 4, 2024
1 parent c0bb1b2 commit 830f5b3
Showing 1 changed file with 70 additions and 0 deletions.
70 changes: 70 additions & 0 deletions src/elody/policies/authorization/mediafile_download_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import re as regex

from elody.policies.permission_handler import (
get_permissions,
handle_single_item_request,
)
from flask import Request # pyright: ignore
from flask_restful import abort # pyright: ignore
from inuits_policy_based_auth import BaseAuthorizationPolicy # pyright: ignore
from inuits_policy_based_auth.contexts.policy_context import ( # pyright: ignore
PolicyContext,
)
from inuits_policy_based_auth.contexts.user_context import ( # pyright: ignore
UserContext,
)
from storage.storagemanager import StorageManager # pyright: ignore


class MediafileDownloadPolicy(BaseAuthorizationPolicy):
def authorize(
self, policy_context: PolicyContext, user_context: UserContext, request_context
):
request: Request = request_context.http_request
if not regex.match(r"^/mediafiles/(.+)/download$", request.path):
return policy_context

view_args = request.view_args or {}
collection = view_args.get("collection", request.path.split("/")[-3])
id = view_args.get("id")
item = (
StorageManager()
.get_db_engine()
.get_item_from_collection_by_id(collection, id)
)
if not item:
abort(
404,
message=f"Item with id {id} doesn't exist in collection {collection}",
)

for role in user_context.x_tenant.roles:
permissions = get_permissions(role, user_context)
if not permissions:
continue

rules = [
GetRequestRules,
]
access_verdict = None
for rule in rules:
access_verdict = rule().apply(item, user_context, request, permissions)
if access_verdict != None:
policy_context.access_verdict = access_verdict
if not policy_context.access_verdict:
return policy_context

if policy_context.access_verdict:
return policy_context

return policy_context


class GetRequestRules:
def apply(
self, item, user_context: UserContext, request: Request, permissions
) -> bool | None:
if request.method != "GET":
return None

return handle_single_item_request(user_context, item, permissions, "read")

0 comments on commit 830f5b3

Please sign in to comment.