Skip to content

Commit

Permalink
Merge pull request #307 from sartography/feature/standardize-task-exe…
Browse files Browse the repository at this point in the history
…cution

Feature/standardize task execution
  • Loading branch information
danfunk committed Mar 28, 2023
2 parents 3c3345c + a087d29 commit 62454c9
Show file tree
Hide file tree
Showing 75 changed files with 348 additions and 392 deletions.
5 changes: 3 additions & 2 deletions SpiffWorkflow/bpmn/specs/BpmnProcessSpec.py
Expand Up @@ -47,9 +47,10 @@ def _check_threshold_unstructured(self, my_task, force=False):

return force or len(waiting_tasks) == 0, waiting_tasks

def _on_complete_hook(self, my_task):
super(_EndJoin, self)._on_complete_hook(my_task)
def _run_hook(self, my_task):
result = super(_EndJoin, self)._run_hook(my_task)
my_task.workflow.data.update(my_task.data)
return result


class BpmnProcessSpec(WorkflowSpec):
Expand Down
3 changes: 2 additions & 1 deletion SpiffWorkflow/bpmn/specs/InclusiveGateway.py
Expand Up @@ -110,11 +110,12 @@ def check(spec):

return complete, waiting_tasks

def _on_complete_hook(self, my_task):
def _run_hook(self, my_task):
outputs = self._get_matching_outputs(my_task)
if len(outputs) == 0:
raise WorkflowTaskException(f'No conditions satisfied on gateway', task=my_task)
my_task._sync_children(outputs, TaskState.FUTURE)
return True

@property
def spec_type(self):
Expand Down
6 changes: 3 additions & 3 deletions SpiffWorkflow/bpmn/specs/ScriptTask.py
Expand Up @@ -29,14 +29,14 @@ def _execute(self, task):
"""Please override for specific Implementations, see ScriptTask below for an example"""
pass

def _on_ready_hook(self, task):
def _run_hook(self, task):
try:
self._execute(task)
super(ScriptEngineTask, self)._on_ready_hook(task)
super(ScriptEngineTask, self)._run_hook(task)
except Exception as exc:
task._set_state(TaskState.WAITING)
raise exc

return True

class ScriptTask(ScriptEngineTask):

Expand Down
3 changes: 0 additions & 3 deletions SpiffWorkflow/bpmn/specs/SubWorkflowTask.py
Expand Up @@ -25,9 +25,6 @@ def __init__(self, wf_spec, name, subworkflow_spec, transaction=False, **kwargs)
def spec_type(self):
return 'Subprocess'

def _on_ready_hook(self, my_task):
super()._on_ready_hook(my_task)

def _on_subworkflow_completed(self, subworkflow, my_task):
self.update_data(my_task, subworkflow)
my_task._set_state(TaskState.READY)
Expand Down
7 changes: 4 additions & 3 deletions SpiffWorkflow/bpmn/specs/events/IntermediateEvent.py
Expand Up @@ -18,7 +18,6 @@
# 02110-1301 USA

from .event_types import ThrowingEvent, CatchingEvent
from .event_definitions import CycleTimerEventDefinition
from ..BpmnSpecMixin import BpmnSpecMixin
from ....specs.Simple import Simple
from ....task import TaskState
Expand Down Expand Up @@ -67,13 +66,15 @@ def __init__(self, wf_spec, name, main_child_task_spec, **kwargs):
def spec_type(self):
return 'Boundary Event Parent'

def _on_ready_hook(self, my_task):
def _run_hook(self, my_task):

# Clear any events that our children might have received and
# wait for new events
for child in my_task.children:
if isinstance(child.task_spec, BoundaryEvent):
child.task_spec.event_definition.reset(child)
child._set_state(TaskState.WAITING)
return True

def _child_complete_hook(self, child_task):

Expand Down Expand Up @@ -123,7 +124,7 @@ def catch(self, my_task, event_definition):
super(BoundaryEvent, self).catch(my_task, event_definition)
# Would love to get rid of this statement and manage in the workflow
# However, it is not really compatible with how boundary events work.
my_task.complete()
my_task.run()


