Skip to content

Commit

Permalink
Merge pull request #349 from sartography/improvement/task-and-spec-re…
Browse files Browse the repository at this point in the history
…lations

Improvement/task and spec relations
  • Loading branch information
essweine committed Sep 6, 2023
2 parents 3bd9e00 + a15dd17 commit 179d62c
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 173 deletions.
8 changes: 4 additions & 4 deletions SpiffWorkflow/bpmn/serializer/helpers/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def get_default_attributes(self, spec):
'description': spec.description,
'manual': spec.manual,
'lookahead': spec.lookahead,
'inputs': [task.name for task in spec.inputs],
'outputs': [task.name for task in spec.outputs],
'inputs': spec._inputs,
'outputs': spec._outputs,
'bpmn_id': spec.bpmn_id,
'bpmn_name': spec.bpmn_name,
'lane': spec.lane,
Expand Down Expand Up @@ -206,8 +206,8 @@ def task_spec_from_dict(self, dct):
bpmn_id = dct.pop('bpmn_id')

spec = self.spec_class(wf_spec, name, **dct)
spec.inputs = inputs
spec.outputs = outputs
spec._inputs = inputs
spec._outputs = outputs

if issubclass(self.spec_class, BpmnSpecMixin) and bpmn_id != name:
# This is a hack for multiinstance tasks :( At least it is simple.
Expand Down
5 changes: 0 additions & 5 deletions SpiffWorkflow/bpmn/serializer/process_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,4 @@ def from_dict(self, dct):
spec.start = task_spec
self.restore_task_spec_extensions(task_dict, task_spec)

# Now we have to go back and fix all the circular references to everything
for task_spec in spec.task_specs.values():
task_spec.inputs = [ spec.get_task_spec_from_name(name) for name in task_spec.inputs ]
task_spec.outputs = [ spec.get_task_spec_from_name(name) for name in task_spec.outputs ]

return spec
99 changes: 44 additions & 55 deletions SpiffWorkflow/bpmn/serializer/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,28 +205,62 @@ def workflow_from_dict(self, dct):
workflow = self.wf_class(spec, subprocess_specs, deserializing=True)

# Restore any unretrieve messages
workflow.bpmn_events = [ self.event_from_dict(msg) for msg in dct.get('bpmn_events', []) ]
workflow.bpmn_events = [ self.event_from_dict(msg) for msg in dct_copy.get('bpmn_events', []) ]

workflow.correlations = dct_copy.pop('correlations', {})

# Restore the remainder of the workflow
workflow.data = self.data_converter.restore(dct_copy.pop('data'))
workflow.success = dct_copy.pop('success')
workflow.task_tree = self.task_tree_from_dict(dct_copy, dct_copy.pop('root'), None, workflow)
workflow.tasks = dict(
(UUID(task['id']), self.task_from_dict(task, workflow, workflow.spec))
for task in dct_copy['tasks'].values()
)
workflow.task_tree = workflow.tasks.get(UUID(dct_copy['root']))
if dct_copy['last_task'] is not None:
workflow.last_task = workflow.tasks.get(UUID(dct_copy['last_task']))

self.subprocesses_from_dict(dct_copy['subprocesses'], workflow)
return workflow

def subprocesses_from_dict(self, dct, workflow, top_workflow=None):
# This ensures we create parent workflows before their children
top_workflow = top_workflow or workflow
for task in workflow.tasks.values():
if isinstance(task.task_spec, SubWorkflowTask) and str(task.id) in dct:
sp = self.subworkflow_from_dict(dct.pop(str(task.id)), task, top_workflow)
top_workflow.subprocesses[task.id] = sp
sp.completed_event.connect(task.task_spec._on_subworkflow_completed, task)
if len(sp.spec.data_objects) > 0:
sp.data = task.workflow.data
self.subprocesses_from_dict(dct, sp, top_workflow)

def subworkflow_to_dict(self, workflow):
dct = self.process_to_dict(workflow)
dct['parent_task_id'] = str(workflow.parent_task_id)
dct['spec'] = workflow.spec.name
return dct

def subworkflow_from_dict(self, dct, task, top_workflow):
spec = top_workflow.subprocess_specs.get(task.task_spec.spec)
subprocess = self.sub_wf_class(spec, task.id, top_workflow, deserializing=True)
subprocess.correlations = dct.pop('correlations', {})
subprocess.tasks = dict(
(UUID(task['id']), self.task_from_dict(task, subprocess, spec))
for task in dct['tasks'].values()
)
subprocess.task_tree = subprocess.tasks.get(UUID(dct['root']))
if isinstance(dct['last_task'], str):
subprocess.last_task = subprocess.tasks.get(UUID(dct['last_task']))
subprocess.success = dct['success']
subprocess.data = self.data_converter.restore(dct['data'])
return subprocess

def task_to_dict(self, task):
return {
'id': str(task.id),
'parent': str(task.parent.id) if task.parent is not None else None,
'children': [ str(child.id) for child in task.children ],
'parent': str(task._parent) if task.parent is not None else None,
'children': [ str(child) for child in task._children ],
'last_state_change': task.last_state_change,
'state': task.state,
'task_spec': task.task_spec.name,
Expand All @@ -235,70 +269,25 @@ def task_to_dict(self, task):
'data': self.data_converter.convert(task.data),
}

def task_from_dict(self, dct, workflow, task_spec, parent):
def task_from_dict(self, dct, workflow, spec):

task = Task(workflow, task_spec, parent, dct['state'])
task.id = UUID(dct['id'])
task_spec = spec.task_specs.get(dct['task_spec'])
task = Task(workflow, task_spec, state=dct['state'], id=UUID(dct['id']))
task._parent = UUID(dct['parent']) if dct['parent'] is not None else None
task._children = [UUID(child) for child in dct['children']]
task.last_state_change = dct['last_state_change']
task.triggered = dct['triggered']
task.internal_data = self.data_converter.restore(dct['internal_data'])
task.data = self.data_converter.restore(dct['data'])
return task

def task_tree_to_dict(self, root):

tasks = { }
def add_task(task):
dct = self.task_to_dict(task)
tasks[dct['id']] = dct
for child in task.children:
add_task(child)

add_task(root)
return tasks

def task_tree_from_dict(self, process_dct, task_id, parent_task, process, top_level_workflow=None, top_level_dct=None):

top = top_level_workflow or process
top_dct = top_level_dct or process_dct

task_dict = process_dct['tasks'][task_id]
task_spec = process.spec.task_specs[task_dict['task_spec']]
task = self.task_from_dict(task_dict, process, task_spec, parent_task)
if task_id == process_dct['last_task']:
process.last_task = task

if isinstance(task_spec, SubWorkflowTask) and task_id in top_dct.get('subprocesses', {}):
subprocess_spec = top.subprocess_specs[task_spec.spec]
subprocess = self.sub_wf_class(subprocess_spec, task.id, top_level_workflow, deserializing=True)
subprocess_dct = top_dct['subprocesses'].get(task_id, {})
subprocess.spec.data_objects.update(process.spec.data_objects)
if len(subprocess.spec.data_objects) > 0:
subprocess.data = process.data
else:
subprocess.data = self.data_converter.restore(subprocess_dct.pop('data'))
subprocess.success = subprocess_dct.pop('success')
subprocess.correlations = subprocess_dct.pop('correlations', {})
subprocess.task_tree = self.task_tree_from_dict(subprocess_dct, subprocess_dct.pop('root'), None, subprocess, top, top_dct)
subprocess.completed_event.connect(task_spec._on_subworkflow_completed, task)
top_level_workflow.subprocesses[task.id] = subprocess

for child_task_id in task_dict['children']:
if child_task_id in process_dct['tasks']:
process_dct['tasks'][child_task_id]
self.task_tree_from_dict(process_dct, child_task_id, task, process, top, top_dct)
else:
raise ValueError(f"Task {task_id} ({task_spec.name}) has child {child_task_id}, but no such task exists")

return task

def process_to_dict(self, process):
return {
'data': self.data_converter.convert(process.data),
'correlations': process.correlations,
'last_task': str(process.last_task.id) if process.last_task is not None else None,
'success': process.success,
'tasks': self.task_tree_to_dict(process.task_tree),
'tasks': dict((str(task.id), self.task_to_dict(task)) for task in process.tasks.values()),
'root': str(process.task_tree.id),
}

Expand Down
44 changes: 12 additions & 32 deletions SpiffWorkflow/serializer/dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
# 02110-1301 USA

import json

import pickle
import warnings
from base64 import b64encode, b64decode

from ..workflow import Workflow
from ..util.impl import get_class
from ..task import Task, TaskState
from ..task import Task
from ..operators import Attrib, PathAttrib, Equal, NotEqual, Operator, GreaterThan, LessThan, Match
from ..specs.base import TaskSpec
from ..specs.AcquireMutex import AcquireMutex
Expand All @@ -46,7 +47,7 @@
from ..specs.WorkflowSpec import WorkflowSpec
from .base import Serializer
from .exceptions import TaskNotSupportedError, MissingSpecError
import warnings


class DictionarySerializer(Serializer):

