Skip to content
This repository has been archived by the owner on Apr 16, 2024. It is now read-only.

Commit

Permalink
Support for permissions on transition methods Close #36
Browse files Browse the repository at this point in the history
  • Loading branch information
kmmbvnr committed May 15, 2014
1 parent f6c460d commit 2590588
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 32 deletions.
37 changes: 34 additions & 3 deletions README.md
Expand Up @@ -64,7 +64,7 @@ from django_fsm import can_proceed
def publish_view(request, post_id):
post = get_object__or_404(BlogPost, pk=post_id)
if not can_proceed(post.publish):
raise PermissionDenied;
raise PermissionDenied

post.publish()
post.save()
Expand Down Expand Up @@ -129,13 +129,44 @@ def legal_hold(self):
Side effects galore
"""
```
### Permissions
It is common to have permissions attached to each model transition. `django-fsm` handles this with
`permission` keyword on the `transition` decorator. `permission` accepts a permission string, or
callable that excepts `user` argument and returns True if user can perform the transition

### get_available_FIELD_transitions
Returns all transitions data available in current state
```python
@transition(field=state, source='*', target='publish',
permission=lambda user: not user.has_perm('myapp.can_make_mistakes'))
def publish(self):
pass

@transition(field=state, source='*', target='publish',
permission='myapp.can_remove_post')
def remove(self):
pass
```

You can check permission with `has_transition_permission` method
from django_fsm import can_proceed
``` python

This comment has been minimized.

Copy link
@mbertheau

mbertheau May 15, 2014

This code block should start one line earlier

def publish_view(request, post_id):
post = get_object__or_404(BlogPost, pk=post_id)

This comment has been minimized.

Copy link
@mbertheau

mbertheau May 15, 2014

Too many underscores

if not has_transition_permission(post.publish, request.user):
raise PermissionDenied

post.publish()
post.save()
return redirect('/')
```

### get_all_FIELD_transitions
Enumerates all declared transitions

### get_available_FIELD_transitions
Returns all transitions data available in current state

### get_available_user_FIELD_transitions:
Enumerates all transitions data available in current state for provided user

### Foreign Key constraints support

Expand Down
81 changes: 67 additions & 14 deletions django_fsm/__init__.py
Expand Up @@ -31,10 +31,15 @@ class TransitionNotAllowed(Exception):
"""Raise when a transition is not allowed"""


Transition = namedtuple('Transition', ['name', 'source', 'target', 'conditions', 'method', 'custom'])
Transition = namedtuple('Transition', ['name', 'source', 'target', 'conditions', 'method',
'permission', 'custom'])


def get_available_FIELD_transitions(instance, field):
"""
List of transitions available in current model state
with all conditions met
"""
curr_state = field.get_state(instance)
transitions = field.transitions[instance.__class__]

Expand All @@ -43,34 +48,52 @@ def get_available_FIELD_transitions(instance, field):

for state in [curr_state, '*']:
if state in meta.transitions:
target, conditions, custom = meta.transitions[state]
target, conditions, permission, custom = meta.transitions[state]
if all(map(lambda condition: condition(instance), conditions)):
yield Transition(
name=name,
source=state,
target=target,
conditions=conditions,
method=transition,
permission=permission,
custom=custom)


def get_all_FIELD_transitions(instance, field):
"""
List of all transitions available in current model state
"""
return field.get_all_transitions(instance.__class__)


def get_available_user_FIELD_transitions(instance, user, field):
"""
List of transitions available in current model state
with all conditions met and user have rights on it
"""
for transition in get_available_FIELD_transitions(instance, field):
if not transition.permission:
yield transition
elif callable(transition.permission) and transition.permission(user):
yield transition
elif user.has_perm(transition.permission):
yield transition


class FSMMeta(object):
"""
Models methods transitions meta information
"""
def __init__(self, field, method):
self.field = field
self.transitions = {} # source -> (target, conditions)
self.transitions = {} # source -> (target, conditions, permission, custom)

def add_transition(self, source, target, conditions=[], custom={}):
def add_transition(self, source, target, conditions=[], permission=None, custom={}):
if source in self.transitions:
raise AssertionError('Duplicate transition for {} state'.format(source))

self.transitions[source] = (target, conditions, custom)
self.transitions[source] = (target, conditions, permission, custom)

def has_transition(self, state):
"""
Expand All @@ -82,12 +105,26 @@ def conditions_met(self, instance, state):
"""
Check if all conditions have been met
"""
_, conditions, _ = self.transitions.get(state, (None, [], {}))
_, conditions, _, _ = self.transitions.get(state, (None, [], None, {}))
if not conditions:
_, conditions, _ = self.transitions.get('*', (None, [], {}))
_, conditions, _, _ = self.transitions.get('*', (None, [], None, {}))

return all(map(lambda condition: condition(instance), conditions))

def has_transition_perm(self, instance, state, user):
permission = None
if state in self.transitions:
_, _, permission, _ = self.transitions[state]
elif '*' in self.transitions:
_, _, permission, _ = self.transitions['*']

if not permission:
return True
elif callable(permission) and permission(user):
return True
elif user.has_perm(permission):
return True

def next_state(self, current_state):
try:
return self.transitions[current_state][0]
Expand Down Expand Up @@ -169,24 +206,27 @@ def get_all_transitions(self, instance_cls):
for name, transition in transitions.items():
meta = transition._django_fsm

for source, (target, conditions, custom) in meta.transitions.items():
for source, (target, conditions, permission, custom) in meta.transitions.items():
yield Transition(
name=name,
source=source,
target=target,
conditions=conditions,
method=transition,
permission=permission,
custom=custom)

