This repository has been archived by the owner on Apr 16, 2024. It is now read-only.
/
__init__.py
335 lines (257 loc) · 11.4 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# -*- coding: utf-8 -*-
"""
State tracking functionality for django models
"""
import inspect
from collections import namedtuple
from functools import wraps
from django.db import models
from django.db.models.signals import class_prepared
from django.utils.functional import curry
from django_fsm.signals import pre_transition, post_transition
__all__ = ["TransitionNotAllowed", "FSMFieldMixin", "FSMField",
'FSMIntegerField', 'FSMKeyField', 'transition', 'can_proceed']
# South support; see http://south.aeracode.org/docs/tutorial/part4.html#simple-inheritance
try:
from south.modelsinspector import add_introspection_rules
except ImportError:
pass
else:
add_introspection_rules([], [r"^django_fsm\.FSMField"])
add_introspection_rules([], [r"^django_fsm\.FSMIntegerField"])
add_introspection_rules([], [r"^django_fsm\.FSMKeyField"])
class TransitionNotAllowed(Exception):
"""Raise when a transition is not allowed"""
Transition = namedtuple('Transition', ['name', 'source', 'target', 'conditions', 'method',
'permission', 'custom'])
def get_available_FIELD_transitions(instance, field):
"""
List of transitions available in current model state
with all conditions met
"""
curr_state = field.get_state(instance)
transitions = field.transitions[instance.__class__]
for name, transition in transitions.items():
meta = transition._django_fsm
for state in [curr_state, '*']:
if state in meta.transitions:
target, conditions, permission, custom = meta.transitions[state]
if all(map(lambda condition: condition(instance), conditions)):
yield Transition(
name=name,
source=state,
target=target,
conditions=conditions,
method=transition,
permission=permission,
custom=custom)
def get_all_FIELD_transitions(instance, field):
"""
List of all transitions available in current model state
"""
return field.get_all_transitions(instance.__class__)
def get_available_user_FIELD_transitions(instance, user, field):
"""
List of transitions available in current model state
with all conditions met and user have rights on it
"""
for transition in get_available_FIELD_transitions(instance, field):
if not transition.permission:
yield transition
elif callable(transition.permission) and transition.permission(user):
yield transition
elif user.has_perm(transition.permission):
yield transition
class FSMMeta(object):
"""
Models methods transitions meta information
"""
def __init__(self, field, method):
self.field = field
self.transitions = {} # source -> (target, conditions, permission, custom)
def add_transition(self, source, target, conditions=[], permission=None, custom={}):
if source in self.transitions:
raise AssertionError('Duplicate transition for {} state'.format(source))
self.transitions[source] = (target, conditions, permission, custom)
def has_transition(self, state):
"""
Lookup if any transition exists from current model state using current method
"""
return state in self.transitions or '*' in self.transitions
def conditions_met(self, instance, state):
"""
Check if all conditions have been met
"""
_, conditions, _, _ = self.transitions.get(state, (None, [], None, {}))
if not conditions:
_, conditions, _, _ = self.transitions.get('*', (None, [], None, {}))
return all(map(lambda condition: condition(instance), conditions))
def has_transition_perm(self, instance, state, user):
permission = None
if state in self.transitions:
_, _, permission, _ = self.transitions[state]
elif '*' in self.transitions:
_, _, permission, _ = self.transitions['*']
if not permission:
return True
elif callable(permission) and permission(user):
return True
elif user.has_perm(permission):
return True
def next_state(self, current_state):
try:
return self.transitions[current_state][0]
except KeyError:
return self.transitions['*'][0]
class FSMFieldDescriptor(object):
def __init__(self, field):
self.field = field
def __get__(self, instance, type=None):
if instance is None:
raise AttributeError('Can only be accessed via an instance.')
return self.field.get_state(instance)
def __set__(self, instance, value):
if self.field.protected and self.field.name in instance.__dict__:
raise AttributeError('Direct {} modification is not allowed'.format(self.field.name))
self.field.set_state(instance, value)
class FSMFieldMixin(object):
descriptor_class = FSMFieldDescriptor
def __init__(self, *args, **kwargs):
self.protected = kwargs.pop('protected', False)
self.transitions = {} # cls -> (transitions name -> method)
super(FSMFieldMixin, self).__init__(*args, **kwargs)
def deconstruct(self):
name, path, args, kwargs = super(FSMFieldMixin, self).deconstruct()
if self.protected:
kwargs['protected'] = self.protected
return name, path, args, kwargs
def get_state(self, instance):
return instance.__dict__[self.name]
def set_state(self, instance, state):
instance.__dict__[self.name] = state
def change_state(self, instance, method, *args, **kwargs):
meta = method._django_fsm
method_name = method.__name__
current_state = self.get_state(instance)
if not (meta.has_transition(current_state) and meta.conditions_met(instance, current_state)):
raise TransitionNotAllowed(
"Can't switch from state '{}' using method '{}'".format(current_state, method_name))
next_state = meta.next_state(current_state)
signal_kwargs = {
'sender': instance.__class__,
'instance': instance,
'name': method_name,
'source': current_state,
'target': next_state
}
pre_transition.send(**signal_kwargs)
result = method(instance, *args, **kwargs)
if next_state:
self.set_state(instance, next_state)
post_transition.send(**signal_kwargs)
return result
def get_all_transitions(self, instance_cls):
"""
Returns [(source, target, name, method)] for all field transitions
"""
transitions = self.transitions[instance_cls]
for name, transition in transitions.items():
meta = transition._django_fsm
for source, (target, conditions, permission, custom) in meta.transitions.items():
yield Transition(
name=name,
source=source,
target=target,
conditions=conditions,
method=transition,
permission=permission,
custom=custom)
def contribute_to_class(self, cls, name, virtual_only=False):
self.base_cls = cls
super(FSMFieldMixin, self).contribute_to_class(cls, name, virtual_only=virtual_only)
setattr(cls, self.name, self.descriptor_class(self))
setattr(cls, 'get_all_{}_transitions'.format(self.name),
curry(get_all_FIELD_transitions, field=self))
setattr(cls, 'get_available_{}_transitions'.format(self.name),
curry(get_available_FIELD_transitions, field=self))
setattr(cls, 'get_available_user_{}_transitions'.format(self.name),
curry(get_available_user_FIELD_transitions, field=self))
class_prepared.connect(self._collect_transitions)
def _collect_transitions(self, *args, **kwargs):
sender = kwargs['sender']
if not issubclass(sender, self.base_cls):
return
def is_field_transition_method(attr):
return (inspect.ismethod(attr) or inspect.isfunction(attr)) \
and hasattr(attr, '_django_fsm') \
and attr._django_fsm.field in [self, self.name]
sender_transitions = {}
transitions = inspect.getmembers(sender, predicate=is_field_transition_method)
for method_name, method in transitions:
method._django_fsm.field = self
sender_transitions[method_name] = method
self.transitions[sender] = sender_transitions
class FSMField(FSMFieldMixin, models.CharField):
"""
State Machine support for Django model as CharField
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault('max_length', 50)
super(FSMField, self).__init__(*args, **kwargs)
class FSMIntegerField(FSMFieldMixin, models.IntegerField):
"""
Same as FSMField, but stores the state value in an IntegerField.
db_index is True by default.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault('db_index', True)
super(FSMIntegerField, self).__init__(*args, **kwargs)
class FSMKeyField(FSMFieldMixin, models.ForeignKey):
"""
State Machine support for Django model
"""
def get_state(self, instance):
return instance.__dict__[self.attname]
def set_state(self, instance, state):
instance.__dict__[self.attname] = self.to_python(state)
def transition(field, source='*', target=None, conditions=[], permission=None, custom={}):
"""
Method decorator for mark allowed transitions
Set target to None if current state needs to be validated and
has not changed after the function call
"""
def inner_transition(func):
fsm_meta = getattr(func, '_django_fsm', None)
if not fsm_meta:
fsm_meta = FSMMeta(field=field, method=func)
setattr(func, '_django_fsm', fsm_meta)
@wraps(func)
def _change_state(instance, *args, **kwargs):
return fsm_meta.field.change_state(instance, func, *args, **kwargs)
if isinstance(source, (list, tuple)):
for state in source:
func._django_fsm.add_transition(state, target, conditions, permission, custom)
else:
func._django_fsm.add_transition(source, target, conditions, permission, custom)
return _change_state
return inner_transition
def can_proceed(bound_method):
"""
Returns True if model in state allows to call bound_method
"""
if not hasattr(bound_method, '_django_fsm'):
raise TypeError('%s method is not transition' % bound_method.im_func.__name__)
meta = bound_method._django_fsm
im_self = getattr(bound_method, 'im_self', getattr(bound_method, '__self__'))
current_state = meta.field.get_state(im_self)
return meta.has_transition(current_state) and meta.conditions_met(im_self, current_state)
def has_transition_perm(bound_method, user):
"""
Returns True if model in state allows to call bound_method and user have rights on it
"""
meta = bound_method._django_fsm
im_self = getattr(bound_method, 'im_self', getattr(bound_method, '__self__'))
current_state = meta.field.get_state(im_self)
return (meta.has_transition(current_state)
and meta.conditions_met(im_self, current_state)
and meta.has_transition_perm(im_self, current_state, user))