Expand Down Expand Up @@ -146,8 +147,8 @@ def serialize_task_spec(self, spec):
lookahead=spec.lookahead)
module_name = spec.__class__.__module__
s_state['class'] = module_name + '.' + spec.__class__.__name__
s_state['inputs'] = [t.name for t in spec.inputs]
s_state['outputs'] = [t.name for t in spec.outputs]
s_state['inputs'] = spec._inputs
s_state['outputs'] = spec._outputs
s_state['data'] = self.serialize_dict(spec.data)
s_state['defines'] = self.serialize_dict(spec.defines)
s_state['pre_assign'] = self.serialize_list(spec.pre_assign)
Expand All @@ -164,10 +165,8 @@ def deserialize_task_spec(self, wf_spec, s_state, spec):
spec.defines = self.deserialize_dict(s_state.get('defines', {}))
spec.pre_assign = self.deserialize_list(s_state.get('pre_assign', []))
spec.post_assign = self.deserialize_list(s_state.get('post_assign', []))
# We can't restore inputs and outputs yet because they may not be
# deserialized yet. So keep the names, and resolve them in the end.
spec.inputs = s_state.get('inputs', [])[:]
spec.outputs = s_state.get('outputs', [])[:]
spec._inputs = s_state.get('inputs', [])
spec._outputs = s_state.get('outputs', [])
return spec

def serialize_acquire_mutex(self, spec):
Expand Down Expand Up @@ -437,10 +436,6 @@ def serialize_workflow_spec(self, spec, **kwargs):
)
return s_state

def _deserialize_workflow_spec_task_spec(self, spec, task_spec, name):
task_spec.inputs = [spec.get_task_spec_from_name(t) for t in task_spec.inputs]
task_spec.outputs = [spec.get_task_spec_from_name(t) for t in task_spec.outputs]

def deserialize_workflow_spec(self, s_state, **kwargs):
spec = WorkflowSpec(s_state['name'], filename=s_state['file'])
spec.description = s_state['description']
Expand All @@ -458,9 +453,6 @@ def deserialize_workflow_spec(self, s_state, **kwargs):
task_spec = task_spec_cls.deserialize(self, spec, task_spec_state)
spec.task_specs[name] = task_spec

for name, task_spec in list(spec.task_specs.items()):
self._deserialize_workflow_spec_task_spec(spec, task_spec, name)

if s_state.get('end', None):
spec.end = spec.get_task_spec_from_name(s_state['end'])

Expand Down Expand Up @@ -502,17 +494,6 @@ def deserialize_workflow(self, s_state, wf_class=Workflow, **kwargs):
workflow.spec = wf_spec
workflow.task_tree = self.deserialize_task(workflow, s_state['task_tree'], reset_specs)

# Re-connect parents and update states if necessary
update_state = workflow.task_tree.state != TaskState.COMPLETED
for task in workflow.get_tasks_iterator():
if task.parent is not None:
task.parent = workflow.get_task_from_id(task.parent)
if update_state:
if task.state == 32:
task.state = TaskState.COMPLETED
elif task.state == 64:
task.state = TaskState.CANCELLED

if workflow.last_task is not None:
workflow.last_task = workflow.get_task_from_id(s_state['last_task'])

Expand Down Expand Up @@ -545,12 +526,11 @@ def deserialize_task(self, workflow, s_state, ignored_specs=None):
task_spec = workflow.spec.get_task_spec_from_name(old_spec_name)
if task_spec is None:
raise MissingSpecError("Unknown task spec: " + old_spec_name)
task = Task(workflow, task_spec)
task_id = s_state['id']
parent_id = s_state['parent']
parent = workflow.get_task_from_id(parent_id) if parent_id is not None else None
task = Task(workflow, task_spec, parent, id=task_id)

task.id = s_state['id']
# as the task_tree might not be complete yet
# keep the ids so they can be processed at the end
task.parent = s_state['parent']
task.children = self._deserialize_task_children(task, s_state, ignored_specs)
task._state = s_state['state']
task.triggered = s_state['triggered']
Expand Down
46 changes: 16 additions & 30 deletions SpiffWorkflow/serializer/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# 02110-1301 USA

import warnings
from uuid import UUID

from lxml import etree
from lxml.etree import SubElement
from ..workflow import Workflow
Expand Down Expand Up @@ -292,16 +294,12 @@ def serialize_task_spec(self, spec, elem):
if spec.manual:
SubElement(elem, 'manual')
SubElement(elem, 'lookahead').text = str(spec.lookahead)
inputs = [t.name for t in spec.inputs]
outputs = [t.name for t in spec.outputs]
self.serialize_value_list(SubElement(elem, 'inputs'), inputs)
self.serialize_value_list(SubElement(elem, 'outputs'), outputs)
self.serialize_value_list(SubElement(elem, 'inputs'), spec._inputs)
self.serialize_value_list(SubElement(elem, 'outputs'), spec._outputs)
self.serialize_value_map(SubElement(elem, 'data'), spec.data)
self.serialize_value_map(SubElement(elem, 'defines'), spec.defines)
self.serialize_value_list(SubElement(elem, 'pre-assign'),
spec.pre_assign)
self.serialize_value_list(SubElement(elem, 'post-assign'),
spec.post_assign)
self.serialize_value_list(SubElement(elem, 'pre-assign'), spec.pre_assign)
self.serialize_value_list(SubElement(elem, 'post-assign'), spec.post_assign)

