-
Notifications
You must be signed in to change notification settings - Fork 20
/
owner.py
288 lines (230 loc) · 8.34 KB
/
owner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Support for owned objects
"""
from Acquisition import aq_base
from Acquisition import aq_get
from Acquisition import aq_inner
from Acquisition import aq_parent
from ExtensionClass import Base
from zope.interface import implementer
from AccessControl import SpecialUsers as SU
from AccessControl.class_init import InitializeClass
from AccessControl.interfaces import IOwned
from AccessControl.Permissions import take_ownership
from AccessControl.Permissions import view_management_screens
from AccessControl.SecurityInfo import ClassSecurityInfo
from AccessControl.SecurityManagement import getSecurityManager
UnownableOwner = []
def ownableFilter(self):
_owner = aq_get(self, '_owner', None, 1)
return _owner is not UnownableOwner
# Marker to use as a getattr default.
_mark = ownableFilter
@implementer(IOwned)
class Owned(Base):
security = ClassSecurityInfo()
security.setPermissionDefault(take_ownership, ('Owner', ))
@security.protected(view_management_screens)
def owner_info(self):
"""Get ownership info for display
"""
owner = self.getOwnerTuple()
if owner is None or owner is UnownableOwner:
return owner
d = {
'path': '/'.join(owner[0]),
'id': owner[1],
'explicit': hasattr(self, '_owner'),
'userCanChangeOwnershipType':
getSecurityManager().checkPermission('Take ownership', self),
}
return d
@security.private
def getOwner(self, info=0,
aq_get=aq_get,
UnownableOwner=UnownableOwner,
getSecurityManager=getSecurityManager,
):
"""Get the owner
If a true argument is provided, then only the owner path and id are
returned. Otherwise, the owner object is returned.
"""
if info:
import warnings
warnings.warn('Owned.getOwner(1) is deprecated; '
'please use getOwnerTuple() instead.',
DeprecationWarning, stacklevel=2)
owner = aq_get(self, '_owner', None, 1)
if info or (owner is None):
return owner
if owner is UnownableOwner:
return None
udb, oid = owner
root = self.getPhysicalRoot()
udb = root.unrestrictedTraverse(udb, None)
if udb is None:
user = SU.nobody
else:
user = udb.getUserById(oid, None)
if user is None:
user = SU.nobody
return user
@security.private
def getOwnerTuple(self):
"""Return a tuple, (userdb_path, user_id) for the owner.
o Ownership can be acquired, but only from the containment path.
o If unowned, return None.
"""
return aq_get(self, '_owner', None, 1)
@security.private
def getWrappedOwner(self):
"""Get the owner, modestly wrapped in the user folder.
o If the object is not owned, return None.
o If the owner's user database doesn't exist, return Nobody.
o If the owner ID does not exist in the user database, return Nobody.
"""
owner = self.getOwnerTuple()
if owner is None or owner is UnownableOwner:
return None
udb_path, oid = owner
root = self.getPhysicalRoot()
udb = root.unrestrictedTraverse(udb_path, None)
if udb is None:
return SU.nobody
user = udb.getUserById(oid, None)
if user is None:
return SU.nobody
return user.__of__(udb)
@security.private
def changeOwnership(self, user, recursive=0):
"""Change the ownership to the given user.
If 'recursive' is true then also take ownership of all sub-objects,
otherwise sub-objects retain their ownership information.
"""
new = ownerInfo(user)
if new is None:
return # Special user!
old = self.getOwnerTuple()
if not recursive:
if old == new or old is UnownableOwner:
return
if recursive:
children = getattr(aq_base(self), 'objectValues', lambda: ())()
for child in children:
child.changeOwnership(user, 1)
if old is not UnownableOwner:
self._owner = new
def userCanTakeOwnership(self):
security = getSecurityManager()
user = security.getUser()
info = ownerInfo(user)
if info is None:
return 0
owner = self.getOwnerTuple()
if owner == info:
return 0
return security.checkPermission('Take ownership', self)
def _deleteOwnershipAfterAdd(self):
# Only delete _owner if it is an instance attribute.
if self.__dict__.get('_owner', _mark) is not _mark:
del self._owner
for object in self.objectValues():
try:
s = object._p_changed
except:
s = 0
try:
object._deleteOwnershipAfterAdd()
except:
pass
if s is None:
object._p_deactivate()
def manage_fixupOwnershipAfterAdd(self):
# Sigh, get the parent's _owner
parent = getattr(self, '__parent__', None)
if parent is not None:
_owner = aq_get(parent, '_owner', None, 1)
else:
_owner = None
if _owner is None and \
(getattr(self, '__parent__', None) is None or # NOQA: W504
not hasattr(self, 'getPhysicalRoot')):
# This is a special case. An object is
# being added to an object that hasn't
# been added to the object hierarchy yet.
# We can delay fixing up the ownership until the
# object is actually added.
return None
if _owner is UnownableOwner:
# We want to acquire Unownable ownership!
return self._deleteOwnershipAfterAdd()
else:
# Otherwise change the ownership
user = getSecurityManager().getUser()
if (SU.emergency_user and aq_base(user) is SU.emergency_user):
__creatable_by_emergency_user__ = getattr(
self, '__creatable_by_emergency_user__', None)
if __creatable_by_emergency_user__ is None or \
not __creatable_by_emergency_user__():
raise EmergencyUserCannotOwn(
"Objects cannot be owned by the emergency user")
self.changeOwnership(user)
# Force all subs to acquire ownership!
for object in self.objectValues():
try:
s = object._p_changed
except:
s = 0
try:
object._deleteOwnershipAfterAdd()
except:
pass
if s is None:
object._p_deactivate()
InitializeClass(Owned)
class EmergencyUserCannotOwn(Exception):
"The emergency user cannot own anything"
class EditUnowned(Exception):
"Can't edit unowned executables"
def absattr(attr):
if callable(attr):
return attr()
return attr
def ownerInfo(user, getattr=getattr):
if user is None:
return None
uid = user.getId()
if uid is None:
return uid
db = aq_parent(aq_inner(user))
if db is None:
return None
path = [absattr(db.id)]
root = db.getPhysicalRoot()
while 1:
db = getattr(db, 'aq_inner', None)
if db is None:
break
db = aq_parent(db)
if db is root:
break
id = db.id
if not isinstance(id, str):
try:
id = id()
except:
id = str(id)
path.append(id)
path.reverse()
return path, uid