Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/cleanup task completion #263

Merged
merged 5 commits into from Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 37 additions & 65 deletions SpiffWorkflow/bpmn/specs/MultiInstanceTask.py
Expand Up @@ -81,12 +81,6 @@ def __init__(self, wf_spec, name, times, **kwargs):

TaskSpec.__init__(self, wf_spec, name, **kwargs)


# DO NOT OVERRIDE THE SPEC TYPE.
# @property
# def spec_type(self):
# return 'MultiInstance Task'

def _find_my_task(self, task):
for thetask in task.workflow.task_tree:
if thetask.thread_id != task.thread_id:
Expand All @@ -113,17 +107,6 @@ def _on_trigger(self, task_spec):
new_task.triggered = True
output._predict(new_task)

def _check_inputs(self, my_task):
if self.collection is None:
return
# look for variable in context, if we don't find it, default to 1
variable = valueof(my_task, self.times, 1)
if self.times.name == self.collection.name and type(variable) == type([]):
raise WorkflowTaskExecException(my_task,
'If we are updating a collection,'
' then the collection must be a '
'dictionary.')

def _get_loop_completion(self,my_task):
if not self.completioncondition == None:
terminate = my_task.workflow.script_engine.evaluate(my_task,self.completioncondition)
Expand Down Expand Up @@ -154,17 +137,6 @@ def _get_count(self, my_task):
return len(variable.keys())
return 1 # we shouldn't ever get here, but just in case return a sane value.

def _get_current_var(self, my_task, pos):
variable = valueof(my_task, self.times, 1)
if is_number(variable):
return pos
if isinstance(variable,list) and len(variable) >= pos:
return variable[pos - 1]
elif isinstance(variable,dict) and len(list(variable.keys())) >= pos:
return variable[list(variable.keys())[pos - 1]]
else:
return pos

def _get_predicted_outputs(self, my_task):
split_n = self._get_count(my_task)

Expand Down Expand Up @@ -418,52 +390,60 @@ def _handle_special_cases(self, my_task):
if my_task.task_spec.prevtaskclass in classes.keys() and not terminate:
super()._on_complete_hook(my_task)

def _merge_element_variable(self,my_task,collect,runtimes,colvarname):
# if we are updating the same collection as was our loopcardinality
# then all the keys should be there and we can use the sorted keylist
# if not, we use an integer - we should be guaranteed that the
# collection is a dictionary
def _check_inputs(self, my_task):
if self.collection is None:
return
# look for variable in context, if we don't find it, default to 1
variable = valueof(my_task, self.times, 1)
if self.times.name == self.collection.name and type(variable) == type([]):
raise WorkflowTaskExecException(my_task,
'If we are updating a collection, then the collection must be a dictionary.')

def _get_current_var(self, my_task, pos):
variable = valueof(my_task, self.times, 1)
if is_number(variable):
return pos
if isinstance(variable,list) and len(variable) >= pos:
return variable[pos - 1]
elif isinstance(variable,dict) and len(list(variable.keys())) >= pos:
return variable[list(variable.keys())[pos - 1]]
else:
return pos

def _merge_element_variable(self, my_task, collect, runtimes):
if self.collection is not None and self.times.name == self.collection.name:
# Update an existing collection (we used the collection as the cardinality)
keys = list(collect.keys())
if len(keys) < runtimes:
msg = f"There is a mismatch between runtimes and the number " \
f"items in the collection, please check for empty " \
f"collection {self.collection.name}."
raise WorkflowTaskExecException(my_task, msg)

runtimesvar = keys[runtimes - 1]
else:
# Use an integer (for arrays)
runtimesvar = runtimes

if self.elementVar in my_task.data and isinstance(my_task.data[self.elementVar], dict):
collect[str(runtimesvar)] = DeepMerge.merge(collect.get(runtimesvar, {}),
copy.copy(my_task.data[self.elementVar]))

