-
-
Notifications
You must be signed in to change notification settings - Fork 282
/
user.py
200 lines (160 loc) · 6.35 KB
/
user.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# -*- coding: utf-8 -*-
from attachments.models import Attachment
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core.exceptions import PermissionDenied
from django.forms.models import model_to_dict
from modernrpc.core import REQUEST_KEY, rpc_method
from tcms.rpc import utils
from tcms.rpc.decorators import permissions_required
User = get_user_model() # pylint: disable=invalid-name
def _get_user_dict(user):
user_dict = model_to_dict(user)
for field in (
"password",
"groups",
"user_permissions",
"date_joined",
"last_login",
):
if field in user_dict:
del user_dict[field]
return user_dict
@permissions_required("auth.view_user")
@rpc_method(name="User.filter")
def filter(query=None, **kwargs): # pylint: disable=redefined-builtin
"""
.. function:: RPC User.filter(query)
Search and return the resulting list of users.
:param query: Field lookups for :class:`django.contrib.auth.models.User`
:type query: dict
:param \\**kwargs: Dict providing access to the current request, protocol,
entry point name and handler instance from the rpc method
:return: Serialized :class:`django.contrib.auth.models.User` object without
the password field!
:rtype: dict
.. note::
If query is ``None`` will return the user issuing the RPC request.
"""
if not query:
query = {"pk": kwargs.get(REQUEST_KEY).user.pk}
return list(
User.objects.filter(**query)
.values(
"email",
"first_name",
"id",
"is_active",
"is_staff",
"is_superuser",
"last_name",
"username",
)
.distinct()
)
@rpc_method(name="User.update")
def update(
user_id, values, **kwargs
): # pylint: disable=missing-api-permissions-required
"""
.. function:: RPC User.update(user_id, values)
Updates the fields of the selected user. Can be used to update
password as well!
:param user_id: PK of user to update
:type user_id: int
:param values: Field values for :class:`django.contrib.auth.models.User`
:type values: dict
:param \\**kwargs: Dict providing access to the current request, protocol,
entry point name and handler instance from the rpc method
:return: Serialized :class:`django.contrib.auth.models.User` object
:rtype: dict
:raises PermissionDenied: if missing the *auth.change_user* permission
when updating another user or when passwords don't match.
.. note::
If ``user_id`` is None will update the user issuing the RPC request.
.. warning::
Changing the password for another user via RPC is not allowed!
"""
request = kwargs.get(REQUEST_KEY)
if user_id:
user_being_updated = User.objects.get(pk=user_id)
else:
user_being_updated = request.user
editable_fields = ("first_name", "last_name", "email", "password")
can_change_user = request.user.has_perm("auth.change_user")
is_updating_other = request.user != user_being_updated
# If changing other's attributes, current user must have proper permission
if is_updating_other and not can_change_user:
raise PermissionDenied("Permission denied")
update_fields = []
for field in editable_fields:
if not values.get(field):
continue
update_fields.append(field)
if field == "password":
if is_updating_other:
raise PermissionDenied(
"Password updates for other users are not allowed via RPC!"
)
old_password = values.get("old_password")
if not old_password:
raise PermissionDenied("Old password is required")
if not user_being_updated.check_password(old_password):
raise PermissionDenied("Password is incorrect")
user_being_updated.set_password(values["password"])
else:
setattr(user_being_updated, field, values[field])
user_being_updated.save(update_fields=update_fields)
return _get_user_dict(user_being_updated)
@permissions_required("auth.change_user")
@rpc_method(name="User.join_group")
def join_group(username, groupname):
"""
.. function:: RPC User.join_group(username, groupname)
Add user to a group specified by name.
:param username: Username to modify
:type username: str
:param groupname: Name of group to join, must exist!
:type groupname: str
:raises PermissionDenied: if missing *auth.change_user* permission
"""
user = User.objects.get(username=username)
group = Group.objects.get(name=groupname)
user.groups.add(group)
@permissions_required("attachments.add_attachment")
@rpc_method(name="User.add_attachment")
def add_attachment(filename, b64content, **kwargs):
"""
.. function:: RPC User.add_attachment(filename, b64content)
Attach a file under the currently logged-in user!
This method is meant to be used by SimpleMDE combined with post_save
processing for various models like TestPlan and TestCase. While files
uploaded by this method will be attached and available (if you know their URL),
there is no UI to see all of the files uploaded by a certain user or
manage them!
:param filename: File name of attachment, e.g. 'logs.txt'
:type filename: str
:param b64content: Base64 encoded content
:type b64content: str
:param \\**kwargs: Dict providing access to the current request, protocol,
entry point name and handler instance from the rpc method
:return: Information about the attachment
:rtype: dict
"""
user = kwargs.get(REQUEST_KEY).user
utils.add_attachment(
user.pk,
settings.AUTH_USER_MODEL,
user,
filename,
b64content,
)
# take the last attachment for this user and return information about it
attachment = (
Attachment.objects.attachments_for_object(user).order_by("created").last()
)
return {
"url": attachment.attachment_file.url,
"filename": attachment.filename,
}