diff --git a/requirements.txt b/requirements.txt index ee70de8..4b6151e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ django-mptt factory-boy -mock \ No newline at end of file +mock +pyhamcrest \ No newline at end of file diff --git a/river/core/classworkflowobject.py b/river/core/classworkflowobject.py index ca4821c..0514311 100644 --- a/river/core/classworkflowobject.py +++ b/river/core/classworkflowobject.py @@ -1,8 +1,6 @@ from django.contrib.contenttypes.models import ContentType from river.models import State, TransitionApprovalMeta, Workflow -from river.utils.error_code import ErrorCode -from river.utils.exceptions import RiverException class ClassWorkflowObject(object): @@ -23,30 +21,23 @@ def workflow(self): def _content_type(self): return ContentType.objects.get_for_model(self.workflow_class) - def get_on_approval_objects(self, as_user): - object_pks = [] + def get_available_approvals(self, as_user): + transition_approvals = None for workflow_object in self.workflow_class.objects.all(): instance_workflow = getattr(workflow_object.river, self.name) - transition_approvals = instance_workflow.get_available_approvals(as_user=as_user) - if transition_approvals.count(): - object_pks.append(workflow_object.pk) - return self.workflow_class.objects.filter(pk__in=object_pks) + available_approvals = instance_workflow.get_available_approvals(as_user=as_user) + transition_approvals = available_approvals.union(available_approvals) if transition_approvals else available_approvals + + return transition_approvals + + def get_on_approval_objects(self, as_user): + available_approvals = self.get_available_approvals(as_user) + return self.workflow_class.objects.filter(pk__in=available_approvals.values_list("object_id", flat=True)) @property def initial_state(self): - initial_states = State.objects.filter( - pk__in=TransitionApprovalMeta.objects.filter( - workflow=self.workflow, - parents__isnull=True - ).values_list("source_state", flat=True) - ) - if initial_states.count() == 0: - raise RiverException(ErrorCode.NO_AVAILABLE_INITIAL_STATE, 'There is no available initial state for the content type %s. ' % self._content_type) - elif initial_states.count() > 1: - raise RiverException(ErrorCode.MULTIPLE_INITIAL_STATE, - 'There are multiple initial state for the content type %s. Have only one initial state' % self._content_type) - - return initial_states[0] + workflow = Workflow.objects.filter(content_type=self._content_type, field_name=self.name).first() + return workflow.initial_state if workflow else None @property def final_states(self): diff --git a/river/core/instanceworkflowobject.py b/river/core/instanceworkflowobject.py index a612dd2..a790044 100644 --- a/river/core/instanceworkflowobject.py +++ b/river/core/instanceworkflowobject.py @@ -154,7 +154,7 @@ def process(action, next_state=None, god_mod=False): available_transition_approvals = self.get_available_approvals(as_user=as_user, god_mod=god_mod) c = available_transition_approvals.count() if c == 0: - raise RiverException(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, "There is no available state for destination for the user.") + raise RiverException(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, "There is no available approval for the user") if c > 1: if next_state: available_transition_approvals = available_transition_approvals.filter(destination_state=next_state) @@ -188,7 +188,8 @@ def process(action, next_state=None, god_mod=False): transition_status = True # Next states should be PENDING back again if there is circle. - self._cycle_proceedings() + if self._check_if_it_cycled(transition_approval.destination_state): + self._re_create_cycled_path(transition_approval.destination_state) # ProceedingService.get_next_proceedings(workflow_object).update(status=PENDING) with ProceedingSignal(self.workflow_object, self.field_name, transition_approval), \ @@ -224,33 +225,38 @@ def _get_next_approvals(self, transition_approval_pks=None, current_states=None, return proceedings - @atomic - def _cycle_proceedings(self): - """ - Finds next proceedings and clone them for cycling if it exists. - """ - next_approvals = self._get_next_approvals().exclude( - status=PENDING).exclude(cloned=True) - for ta in next_approvals: - clone_transition_approval, c = TransitionApproval.objects.get_or_create( - source_state=ta.source_state, - destination_state=ta.destination_state, - content_type=ta.content_type, - object_id=ta.object_id, + def _check_if_it_cycled(self, new_state): + return TransitionApproval.objects.filter( + workflow_object=self.workflow_object, + workflow=self.class_workflow.workflow, + source_state=new_state, + status=APPROVED + ).count() > 0 + + def _re_create_cycled_path(self, from_state): + approvals = TransitionApproval.objects.filter(workflow_object=self.workflow_object, workflow=self.class_workflow.workflow, source_state=from_state) + cycle_ended = False + while not cycle_ended: + for old_approval in approvals: + if old_approval.enabled: + TransitionApproval.objects.get_or_create( + source_state=old_approval.source_state, + destination_state=old_approval.destination_state, + workflow=old_approval.workflow, + object_id=old_approval.workflow_object.pk, + content_type=old_approval.content_type, + skip=False, + priority=old_approval.priority, + enabled=True, + status=PENDING, + meta=old_approval.meta + ) + approvals = TransitionApproval.objects.filter( + workflow_object=self.workflow_object, workflow=self.class_workflow.workflow, - skip=ta.skip, - priority=ta.priority, - enabled=ta.enabled, - status=PENDING, - meta=ta.meta + source_state__in=approvals.values_list("destination_state", flat=TransitionApproval) ) - - if c: - clone_transition_approval.permissions.add(*ta.permissions.all()) - clone_transition_approval.groups.add(*ta.groups.all()) - next_approvals.update(cloned=True) - - return True if next_approvals.count() else False + cycle_ended = approvals.filter(source_state=from_state).count() > 0 def get_state(self): return getattr(self.workflow_object, self.field_name) diff --git a/river/tests/base_test.py b/river/tests/base_test.py deleted file mode 100644 index 19f0256..0000000 --- a/river/tests/base_test.py +++ /dev/null @@ -1,160 +0,0 @@ -from django.contrib.contenttypes.models import ContentType -from django.test import TestCase - -from river.models.factories import StateObjectFactory, TransitionApprovalMetaFactory, PermissionObjectFactory, UserObjectFactory, WorkflowFactory -from river.tests.models import BasicTestModel - -__author__ = 'ahmetdal' - - -class BaseTestCase(TestCase): - def initialize_standard_scenario(self): - TransitionApprovalMetaFactory.reset_sequence(0) - StateObjectFactory.reset_sequence(0) - - content_type = ContentType.objects.get_for_model(BasicTestModel) - permissions = PermissionObjectFactory.create_batch(4) - self.user1 = UserObjectFactory(user_permissions=[permissions[0]]) - self.user2 = UserObjectFactory(user_permissions=[permissions[1]]) - self.user3 = UserObjectFactory(user_permissions=[permissions[2]]) - self.user4 = UserObjectFactory(user_permissions=[permissions[3]]) - - self.state1 = StateObjectFactory(label="state1") - self.state2 = StateObjectFactory(label="state2") - self.state3 = StateObjectFactory(label="state3") - self.state4 = StateObjectFactory(label="state4") - self.state5 = StateObjectFactory(label="state5") - - self.state41 = StateObjectFactory(label="state4.1") - self.state42 = StateObjectFactory(label="state4.2") - - self.state51 = StateObjectFactory(label="state5.1") - self.state52 = StateObjectFactory(label="state5.2") - - self.workflow = WorkflowFactory(field_name="my_field", content_type=content_type, initial_state=self.state1) - - t1 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.state1, - destination_state=self.state2, - priority=0 - ) - t1.permissions.add(permissions[0]) - - t2 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.state2, - destination_state=self.state3, - priority=0 - ) - t2.permissions.add(permissions[1]) - - t3 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.state2, - destination_state=self.state3, - priority=1 - ) - t3.permissions.add(permissions[2]) - - t4 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.state3, - destination_state=self.state4, - priority=0 - ) - t4.permissions.add(permissions[3]) - - t5 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.state3, - destination_state=self.state5, - priority=0 - ) - t5.permissions.add(permissions[3]) - - t6 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.state4, - destination_state=self.state41, - priority=0 - ) - t6.permissions.add(permissions[3]) - - t7 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.state4, - destination_state=self.state42, - priority=0 - ) - t7.permissions.add(permissions[3]) - - t8 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.state5, - destination_state=self.state51, - priority=0 - ) - t8.permissions.add(permissions[3]) - - t9 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.state5, - destination_state=self.state52, - priority=0 - ) - t9.permissions.add(permissions[3]) - - def initialize_circular_scenario(self): - StateObjectFactory.reset_sequence(0) - TransitionApprovalMetaFactory.reset_sequence(0) - - content_type = ContentType.objects.get_for_model(BasicTestModel) - permissions = PermissionObjectFactory.create_batch(4) - self.user1 = UserObjectFactory(user_permissions=[permissions[0]]) - self.user2 = UserObjectFactory(user_permissions=[permissions[1]]) - self.user3 = UserObjectFactory(user_permissions=[permissions[2]]) - self.user4 = UserObjectFactory(user_permissions=[permissions[3]]) - - self.open_state = StateObjectFactory(label='open') - self.in_progress_state = StateObjectFactory(label='in-progress') - self.resolved_state = StateObjectFactory(label='resolved') - self.re_opened_state = StateObjectFactory(label='re-opened') - self.closed_state = StateObjectFactory(label='closed') - - self.workflow = WorkflowFactory(field_name="my_field", content_type=content_type, initial_state=self.open_state) - - t1 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.open_state, - destination_state=self.in_progress_state, - ) - t1.permissions.add(permissions[0]) - - t2 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.in_progress_state, - destination_state=self.resolved_state, - ) - t2.permissions.add(permissions[1]) - - t3 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.resolved_state, - destination_state=self.re_opened_state, - ) - t3.permissions.add(permissions[2]) - - t4 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.resolved_state, - destination_state=self.closed_state, - ) - t4.permissions.add(permissions[3]) - - t5 = TransitionApprovalMetaFactory.create( - workflow=self.workflow, - source_state=self.re_opened_state, - destination_state=self.in_progress_state, - ) - t5.permissions.add(permissions[0]) diff --git a/river/tests/transient/__init__.py b/river/tests/core/__init__.py similarity index 100% rename from river/tests/transient/__init__.py rename to river/tests/core/__init__.py diff --git a/river/tests/core/test__class_api.py b/river/tests/core/test__class_api.py new file mode 100644 index 0000000..327235e --- /dev/null +++ b/river/tests/core/test__class_api.py @@ -0,0 +1,224 @@ +from datetime import datetime, timedelta +from unittest import skip + +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from hamcrest import assert_that, equal_to, has_item, all_of, has_property, less_than, has_items, has_length + +from river.models import TransitionApproval +from river.models.factories import PermissionObjectFactory, UserObjectFactory, StateObjectFactory, TransitionApprovalMetaFactory, GroupObjectFactory, WorkflowFactory +from river.tests.models import BasicTestModel +from river.tests.models.factories import BasicTestModelObjectFactory + + +# noinspection PyMethodMayBeStatic,DuplicatedCode +class ClassApiTest(TestCase): + + def __init__(self, *args, **kwargs): + super(ClassApiTest, self).__init__(*args, **kwargs) + self.content_type = ContentType.objects.get_for_model(BasicTestModel) + + def test_shouldReturnNoApprovalWhenUserIsUnAuthorized(self): + unauthorized_user = UserObjectFactory() + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + authorized_permission = PermissionObjectFactory() + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + + BasicTestModelObjectFactory() + + available_approvals = BasicTestModel.river.my_field.get_available_approvals(as_user=unauthorized_user) + assert_that(available_approvals, has_length(0)) + + def test_shouldReturnAnApprovalWhenUserIsAuthorizedWithAPermission(self): + authorized_permission = PermissionObjectFactory() + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + available_approvals = BasicTestModel.river.my_field.get_available_approvals(as_user=authorized_user) + assert_that(available_approvals, has_length(1)) + assert_that(list(available_approvals), has_item( + all_of( + has_property("workflow_object", workflow_object.model), + has_property("workflow", workflow), + has_property("source_state", state1), + has_property("destination_state", state2) + ) + )) + + def test_shouldReturnAnApprovalWhenUserIsAuthorizedWithAUserGroup(self): + authorized_user_group = GroupObjectFactory() + authorized_user = UserObjectFactory(groups=[authorized_user_group]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + + approval_meta = TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0 + ) + approval_meta.groups.add(authorized_user_group) + + workflow_object = BasicTestModelObjectFactory() + + available_approvals = BasicTestModel.river.my_field.get_available_approvals(as_user=authorized_user) + assert_that(available_approvals, has_length(1)) + assert_that(list(available_approvals), has_item( + all_of( + has_property("workflow_object", workflow_object.model), + has_property("workflow", workflow), + has_property("source_state", state1), + has_property("destination_state", state2) + ) + )) + + def test_shouldReturnAnApprovalWhenUserIsAuthorizedAsTransactioner(self): + authorized_user = UserObjectFactory() + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0 + ) + + workflow_object = BasicTestModelObjectFactory() + + TransitionApproval.objects.filter(workflow_object=workflow_object.model).update(transactioner=authorized_user) + + available_approvals = BasicTestModel.river.my_field.get_available_approvals(as_user=authorized_user) + assert_that(available_approvals, has_length(1)) + assert_that(list(available_approvals), has_item( + all_of( + has_property("workflow_object", workflow_object.model), + has_property("workflow", workflow), + has_property("destination_state", state2) + ) + )) + + @skip("This test proves the implementation is terrible at scale") + def test__shouldReturnApprovalsOnTimeWhenTooManyWorkflowObject(self): + authorized_permission = PermissionObjectFactory() + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + + self.objects = BasicTestModelObjectFactory.create_batch(250) + before = datetime.now() + BasicTestModel.river.my_field.get_on_approval_objects(as_user=authorized_user) + after = datetime.now() + assert_that(after - before, less_than(timedelta(milliseconds=100))) + print("Time taken %s" % str(after - before)) + + def test_shouldAssesInitialStateProperly(self): + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0 + ) + + assert_that(BasicTestModel.river.my_field.initial_state, equal_to(state1)) + + def test_shouldAssesFinalStateProperlyWhenThereIsOnlyOne(self): + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0 + ) + assert_that(BasicTestModel.river.my_field.final_states, has_length(1)) + assert_that(list(BasicTestModel.river.my_field.final_states), has_item(state2)) + + def test_shouldAssesFinalStateProperlyWhenThereAreMultiple(self): + state1 = StateObjectFactory(label="state1") + state21 = StateObjectFactory(label="state21") + state22 = StateObjectFactory(label="state22") + state31 = StateObjectFactory(label="state31") + state32 = StateObjectFactory(label="state32") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state21, + priority=0 + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state22, + priority=0 + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state31, + priority=0 + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state32, + priority=0 + ) + + assert_that(BasicTestModel.river.my_field.final_states, has_length(4)) + assert_that(list(BasicTestModel.river.my_field.final_states), has_items(state21, state22, state31, state32)) diff --git a/river/tests/core/test__instance_api.py b/river/tests/core/test__instance_api.py new file mode 100644 index 0000000..fb020d5 --- /dev/null +++ b/river/tests/core/test__instance_api.py @@ -0,0 +1,359 @@ +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from hamcrest import assert_that, equal_to, has_item, has_property, raises, calling, has_length, is_not + +from river.models import TransitionApproval +from river.models.factories import UserObjectFactory, StateObjectFactory, TransitionApprovalMetaFactory, PermissionObjectFactory, WorkflowFactory +from river.tests.models import BasicTestModel +from river.tests.models.factories import BasicTestModelObjectFactory +from river.utils.exceptions import RiverException + + +# noinspection PyMethodMayBeStatic,DuplicatedCode +class InstanceApiTest(TestCase): + + def __init__(self, *args, **kwargs): + super(InstanceApiTest, self).__init__(*args, **kwargs) + self.content_type = ContentType.objects.get_for_model(BasicTestModel) + + def test_shouldNotReturnOtherObjectsApprovalsForTheAuthorizedUser(self): + authorized_permission = PermissionObjectFactory() + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + + ) + + workflow_object1 = BasicTestModelObjectFactory() + workflow_object2 = BasicTestModelObjectFactory() + + available_approvals = workflow_object1.model.river.my_field.get_available_approvals(as_user=authorized_user) + assert_that(available_approvals, has_length(1)) + assert_that(list(available_approvals), has_item( + has_property("workflow_object", workflow_object1.model) + )) + assert_that(list(available_approvals), has_item( + is_not(has_property("workflow_object", workflow_object2.model)) + )) + + def test_shouldNotAllowUnauthorizedUserToProceedToNextState(self): + unauthorized_user = UserObjectFactory() + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + authorized_permission = PermissionObjectFactory() + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + assert_that( + calling(workflow_object.model.river.my_field.approve).with_args(as_user=unauthorized_user), + raises(RiverException, "There is no available approval for the user") + ) + + def test_shouldAllowAuthorizedUserToProceedToNextState(self): + authorized_permission = PermissionObjectFactory() + + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + assert_that(workflow_object.model.my_field, equal_to(state1)) + workflow_object.model.river.my_field.approve(as_user=authorized_user) + assert_that(workflow_object.model.my_field, equal_to(state2)) + + def test_shouldNotLetUserWhosePriorityComesLaterApproveProceed(self): + manager_permission = PermissionObjectFactory() + team_leader_permission = PermissionObjectFactory() + + manager = UserObjectFactory(user_permissions=[manager_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=1, + permissions=[manager_permission] + + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[team_leader_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + assert_that( + calling(workflow_object.model.river.my_field.approve).with_args(as_user=manager), + raises(RiverException, "There is no available approval for the user") + ) + + def test_shouldNotTransitToNextStateWhenThereAreMultipleApprovalsToBeApproved(self): + manager_permission = PermissionObjectFactory() + team_leader_permission = PermissionObjectFactory() + + team_leader = UserObjectFactory(user_permissions=[team_leader_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=1, + permissions=[manager_permission] + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[team_leader_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + assert_that(workflow_object.model.my_field, equal_to(state1)) + workflow_object.model.river.my_field.approve(team_leader) + assert_that(workflow_object.model.my_field, equal_to(state1)) + + def test_shouldTransitToNextStateWhenAppTheApprovalsAreApprovedBeApproved(self): + manager_permission = PermissionObjectFactory() + team_leader_permission = PermissionObjectFactory() + + manager = UserObjectFactory(user_permissions=[manager_permission]) + team_leader = UserObjectFactory(user_permissions=[team_leader_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=1, + permissions=[manager_permission] + + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[team_leader_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + assert_that(workflow_object.model.my_field, equal_to(state1)) + workflow_object.model.river.my_field.approve(team_leader) + assert_that(workflow_object.model.my_field, equal_to(state1)) + + assert_that(workflow_object.model.my_field, equal_to(state1)) + workflow_object.model.river.my_field.approve(manager) + assert_that(workflow_object.model.my_field, equal_to(state2)) + + def test_shouldDictatePassingNextStateWhenThereAreMultiple(self): + authorized_permission = PermissionObjectFactory() + + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + state3 = StateObjectFactory(label="state3") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state3, + priority=0, + permissions=[authorized_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + assert_that( + calling(workflow_object.model.river.my_field.approve).with_args(as_user=authorized_user), + raises(RiverException, "State must be given when there are multiple states for destination") + ) + + def test_shouldTransitToTheGivenNextStateWhenThereAreMultipleNextStates(self): + authorized_permission = PermissionObjectFactory() + + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + state3 = StateObjectFactory(label="state3") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state3, + priority=0, + permissions=[authorized_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + assert_that(workflow_object.model.my_field, equal_to(state1)) + workflow_object.model.river.my_field.approve(as_user=authorized_user, next_state=state3) + assert_that(workflow_object.model.my_field, equal_to(state3)) + + def test_shouldNotAcceptANextStateWhichIsNotAmongPossibleNextStates(self): + authorized_permission = PermissionObjectFactory() + + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + state3 = StateObjectFactory(label="state3") + invalid_state = StateObjectFactory(label="state4") + + workflow = WorkflowFactory(initial_state=state1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state3, + priority=0, + permissions=[authorized_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + assert_that( + calling(workflow_object.model.river.my_field.approve).with_args(as_user=authorized_user, next_state=invalid_state), + raises(RiverException, + "Invalid state is given\(%s\). Valid states is\(are\) (%s|%s)" % ( + invalid_state.label, + ",".join([state2.label, state3.label]), + ",".join([state3.label, state2.label])) + ) + ) + + def test_shouldAllowCyclicTransitions(self): + authorized_permission = PermissionObjectFactory() + + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) + + cycle_state_1 = StateObjectFactory(label="cycle_state_1") + cycle_state_2 = StateObjectFactory(label="cycle_state_2") + cycle_state_3 = StateObjectFactory(label="cycle_state_3") + off_the_cycle_state = StateObjectFactory(label="off_the_cycle_state") + + workflow = WorkflowFactory(initial_state=cycle_state_1, content_type=self.content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=cycle_state_1, + destination_state=cycle_state_2, + priority=0, + permissions=[authorized_permission] + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=cycle_state_2, + destination_state=cycle_state_3, + priority=0, + permissions=[authorized_permission] + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=cycle_state_3, + destination_state=cycle_state_1, + priority=0, + permissions=[authorized_permission] + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=cycle_state_3, + destination_state=off_the_cycle_state, + priority=0, + permissions=[authorized_permission] + ) + + workflow_object = BasicTestModelObjectFactory() + + assert_that(workflow_object.model.my_field, equal_to(cycle_state_1)) + workflow_object.model.river.my_field.approve(as_user=authorized_user) + assert_that(workflow_object.model.my_field, equal_to(cycle_state_2)) + workflow_object.model.river.my_field.approve(as_user=authorized_user) + assert_that(workflow_object.model.my_field, equal_to(cycle_state_3)) + + approvals = TransitionApproval.objects.filter(workflow=workflow, workflow_object=workflow_object.model) + assert_that(approvals, has_length(4)) + + workflow_object.model.river.my_field.approve(as_user=authorized_user, next_state=cycle_state_1) + assert_that(workflow_object.model.my_field, equal_to(cycle_state_1)) + + approvals = TransitionApproval.objects.filter(workflow=workflow, workflow_object=workflow_object.model) + assert_that(approvals, has_length(7)) diff --git a/river/tests/hooking/backends/test__database_hooking_backend.py b/river/tests/hooking/backends/test__database_hooking_backend.py index 7ba9b48..a24c8c9 100644 --- a/river/tests/hooking/backends/test__database_hooking_backend.py +++ b/river/tests/hooking/backends/test__database_hooking_backend.py @@ -1,8 +1,13 @@ +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from hamcrest import is_not, assert_that, has_key, has_property, has_value, has_length, has_item + from river.config import app_config from river.hooking.backends.loader import load_callback_backend from river.hooking.transition import PostTransitionHooking from river.models.callback import Callback -from river.tests.base_test import BaseTestCase +from river.models.factories import WorkflowFactory, TransitionApprovalMetaFactory, StateObjectFactory, PermissionObjectFactory +from river.tests.models import BasicTestModel from river.tests.models.factories import BasicTestModelObjectFactory __author__ = 'ahmetdal' @@ -12,42 +17,57 @@ def test_callback(*args, **kwargs): pass -class DatabaseHookingBackendTest(BaseTestCase): +# noinspection DuplicatedCode +class DatabaseHookingBackendTest(TestCase): def setUp(self): - super(DatabaseHookingBackendTest, self).setUp() self.field_name = "my_field" - self.initialize_standard_scenario() + authorized_permission = PermissionObjectFactory() + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + content_type = ContentType.objects.get_for_model(BasicTestModel) + workflow = WorkflowFactory(initial_state=state1, content_type=content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + app_config.HOOKING_BACKEND_CLASS = 'river.hooking.backends.database.DatabaseHookingBackend' self.handler_backend = load_callback_backend() self.handler_backend.callbacks = {} - def test_register(self): - objects = BasicTestModelObjectFactory.create_batch(2) + def test_shouldRegisterAHooking(self): + workflow_objects = BasicTestModelObjectFactory.create_batch(2) + + hooking_hash = '%s.%s_object%s_field_name%s' % (PostTransitionHooking.__module__, PostTransitionHooking.__name__, workflow_objects[1].pk, self.field_name) - self.assertEqual(0, Callback.objects.count()) - self.assertFalse('%s.%s_object%sfield%s' % ( - PostTransitionHooking.__module__, PostTransitionHooking.__name__, objects[1].pk, - 'my_field') in self.handler_backend.callbacks) + assert_that(self.handler_backend.callbacks, is_not(has_key(hooking_hash))) - self.handler_backend.register(PostTransitionHooking, test_callback, objects[1], self.field_name) + self.handler_backend.register(PostTransitionHooking, test_callback, workflow_objects[1], self.field_name) - self.assertEqual(1, Callback.objects.filter(hash='%s.%s_object%s_field_name%s' % ( - PostTransitionHooking.__module__, PostTransitionHooking.__name__, objects[1].pk, self.field_name)).count()) - self.assertTrue('%s.%s_object%s_field_name%s' % ( - PostTransitionHooking.__module__, PostTransitionHooking.__name__, objects[1].pk, self.field_name) in self.handler_backend.callbacks) - self.assertEqual(test_callback.__name__, list(self.handler_backend.callbacks.values())[0].__name__) + assert_that(self.handler_backend.callbacks, has_key(hooking_hash)) + assert_that(self.handler_backend.callbacks, has_value(has_property("__name__", test_callback.__name__))) - def test_register_in_multiprocessing(self): - objects = BasicTestModelObjectFactory.create_batch(2) + self.handler_backend.register(PostTransitionHooking, test_callback, workflow_objects[1], self.field_name) + + def test_shouldRegisterAHookingResilientlyToMultiProcessing(self): + workflow_objects = BasicTestModelObjectFactory.create_batch(2) from multiprocessing import Process, Queue - self.handler_backend.register(PostTransitionHooking, test_callback, objects[1], self.field_name) - self.assertEqual(1, Callback.objects.count()) + assert_that(Callback.objects.all(), has_length(0)) + + self.handler_backend.register(PostTransitionHooking, test_callback, workflow_objects[1], self.field_name) + + assert_that(Callback.objects.all(), has_length(1)) def worker2(q): second_handler_backend = load_callback_backend() - handlers = second_handler_backend.get_callbacks(PostTransitionHooking, objects[1], self.field_name) + handlers = second_handler_backend.get_callbacks(PostTransitionHooking, workflow_objects[1], self.field_name) q.put([f.__name__ for f in handlers]) q = Queue() @@ -56,25 +76,25 @@ def worker2(q): p2.start() handlers = q.get(timeout=1) - self.assertEqual(1, len(handlers)) - self.assertEqual(test_callback.__name__, handlers[0]) - def test_get_callbacks(self): - objects = BasicTestModelObjectFactory.create_batch(2) + assert_that(handlers, has_length(1)) + assert_that(handlers, has_item(test_callback.__name__)) - self.handler_backend.register(PostTransitionHooking, test_callback, objects[1], self.field_name) - handlers = self.handler_backend.get_callbacks(PostTransitionHooking, objects[1], self.field_name) - self.assertEqual(1, len(handlers)) + def test_shouldReturnTheRegisteredHooking(self): + workflow_objects = BasicTestModelObjectFactory.create_batch(2) - self.assertEqual(test_callback.__name__, handlers[0].__name__) + self.handler_backend.register(PostTransitionHooking, test_callback, workflow_objects[1], self.field_name) + handlers = self.handler_backend.get_callbacks(PostTransitionHooking, workflow_objects[1], self.field_name) + assert_that(handlers, has_length(1)) + assert_that(handlers, has_item(has_property("__name__", test_callback.__name__))) def test_get_handlers_in_multiprocessing(self): - objects = BasicTestModelObjectFactory.create_batch(2) + workflow_objects = BasicTestModelObjectFactory.create_batch(2) from multiprocessing import Process, Queue Callback.objects.update_or_create( - hash='%s.%s_object%s_field_name%s' % (PostTransitionHooking.__module__, PostTransitionHooking.__name__, objects[1].pk, self.field_name), + hash='%s.%s_object%s_field_name%s' % (PostTransitionHooking.__module__, PostTransitionHooking.__name__, workflow_objects[1].pk, self.field_name), defaults={ 'method': '%s.%s' % (test_callback.__module__, test_callback.__name__), 'hooking_cls': '%s.%s' % (PostTransitionHooking.__module__, PostTransitionHooking.__name__), @@ -82,7 +102,7 @@ def test_get_handlers_in_multiprocessing(self): ) def worker2(q): - handlers = self.handler_backend.get_callbacks(PostTransitionHooking, objects[1], self.field_name) + handlers = self.handler_backend.get_callbacks(PostTransitionHooking, workflow_objects[1], self.field_name) q.put([f.__name__ for f in handlers]) q = Queue() @@ -91,5 +111,5 @@ def worker2(q): p2.start() handlers = q.get(timeout=1) - self.assertEqual(1, len(handlers)) - self.assertEqual(test_callback.__name__, handlers[0]) + assert_that(handlers, has_length(1)) + assert_that(handlers, has_item(test_callback.__name__)) diff --git a/river/tests/hooking/backends/test__memory_handler_backend.py b/river/tests/hooking/backends/test__memory_handler_backend.py index 5ea2623..32e7ce6 100644 --- a/river/tests/hooking/backends/test__memory_handler_backend.py +++ b/river/tests/hooking/backends/test__memory_handler_backend.py @@ -1,7 +1,12 @@ +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from hamcrest import assert_that, has_value, is_not, has_key, has_property, has_length, has_item + from river.config import app_config from river.hooking.backends.loader import load_callback_backend from river.hooking.transition import PostTransitionHooking -from river.tests.base_test import BaseTestCase +from river.models.factories import PermissionObjectFactory, StateObjectFactory, WorkflowFactory, TransitionApprovalMetaFactory +from river.tests.models import BasicTestModel from river.tests.models.factories import BasicTestModelObjectFactory __author__ = 'ahmetdal' @@ -11,30 +16,46 @@ def test_handler(*args, **kwargs): pass -class MemoryHookingBackendTest(BaseTestCase): +# noinspection DuplicatedCode +class MemoryHookingBackendTest(TestCase): def setUp(self): - super(MemoryHookingBackendTest, self).setUp() self.field_name = "my_field" - self.initialize_standard_scenario() + authorized_permission = PermissionObjectFactory() + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + + content_type = ContentType.objects.get_for_model(BasicTestModel) + workflow = WorkflowFactory(initial_state=state1, content_type=content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + app_config.HOOKING_BACKEND_CLASS = 'river.hooking.backends.memory.MemoryHookingBackend' self.handler_backend = load_callback_backend() self.handler_backend.callbacks = {} - def test_register(self): - objects = BasicTestModelObjectFactory.create_batch(2) + def test_shouldRegisterAHooking(self): + workflow_object = BasicTestModelObjectFactory.create_batch(2) + + hooking_hash = '%s.%s_object%s_field_name%s' % (PostTransitionHooking.__module__, PostTransitionHooking.__name__, workflow_object[1].pk, self.field_name) + + assert_that(self.handler_backend.callbacks, is_not(has_key(hooking_hash))) + + self.handler_backend.register(PostTransitionHooking, test_handler, workflow_object[1], self.field_name) - self.assertFalse('%s.%s_object%s_field_name%s' % ( - PostTransitionHooking.__module__, PostTransitionHooking.__name__, objects[1].pk, self.field_name) in self.handler_backend.callbacks) - self.handler_backend.register(PostTransitionHooking, test_handler, objects[1], self.field_name) - self.assertTrue( - '%s.%s_object%s_field_name%s' % (PostTransitionHooking.__module__, PostTransitionHooking.__name__, objects[1].pk, self.field_name) in self.handler_backend.callbacks) - self.assertEqual(test_handler.__name__, list(self.handler_backend.callbacks.values())[0].__name__) + assert_that(self.handler_backend.callbacks, has_key(hooking_hash)) + assert_that(self.handler_backend.callbacks, has_value(has_property("__name__", test_handler.__name__))) - def test_get_handlers(self): - object = BasicTestModelObjectFactory.create_batch(1)[0] + def test_shouldReturnTheRegisteredHooking(self): + workflow_object = BasicTestModelObjectFactory.create_batch(1)[0] - self.handler_backend.register(PostTransitionHooking, test_handler, object, self.field_name) - callbacks = self.handler_backend.get_callbacks(PostTransitionHooking, object, self.field_name) - self.assertEqual(1, len(callbacks)) + self.handler_backend.register(PostTransitionHooking, test_handler, workflow_object, self.field_name) + callbacks = self.handler_backend.get_callbacks(PostTransitionHooking, workflow_object, self.field_name) - self.assertEqual(test_handler.__name__, callbacks[0].__name__) + assert_that(callbacks, has_length(1)) + assert_that(callbacks, has_item(has_property("__name__", test_handler.__name__))) diff --git a/river/tests/hooking/test__completed_hooking.py b/river/tests/hooking/test__completed_hooking.py index 420e73f..377bc32 100644 --- a/river/tests/hooking/test__completed_hooking.py +++ b/river/tests/hooking/test__completed_hooking.py @@ -1,17 +1,46 @@ -from river.hooking.completed import PostCompletedHooking -from river.tests.base_test import BaseTestCase +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from hamcrest import assert_that, equal_to, none + +from river.models.factories import PermissionObjectFactory, StateObjectFactory, WorkflowFactory, TransitionApprovalMetaFactory, UserObjectFactory +from river.tests.models import BasicTestModel from river.tests.models.factories import BasicTestModelObjectFactory __author__ = 'ahmetdal' -class CompletedHookingTest(BaseTestCase): +# noinspection DuplicatedCode +class CompletedHookingTest(TestCase): def setUp(self): super(CompletedHookingTest, self).setUp() - self.initialize_standard_scenario() - def test_register_for_an_object(self): - objects = BasicTestModelObjectFactory.create_batch(2) + def test_shouldInvokeTheRegisteredCallBackWhenFlowIsCompleteForTheObject(self): + authorized_permission = PermissionObjectFactory() + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) + + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + state3 = StateObjectFactory(label="state3") + + content_type = ContentType.objects.get_for_model(BasicTestModel) + workflow = WorkflowFactory(initial_state=state1, content_type=content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) + + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state2, + destination_state=state3, + priority=0, + permissions=[authorized_permission] + ) + + workflow_object = BasicTestModelObjectFactory() self.test_args = None self.test_kwargs = None @@ -20,32 +49,17 @@ def test_callback(*args, **kwargs): self.test_args = args self.test_kwargs = kwargs - objects[0].river.my_field.hook_post_complete(test_callback) - - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) - - objects[0].river.my_field.approve(as_user=self.user1) - - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) - - # Proceeded but no transition - objects[0].river.my_field.approve(as_user=self.user2) - - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) - - objects[0].river.my_field.approve(as_user=self.user3) + workflow_object.model.river.my_field.hook_post_complete(test_callback) - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) + assert_that(self.test_args, none()) - objects[0].river.my_field.approve(as_user=self.user4, next_state=self.state4) + assert_that(workflow_object.model.my_field, equal_to(state1)) + workflow_object.model.river.my_field.approve(as_user=authorized_user) + assert_that(workflow_object.model.my_field, equal_to(state2)) - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) + assert_that(self.test_args, none()) - objects[0].river.my_field.approve(as_user=self.user4, next_state=self.state41) + workflow_object.model.river.my_field.approve(as_user=authorized_user) + assert_that(workflow_object.model.my_field, equal_to(state3)) - self.assertEqual((objects[0], "my_field"), self.test_args) + assert_that(self.test_args, equal_to((workflow_object.model, "my_field"))) diff --git a/river/tests/hooking/test__transition_hooking.py b/river/tests/hooking/test__transition_hooking.py index 63b2c3b..8aa77a6 100644 --- a/river/tests/hooking/test__transition_hooking.py +++ b/river/tests/hooking/test__transition_hooking.py @@ -1,16 +1,19 @@ -from django.contrib.auth.models import Permission +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from hamcrest import equal_to, assert_that, none, has_entry from river.models import TransitionApproval -from river.tests.base_test import BaseTestCase +from river.models.factories import PermissionObjectFactory, UserObjectFactory, StateObjectFactory, WorkflowFactory, TransitionApprovalMetaFactory +from river.tests.models import BasicTestModel from river.tests.models.factories import BasicTestModelObjectFactory __author__ = 'ahmetdal' -class TransitionHooking(BaseTestCase): +# noinspection DuplicatedCode +class TransitionHooking(TestCase): - def test_register_for_an_object(self): - self.initialize_standard_scenario() + def test_shouldInvokeTheRegisteredCallBackWhenATransitionHappens(self): self.test_args = None self.test_kwargs = None @@ -18,35 +21,50 @@ def test_callback(*args, **kwargs): self.test_args = args self.test_kwargs = kwargs - objects = BasicTestModelObjectFactory.create_batch(2) + authorized_permission = PermissionObjectFactory() + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) - objects[1].river.my_field.hook_post_transition(test_callback) + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") - objects[0].river.my_field.approve(as_user=self.user1) + content_type = ContentType.objects.get_for_model(BasicTestModel) + workflow = WorkflowFactory(initial_state=state1, content_type=content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=1, + permissions=[authorized_permission] + ) - objects[0].river.my_field.approve(as_user=self.user2) + workflow_object = BasicTestModelObjectFactory() - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) + workflow_object.model.river.my_field.hook_post_transition(test_callback) - objects[0].river.my_field.hook_post_transition(test_callback) + assert_that(self.test_args, none()) - objects[0].river.my_field.approve(as_user=self.user3) + assert_that(workflow_object.model.my_field, equal_to(state1)) + workflow_object.model.river.my_field.approve(as_user=authorized_user) + assert_that(workflow_object.model.my_field, equal_to(state1)) - self.assertEqual((objects[0], "my_field"), self.test_args) + assert_that(self.test_args, none()) - self.assertDictEqual( - { - 'transition_approval': TransitionApproval.objects.get(object_id=objects[0].pk, source_state=self.state2, destination_state=self.state3, - permissions__in=Permission.objects.filter(user=self.user3)) - }, self.test_kwargs) + workflow_object.model.river.my_field.approve(as_user=authorized_user) + assert_that(workflow_object.model.my_field, equal_to(state2)) + assert_that(self.test_args, equal_to((workflow_object.model, "my_field"))) - def test_register_for_a_transition(self): - self.initialize_standard_scenario() + last_approval = TransitionApproval.objects.get(object_id=workflow_object.model.pk, source_state=state1, destination_state=state2, priority=1) + assert_that(self.test_kwargs, has_entry(equal_to("transition_approval"), equal_to(last_approval))) + def test_shouldInvokeTheRegisteredCallBackWhenASpecificTransitionHappens(self): self.test_args = None self.test_kwargs = None @@ -54,27 +72,46 @@ def test_callback(*args, **kwargs): self.test_args = args self.test_kwargs = kwargs - objects = BasicTestModelObjectFactory.create_batch(2) - objects[0].river.my_field.hook_post_transition(test_callback, source_state=self.state2, destination_state=self.state3) + authorized_permission = PermissionObjectFactory() + authorized_user = UserObjectFactory(user_permissions=[authorized_permission]) - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) + state1 = StateObjectFactory(label="state1") + state2 = StateObjectFactory(label="state2") + state3 = StateObjectFactory(label="state3") - objects[0].river.my_field.approve(as_user=self.user1) + content_type = ContentType.objects.get_for_model(BasicTestModel) + workflow = WorkflowFactory(initial_state=state1, content_type=content_type, field_name="my_field") + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state1, + destination_state=state2, + priority=0, + permissions=[authorized_permission] + ) - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) + TransitionApprovalMetaFactory.create( + workflow=workflow, + source_state=state2, + destination_state=state3, + priority=0, + permissions=[authorized_permission] + ) - objects[0].river.my_field.approve(as_user=self.user2) + workflow_object = BasicTestModelObjectFactory() - self.assertIsNone(self.test_args) - self.assertIsNone(self.test_kwargs) + workflow_object.model.river.my_field.hook_post_transition(test_callback, source_state=state2, destination_state=state3) - objects[0].river.my_field.approve(as_user=self.user3) + assert_that(self.test_args, none()) - self.assertEqual((objects[0], "my_field"), self.test_args) - self.assertDictEqual( - { - 'transition_approval': TransitionApproval.objects.get(object_id=objects[0].pk, source_state=self.state2, destination_state=self.state3, - permissions__in=Permission.objects.filter(user=self.user3)) - }, self.test_kwargs) + assert_that(workflow_object.model.my_field, equal_to(state1)) + workflow_object.model.river.my_field.approve(as_user=authorized_user) + assert_that(workflow_object.model.my_field, equal_to(state2)) + + assert_that(self.test_args, none()) + + workflow_object.model.river.my_field.approve(as_user=authorized_user) + assert_that(workflow_object.model.my_field, equal_to(state3)) + assert_that(self.test_args, equal_to((workflow_object.model, "my_field"))) + + last_approval = TransitionApproval.objects.get(object_id=workflow_object.model.pk, source_state=state2, destination_state=state3) + assert_that(self.test_kwargs, has_entry(equal_to("transition_approval"), equal_to(last_approval))) diff --git a/river/tests/test__river.py b/river/tests/test__river.py deleted file mode 100644 index af06ef9..0000000 --- a/river/tests/test__river.py +++ /dev/null @@ -1,405 +0,0 @@ -from datetime import datetime, timedelta - -from river.models import TransitionApproval, APPROVED, TransitionApprovalMeta -from river.tests.base_test import BaseTestCase -from river.tests.models.factories import BasicTestModelObjectFactory -from river.tests.models import BasicTestModel -from river.utils.error_code import ErrorCode -from river.utils.exceptions import RiverException - -__author__ = 'ahmetdal' - - -class RiverTest(BaseTestCase): - - def test_get_on_approval_objects(self): - self.initialize_standard_scenario() - objects = BasicTestModelObjectFactory.create_batch(2) - - on_approval_objects = BasicTestModel.river.my_field.get_on_approval_objects(as_user=self.user1) - self.assertEqual(2, on_approval_objects.count()) - self.assertEqual(objects[0], on_approval_objects[0]) - - on_approval_objects = BasicTestModel.river.my_field.get_on_approval_objects(as_user=self.user2) - self.assertEqual(0, on_approval_objects.count()) - - on_approval_objects = BasicTestModel.river.my_field.get_on_approval_objects(as_user=self.user3) - self.assertEqual(0, on_approval_objects.count()) - - on_approval_objects = BasicTestModel.river.my_field.get_on_approval_objects(as_user=self.user4) - self.assertEqual(0, on_approval_objects.count()) - - def test_get_available_states(self): - self.initialize_standard_scenario() - object = BasicTestModelObjectFactory.create_batch(1)[0] - available_states = object.river.my_field.get_available_states() - self.assertEqual(1, available_states.count()) - self.assertEqual(self.state2, available_states[0]) - - available_states = object.river.my_field.get_available_states(as_user=self.user1) - self.assertEqual(1, available_states.count()) - self.assertEqual(self.state2, available_states[0]) - - available_states = object.river.my_field.get_available_states(as_user=self.user2) - self.assertEqual(0, available_states.count()) - - available_states = object.river.my_field.get_available_states(as_user=self.user3) - self.assertEqual(0, available_states.count()) - - available_states = object.river.my_field.get_available_states(as_user=self.user4) - self.assertEqual(0, available_states.count()) - - def test_get_initial_state(self): - self.initialize_standard_scenario() - self.assertEqual(self.state1, BasicTestModel.river.my_field.initial_state) - - def test_get_final_states(self): - self.initialize_standard_scenario() - self.assertListEqual([self.state41, self.state42, self.state51, self.state52], list(BasicTestModel.river.my_field.final_states)) - - def test_get_waiting_transition_approvals_without_skip(self): - self.initialize_standard_scenario() - object = BasicTestModelObjectFactory.create_batch(1)[0] - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user1) - self.assertEqual(1, transition_approvals.count()) - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user2) - self.assertEqual(0, transition_approvals.count()) - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user3) - self.assertEqual(0, transition_approvals.count()) - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user4) - self.assertEqual(0, transition_approvals.count()) - - def test_get_waiting_transition_approvals_with_skip(self): - self.initialize_standard_scenario() - object = BasicTestModelObjectFactory.create_batch(1)[0] - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user1) - self.assertEqual(1, transition_approvals.count()) - self.assertEqual(self.state2, transition_approvals[0].destination_state) - - TransitionApproval.objects.filter( - workflow_object=object, - workflow=self.workflow, - destination_state=self.state2 - ).update(skip=True) - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user2) - self.assertEqual(1, transition_approvals.count()) - self.assertEqual(self.state3, transition_approvals[0].destination_state) - - TransitionApproval.objects.filter( - workflow_object=object, - workflow=self.workflow, - destination_state=self.state3 - ).update(skip=True) - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user4) - self.assertEqual(2, transition_approvals.count()) - self.assertEqual(self.state4, transition_approvals[0].destination_state) - self.assertEqual(self.state5, transition_approvals[1].destination_state) - - TransitionApproval.objects.filter( - workflow_object=object, - workflow=self.workflow, - destination_state=self.state4 - ).update(skip=True) - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user4) - self.assertEqual(3, transition_approvals.count()) - self.assertEqual(self.state5, transition_approvals[0].destination_state) - self.assertEqual(self.state41, transition_approvals[1].destination_state) - self.assertEqual(self.state42, transition_approvals[2].destination_state) - - TransitionApproval.objects.filter( - workflow_object=object, - workflow=self.workflow, - destination_state=self.state4 - ).update(skip=False) - - TransitionApproval.objects.filter( - workflow_object=object, - workflow=self.workflow, - destination_state=self.state5 - ).update(skip=True) - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user4) - self.assertEqual(3, transition_approvals.count()) - self.assertEqual(self.state4, transition_approvals[0].destination_state) - self.assertEqual(self.state51, transition_approvals[1].destination_state) - self.assertEqual(self.state52, transition_approvals[2].destination_state) - - TransitionApproval.objects.filter( - workflow_object=object, - workflow=self.workflow, - destination_state__in=[self.state4, self.state5] - ).update(skip=True) - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user4) - self.assertEqual(4, transition_approvals.count()) - self.assertEqual(self.state41, transition_approvals[0].destination_state) - self.assertEqual(self.state42, transition_approvals[1].destination_state) - self.assertEqual(self.state51, transition_approvals[2].destination_state) - self.assertEqual(self.state52, transition_approvals[3].destination_state) - - TransitionApproval.objects.filter( - workflow_object=object, - workflow=self.workflow, - destination_state__in=[self.state41, self.state51] - ).update(skip=True) - - transition_approvals = object.river.my_field.get_available_approvals(as_user=self.user4) - self.assertEqual(2, transition_approvals.count()) - self.assertEqual(self.state42, transition_approvals[0].destination_state) - self.assertEqual(self.state52, transition_approvals[1].destination_state) - - def test_proceed(self): - self.initialize_standard_scenario() - object = BasicTestModelObjectFactory.create_batch(1)[0] - - # #################### - # STATE 1 - STATE 2 - # Only User1(2001) can proceed and after his proceed state must be changed to STATE 2 - # ################### - - # Proceeded by user has no required permission for this transition - - try: - object.river.my_field.approve(as_user=self.user2) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - try: - object.river.my_field.approve(as_user=self.user3) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - try: - object.river.my_field.approve(as_user=self.user4) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - self.assertEqual(self.state1, object.my_field) - - object.river.my_field.approve(as_user=self.user1) - - self.assertEqual(self.state2, object.my_field) - - transition_approvals = TransitionApproval.objects.filter( - workflow_object=object, - status=APPROVED, - source_state=self.state1, - destination_state=self.state2 - ) - self.assertEqual(1, transition_approvals.count()) - self.assertIsNotNone(transition_approvals[0].transactioner) - self.assertEqual(self.user1, transition_approvals[0].transactioner) - self.assertIsNotNone(transition_approvals[0].transaction_date) - - # #################### - # STATE 2 - STATE 3 - # User2(2002) is first proceeder and User3(2003) is second proceeder. This must be done with turn. After proceeding is done, state is gonna be changed to STATE 3 - # #################### - - # Proceeded by user has no required permission for this transition - try: - object.river.my_field.approve(as_user=self.user1) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - try: - object.river.my_field.approve(as_user=self.user4) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - # Turn is User2(2002)s, not User3(2003)s. After User2(2002) proceeded, User3(2003) can proceed. - try: - object.river.my_field.approve(as_user=self.user3) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - # Proceeded by two user has required permission for this transition to get next state (order is user2(2002),user3(2003)). - object.river.my_field.approve(as_user=self.user2) - self.assertEqual(self.state2, object.my_field) - - transition_approvals = TransitionApproval.objects.filter( - workflow_object=object, - status=APPROVED, - source_state=self.state2, - destination_state=self.state3 - ) - self.assertEqual(1, transition_approvals.count()) - self.assertIsNotNone(transition_approvals[0].transactioner) - self.assertEqual(self.user2, transition_approvals[0].transactioner) - self.assertIsNotNone(transition_approvals[0].transaction_date) - - try: - object.river.my_field.approve(as_user=self.user2) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - object.river.my_field.approve(as_user=self.user3) - self.assertEqual(self.state3, object.my_field) - - transition_approvals = TransitionApproval.objects.filter( - workflow_object=object, - status=APPROVED, - source_state=self.state2, - destination_state=self.state3 - ) - self.assertEqual(2, transition_approvals.count()) - self.assertIsNotNone(transition_approvals[0].transactioner) - self.assertIsNotNone(transition_approvals[1].transactioner) - self.assertEqual(self.user2, transition_approvals[0].transactioner) - self.assertEqual(self.user3, transition_approvals[1].transactioner) - self.assertIsNotNone(transition_approvals[0].transaction_date) - self.assertIsNotNone(transition_approvals[1].transaction_date) - - # #################### - # STATE 3 - STATE 4 or STATE 5 - # Only User4(2004) can proceed by giving the exact next state and after his proceed with his state must be changed to STATE 4 or STATE 5 - # ################### - - # Proceeded by user has no required permission for this transition - try: - object.river.my_field.approve(as_user=self.user1) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - # Proceeded by user has no required permission for this transition - try: - object.river.my_field.approve(as_user=self.user2) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - # Proceeded by user has no required permission for this transition - try: - object.river.my_field.approve(as_user=self.user3) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'There is no available state for destination for the user.') - self.assertEqual(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, e.code) - - # There are STATE 4 and STATE 5 as next. State must be given to switch - try: - object.river.my_field.approve(as_user=self.user4) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), 'State must be given when there are multiple states for destination') - self.assertEqual(ErrorCode.NEXT_STATE_IS_REQUIRED, e.code) - - # There are STATE 4 and STATE 5 as next. State among STATE 4 and STATE 5 must be given to switch, not other state - try: - object.river.my_field.approve(as_user=self.user4, next_state=self.state3) - self.fail('Exception was expected') - except RiverException as e: - self.assertEqual(str(e), - "Invalid state is given(%s). Valid states is(are) %s" % ( - self.state3.__str__(), ','.join([ast.__str__() for ast in [self.state4, self.state5]]))) - self.assertEqual(ErrorCode.INVALID_NEXT_STATE_FOR_USER, e.code) - - # There are STATE 4 and STATE 5 as next. After one of them is given to proceeding, the state must be switch to it immediately. - object.river.my_field.approve(as_user=self.user4, next_state=self.state5) - self.assertEqual(self.state5, object.my_field) - - transition_approvals = TransitionApproval.objects.filter( - workflow_object=object, - status=APPROVED, - source_state=self.state3, - destination_state=self.state5 - ) - self.assertEqual(1, transition_approvals.count()) - self.assertIsNotNone(transition_approvals[0].transactioner) - self.assertEqual(self.user4, transition_approvals[0].transactioner) - self.assertIsNotNone(transition_approvals[0].transaction_date) - - def test_cycle_proceedings(self): - self.initialize_circular_scenario() - object = BasicTestModelObjectFactory.create_batch(1)[0] - - # No Cycle - self.assertFalse(object.river.my_field._cycle_proceedings()) - object.river.my_field.approve(as_user=self.user1, next_state=self.in_progress_state, god_mod=True) - self.assertEqual(5, TransitionApproval.objects.filter(object_id=object.pk).count()) - - # No Cycle - self.assertFalse(object.river.my_field._cycle_proceedings()) - object.river.my_field.approve(as_user=self.user2, next_state=self.resolved_state, god_mod=True) - self.assertEqual(5, TransitionApproval.objects.filter(object_id=object.pk).count()) - - # State is re-opened and cycle is detected. Transition in-progress to resolved proceeding is cloned - self.assertFalse(object.river.my_field._cycle_proceedings()) - object.river.my_field.approve(as_user=self.user3, next_state=self.re_opened_state, god_mod=True) - self.assertEqual(6, TransitionApproval.objects.filter(object_id=object.pk).count()) - - self.assertEqual(TransitionApprovalMeta.objects.get(source_state=self.in_progress_state, destination_state=self.resolved_state), - TransitionApproval.objects.filter(object_id=object.pk).latest('date_created').meta) - - # There will be no cycling even if the method is invoked. Because cycling is done in proceeding. - self.assertFalse(object.river.my_field._cycle_proceedings()) - self.assertEqual(6, TransitionApproval.objects.filter(object_id=object.pk).count()) - - # State is in-progress and cycle is detected. Transition resolved to re-opened proceeding is cloned - object.river.my_field.approve(as_user=self.user3, next_state=self.in_progress_state, god_mod=True) - self.assertEqual(7, TransitionApproval.objects.filter(object_id=object.pk).count()) - self.assertEqual(TransitionApprovalMeta.objects.get(source_state=self.resolved_state, destination_state=self.re_opened_state), - TransitionApproval.objects.filter(object_id=object.pk).latest('date_created').meta) - - # State is resolved and cycle is detected. Transition re-opened to in-progress proceeding is cloned - object.river.my_field.approve(as_user=self.user3, next_state=self.resolved_state, god_mod=True) - self.assertEqual(8, TransitionApproval.objects.filter(object_id=object.pk).count()) - self.assertEqual(TransitionApprovalMeta.objects.get(source_state=self.re_opened_state, destination_state=self.in_progress_state), - TransitionApproval.objects.filter(object_id=object.pk).latest('date_created').meta) - - # State is re-opened and cycle is detected. Transition in-progress to resolved proceeding is cloned - self.assertFalse(object.river.my_field._cycle_proceedings()) - object.river.my_field.approve(as_user=self.user3, next_state=self.re_opened_state, god_mod=True) - self.assertEqual(9, TransitionApproval.objects.filter(object_id=object.pk).count()) - self.assertEqual(TransitionApprovalMeta.objects.get(source_state=self.in_progress_state, destination_state=self.resolved_state), - TransitionApproval.objects.filter(object_id=object.pk).latest('date_created').meta) - - # State is in-progress and cycle is detected. Transition resolved to re-opened proceeding is cloned - object.river.my_field.approve(as_user=self.user3, next_state=self.in_progress_state, god_mod=True) - self.assertEqual(10, TransitionApproval.objects.filter(object_id=object.pk).count()) - self.assertEqual(TransitionApprovalMeta.objects.get(source_state=self.resolved_state, destination_state=self.re_opened_state), - TransitionApproval.objects.filter(object_id=object.pk).latest('date_created').meta) - - # State is resolved and cycle is detected. Transition re-opened to in-progress proceeding is cloned - object.river.my_field.approve(as_user=self.user3, next_state=self.resolved_state, god_mod=True) - self.assertEqual(11, TransitionApproval.objects.filter(object_id=object.pk).count()) - self.assertEqual(TransitionApprovalMeta.objects.get(source_state=self.re_opened_state, destination_state=self.in_progress_state), - TransitionApproval.objects.filter(object_id=object.pk).latest('date_created').meta) - - # No Cycle for closed state. - object.river.my_field.approve(as_user=self.user4, next_state=self.closed_state, god_mod=True) - self.assertEqual(11, TransitionApproval.objects.filter(object_id=object.pk).count()) - - def test_get_waiting_approvals_slowness_test(self): - self.initialize_standard_scenario() - self.objects = BasicTestModelObjectFactory.create_batch(100) - before = datetime.now() - for o in self.objects: - o.river.my_field.get_available_states(as_user=self.user1) - after = datetime.now() - self.assertLess(after - before, timedelta(seconds=2)) diff --git a/river/tests/test__state_field.py b/river/tests/test__state_field.py index 1f7d164..1ee41ad 100644 --- a/river/tests/test__state_field.py +++ b/river/tests/test__state_field.py @@ -1,22 +1,23 @@ from django.contrib.contenttypes.models import ContentType +from django.db.models import QuerySet +from django.test import TestCase +from hamcrest import assert_that, has_property, is_, instance_of -from river.models.factories import StateObjectFactory, TransitionApprovalMetaFactory, WorkflowFactory -from river.core.riverobject import RiverObject -from river.core.instanceworkflowobject import InstanceWorkflowObject from river.core.classworkflowobject import ClassWorkflowObject -from river.tests.base_test import BaseTestCase +from river.core.instanceworkflowobject import InstanceWorkflowObject +from river.core.riverobject import RiverObject +from river.models import State +from river.models.factories import StateObjectFactory, TransitionApprovalMetaFactory, WorkflowFactory from river.tests.models import BasicTestModel __author__ = 'ahmetdal' -class StateFieldTest(BaseTestCase): +class StateFieldTest(TestCase): - def test_injections(self): - self.assertTrue(hasattr(BasicTestModel, 'river')) - self.assertIsInstance(BasicTestModel.river, RiverObject) - self.assertTrue(hasattr(BasicTestModel.river, "my_field")) - self.assertIsInstance(BasicTestModel.river.my_field, ClassWorkflowObject) + def test_shouldInjectTheField(self): # pylint: disable=no-self-use + assert_that(BasicTestModel, has_property('river', is_(instance_of(RiverObject)))) + assert_that(BasicTestModel.river, has_property('my_field', is_(instance_of(ClassWorkflowObject)))) content_type = ContentType.objects.get_for_model(BasicTestModel) @@ -32,17 +33,11 @@ def test_injections(self): priority=0 ) test_model = BasicTestModel.objects.create() - self.assertTrue(hasattr(test_model, "river")) - self.assertIsInstance(test_model.river, RiverObject) - self.assertTrue(hasattr(test_model.river, "my_field")) - self.assertIsInstance(test_model.river.my_field, InstanceWorkflowObject) - - self.assertTrue(hasattr(test_model.river.my_field, "approve")) - self.assertTrue(callable(test_model.river.my_field.approve)) - - self.assertTrue(test_model.river.my_field.on_initial_state) - self.assertFalse(test_model.river.my_field.on_final_state) - - self.assertEqual(state1, BasicTestModel.river.my_field.initial_state) - self.assertEqual(1, BasicTestModel.river.my_field.final_states.count()) - self.assertEqual(state2, BasicTestModel.river.my_field.final_states[0]) + assert_that(test_model, has_property('river', is_(instance_of(RiverObject)))) + assert_that(test_model.river, has_property('my_field', is_(instance_of(InstanceWorkflowObject)))) + assert_that(BasicTestModel.river.my_field, has_property('initial_state', is_(instance_of(State)))) + assert_that(BasicTestModel.river.my_field, has_property('final_states', is_(instance_of(QuerySet)))) + + assert_that(test_model.river.my_field, has_property('approve', has_property("__call__"))) + assert_that(test_model.river.my_field, has_property('on_initial_state', is_(instance_of(bool)))) + assert_that(test_model.river.my_field, has_property('on_final_state', is_(instance_of(bool)))) diff --git a/river/tests/tmigrations/test__migrations.py b/river/tests/tmigrations/test__migrations.py index c45d3f2..60d03ce 100644 --- a/river/tests/tmigrations/test__migrations.py +++ b/river/tests/tmigrations/test__migrations.py @@ -2,6 +2,7 @@ import sys from django.test.utils import override_settings +from hamcrest import assert_that, equal_to, has_length try: from StringIO import StringIO @@ -15,13 +16,13 @@ def clean_migrations(): - for f in os.listdir("river/tests/transient/river/"): + for f in os.listdir("river/tests/volatile/river/"): if f != "__init__.py" and f != "__pycache__": - os.remove(os.path.join("river/tests/transient/river/", f)) + os.remove(os.path.join("river/tests/volatile/river/", f)) - for f in os.listdir("river/tests/transient/river_tests/"): + for f in os.listdir("river/tests/volatile/river_tests/"): if f != "__init__.py" and f != "__pycache__": - os.remove(os.path.join("river/tests/transient/river_tests/", f)) + os.remove(os.path.join("river/tests/volatile/river_tests/", f)) class MigrationTests(TestCase): @@ -44,38 +45,36 @@ def tearDown(self): """ clean_migrations() - @override_settings(MIGRATION_MODULES={"river": "river.tests.transient.river"}) - def test_should_not_be_missing_migrations(self): + @override_settings(MIGRATION_MODULES={"river": "river.tests.volatile.river"}) + def test_shouldCreateAllMigrations(self): for f in os.listdir("river/migrations"): if f != "__init__.py" and f != "__pycache__" and not f.endswith(".pyc"): - open(os.path.join("river/tests/transient/river/", f), 'wb').write(open(os.path.join("river/migrations", f), 'rb').read()) + open(os.path.join("river/tests/volatile/river/", f), 'wb').write(open(os.path.join("river/migrations", f), 'rb').read()) - self.migrations_before = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/transient/river/'))) + self.migrations_before = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/volatile/river/'))) out = StringIO() sys.stout = out call_command('makemigrations', 'river', stdout=out) - self.migrations_after = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/transient/river/'))) + self.migrations_after = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/volatile/river/'))) - self.assertEqual("No changes detected in app 'river'\n", out.getvalue()) + assert_that(out.getvalue(), equal_to("No changes detected in app 'river'\n")) + assert_that(self.migrations_after, has_length(len(self.migrations_before))) - self.assertEqual(len(self.migrations_before), len(self.migrations_after)) - - @override_settings(MIGRATION_MODULES={"tests": "river.tests.transient.river_tests"}) - def test__should_not_keep_recreating_migrations_when_no_change(self): + @override_settings(MIGRATION_MODULES={"tests": "river.tests.volatile.river_tests"}) + def test__shouldNotKeepRecreatingMigrationsWhenNoChange(self): call_command('makemigrations', 'tests') - self.migrations_before = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/transient/river_tests/'))) + self.migrations_before = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/volatile/river_tests/'))) out = StringIO() sys.stout = out call_command('makemigrations', 'tests', stdout=out) - self.migrations_after = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/transient/river_tests/'))) - - self.assertEqual("No changes detected in app 'tests'\n", out.getvalue()) + self.migrations_after = list(filter(lambda f: f.endswith('.py') and f != '__init__.py', os.listdir('river/tests/volatile/river_tests/'))) - self.assertEqual(len(self.migrations_before), len(self.migrations_after)) + assert_that(out.getvalue(), equal_to("No changes detected in app 'tests'\n")) + assert_that(self.migrations_after, has_length(len(self.migrations_before))) diff --git a/river/tests/transient/river/__init__.py b/river/tests/volatile/__init__.py similarity index 100% rename from river/tests/transient/river/__init__.py rename to river/tests/volatile/__init__.py diff --git a/river/tests/transient/river_tests/__init__.py b/river/tests/volatile/river/__init__.py similarity index 100% rename from river/tests/transient/river_tests/__init__.py rename to river/tests/volatile/river/__init__.py diff --git a/river/tests/volatile/river_tests/__init__.py b/river/tests/volatile/river_tests/__init__.py new file mode 100644 index 0000000..e69de29