-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: Improve workspace resource authorization #3004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
return [PermissionConstants[k] for k in PermissionConstants.__members__ if | ||
PermissionConstants[k].value.resource_permission_group_list.__contains__(resource_group)] | ||
|
||
|
||
class Auth: | ||
""" | ||
用于存储当前用户的角色和权限 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code snippet you provided looks generally well-structured and follows Django conventions. However, there are a few areas where improvements or corrections can be made:
Improvements and Corrected Points
-
Comments: The docstrings and comments should be more detailed to help future maintainers understand the purpose of each function and its parameters.
-
Useful Functionality: There might be use cases for some functions such as
get_permission_list_by_resource_group
, which could be more helpful in managing permissions based on different resource groups. -
Django Models: Using Django models for roles and groups would allow for better integration with existing Django features.
-
Code Organization: Consider organizing related functionality into separate files if they become too complex, especially when dealing with multiple constants and classes.
-
Error Handling: Add error handling where appropriate to manage potential edge cases during runtime.
Here's an updated version with these considerations:
# Import necessary modules
from enum import Enum, TextChoices
from functools import reduce
from typing import List, Dict
import django.db.models as models
# Define Enums
class Group(Enum):
ADMIN = "ADMIN"
USER = "USER"
class RoleGroup(TextChoices):
GENERAL = "GENERAL"
STUDENT = "STUDENT"
TEACHER = "TEACHER"
class ResourcePermissionRole(TextChoices):
ROLE = "ROLE"
class ResourcePermissionGroup(TextChoices):
VIEW = "VIEW"
MANAGE = "MANAGE"
class ResourceType(TextChoices):
API = "API"
FILE = "FILE"
DOCUMENT = "DOCUMENT"
class Operate(Enum):
LIST = "LIST"
CREATE = "CREATE"
READ = "READ"
UPDATE = "UPDATE"
DELETE = "DELETE"
# Define Constants and Classes
class Permission(models.Model): # Move this to models.py or another file
"""Model representation for Permissions."""
GRP_CHOICES = [(group.value, group.name) for group in Group]
OPERATE_CHOICES = [(operate.value, operate.name) for operate in Operate]
group_name = models.CharField(max_length=50, choices=GRP_CHOICES)
operation = models.CharField(max_length=50, choices=OPERATE_CHOICES)
def __eq__(self, other):
return f"{self.group_name}:{self.operation}" == str(other)
class User(RoleMixin):
username = models.CharField(max_length=100, unique=True)
groups = models.ManyToManyField(Group, related_name='users')
class Role(models.Model): # Move this to models.py or another file
name = models.CharField(max_length=100)
description = models.TextField()
group_type = models.ForeignKey(RoleGroup, null=True, blank=True, on_delete=models.CASCADE)
roles_permissions = models.ManyToManyField(Permission, through="UserRole")
def get_default_permission_list_by_role(role: Role):
"""Retrieve default permissions for a given role."""
return [permission for permission in Permission.objects.filter(role__name=role.name)]
def get_default_role_permission_mapping_list():
"""Generate default role-permission mappings."""
return UserRole.objects.all()
def get_permission_groups_for_user(user: User):
"""Get all available permission groups for a user."""
roles = [group.name for group in user.groups.all()]
permission_mappings = get_default_role_permission_mapping_list().select_related("permissions")
permissions_dict: Dict[ResourceType, set[str]] = defaultdict(set)
for mapping in permission_mappings:
perm = mapping.permissions.first()
if perm:
resources_with_perms = perm.resource_path.split(',')
for res in resources_with_perms:
# Assuming resource type is part of path (e.g., /api/user/resource/VIEW)
resource_type = res.rsplit('/', 1)[0]
permissions_dict[resource_type].add(res.rsplit('/', 1)[-1])
resource_groups = {}
allowed_operations_per_res_type = {
ResourceType.API: {"list", "create"},
ResourceType.FILE: {"read", "update", "delete"},
ResourceType.DOCUMENT: {"view", "edit"}
}
for key, vals in permissions_dict.items():
grouped_perms = {op: [] for op in allowed_operations_per_res_type[key]}
for val in vals:
grouped_ops_key = next((key for key, vlist inallowed_operations_per_res_type[key].items() if val.lower() in vlist), None)
if grouped_ops_key:
grouped_perms[grouped_ops_key].append(val)
resource_groups.setdefault(key, []).extend(grouped_perms.values())
return resource_groups
def map_roles_to_workspace_users(workspace_id, roles):
"""
Map a list of roles to a specific workspace.
:param workspace_id: ID of the workspace
:param roles: List of roleName string values associated with roles
:return: List of WorkspaceUserRoleMappings representing mapped users
"""
workspace_users_list = []
current_active_group_set = set(get_all_active_workspace_groups(workspace_id)) # Implement helper function here
for role_name in roles:
current_role_details = None
for r in get_available_roles_in_workspace(workspace_id):
if r.name.lower() == role_name.lower():
current_role_details = r
break # Exit upon first match
if current_role_details:
active_users = list(current_active_group_set.intersection({u['username'] for u in retrieve_active_users()}))
workspace_users_list.extend([WorkspaceUserRoleMapping(workspace.id,
str(r.name),
user) for user in active_users])
return workspace_users_list
def retrieve_active_users():
"""
Retrieve all currently active user accounts.
Note that actual implementation may vary depending on your authentication system.
:return: A list of dicts containing usernames and email addresses
"""
return [
{'username': 'john_doe', 'email': '<EMAIL>'},
{'username': 'jane_smith', 'email': '<EMAIL>'}
]
def get_available_roles_in_workspace(workspace_id):
"""List all roles available within specific workspace."""
from .models import Role # Ensure proper relative imports
return Role.objects.filter(workspaces=workspace_id)
def get_all_active_workspace_groups(workspace_id):
"""Return a collection of all group names considered active under specified workspace_id."""
active_workspaces = [
{'id': 1, 'groups': ['students']},
{'id': 2, 'groups': ['teachers']}
]
# Assume retrieving active groups is context-specific logic
return [active['groups'] for active in active_workspaces if active.get('id') == workspace_id][0]
This revised version includes moving the Permission
model to Django models, improving documentation readability, adding functionalities like mapping roles to a workspace, and ensuring proper organization and separation of concerns.
migrations.DeleteModel( | ||
name='WorkspaceUserPermission', | ||
), | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the provided code, there are no major programming errors or inefficiencies. However, there are a few minor improvements that can be made:
-
Consistent Field Names: The
target
field should have a more descriptive name since it represents both knowledge base and application IDs. -
Default Value for
user
Foreign Key: Instead of settingon_delete
, you might want to set a default user who will own these permissions in case no specific user is associated with them initially. -
Optional Fields: If certain attributes like
create_time
andupdate_time
could be optional for historical purposes, consider adding conditions to avoid raising exceptions during model creation. -
Database Indexing: Since there won't necessarily be multiple instances of the permission list per permission type, consider using an index only on the fields that are frequently queried together, such as
(workspace_id, auth_type)
if needed.
Here's a revised version incorporating some of these suggestions:
@@ -0,0 +1,39 @@
+# Generated by Django 5.2 on 2025-04-27 10:09
+
+import django.contrib.postgres.fields
+import django.db.models.deletion
+import uuid_utils.compat
+from django.conf import settings
+from django.db import migrations, models
+
+# Define a placeholder user if none is specified
DEFAULT_PLACEHOLDER_USER_ID = getattr(settings, 'DEFAULT_WORKSPACE_OWNER', None)
placeholder_user_model = settings.AUTH_USER_MODEL
class Migration(migrations.Migration):
dependencies = [
('system_manage', '0002_systemsetting'),
('users', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='WorkspaceUserResourcePermission',
fields=[
('id', models.UUIDField(default=uuid_utils.compat.uuid7, editable=False, primary_key=True, serialize=False, verbose_name='主键id')),
('workspace_id', models.CharField(default='default', max_length=128, verbose_name='工作空间id')),
('auth_target_type', models.CharField(choices=[('KNOWLEDGE', '知识库'), ('APPLICATION', '应用')], default='KNOWNLEDGE', max_length=128, verbose_name='授权目标')),
('resource_id', models.UUIDField(verbose_name='资源ID(知识点或应用)')), # More descriptive field name
('auth_type', models.CharField(choices=[('ROLE', 'Role'), ('RESOURCE_PERMISSION_GROUP', 'Resource Permission Group')], db_default='ROLE', default=False, verbose_name='授权类型')),
('permission_list', django.contrib.postgres.fields.ArrayField(base_field=models.CharField(blank=True, choices=[('VIEW', 'View'), ('MANAGE', 'Manage'), ('ROLE', 'Role')], default='VIEW', max_length=256), default=list, size=None, verbose_name='权限列表')),
('create_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
('update_time', models.DateTimeField(auto_now=True, verbose_name='修改时间')),
(
'owner_user',
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='permissions',
to=placeholder_user_model or DEFAULT_PLACEHOLDER_USER_ID,
verbose_name='所有权人',
)
),
],
options={
'db_table': 'workspace_user_resource_permission',
'indexes': [
models.Index(fields=['workspace_id', 'auth_type']),
],
},
),
migrations.DeleteModel(
name='WorkspaceUserPermission',
),
]
These changes make the schema slightly more explicit and robust while addressing some common concerns about database design.
|
||
create_time = models.DateTimeField(verbose_name="创建时间", auto_now_add=True) | ||
|
||
update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True) | ||
|
||
class Meta: | ||
db_table = "workspace_user_permission" | ||
db_table = "workspace_user_resource_permission" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several areas for review and improvement in your updated WorkspaceUserResourcePermission
model:
-
Database Schema Optimization: It appears that there might be redundancy in storing permissions. For example, if you have different types of resource permissions (e.g., VIEW vs EDIT), it's better to store them individually rather than using an array field with mixed choices.
-
Enum Improvements:
- Since you mentioned multiple choice fields (
auth_type
,permission_list
), consider refactoring these enums to include all necessary values instead of relying solely on defaults. This can help future maintainers understand the full scope of possible options.
- Since you mentioned multiple choice fields (
-
Validation:
- Ensure that the data stored in
permission_list
adheres to expected formats or ranges. This could involve adding input validation checks within the Django model validations to ensure proper database integrity.
- Ensure that the data stored in
-
Code Readability:
- Consider simplifying complex expressions and improving the readability of variable assignments throughout the codebase. Use descriptive naming conventions and inline comments where necessary to make code more understandable.
-
Model Logic:
- Evaluate whether certain business logic should reside directly within the model. Complex queries or operations often need to move elsewhere, such as services or managers, depending on application architecture design principles.
Here’s a slightly improved version based on these considerations:
import uuid_utils.compat as uuid
from django.contrib.postgres.fields import ArrayField
from django.db import models
@dataclasses.dataclass
class PermissionDetails:
group: str = None
role: str = None
type_: str = None
# Add other fields as needed for your specific use case
def __init__(self,
details=None):
super().__init__()
if details:
self.group = details.get('group')
self.role = details.get('role')
self.type_ = details.get('type')
@classmethod
def from_dict(cls, details_dict) -> 'PermissionDetails':
return cls(details=details_dict)
@property
def permission_value(self):
"""
Returns a string representation of this instance.
"""
parts = []
if self.group and not self.role:
parts.append(f"{self.group}")
elif self.role and not self.group:
parts.append(f"~{self.role}")
else:
parts.extend([
f"{self.group}:{self.role}" if self.group else "",
"" if self.role else ""
])
if self.type_:
parts.append(self.type_)
elif len(parts) > 1:
parts.pop(-2)
return ":".join(x.strip() for x in parts)
@dataclasses.dataclass(eq=True, frozen=True)
class AuthTarget(abc.ABC):
id: uuid.UUID
name: str
@abc.abstractmethod
async def get_user_permissions_queryset(self, user: User) -> QuerySet[PermissionDetails]:
pass
@roles_enum(value=["VIEW", "EDIT"])
@models.enum.Enum(choices=list(ResourcePermissionGroup.values()) +
list(ResourcePermissionRole.values()))
class ResourceAuthType(Enum):
ROLE = 0
PERMISSION = 1
@ResourceAuthType.register()
async def get_auth_target_details(auth_target_id, token) -> Optional[Dict[str, any]]:
pass
class WorkspaceUserPermission(models.Model):
workspace = models.ForeignKey('workspaces.Workspace', related_name='permissions',
on_delete=models.CASCADE,
verbose_name="工作区")
auth_target = models.UUIDField(
max_length=128, blank=False, null=False,
verbose_name="应用/知识库ID")
class META:
db_table = "workspace_user_permission"
indexes = [
models.Index(fields=['username', '-create_time']),
models.Index(fields=['created_by', '-update_time'])
]
objects = WorkSpaceUserPermissionsManager.default_objects()
# Define additional classes like `WorkSpaceUserPermissionsManager`, `WorkSpacesUserPermissionsQueryset`
# And implement the required methods/functions here as per your application requirements.
By implementing these improvements, you enhance both the functionality and maintainability of your Django model while ensuring adherence to best practices in software engineering.
feat: Improve workspace resource authorization