class EventBasedGateway(CatchingEvent):
Expand Down
54 changes: 25 additions & 29 deletions SpiffWorkflow/bpmn/specs/events/event_definitions.py
Expand Up @@ -453,42 +453,38 @@ class CycleTimerEventDefinition(TimerEventDefinition):
def event_type(self):
return 'Cycle Timer'

def has_fired(self, my_task):

if not my_task._get_internal_data('event_fired'):
# Only check for the next cycle when the event has not fired to prevent cycles from being skipped.
event_value = my_task._get_internal_data('event_value')
if event_value is None:
expression = my_task.workflow.script_engine.evaluate(my_task, self.expression)
cycles, start, duration = TimerEventDefinition.parse_iso_recurring_interval(expression)
event_value = {'cycles': cycles, 'next': start.isoformat(), 'duration': duration.total_seconds()}

if event_value['cycles'] > 0:
next_event = datetime.fromisoformat(event_value['next'])
if next_event < datetime.now(timezone.utc):
my_task._set_internal_data(event_fired=True)
event_value['next'] = (next_event + timedelta(seconds=event_value['duration'])).isoformat()
def cycle_complete(self, my_task):

my_task._set_internal_data(event_value=event_value)
event_value = my_task._get_internal_data('event_value')
if event_value is None:
# Don't necessarily like this, but it's a lot more staightforward than trying to only create
# a child task on loop iterations after the first
my_task._drop_children()
expression = my_task.workflow.script_engine.evaluate(my_task, self.expression)
cycles, start, duration = TimerEventDefinition.parse_iso_recurring_interval(expression)
event_value = {'cycles': cycles, 'next': start.isoformat(), 'duration': duration.total_seconds()}

# When the next timer event passes, return True to allow the parent task to generate another child
# Use event fired to indicate that this timer has completed all cycles and the task can be completed
ready = False
if event_value['cycles'] != 0:
next_event = datetime.fromisoformat(event_value['next'])
if next_event < datetime.now(timezone.utc):
event_value['next'] = (next_event + timedelta(seconds=event_value['duration'])).isoformat()
event_value['cycles'] -= 1
ready = True
else:
my_task.internal_data.pop('event_value', None)
my_task.internal_data['event_fired'] = True

return my_task._get_internal_data('event_fired', False)
my_task._set_internal_data(event_value=event_value)
return ready

def timer_value(self, my_task):
event_value = my_task._get_internal_data('event_value')
if event_value is not None and event_value['cycles'] > 0:
if event_value is not None and event_value['cycles'] != 0:
return event_value['next']

def complete(self, my_task):
event_value = my_task._get_internal_data('event_value')
if event_value is not None and event_value['cycles'] == 0:
my_task.internal_data.pop('event_value')
return True

def complete_cycle(self, my_task):
# Only increment when the task completes
if my_task._get_internal_data('event_value') is not None:
my_task.internal_data['event_value']['cycles'] -= 1


class MultipleEventDefinition(EventDefinition):

Expand Down
24 changes: 13 additions & 11 deletions SpiffWorkflow/bpmn/specs/events/event_types.py
Expand Up @@ -57,21 +57,22 @@ def _update_hook(self, my_task):

if self.event_definition.has_fired(my_task):
return True
else:
elif isinstance(self.event_definition, CycleTimerEventDefinition):
if self.event_definition.cycle_complete(my_task):
for output in self.outputs:
child = my_task._add_child(output, TaskState.READY)
child.task_spec._predict(child, mask=TaskState.READY|TaskState.PREDICTED_MASK)
if my_task.state != TaskState.WAITING:
my_task._set_state(TaskState.WAITING)
elif my_task.state != TaskState.WAITING:
my_task._set_state(TaskState.WAITING)

def _on_complete_hook(self, my_task):
def _run_hook(self, my_task):

if isinstance(self.event_definition, MessageEventDefinition):
self.event_definition.update_task_data(my_task)
elif isinstance(self.event_definition, CycleTimerEventDefinition):
self.event_definition.complete_cycle(my_task)
if not self.event_definition.complete(my_task):
for output in self.outputs:
my_task._add_child(output)
my_task._set_state(TaskState.WAITING)
self.event_definition.reset(my_task)
super(CatchingEvent, self)._on_complete_hook(my_task)
return super(CatchingEvent, self)._run_hook(my_task)

