-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: UserResourcePermission #3039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# coding=utf-8 | ||
""" | ||
@project: MaxKB | ||
@Author:虎虎 | ||
@file: __init__.py | ||
@date:2025/4/28 17:05 | ||
@desc: | ||
""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# coding=utf-8 | ||
""" | ||
@project: MaxKB | ||
@Author:虎虎 | ||
@file: workspace_user_resource_permission.py | ||
@date:2025/4/28 18:13 | ||
@desc: | ||
""" | ||
from drf_spectacular.types import OpenApiTypes | ||
from drf_spectacular.utils import OpenApiParameter | ||
|
||
from common.mixins.api_mixin import APIMixin | ||
from common.result import ResultSerializer | ||
from system_manage.serializers.user_resource_permission import UserResourcePermissionResponse, \ | ||
UpdateUserResourcePermissionRequest | ||
|
||
|
||
class APIUserResourcePermissionResponse(ResultSerializer): | ||
def get_data(self): | ||
return UserResourcePermissionResponse(many=True) | ||
|
||
|
||
class UserResourcePermissionAPI(APIMixin): | ||
@staticmethod | ||
def get_parameters(): | ||
return [ | ||
OpenApiParameter( | ||
name="workspace_id", | ||
description="工作空间id", | ||
type=OpenApiTypes.STR, | ||
location='path', | ||
required=True, | ||
) | ||
] | ||
|
||
@staticmethod | ||
def get_response(): | ||
return APIUserResourcePermissionResponse | ||
|
||
|
||
class EditUserResourcePermissionAPI(APIMixin): | ||
@staticmethod | ||
def get_request(): | ||
return UpdateUserResourcePermissionRequest() | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# coding=utf-8 | ||
""" | ||
@project: MaxKB | ||
@Author:虎虎 | ||
@file: __init__.py | ||
@date:2025/4/28 17:05 | ||
@desc: | ||
""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
# coding=utf-8 | ||
""" | ||
@project: MaxKB | ||
@Author:虎虎 | ||
@file: workspace_user_resource_permission.py | ||
@date:2025/4/28 17:17 | ||
@desc: | ||
""" | ||
import json | ||
import os | ||
|
||
from django.core.cache import cache | ||
from django.db.models import QuerySet | ||
from django.utils.translation import gettext_lazy as _ | ||
from rest_framework import serializers | ||
|
||
from common.constants.cache_version import Cache_Version | ||
from common.constants.permission_constants import get_default_workspace_user_role_mapping_list, RoleConstants, \ | ||
ResourcePermissionGroup, ResourcePermissionRole, ResourceAuthType | ||
from common.database_model_manage.database_model_manage import DatabaseModelManage | ||
from common.db.search import native_search | ||
from common.db.sql_execute import select_list | ||
from common.exception.app_exception import AppApiException | ||
from common.utils.common import get_file_content | ||
from common.utils.split_model import group_by | ||
from knowledge.models import Knowledge | ||
from maxkb.conf import PROJECT_DIR | ||
from system_manage.models import WorkspaceUserResourcePermission, AuthTargetType | ||
|
||
|
||
class PermissionSerializer(serializers.Serializer): | ||
VIEW = serializers.BooleanField(required=True, label="可读") | ||
MANAGE = serializers.BooleanField(required=True, label="管理") | ||
ROLE = serializers.BooleanField(required=True, label="跟随角色") | ||
|
||
|
||
class UserResourcePermissionItemResponse(serializers.Serializer): | ||
id = serializers.UUIDField(required=True, label="主键id") | ||
name = serializers.CharField(required=True, label="资源名称") | ||
auth_target_type = serializers.ChoiceField(required=True, choices=AuthTargetType.choices, label="授权资源") | ||
user_id = serializers.UUIDField(required=True, label="用户id") | ||
auth_type = serializers.ChoiceField(required=True, choices=ResourceAuthType.choices, label="授权类型") | ||
permission = PermissionSerializer() | ||
|
||
|
||
class UserResourcePermissionResponse(serializers.Serializer): | ||
KNOWLEDGE = UserResourcePermissionItemResponse(many=True) | ||
|
||
|
||
class UpdateTeamMemberItemPermissionSerializer(serializers.Serializer): | ||
auth_target_type = serializers.ChoiceField(required=True, choices=AuthTargetType.choices, label="授权资源") | ||
target_id = serializers.CharField(required=True, label=_('target id')) | ||
auth_type = serializers.ChoiceField(required=True, choices=ResourceAuthType.choices, label="授权类型") | ||
permission = PermissionSerializer(required=True, many=False) | ||
|
||
|
||
class UpdateUserResourcePermissionRequest(serializers.Serializer): | ||
user_resource_permission_list = UpdateTeamMemberItemPermissionSerializer(required=True, many=True) | ||
|
||
def is_valid(self, *, workspace_id=None, raise_exception=False): | ||
super().is_valid(raise_exception=True) | ||
user_resource_permission_list = self.data.get("user_resource_permission_list") | ||
illegal_target_id_list = select_list( | ||
get_file_content( | ||
os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', 'check_member_permission_target_exists.sql')), | ||
[json.dumps(user_resource_permission_list), workspace_id]) | ||
if illegal_target_id_list is not None and len(illegal_target_id_list) > 0: | ||
raise AppApiException(500, | ||
_('Non-existent application|knowledge base id[') + str(illegal_target_id_list) + ']') | ||
|
||
|
||
class UserResourcePermissionSerializer(serializers.Serializer): | ||
workspace_id = serializers.CharField(required=True, label=_('workspace id')) | ||
|
||
def get_queryset(self): | ||
return { | ||
"knowledge_query_set": QuerySet(Knowledge) | ||
.filter(workspace_id=self.data.get('workspace_id')), | ||
'workspace_user_resource_permission_query_set': QuerySet(WorkspaceUserResourcePermission).filter( | ||
workspace_id=self.data.get('workspace_id')) | ||
} | ||
|
||
def list(self, user, with_valid=True): | ||
if with_valid: | ||
self.is_valid(raise_exception=True) | ||
workspace_id = self.data.get("workspace_id") | ||
# 用户权限列表 | ||
user_resource_permission_list = native_search(self.get_queryset(), get_file_content( | ||
os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', 'get_user_resource_permission.sql'))) | ||
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping") | ||
workspace_model = DatabaseModelManage.get_model("workspace_model") | ||
if workspace_user_role_mapping_model and workspace_model: | ||
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user.id, | ||
workspace_id=workspace_id) | ||
else: | ||
workspace_user_role_mapping_list = get_default_workspace_user_role_mapping_list([user.role]) | ||
is_workspace_manage = any( | ||
[workspace_user_role_mapping for workspace_user_role_mapping in workspace_user_role_mapping_list if | ||
workspace_user_role_mapping.role_id == RoleConstants.WORKSPACE_MANAGE.value]) | ||
# 如果当前用户是当前工作空间管理员那么就拥有所有权限 | ||
if is_workspace_manage: | ||
user_resource_permission_list = list( | ||
map(lambda row: {**row, | ||
'permission': {ResourcePermissionGroup.VIEW.value: True, | ||
ResourcePermissionGroup.MANAGE.value: True, | ||
ResourcePermissionRole.ROLE.value: True}}, | ||
user_resource_permission_list)) | ||
return group_by([{**user_resource_permission, 'permission': { | ||
permission: True if user_resource_permission.get('permission_list').__contains__(permission) else False for | ||
permission in | ||
[ResourcePermissionGroup.VIEW.value, ResourcePermissionGroup.MANAGE.value, | ||
ResourcePermissionRole.ROLE.value]}} | ||
for user_resource_permission in user_resource_permission_list], | ||
key=lambda item: item.get('auth_target_type')) | ||
|
||
def edit(self, instance, user, with_valid=True): | ||
if with_valid: | ||
self.is_valid(raise_exception=True) | ||
UpdateUserResourcePermissionRequest(data=instance).is_valid(raise_exception=True, | ||
workspace_id=self.data.get('workspace_id')) | ||
workspace_id = self.data.get("workspace_id") | ||
update_list = [] | ||
save_list = [] | ||
user_resource_permission_list = instance.get('user_resource_permission_list') | ||
workspace_user_resource_permission_exist_list = QuerySet(WorkspaceUserResourcePermission).filter( | ||
workspace_id=workspace_id) | ||
for user_resource_permission in user_resource_permission_list: | ||
exist_list = [user_resource_permission_exist for user_resource_permission_exist in | ||
workspace_user_resource_permission_exist_list if | ||
user_resource_permission.get('target_id') == str(user_resource_permission_exist.target)] | ||
if len(exist_list) > 0: | ||
exist_list[0].permission_list = [key for key in user_resource_permission.get('permission').keys() if | ||
user_resource_permission.get('permission').get(key)] | ||
update_list.append(exist_list[0]) | ||
else: | ||
save_list.append(WorkspaceUserResourcePermission(target=user_resource_permission.get('target_id'), | ||
auth_target_type=user_resource_permission.get( | ||
'auth_target_type'), | ||
permission_list=[key for key in | ||
user_resource_permission.get( | ||
'permission').keys() if | ||
user_resource_permission.get( | ||
'permission').get(key)], | ||
workspace_id=workspace_id, | ||
user_id=user.id, | ||
auth_type=user_resource_permission.get('auth_type'))) | ||
# 批量更新 | ||
QuerySet(WorkspaceUserResourcePermission).bulk_update(update_list, ['permission_list']) if len( | ||
update_list) > 0 else None | ||
# 批量插入 | ||
QuerySet(WorkspaceUserResourcePermission).bulk_create(save_list) if len(save_list) > 0 else None | ||
version = Cache_Version.PERMISSION_LIST.get_version() | ||
key = Cache_Version.PERMISSION_LIST.get_key(user_id=str(user.id)) | ||
cache.delete(key, version=version) | ||
return True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are several areas of concern and opportunities for improvement in this code: Potential Issues/Corrections:
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
SELECT | ||
static_temp."target_id"::text | ||
FROM | ||
(SELECT * FROM json_to_recordset( | ||
%s | ||
) AS x(target_id uuid,auth_target_type text)) static_temp | ||
LEFT JOIN ( | ||
SELECT | ||
"id", | ||
'KNOWLEDGE' AS "auth_target_type" | ||
FROM | ||
knowledge | ||
WHERE workspace_id= %s | ||
) "app_and_knowledge_temp" | ||
ON "app_and_knowledge_temp"."id" = static_temp."target_id" and app_and_knowledge_temp."auth_target_type"=static_temp."auth_target_type" | ||
WHERE app_and_knowledge_temp.id is NULL ; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
SELECT app_or_knowledge.*, | ||
COALESCE(workspace_user_resource_permission.permission_list,'{}')::varchar[] as permission_list, | ||
COALESCE(workspace_user_resource_permission.auth_type,'ROLE') as auth_type | ||
FROM (SELECT "id", | ||
"name", | ||
'KNOWLEDGE' AS "auth_target_type", | ||
user_id, | ||
workspace_id, | ||
"type" AS "icon" | ||
FROM knowledge | ||
${knowledge_query_set} | ||
) app_or_knowledge | ||
LEFT JOIN (SELECT * | ||
FROM workspace_user_resource_permission | ||
${workspace_user_resource_permission_query_set}) workspace_user_resource_permission | ||
ON workspace_user_resource_permission.target = app_or_knowledge."id"; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from django.urls import path | ||
|
||
from . import views | ||
|
||
app_name = "system_manage" | ||
urlpatterns = [ | ||
path('workspace/<str:workspace_id>/user_resource_permission', views.WorkSpaceUserResourcePermissionView.as_view()) | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,3 +6,4 @@ | |
@date:2025/4/16 19:07 | ||
@desc: | ||
""" | ||
from .user_resource_permission import * |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# coding=utf-8 | ||
""" | ||
@project: MaxKB | ||
@Author:虎虎 | ||
@file: workspace_user_resource_permission.py | ||
@date:2025/4/28 16:38 | ||
@desc: | ||
""" | ||
|
||
from django.utils.translation import gettext_lazy as _ | ||
from drf_spectacular.utils import extend_schema | ||
from rest_framework.request import Request | ||
from rest_framework.views import APIView | ||
|
||
from common import result | ||
from common.auth import TokenAuth | ||
from common.auth.authentication import has_permissions | ||
from common.constants.permission_constants import PermissionConstants | ||
from common.result import DefaultResultSerializer | ||
from system_manage.api.user_resource_permission import UserResourcePermissionAPI, EditUserResourcePermissionAPI | ||
from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer | ||
|
||
|
||
class WorkSpaceUserResourcePermissionView(APIView): | ||
authentication_classes = [TokenAuth] | ||
|
||
@extend_schema( | ||
methods=['GET'], | ||
description=_('Obtain resource authorization list'), | ||
operation_id=_('Obtain resource authorization list'), | ||
parameters=UserResourcePermissionAPI.get_parameters(), | ||
responses=UserResourcePermissionAPI.get_response(), | ||
tags=[_('Resources authorization')] | ||
) | ||
@has_permissions(PermissionConstants.WORKSPACE_USER_RESOURCE_PERMISSION_READ.get_workspace_permission()) | ||
def get(self, request: Request, workspace_id: str): | ||
return result.success(UserResourcePermissionSerializer( | ||
data={'workspace_id': workspace_id} | ||
).list(request.user)) | ||
|
||
@extend_schema( | ||
methods=['PUT'], | ||
description=_('Modify the resource authorization list'), | ||
operation_id=_('Modify the resource authorization list'), | ||
parameters=UserResourcePermissionAPI.get_parameters(), | ||
request=EditUserResourcePermissionAPI.get_request(), | ||
responses=DefaultResultSerializer(), | ||
tags=[_('Resources authorization')] | ||
) | ||
def put(self, request: Request, workspace_id: str): | ||
return result.success(UserResourcePermissionSerializer( | ||
data={'workspace_id': workspace_id} | ||
).edit(request.data, request.user)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here are some suggestions to improve the code:
Here's an updated version of the class with these improvements: # coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: workspace_user_resource_permission.py
@date:2025/4/28 16:38
@desc:
"""
from django.utils.translation import gettext_lazy as _
from drf_spectacular.utils import extend_schema
from rest_framework.request import Request
from rest_framework.views import APIView
from common import result
from common.auth import TokenAuth
from common.auth.authentication import has_permissions
from common.constants.permission_constants import PermissionConstants
from common.result import DefaultResultSerializer
from system_manage.api.user_resource_permission import UserResourcePermissionAPI, EditUserResourcePermissionAPI
from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer
class WorkSpaceUserResourcePermissionView(APIView):
authentication_classes = [TokenAuth]
def _handle_request(self, request, serializer_class, user_func_name):
# Retrieve necessary data from request and context
params = self._extract_params(request)
return getattr(serializer_clas, user_func_name).to_representation(data=params)
def _extract_params(self, request):
# Extract relevant parameters from the request
workspace_id = request.GET.get('workspace_id') if 'GET' == request.method.lower() else None
return {
'workspace_id': workspace_id,
# Add other required parameters here
}
@extend_schema(
methods=['GET'],
description=_('Obtain resource authorization list'),
operation_id=_('Obtain resource authorization list'),
parameters=UserResourcePermissionAPI.get_parameters(),
responses=UserResourcePermissionAPI.get_response(),
tags=[_('Resources authorization')]
)
@has_permissions(PermissionConstants.WORKSPACE_USER_RESOURCE_PERMISSION_READ.get_workspace_permission())
def get(self, request: Request):
params = self._extract_params(request)
serialized_data = self._handle_request(request=request, serializer_class=UserResourcePermissionSerializer, user_func_name='list')
return result.success(serialized_data)
@extend_schema(
methods=['PUT'],
description=_('Modify the resource authorization list'),
operation_id=_('Modify the resource authorization list'),
parameters=UserResourcePermissionAPI.get_parameters(),
request=EditUserResourcePermissionAPI.get_request(),
responses=DefaultResultSerializer(),
tags=[_('Resources authorization')]
)
def put(self, request: Request):
params = self._extract_params(request)
serialized_data = self._handle_request(request=request, serializer_class=EditUserResourcePermissionSerializer, user_func_name='edit')
return result.success(serialized_data if not params['error'] else '', error=params['message']) With these changes, the code becomes more modular and less repetitive while maintaining its original structure and functionality. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is mostly structured correctly. Here are some minor improvements and considerations:
Docstring Formatting: The docstrings can be more detailed if you include additional information about what each part of the class does (
UserResourcePermissionAPI
andEditUserResourcePermissionAPI
).Class Method Naming: While not strictly necessary, using
get_*
for methods that return something makes it clear what they do.Type Annotations: Ensure all parameters have annotations to improve readability and maintainability.
Here's an updated version with improved documentation and type annotations:
Key Changes: