This repository has been archived by the owner on Apr 16, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 312
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 9a6c527
Showing
7 changed files
with
153 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Django friendly finite state machine support |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# -*- coding: utf-8 -*- | ||
from fsmfield import FSMField, transition | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# -*- coding: utf-8 -*- | ||
from collections import defaultdict | ||
from functools import wraps | ||
from django.db import models | ||
|
||
class FSMMeta(object): | ||
def __init__(self): | ||
self.transitions = defaultdict() | ||
|
||
def get_state_field(self, instance): | ||
fields = [field for field in instance._meta.fields if isinstance(field, FSMField)] | ||
found = len(fields) | ||
if found == 0: | ||
raise TypeError("No FSMField found in model") | ||
elif found > 1: | ||
raise TypeError("More than one FSMField found in model, please specify field name in transition decorator") | ||
return fields[0] | ||
|
||
def current_state(self, instance): | ||
field_name = self.get_state_field(instance).name | ||
return getattr(instance, field_name) | ||
|
||
def has_transition(self, instance): | ||
return self.transitions.has_key(self.current_state(instance)) | ||
|
||
def to_next_state(self, instance): | ||
field_name = self.get_state_field(instance).name | ||
curr_state = getattr(instance, field_name) | ||
setattr(instance, field_name, self.transitions[curr_state]) | ||
|
||
|
||
def transition(source='*', target=None, save=False): | ||
def inner_transition(func): | ||
if not hasattr(func, '_django_fsm'): | ||
setattr(func, '_django_fsm', FSMMeta()) | ||
|
||
func._django_fsm.transitions[source] = target | ||
|
||
@wraps(func) | ||
def _change_state(instance, *args, **kwargs): | ||
meta = func._django_fsm | ||
if not meta.has_transition(instance): | ||
raise NotImplementedError("Can't switch from state '%s' using method '%s'" % (meta.current_state(instance), func.func_name)) | ||
|
||
func(instance, *args, **kwargs) | ||
|
||
meta.to_next_state(instance) | ||
if save: | ||
instance.save() | ||
|
||
return _change_state | ||
|
||
if not target: | ||
raise ValueError("Result state not specified") | ||
|
||
return inner_transition | ||
|
||
|
||
|
||
class FSMField(models.Field): | ||
""" | ||
Enabels State Machine support for Django model | ||
""" | ||
__metaclass__ = models.SubfieldBase | ||
|
||
def __init__(self, initial_state = None, *args, **kwargs): | ||
kwargs['max_length'] = 50 | ||
super(FSMField, self).__init__(*args, **kwargs) | ||
|
||
def get_internal_type(self): | ||
return 'CharField' | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
#-*- coding: utf-8 -*- | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
#-*- coding: utf-8 -*- | ||
from django.test import TestCase | ||
from django.db import models | ||
from django.contrib.contenttypes.models import ContentType | ||
|
||
from django_fsm.db.fields import FSMField, transition | ||
|
||
class BlogPost(models.Model): | ||
state = FSMField(default='new') | ||
|
||
@transition(source='new', target='published') | ||
def publish(self): | ||
pass | ||
|
||
@transition(source='published', target='hidden') | ||
def hide(self): | ||
pass | ||
|
||
@transition(source='new', target='removed') | ||
def remove(self): | ||
raise Exception('No rights to delete') | ||
|
||
|
||
class FSMFieldTest(TestCase): | ||
def setUp(self): | ||
self.model = BlogPost() | ||
|
||
def test_initial_state_instatiated(self): | ||
self.assertEqual(self.model.state, 'new') | ||
|
||
def test_known_transition_should_succeed(self): | ||
self.model.publish() | ||
self.assertEqual(self.model.state, 'published') | ||
|
||
self.model.hide() | ||
self.assertEqual(self.model.state, 'hidden') | ||
|
||
def test_unknow_transition_fails(self): | ||
self.assertRaises(NotImplementedError, self.model.hide) | ||
|
||
def test_state_non_changed_after_fail(self): | ||
self.assertRaises(Exception, self.model.remove) | ||
self.assertEqual(self.model.state, 'new') | ||
|
||
|
||
class InvalidModel(models.Model): | ||
state = FSMField(default='new') | ||
action = FSMField(default='no') | ||
|
||
@transition(source='new', target='no') | ||
def validate(): | ||
pass | ||
|
||
|
||
class InvalidModelTest(TestCase): | ||
def test_two_fsmfields_in_one_model_not_allowed(self): | ||
model = InvalidModel() | ||
self.assertRaises(TypeError, model.validate) | ||
|
||
|
||
class Document(models.Model): | ||
status = FSMField(default='new') | ||
|
||
@transition(source='new', target='published') | ||
def publish(self): | ||
pass | ||
|
||
|
||
class DocumentTest(TestCase): | ||
def test_any_state_field_name_allowed(self): | ||
model = Document() | ||
model.publish() | ||
self.assertEqual(model.status, 'published') | ||
|