my_task.data = DeepMerge.merge(my_task.data,
gendict(colvarname.split('/'), collect))
collect[str(runtimesvar)] = DeepMerge.merge(
collect.get(runtimesvar, {}),
copy.copy(my_task.data[self.elementVar])
)

def _update_sibling_data(self, my_task, runtimes, runcount, colvarname, collect):

def _update_sibling_data(self,my_task,runtimes,runcount,colvarname,collect):
if (runtimes < runcount) and not my_task.terminate_current_loop and self.loopTask:
my_task._set_state(TaskState.READY)
my_task._set_internal_data(runtimes=runtimes + 1)
my_task.data[self.elementVar] = self._get_current_var(my_task, runtimes + 1)
element_var_data = None
else:
# The element var data should not be passed on to children
# but will add this back onto this task later.
element_var_data = my_task.data.pop(self.elementVar, None)

# if this is a parallel mi - then update all siblings with the
# current data
if not self.isSequential:
for task in my_task.parent.children:
task.data = DeepMerge.merge(
task.data,
gendict(colvarname.split('/'),
collect)
)
return element_var_data
my_task.data.pop(self.elementVar, None)

for task in my_task.parent.children:
task.data = DeepMerge.merge(
task.data,
gendict(colvarname.split('/'), collect)
)

def _on_complete_hook(self, my_task):
# do special stuff for non-user tasks
Expand All @@ -486,9 +466,9 @@ def __iteration_complete(self, my_task):

collect = valueof(my_task, self.collection, {})

self._merge_element_variable(my_task,collect,runtimes,colvarname)
self._merge_element_variable(my_task, collect, runtimes)

element_var_data = self._update_sibling_data(my_task,runtimes,runcount,colvarname,collect)
self._update_sibling_data(my_task, runtimes, runcount, colvarname, collect)

# please see MultiInstance code for previous version
outputs = []
Expand All @@ -497,14 +477,6 @@ def __iteration_complete(self, my_task):
if not isinstance(my_task.task_spec,SubWorkflowTask):
my_task._sync_children(outputs, TaskState.FUTURE)

for child in my_task.children:
child.task_spec._update(child)

# If removed, add the element_var_data back onto this task, after
# updating the children.
if(element_var_data):
my_task.data[self.elementVar] = element_var_data

def serialize(self, serializer):

return serializer.serialize_multi_instance(self)
Expand Down
2 changes: 0 additions & 2 deletions SpiffWorkflow/bpmn/specs/SubWorkflowTask.py
Expand Up @@ -66,8 +66,6 @@ def _predict_hook(self, my_task):

def _on_complete_hook(self, my_task):
BpmnSpecMixin._on_complete_hook(self, my_task)
for child in my_task.children:
child.task_spec._update(child)

def _on_cancel(self, my_task):
subworkflow = my_task.workflow.get_subprocess(my_task)
Expand Down
2 changes: 0 additions & 2 deletions SpiffWorkflow/bpmn/specs/UnstructuredJoin.py
Expand Up @@ -146,8 +146,6 @@ def _do_join(self, my_task):

def _update_hook(self, my_task):

if my_task._is_predicted():
self._predict(my_task)
if not my_task.parent._is_finished():
return

Expand Down
2 changes: 0 additions & 2 deletions SpiffWorkflow/specs/ExclusiveChoice.py
Expand Up @@ -88,8 +88,6 @@ def _on_complete_hook(self, my_task):
f'No conditions satisfied for {my_task.task_spec.name}')

my_task._sync_children([output], TaskState.FUTURE)
for child in my_task.children:
child.task_spec._update(child)

def serialize(self, serializer):
return serializer.serialize_exclusive_choice(self)
Expand Down
2 changes: 0 additions & 2 deletions SpiffWorkflow/specs/MultiChoice.py
Expand Up @@ -134,8 +134,6 @@ def _on_complete_hook(self, my_task):
outputs.append(self._wf_spec.get_task_spec_from_name(output))

