Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement/task and spec relations #349

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions SpiffWorkflow/bpmn/serializer/helpers/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def get_default_attributes(self, spec):
'description': spec.description,
'manual': spec.manual,
'lookahead': spec.lookahead,
'inputs': [task.name for task in spec.inputs],
'outputs': [task.name for task in spec.outputs],
'inputs': spec._inputs,
'outputs': spec._outputs,
'bpmn_id': spec.bpmn_id,
'bpmn_name': spec.bpmn_name,
'lane': spec.lane,
Expand Down Expand Up @@ -206,8 +206,8 @@ def task_spec_from_dict(self, dct):
bpmn_id = dct.pop('bpmn_id')

spec = self.spec_class(wf_spec, name, **dct)
spec.inputs = inputs
spec.outputs = outputs
spec._inputs = inputs
spec._outputs = outputs

if issubclass(self.spec_class, BpmnSpecMixin) and bpmn_id != name:
# This is a hack for multiinstance tasks :( At least it is simple.
Expand Down
5 changes: 0 additions & 5 deletions SpiffWorkflow/bpmn/serializer/process_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,4 @@ def from_dict(self, dct):
spec.start = task_spec
self.restore_task_spec_extensions(task_dict, task_spec)

# Now we have to go back and fix all the circular references to everything
for task_spec in spec.task_specs.values():
task_spec.inputs = [ spec.get_task_spec_from_name(name) for name in task_spec.inputs ]
task_spec.outputs = [ spec.get_task_spec_from_name(name) for name in task_spec.outputs ]