def contribute_to_class(self, cls, name, virtual_only=False):
self.base_cls = cls

super(FSMFieldMixin, self).contribute_to_class(cls, name, virtual_only=virtual_only)
setattr(cls, self.name, self.descriptor_class(self))
setattr(cls, 'get_available_{}_transitions'.format(self.name),
curry(get_available_FIELD_transitions, field=self))
setattr(cls, 'get_all_{}_transitions'.format(self.name),
curry(get_all_FIELD_transitions, field=self))
setattr(cls, 'get_available_{}_transitions'.format(self.name),
curry(get_available_FIELD_transitions, field=self))
setattr(cls, 'get_available_user_{}_transitions'.format(self.name),
curry(get_available_user_FIELD_transitions, field=self))

class_prepared.connect(self._collect_transitions)

Expand Down Expand Up @@ -240,11 +280,11 @@ def set_state(self, instance, state):
instance.__dict__[self.attname] = self.to_python(state)


def transition(field, source='*', target=None, conditions=[], custom={}):
def transition(field, source='*', target=None, conditions=[], permission=None, custom={}):
"""
Method decorator for mark allowed transitions
Set target to None if current state needs to be validated and
Set target to None if current state needs to be validated and
has not changed after the function call
"""
def inner_transition(func):
Expand All @@ -259,9 +299,9 @@ def _change_state(instance, *args, **kwargs):

if isinstance(source, (list, tuple)):
for state in source:
func._django_fsm.add_transition(state, target, conditions, custom)
func._django_fsm.add_transition(state, target, conditions, permission, custom)
else:
func._django_fsm.add_transition(source, target, conditions, custom)
func._django_fsm.add_transition(source, target, conditions, permission, custom)

return _change_state

Expand All @@ -280,3 +320,16 @@ def can_proceed(bound_method):
current_state = meta.field.get_state(im_self)

return meta.has_transition(current_state) and meta.conditions_met(im_self, current_state)


def has_transition_perm(bound_method, user):
"""
Returns True if model in state allows to call bound_method and user have rights on it
"""
meta = bound_method._django_fsm
im_self = getattr(bound_method, 'im_self', getattr(bound_method, '__self__'))
current_state = meta.field.get_state(im_self)

return (meta.has_transition(current_state)
and meta.conditions_met(im_self, current_state)
and meta.has_transition_perm(im_self, current_state, user))
2 changes: 1 addition & 1 deletion tests/settings.py
@@ -1,5 +1,5 @@
PROJECT_APPS = ('django_fsm', 'testapp',)
INSTALLED_APPS = ('django_jenkins',) + PROJECT_APPS
INSTALLED_APPS = ('django.contrib.contenttypes', 'django.contrib.auth', 'django_jenkins',) + PROJECT_APPS
DATABASE_ENGINE = 'sqlite3'
SECRET_KEY = 'nokey'

Expand Down
12 changes: 10 additions & 2 deletions tests/testapp/models.py
Expand Up @@ -84,7 +84,8 @@ class BlogPost(models.Model):
"""
state = FSMField(default='new', protected=True)

@transition(field=state, source='new', target='published')
@transition(field=state, source='new', target='published',
permission='testapp.can_publish_post')
def publish(self):
pass

Expand All @@ -96,7 +97,8 @@ def notify_all(self):
def hide(self):
pass

@transition(field=state, source='new', target='removed')
@transition(field=state, source='new', target='removed',
permission=lambda u: u.has_perm('testapp.can_remove_post'))
def remove(self):
raise Exception('No rights to delete %s' % self)

Expand All @@ -107,3 +109,9 @@ def steal(self):
@transition(field=state, source='*', target='moderated')
def moderate(self):
pass

class Meta:
permissions = [
('can_publish_post', 'Can publish post'),
('can_remove_post', 'Can remove post'),
]
41 changes: 29 additions & 12 deletions tests/testapp/tests.py
@@ -1,16 +1,33 @@
"""
This file demonstrates writing tests using the unittest module. These will pass
when you run "manage.py test".
from django.contrib.auth.models import User, Permission
from django.test import TestCase

Replace this with more appropriate tests for your application.
"""
from django_fsm import has_transition_perm
from testapp.models import BlogPost

from django.test import TestCase

class PermissionFSMFieldTest(TestCase):
def setUp(self):
self.model = BlogPost()
self.unpriviledged = User.objects.create(username='unpriviledged')
self.priviledged = User.objects.create(username='priviledged')

self.priviledged.user_permissions.add(
Permission.objects.get_by_natural_key('can_publish_post', 'testapp', 'blogpost'))
self.priviledged.user_permissions.add(
Permission.objects.get_by_natural_key('can_remove_post', 'testapp', 'blogpost'))

def test_proviledged_access_succed(self):
self.assertTrue(has_transition_perm(self.model.publish, self.priviledged))
self.assertTrue(has_transition_perm(self.model.remove, self.priviledged))

transitions = self.model.get_available_user_state_transitions(self.priviledged)
self.assertEquals(set(['publish', 'remove', 'moderate']),
set(transition.name for transition in transitions))

def test_unpriviledged_access_prohibited(self):
self.assertFalse(has_transition_perm(self.model.publish, self.unpriviledged))
self.assertFalse(has_transition_perm(self.model.remove, self.unpriviledged))

class SimpleTest(TestCase):
def test_basic_addition(self):
"""
Tests that 1 + 1 always equals 2.
"""
self.assertEqual(1 + 1, 2)
transitions = self.model.get_available_user_state_transitions(self.unpriviledged)
self.assertEquals(set(['moderate']),
set(transition.name for transition in transitions))

0 comments on commit 2590588

Please sign in to comment.