my_task._sync_children(outputs, TaskState.FUTURE)
for child in my_task.children:
child.task_spec._update(child)

def serialize(self, serializer):
return serializer.serialize_multi_choice(self)
Expand Down
2 changes: 0 additions & 2 deletions SpiffWorkflow/specs/MultiInstance.py
Expand Up @@ -102,8 +102,6 @@ def _predict_hook(self, my_task):
def _on_complete_hook(self, my_task):
outputs = self._get_predicted_outputs(my_task)
my_task._sync_children(outputs, TaskState.FUTURE)
for child in my_task.children:
child.task_spec._update(child)

def serialize(self, serializer):
return serializer.serialize_multi_instance(self)
Expand Down
28 changes: 15 additions & 13 deletions SpiffWorkflow/specs/SubWorkflow.py
Expand Up @@ -76,12 +76,14 @@ def test(self):
self, 'File does not exist: %s' % self.file)

def _predict_hook(self, my_task):
# Modifying the task spec is a TERRIBLE idea, but if we don't do it, sync_children won't work
outputs = [task.task_spec for task in my_task.children]
for output in self.outputs:
if output not in outputs:
outputs.insert(0, output)
if my_task._is_definite():
my_task._sync_children(outputs, TaskState.FUTURE)
# This prevents errors with sync children
my_task._sync_children(outputs, TaskState.LIKELY)
else:
my_task._sync_children(outputs, my_task.state)

Expand All @@ -107,10 +109,7 @@ def _on_ready_before_hook(self, my_task):

def _integrate_subworkflow_tree(self, my_task, subworkflow):
# Integrate the tree of the subworkflow into the tree of this workflow.
my_task._sync_children(self.outputs, TaskState.FUTURE)
for child in my_task.children:
child.task_spec._update(child)
child._inherit_data()
my_task._sync_children(self.outputs, TaskState.LIKELY)
for child in subworkflow.task_tree.children:
my_task.children.insert(0, child)
child.parent = my_task
Expand All @@ -121,10 +120,18 @@ def _on_ready_hook(self, my_task):
for child in subworkflow.task_tree.children:
for assignment in self.in_assign:
assignment.assign(my_task, child)

self._predict(my_task)
for child in subworkflow.task_tree.children:
child.task_spec._update(child)
# Instead of completing immediately, we'll wait for the subworkflow to complete
my_task._set_state(TaskState.WAITING)

def _update_hook(self, my_task):
subworkflow = my_task._get_internal_data('subworkflow')
if subworkflow is None:
# On the first update, we have to create the subworkflow
super()._update_hook(my_task)
elif subworkflow.is_completed():
# Then wait until it finishes to complete
my_task.complete()

def _on_subworkflow_completed(self, subworkflow, my_task):
# Assign variables, if so requested.
Expand All @@ -138,11 +145,6 @@ def _on_subworkflow_completed(self, subworkflow, my_task):
# Alright, abusing that hook is just evil but it works.
child.task_spec._update_hook(child)

def _on_complete_hook(self, my_task):
for child in my_task.children:
if isinstance(child.task_spec, StartTask):
child.task_spec._update(child)

def serialize(self, serializer):
return serializer.serialize_sub_workflow(self)

Expand Down
2 changes: 0 additions & 2 deletions SpiffWorkflow/specs/ThreadSplit.py
Expand Up @@ -133,8 +133,6 @@ def _on_complete_hook(self, my_task):
for i in range(split_n):
outputs.append(self.thread_starter)
my_task._sync_children(outputs, TaskState.FUTURE)
for child in my_task.children:
child.task_spec._update(child)