# This fixes the problem of boundary events remaining cancelled if the task is reused.
# It pains me to add these methods, but unless we can get rid of the loop reset task we're stuck
Expand All @@ -95,6 +96,7 @@ def __init__(self, wf_spec, name, event_definition, **kwargs):
super(ThrowingEvent, self).__init__(wf_spec, name, **kwargs)
self.event_definition = event_definition

def _on_complete_hook(self, my_task):
super(ThrowingEvent, self)._on_complete_hook(my_task)
def _run_hook(self, my_task):
super(ThrowingEvent, self)._run_hook(my_task)
self.event_definition.throw(my_task)
return True
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/workflow.py
Expand Up @@ -251,7 +251,7 @@ def do_engine_steps(self, exit_at = None, will_complete_task=None, did_complete_
for task in engine_steps:
if will_complete_task is not None:
will_complete_task(task)
task.complete()
task.run()
if did_complete_task is not None:
did_complete_task(task)
if task.task_spec.name == exit_at:
Expand Down
6 changes: 3 additions & 3 deletions SpiffWorkflow/dmn/specs/BusinessRuleTask.py
Expand Up @@ -23,15 +23,15 @@ def __init__(self, wf_spec, name, dmnEngine, **kwargs):
def spec_class(self):
return 'Business Rule Task'

def _on_ready_hook(self, my_task):
def _run_hook(self, my_task):
try:
my_task.data = DeepMerge.merge(my_task.data, self.dmnEngine.result(my_task))
super(BusinessRuleTask, self)._on_ready_hook(my_task)
super(BusinessRuleTask, self)._run_hook(my_task)
except SpiffWorkflowException as we:
we.add_note(f"Business Rule Task '{my_task.task_spec.description}'.")
raise we
except Exception as e:
error = WorkflowTaskException(str(e), task=my_task)
error.add_note(f"Business Rule Task '{my_task.task_spec.description}'.")
raise error

return True
4 changes: 2 additions & 2 deletions SpiffWorkflow/specs/Cancel.py
Expand Up @@ -55,9 +55,9 @@ def test(self):
if len(self.outputs) > 0:
raise WorkflowException('Cancel with an output.', task_spec=self)

def _on_complete_hook(self, my_task):
def _run_hook(self, my_task):
my_task.workflow.cancel(self.cancel_successfully)
TaskSpec._on_complete_hook(self, my_task)
return True

def serialize(self, serializer):
return serializer.serialize_cancel(self)
Expand Down
5 changes: 2 additions & 3 deletions SpiffWorkflow/specs/CancelTask.py
Expand Up @@ -16,7 +16,6 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
from .base import TaskSpec
from .Trigger import Trigger


Expand All @@ -30,12 +29,12 @@ class CancelTask(Trigger):
parallel split.
"""

def _on_complete_hook(self, my_task):
def _run_hook(self, my_task):
for task_name in self.context:
cancel_tasks = my_task.workflow.get_task_spec_from_name(task_name)
for cancel_task in my_task._get_root()._find_any(cancel_tasks):
cancel_task.cancel()
TaskSpec._on_complete_hook(self, my_task)
return True

def serialize(self, serializer):
return serializer.serialize_cancel_task(self)
Expand Down
4 changes: 2 additions & 2 deletions SpiffWorkflow/specs/Choose.py
Expand Up @@ -55,7 +55,7 @@ def __init__(self, wf_spec, name, context, choice=None, **kwargs):
self.context = context
self.choice = choice is not None and choice or []

def _on_complete_hook(self, my_task):
def _run_hook(self, my_task):
context = my_task.workflow.get_task_spec_from_name(self.context)
triggered = []
for task in my_task.workflow.task_tree:
Expand All @@ -66,7 +66,7 @@ def _on_complete_hook(self, my_task):
triggered.append(task)
for task in triggered:
context._predict(task)
TaskSpec._on_complete_hook(self, my_task)
return True

def serialize(self, serializer):
return serializer.serialize_choose(self)
Expand Down
15 changes: 5 additions & 10 deletions SpiffWorkflow/specs/ExclusiveChoice.py
Expand Up @@ -61,16 +61,7 @@ def test(self):
if self.default_task_spec is None:
raise WorkflowException('A default output is required.', task_spec=self)

def _predict_hook(self, my_task):
# If the task's status is not predicted, we default to MAYBE
# for all it's outputs except the default choice, which is
# LIKELY.
# Otherwise, copy my own state to the children.
my_task._sync_children(self.outputs)
spec = self._wf_spec.get_task_spec_from_name(self.default_task_spec)
my_task._set_likely_task(spec)

def _on_complete_hook(self, my_task):
def _run_hook(self, my_task):

output = self._wf_spec.get_task_spec_from_name(self.default_task_spec)
for condition, spec_name in self.cond_task_specs:
Expand All @@ -82,6 +73,10 @@ def _on_complete_hook(self, my_task):
raise WorkflowException(f'No conditions satisfied for {my_task.task_spec.name}', task_spec=self)

my_task._sync_children([output], TaskState.FUTURE)
for child in my_task.children:
child.task_spec._predict(child, mask=TaskState.FUTURE|TaskState.PREDICTED_MASK)

return True

def serialize(self, serializer):
return serializer.serialize_exclusive_choice(self)
Expand Down
3 changes: 1 addition & 2 deletions SpiffWorkflow/specs/Join.py
Expand Up @@ -120,8 +120,7 @@ def _branch_may_merge_at(self, task):
# If the task is predicted with less outputs than he has
# children, that means the prediction may be incomplete (for
# example, because a prediction is not yet possible at this time).
if not child._is_definite() \
and len(child.task_spec.outputs) > len(child.children):
if child._is_predicted() and len(child.task_spec.outputs) > len(child.children):
return True
return False

Expand Down
44 changes: 15 additions & 29 deletions SpiffWorkflow/specs/MultiChoice.py
Expand Up @@ -89,32 +89,18 @@ def _on_trigger(self, my_task, choice):
# The caller needs to make sure that predict() is called.

def _predict_hook(self, my_task):
if self.choice:
outputs = [self._wf_spec.get_task_spec_from_name(o)
for o in self.choice]
else:
outputs = self.outputs

# Default to MAYBE for all conditional outputs, default to LIKELY
# for unconditional ones. We can not default to FUTURE, because
# a call to trigger() may override the unconditional paths.
my_task._sync_children(outputs)
if not my_task._is_definite():
best_state = my_task.state
else:
best_state = TaskState.LIKELY

# Collect a list of all unconditional outputs.
outputs = []
conditional, unconditional = [], []
for condition, output in self.cond_task_specs:
if condition is None:
outputs.append(self._wf_spec.get_task_spec_from_name(output))

for child in my_task.children:
if child._is_definite():
if self.choice is not None and output not in self.choice:
continue
if child.task_spec in outputs:
child._set_state(best_state)
if condition is None:
unconditional.append(self._wf_spec.get_task_spec_from_name(output))
else:
conditional.append(self._wf_spec.get_task_spec_from_name(output))
state = TaskState.MAYBE if my_task.state == TaskState.MAYBE else TaskState.LIKELY
my_task._sync_children(unconditional, state)
for spec in conditional:
my_task._add_child(spec, TaskState.MAYBE)

def _get_matching_outputs(self, my_task):
outputs = []
Expand All @@ -125,12 +111,12 @@ def _get_matching_outputs(self, my_task):
outputs.append(self._wf_spec.get_task_spec_from_name(output))
return outputs

def _on_complete_hook(self, my_task):
"""
Runs the task. Should not be called directly.
Returns True if completed, False otherwise.
"""
def _run_hook(self, my_task):
"""Runs the task. Should not be called directly."""
my_task._sync_children(self._get_matching_outputs(my_task), TaskState.FUTURE)
for child in my_task.children:
child.task_spec._predict(child, mask=TaskState.FUTURE|TaskState.PREDICTED_MASK)
return True

def serialize(self, serializer):
return serializer.serialize_multi_choice(self)
Expand Down

0 comments on commit 62454c9

Please sign in to comment.