return spec
102 changes: 46 additions & 56 deletions SpiffWorkflow/bpmn/serializer/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def workflow_to_dict(self, workflow):
"""
# These properties are applicable to top level & subprocesses
dct = self.process_to_dict(workflow)
dct['spec'] = self.spec_converter.convert(workflow.spec)
# These are only used at the top-level
dct['subprocess_specs'] = dict(
(name, self.spec_converter.convert(spec)) for name, spec in workflow.subprocess_specs.items()
Expand Down Expand Up @@ -204,27 +205,62 @@ def workflow_from_dict(self, dct):
workflow = self.wf_class(spec, subprocess_specs, deserializing=True)

# Restore any unretrieve messages
workflow.bpmn_events = [ self.event_from_dict(msg) for msg in dct.get('bpmn_events', []) ]
workflow.bpmn_events = [ self.event_from_dict(msg) for msg in dct_copy.get('bpmn_events', []) ]

workflow.correlations = dct_copy.pop('correlations', {})

# Restore the remainder of the workflow
workflow.data = self.data_converter.restore(dct_copy.pop('data'))
workflow.success = dct_copy.pop('success')
workflow.task_tree = self.task_tree_from_dict(dct_copy, dct_copy.pop('root'), None, workflow)
workflow.tasks = dict(
(UUID(task['id']), self.task_from_dict(task, workflow, workflow.spec))
for task in dct_copy['tasks'].values()
)
workflow.task_tree = workflow.tasks.get(UUID(dct_copy['root']))
if dct_copy['last_task'] is not None:
workflow.last_task = workflow.tasks.get(UUID(dct_copy['last_task']))

self.subprocesses_from_dict(dct_copy['subprocesses'], workflow)
return workflow

def subprocesses_from_dict(self, dct, workflow, top_workflow=None):
# This ensures we create parent workflows before their children
top_workflow = top_workflow or workflow
for task in workflow.tasks.values():
if isinstance(task.task_spec, SubWorkflowTask) and str(task.id) in dct:
sp = self.subworkflow_from_dict(dct.pop(str(task.id)), task, top_workflow)
top_workflow.subprocesses[task.id] = sp
sp.completed_event.connect(task.task_spec._on_subworkflow_completed, task)
if len(sp.spec.data_objects) > 0:
sp.data = task.workflow.data
self.subprocesses_from_dict(dct, sp, top_workflow)

def subworkflow_to_dict(self, workflow):
dct = self.process_to_dict(workflow)
dct['parent_task_id'] = str(workflow.parent_task_id)
dct['spec'] = workflow.spec.name
return dct

def subworkflow_from_dict(self, dct, task, top_workflow):
spec = top_workflow.subprocess_specs.get(task.task_spec.spec)
subprocess = self.sub_wf_class(spec, task.id, top_workflow, deserializing=True)
subprocess.correlations = dct.pop('correlations', {})
subprocess.tasks = dict(
(UUID(task['id']), self.task_from_dict(task, subprocess, spec))
for task in dct['tasks'].values()
)
subprocess.task_tree = subprocess.tasks.get(UUID(dct['root']))
if isinstance(dct['last_task'], str):
subprocess.last_task = subprocess.tasks.get(UUID(dct['last_task']))
subprocess.success = dct['success']
subprocess.data = self.data_converter.restore(dct['data'])
return subprocess

def task_to_dict(self, task):
return {
'id': str(task.id),
'parent': str(task.parent.id) if task.parent is not None else None,
'children': [ str(child.id) for child in task.children ],
'parent': str(task._parent) if task.parent is not None else None,
'children': [ str(child) for child in task._children ],
'last_state_change': task.last_state_change,
'state': task.state,
'task_spec': task.task_spec.name,
Expand All @@ -233,71 +269,25 @@ def task_to_dict(self, task):
'data': self.data_converter.convert(task.data),
}

def task_from_dict(self, dct, workflow, task_spec, parent):
def task_from_dict(self, dct, workflow, spec):

task = Task(workflow, task_spec, parent, dct['state'])
task.id = UUID(dct['id'])
task_spec = spec.task_specs.get(dct['task_spec'])
task = Task(workflow, task_spec, state=dct['state'], id=UUID(dct['id']))
task._parent = UUID(dct['parent']) if dct['parent'] is not None else None
task._children = [UUID(child) for child in dct['children']]
task.last_state_change = dct['last_state_change']
task.triggered = dct['triggered']
task.internal_data = self.data_converter.restore(dct['internal_data'])
task.data = self.data_converter.restore(dct['data'])
return task

def task_tree_to_dict(self, root):

tasks = { }
def add_task(task):
dct = self.task_to_dict(task)
tasks[dct['id']] = dct
for child in task.children:
add_task(child)

add_task(root)
return tasks

def task_tree_from_dict(self, process_dct, task_id, parent_task, process, top_level_workflow=None, top_level_dct=None):

top = top_level_workflow or process
top_dct = top_level_dct or process_dct

task_dict = process_dct['tasks'][task_id]
task_spec = process.spec.task_specs[task_dict['task_spec']]
task = self.task_from_dict(task_dict, process, task_spec, parent_task)
if task_id == process_dct['last_task']:
process.last_task = task

if isinstance(task_spec, SubWorkflowTask) and task_id in top_dct.get('subprocesses', {}):
subprocess_spec = top.subprocess_specs[task_spec.spec]
subprocess = self.sub_wf_class(subprocess_spec, task.id, top_level_workflow, deserializing=True)
subprocess_dct = top_dct['subprocesses'].get(task_id, {})
subprocess.spec.data_objects.update(process.spec.data_objects)
if len(subprocess.spec.data_objects) > 0:
subprocess.data = process.data
else:
subprocess.data = self.data_converter.restore(subprocess_dct.pop('data'))
subprocess.success = subprocess_dct.pop('success')
subprocess.correlations = subprocess_dct.pop('correlations', {})
subprocess.task_tree = self.task_tree_from_dict(subprocess_dct, subprocess_dct.pop('root'), None, subprocess, top, top_dct)
subprocess.completed_event.connect(task_spec._on_subworkflow_completed, task)
top_level_workflow.subprocesses[task.id] = subprocess

for child_task_id in task_dict['children']:
if child_task_id in process_dct['tasks']:
process_dct['tasks'][child_task_id]
self.task_tree_from_dict(process_dct, child_task_id, task, process, top, top_dct)
else:
raise ValueError(f"Task {task_id} ({task_spec.name}) has child {child_task_id}, but no such task exists")

return task

def process_to_dict(self, process):
return {
'spec': self.spec_converter.convert(process.spec),
'data': self.data_converter.convert(process.data),
'correlations': process.correlations,
'last_task': str(process.last_task.id) if process.last_task is not None else None,
'success': process.success,
'tasks': self.task_tree_to_dict(process.task_tree),
'tasks': dict((str(task.id), self.task_to_dict(task)) for task in process.tasks.values()),
'root': str(process.task_tree.id),
}

Expand Down
4 changes: 1 addition & 3 deletions SpiffWorkflow/bpmn/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,8 @@ def get_or_create_subprocess(task_spec, wf_spec):

new = spec_class(self.spec, f'{wf_spec.name}_{len(self.subprocesses)}', wf_spec.name)
self.spec.start.connect(new)
task = Task(self, new)
start = self.get_tasks_from_spec_name('Start', workflow=self)[0]
start.children.append(task)
task.parent = start
task = Task(self, new, parent=start)
# This (indirectly) calls create_subprocess
task.task_spec._update(task)
return self.subprocesses[task.id]
Expand Down
44 changes: 12 additions & 32 deletions SpiffWorkflow/serializer/dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
# 02110-1301 USA

import json

import pickle
import warnings
from base64 import b64encode, b64decode

from ..workflow import Workflow
from ..util.impl import get_class
from ..task import Task, TaskState
from ..task import Task
from ..operators import Attrib, PathAttrib, Equal, NotEqual, Operator, GreaterThan, LessThan, Match
from ..specs.base import TaskSpec
from ..specs.AcquireMutex import AcquireMutex
Expand All @@ -46,7 +47,7 @@
from ..specs.WorkflowSpec import WorkflowSpec
from .base import Serializer
from .exceptions import TaskNotSupportedError, MissingSpecError
import warnings


class DictionarySerializer(Serializer):

Expand Down Expand Up @@ -146,8 +147,8 @@ def serialize_task_spec(self, spec):
lookahead=spec.lookahead)
module_name = spec.__class__.__module__
s_state['class'] = module_name + '.' + spec.__class__.__name__
s_state['inputs'] = [t.name for t in spec.inputs]
s_state['outputs'] = [t.name for t in spec.outputs]
s_state['inputs'] = spec._inputs
s_state['outputs'] = spec._outputs
s_state['data'] = self.serialize_dict(spec.data)
s_state['defines'] = self.serialize_dict(spec.defines)
s_state['pre_assign'] = self.serialize_list(spec.pre_assign)
Expand All @@ -164,10 +165,8 @@ def deserialize_task_spec(self, wf_spec, s_state, spec):
spec.defines = self.deserialize_dict(s_state.get('defines', {}))
spec.pre_assign = self.deserialize_list(s_state.get('pre_assign', []))
spec.post_assign = self.deserialize_list(s_state.get('post_assign', []))
# We can't restore inputs and outputs yet because they may not be
# deserialized yet. So keep the names, and resolve them in the end.
spec.inputs = s_state.get('inputs', [])[:]
spec.outputs = s_state.get('outputs', [])[:]
spec._inputs = s_state.get('inputs', [])
spec._outputs = s_state.get('outputs', [])
return spec

def serialize_acquire_mutex(self, spec):
Expand Down Expand Up @@ -437,10 +436,6 @@ def serialize_workflow_spec(self, spec, **kwargs):
)
return s_state

def _deserialize_workflow_spec_task_spec(self, spec, task_spec, name):
task_spec.inputs = [spec.get_task_spec_from_name(t) for t in task_spec.inputs]
task_spec.outputs = [spec.get_task_spec_from_name(t) for t in task_spec.outputs]

def deserialize_workflow_spec(self, s_state, **kwargs):
spec = WorkflowSpec(s_state['name'], filename=s_state['file'])
spec.description = s_state['description']
Expand All @@ -458,9 +453,6 @@ def deserialize_workflow_spec(self, s_state, **kwargs):
task_spec = task_spec_cls.deserialize(self, spec, task_spec_state)
spec.task_specs[name] = task_spec

for name, task_spec in list(spec.task_specs.items()):
self._deserialize_workflow_spec_task_spec(spec, task_spec, name)

if s_state.get('end', None):
spec.end = spec.get_task_spec_from_name(s_state['end'])

Expand Down Expand Up @@ -502,17 +494,6 @@ def deserialize_workflow(self, s_state, wf_class=Workflow, **kwargs):
workflow.spec = wf_spec
workflow.task_tree = self.deserialize_task(workflow, s_state['task_tree'], reset_specs)

# Re-connect parents and update states if necessary
update_state = workflow.task_tree.state != TaskState.COMPLETED
for task in workflow.get_tasks_iterator():
if task.parent is not None:
task.parent = workflow.get_task_from_id(task.parent)
if update_state:
if task.state == 32:
task.state = TaskState.COMPLETED
elif task.state == 64:
task.state = TaskState.CANCELLED

if workflow.last_task is not None:
workflow.last_task = workflow.get_task_from_id(s_state['last_task'])

Expand Down Expand Up @@ -545,12 +526,11 @@ def deserialize_task(self, workflow, s_state, ignored_specs=None):
task_spec = workflow.spec.get_task_spec_from_name(old_spec_name)
if task_spec is None:
raise MissingSpecError("Unknown task spec: " + old_spec_name)
task = Task(workflow, task_spec)
task_id = s_state['id']
parent_id = s_state['parent']
parent = workflow.get_task_from_id(parent_id) if parent_id is not None else None
task = Task(workflow, task_spec, parent, id=task_id)

task.id = s_state['id']
# as the task_tree might not be complete yet
# keep the ids so they can be processed at the end
task.parent = s_state['parent']
task.children = self._deserialize_task_children(task, s_state, ignored_specs)
task._state = s_state['state']
task.triggered = s_state['triggered']
Expand Down
46 changes: 16 additions & 30 deletions SpiffWorkflow/serializer/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# 02110-1301 USA

import warnings
from uuid import UUID

from lxml import etree
from lxml.etree import SubElement
from ..workflow import Workflow
Expand Down Expand Up @@ -292,16 +294,12 @@ def serialize_task_spec(self, spec, elem):
if spec.manual:
SubElement(elem, 'manual')
SubElement(elem, 'lookahead').text = str(spec.lookahead)
inputs = [t.name for t in spec.inputs]
outputs = [t.name for t in spec.outputs]
self.serialize_value_list(SubElement(elem, 'inputs'), inputs)
self.serialize_value_list(SubElement(elem, 'outputs'), outputs)
self.serialize_value_list(SubElement(elem, 'inputs'), spec._inputs)
self.serialize_value_list(SubElement(elem, 'outputs'), spec._outputs)
self.serialize_value_map(SubElement(elem, 'data'), spec.data)
self.serialize_value_map(SubElement(elem, 'defines'), spec.defines)
self.serialize_value_list(SubElement(elem, 'pre-assign'),
spec.pre_assign)
self.serialize_value_list(SubElement(elem, 'post-assign'),
spec.post_assign)
self.serialize_value_list(SubElement(elem, 'pre-assign'), spec.pre_assign)
self.serialize_value_list(SubElement(elem, 'post-assign'), spec.post_assign)

# Note: Events are not serialized; this is documented in
# the TaskSpec API docs.
Expand All @@ -327,12 +325,8 @@ def deserialize_task_spec(self, wf_spec, elem, spec_cls, **kwargs):
post_assign_elem = elem.find('post-assign')
if post_assign_elem is not None:
spec.post_assign = self.deserialize_value_list(post_assign_elem)

# We can't restore inputs and outputs yet because they may not be
# deserialized yet. So keep the names, and resolve them in the
# workflowspec deserializer.
spec.inputs = self.deserialize_value_list(elem.find('inputs'))
spec.outputs = self.deserialize_value_list(elem.find('outputs'))
spec._inputs = self.deserialize_value_list(elem.find('inputs'))
spec._outputs = self.deserialize_value_list(elem.find('outputs'))

return spec

Expand Down Expand Up @@ -634,13 +628,6 @@ def deserialize_workflow_spec(self, elem, **kwargs):
task_spec = cls.deserialize(self, spec, task_elem)
spec.task_specs[task_spec.name] = task_spec
spec.start = spec.task_specs['Start']

# Connect the tasks.
for name, task_spec in list(spec.task_specs.items()):
task_spec.inputs = [spec.get_task_spec_from_name(t)
for t in task_spec.inputs]
task_spec.outputs = [spec.get_task_spec_from_name(t)
for t in task_spec.outputs]
return spec

def serialize_workflow(self, workflow, **kwargs):
Expand Down Expand Up @@ -674,11 +661,6 @@ def deserialize_workflow(self, elem, **kwargs):
task_tree_elem = elem.find('task-tree')
workflow.task_tree = self.deserialize_task(workflow, task_tree_elem[0])

# Re-connect parents
for task in workflow.get_tasks_iterator():
if task.parent is not None:
task.parent = workflow.get_task_from_id(task.parent)

# last_task
last_task = elem.findtext('last-task')
if last_task is not None:
Expand Down Expand Up @@ -725,10 +707,14 @@ def deserialize_task(self, workflow, elem):

task_spec_name = elem.findtext('spec')
task_spec = workflow.spec.get_task_spec_from_name(task_spec_name)
task = Task(workflow, task_spec)
task.id = elem.findtext('id')
# The parent is later resolved by the workflow deserializer
task.parent = elem.findtext('parent')
task_id = elem.findtext('id')
if task_id is not None:
task_id = UUID(task_id)
# Deserialization is done by traversing the tree, the parent should already exist
# when children are deserialized
parent_id = elem.findtext('parent')
parent = workflow.tasks[UUID(parent_id)] if parent_id is not None else None
task = Task(workflow, task_spec, parent, id=task_id)

for child_elem in elem.find('children'):
child_task = self.deserialize_task(workflow, child_elem)
Expand Down
Loading