def serialize(self, serializer):
return serializer.serialize_thread_split(self)
Expand Down
47 changes: 17 additions & 30 deletions SpiffWorkflow/specs/base.py
Expand Up @@ -242,24 +242,19 @@ def _predict(self, my_task, seen=None, looked_ahead=0):
:type looked_ahead: integer
:param looked_ahead: The depth of the predicted path so far.
"""
if my_task._is_finished():
return
if seen is None:
seen = []
elif self in seen:
return
if not my_task._is_finished():
self._predict_hook(my_task)

self._predict_hook(my_task)
if not my_task._is_definite():
if looked_ahead + 1 >= self.lookahead:
return
seen.append(self)
look_ahead = my_task._is_definite() or looked_ahead + 1 < self.lookahead
for child in my_task.children:
child.task_spec._predict(child, seen[:], looked_ahead + 1)
if not child._is_finished() and child not in seen and look_ahead:
child.task_spec._predict(child, seen[:], looked_ahead + 1)

def _predict_hook(self, my_task):
# If the task's status is not predicted, we default to FUTURE
# for all it's outputs.
# If the task's status is not predicted, we default to FUTURE for all it's outputs.
# Otherwise, copy my own state to the children.
if my_task._is_definite():
best_state = TaskState.FUTURE
Expand All @@ -278,6 +273,12 @@ def _update(self, my_task):
completes it makes sure to call this method so we can react.
"""
my_task._inherit_data()
# We were doing this in _update_hook, but to me that seems inconsistent with the spirit
# of the hook functions. Moving it here allows removal of some repeated calls (overridden
# hook methods still need to do these things)
if my_task._is_predicted():
self._predict(my_task)
self.entered_event.emit(my_task.workflow, my_task)
self._update_hook(my_task)

def _update_hook(self, my_task):
Expand All @@ -290,11 +291,8 @@ def _update_hook(self, my_task):
Returning non-False will cause the task to go into READY.
Returning any other value will cause no action.
"""
if my_task._is_predicted():
self._predict(my_task)
if not my_task.parent._is_finished():
return
self.entered_event.emit(my_task.workflow, my_task)
# If this actually did what the documentation said (returned a value indicating
# that the task was ready), then a lot of things might be easier.
my_task._ready()

def _on_ready(self, my_task):
Expand Down Expand Up @@ -387,21 +385,14 @@ def _on_complete(self, my_task):
"""
assert my_task is not None

if my_task.workflow.debug:
print("Executing %s: %s (%s)" % (
my_task.task_spec.__class__.__name__,
my_task.get_name(), my_task.get_description()))

# We have to set the last task here, because the on_complete_hook
# of a loopback task may overwrite what the last_task will be.
my_task.workflow.last_task = my_task
self._on_complete_hook(my_task)
for child in my_task.children:
child.task_spec._update(child)
my_task.workflow._task_completed_notify(my_task)

if my_task.workflow.debug:
if hasattr(my_task.workflow, "outer_workflow"):
my_task.workflow.outer_workflow.task_tree.dump()

self.completed_event.emit(my_task.workflow, my_task)
return True

Expand All @@ -414,9 +405,7 @@ def _on_complete_hook(self, my_task):
:rtype: bool
:returns: True on success, False otherwise.
"""
# If we have more than one output, implicitly split.
for child in my_task.children:
child.task_spec._update(child)
pass

@abstractmethod
def serialize(self, serializer, **kwargs):
Expand Down Expand Up @@ -478,8 +467,6 @@ def deserialize(cls, serializer, wf_spec, s_state, **kwargs):
:rtype: TaskSpec
:returns: The task specification instance.
"""
print(s_state)
print(wf_spec)
out = cls(wf_spec,s_state.get('name'))
out.id = s_state.get('id')
out.name = s_state.get('name')
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/spiff/specs/spiff_task.py
Expand Up @@ -39,6 +39,6 @@ def _on_ready_hook(self, my_task):
self.execute_script(my_task, self.prescript)

def _on_complete_hook(self, my_task):
super()._on_complete_hook(my_task)
if self.postscript is not None:
self.execute_script(my_task, self.postscript)
super()._on_complete_hook(my_task)