# Note: Events are not serialized; this is documented in
# the TaskSpec API docs.
Expand All @@ -327,12 +325,8 @@ def deserialize_task_spec(self, wf_spec, elem, spec_cls, **kwargs):
post_assign_elem = elem.find('post-assign')
if post_assign_elem is not None:
spec.post_assign = self.deserialize_value_list(post_assign_elem)

# We can't restore inputs and outputs yet because they may not be
# deserialized yet. So keep the names, and resolve them in the
# workflowspec deserializer.
spec.inputs = self.deserialize_value_list(elem.find('inputs'))
spec.outputs = self.deserialize_value_list(elem.find('outputs'))
spec._inputs = self.deserialize_value_list(elem.find('inputs'))
spec._outputs = self.deserialize_value_list(elem.find('outputs'))

return spec

Expand Down Expand Up @@ -634,13 +628,6 @@ def deserialize_workflow_spec(self, elem, **kwargs):
task_spec = cls.deserialize(self, spec, task_elem)
spec.task_specs[task_spec.name] = task_spec
spec.start = spec.task_specs['Start']

# Connect the tasks.
for name, task_spec in list(spec.task_specs.items()):
task_spec.inputs = [spec.get_task_spec_from_name(t)
for t in task_spec.inputs]
task_spec.outputs = [spec.get_task_spec_from_name(t)
for t in task_spec.outputs]
return spec

def serialize_workflow(self, workflow, **kwargs):
Expand Down Expand Up @@ -674,11 +661,6 @@ def deserialize_workflow(self, elem, **kwargs):
task_tree_elem = elem.find('task-tree')
workflow.task_tree = self.deserialize_task(workflow, task_tree_elem[0])

# Re-connect parents
for task in workflow.get_tasks_iterator():
if task.parent is not None:
task.parent = workflow.get_task_from_id(task.parent)

# last_task
last_task = elem.findtext('last-task')
if last_task is not None:
Expand Down Expand Up @@ -725,10 +707,14 @@ def deserialize_task(self, workflow, elem):

task_spec_name = elem.findtext('spec')
task_spec = workflow.spec.get_task_spec_from_name(task_spec_name)
task = Task(workflow, task_spec)
task.id = elem.findtext('id')
# The parent is later resolved by the workflow deserializer
task.parent = elem.findtext('parent')
task_id = elem.findtext('id')
if task_id is not None:
task_id = UUID(task_id)
# Deserialization is done by traversing the tree, the parent should already exist
# when children are deserialized
parent_id = elem.findtext('parent')
parent = workflow.tasks[UUID(parent_id)] if parent_id is not None else None
task = Task(workflow, task_spec, parent, id=task_id)

for child_elem in elem.find('children'):
child_task = self.deserialize_task(workflow, child_elem)
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/specs/MultiChoice.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def connect_if(self, condition, task_spec):
taskspec -- the conditional task spec
"""
assert task_spec is not None
self.outputs.append(task_spec)
self._outputs.append(task_spec.name)
self.cond_task_specs.append((condition, task_spec.name))
task_spec._connect_notify(self)

Expand Down
6 changes: 5 additions & 1 deletion SpiffWorkflow/specs/SubWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,12 @@ def _create_subworkflow(self, my_task):
wf_spec = WorkflowSpec.deserialize(serializer, xml, filename=file_name)
subworkflow = Workflow(wf_spec)
my_task._sync_children(self.outputs, TaskState.FUTURE)
# I don't necessarily like this, but I can't say I like anything about subproceses work here
for task in subworkflow.task_tree:
my_task.workflow.tasks[task.id] = task
subworkflow.tasks[my_task.id] = my_task
subworkflow.task_tree.parent = my_task
my_task.children.insert(0, subworkflow.task_tree)
my_task._children.insert(0, subworkflow.task_tree.id)
subworkflow.completed_event.connect(self._on_subworkflow_completed, my_task)
my_task._set_internal_data(subworkflow=subworkflow)
my_task._set_state(TaskState.WAITING)
Expand Down
Loading

0 comments on commit 179d62c

Please sign in to comment.