Skip to content

Commit

Permalink
Merge pull request #292 from sartography/feature/multiinstance-refactor
Browse files Browse the repository at this point in the history
Feature/multiinstance refactor
  • Loading branch information
essweine committed Feb 15, 2023
2 parents 11e4b4f + 5374900 commit 590903f
Show file tree
Hide file tree
Showing 102 changed files with 4,257 additions and 3,414 deletions.
3 changes: 2 additions & 1 deletion SpiffWorkflow/bpmn/parser/BpmnParser.py
Expand Up @@ -21,7 +21,7 @@
import os

from lxml import etree
from lxml.etree import DocumentInvalid, LxmlError
from lxml.etree import LxmlError

from SpiffWorkflow.bpmn.specs.events.event_definitions import NoneEventDefinition

Expand Down Expand Up @@ -62,6 +62,7 @@
)



XSD_PATH = os.path.join(os.path.dirname(__file__), 'schema', 'BPMN20.xsd')

class BpmnValidator:
Expand Down
34 changes: 11 additions & 23 deletions SpiffWorkflow/bpmn/parser/ProcessParser.py
Expand Up @@ -18,7 +18,8 @@
# 02110-1301 USA

from .ValidationException import ValidationException
from ..specs.BpmnProcessSpec import BpmnProcessSpec, BpmnDataSpecification
from ..specs.BpmnProcessSpec import BpmnProcessSpec
from ..specs.data_spec import DataObject
from .node_parser import NodeParser
from .util import first

Expand Down Expand Up @@ -106,21 +107,22 @@ def _parse(self):
raise ValidationException("No start event found", node=self.node, file_name=self.filename)
self.spec = BpmnProcessSpec(name=self.get_id(), description=self.get_name(), filename=self.filename)

# Check for an IO Specification.
io_spec = first(self.xpath('./bpmn:ioSpecification'))
if io_spec is not None:
data_parser = DataSpecificationParser(io_spec, filename=self.filename)
self.spec.data_inputs, self.spec.data_outputs = data_parser.parse_io_spec()

# Get the data objects
for obj in self.xpath('./bpmn:dataObject'):
data_parser = DataSpecificationParser(obj, filename=self.filename)
data_object = data_parser.parse_data_object()
data_object = self.parse_data_object(obj)
self.spec.data_objects[data_object.name] = data_object

# Check for an IO Specification.
io_spec = first(self.xpath('./bpmn:ioSpecification'))
if io_spec is not None:
self.spec.io_specification = self.parse_io_spec()

for node in start_node_list:
self.parse_node(node)

def parse_data_object(self, obj):
return DataObject(obj.get('id'), obj.get('name'))

def get_spec(self):
"""
Parse this process (if it has not already been parsed), and return the
Expand All @@ -129,17 +131,3 @@ def get_spec(self):
if self.spec is None:
self._parse()
return self.spec


class DataSpecificationParser(NodeParser):

def parse_io_spec(self):
inputs, outputs = [], []
for elem in self.xpath('./bpmn:dataInput'):
inputs.append(BpmnDataSpecification(elem.get('id'), elem.get('name')))
for elem in self.xpath('./bpmn:dataOutput'):
outputs.append(BpmnDataSpecification(elem.get('id'), elem.get('name')))
return inputs, outputs

def parse_data_object(self):
return BpmnDataSpecification(self.node.get('id'), self.node.get('name'))
196 changes: 115 additions & 81 deletions SpiffWorkflow/bpmn/parser/TaskParser.py
Expand Up @@ -17,24 +17,17 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

import sys
import traceback
from .ValidationException import ValidationException
from ..specs.NoneTask import NoneTask
from ..specs.ScriptTask import ScriptTask
from ..specs.UserTask import UserTask
from ..specs.events.IntermediateEvent import _BoundaryEventParent
from ..specs.events.event_definitions import CancelEventDefinition
from ..specs.MultiInstanceTask import getDynamicMIClass
from ..specs.SubWorkflowTask import CallActivity, TransactionSubprocess, SubWorkflowTask
from ..specs.MultiInstanceTask import StandardLoopTask, SequentialMultiInstanceTask, ParallelMultiInstanceTask
from ..specs.SubWorkflowTask import TransactionSubprocess
from ..specs.ExclusiveGateway import ExclusiveGateway
from ..specs.InclusiveGateway import InclusiveGateway
from ...dmn.specs.BusinessRuleTask import BusinessRuleTask
from ...operators import Attrib, PathAttrib
from .util import one, first
from .node_parser import NodeParser
from ..specs.data_spec import TaskDataReference

STANDARDLOOPCOUNT = '25'
from .util import one
from .node_parser import NodeParser

CAMUNDA_MODEL_NS = 'http://camunda.org/schema/1.0/bpmn'

Expand All @@ -48,6 +41,12 @@ class TaskParser(NodeParser):
outgoing transitions, once the child tasks have all been parsed.
"""

# I hate myself for this. I wanted to at least relegate it to the top-level
# parser where the rest of the similar nonsense is, but it's inaccessible here.
STANDARD_LOOP_CLASS = StandardLoopTask
PARALLEL_MI_CLASS = ParallelMultiInstanceTask
SEQUENTIAL_MI_CLASS = SequentialMultiInstanceTask

def __init__(self, process_parser, spec_class, node, nsmap=None, lane=None):
"""
Constructor.
Expand All @@ -63,64 +62,97 @@ def __init__(self, process_parser, spec_class, node, nsmap=None, lane=None):
self.spec_class = spec_class
self.spec = self.process_parser.spec

def _set_multiinstance_attributes(self, is_sequential, expanded, loop_count,
loop_task=False, element_var=None, collection=None, completion_condition=None):
# This should be replaced with its own task parser (though I'm not sure how feasible this is given
# the current parser achitecture). We should also consider separate classes for loop vs
# multiinstance because having all these optional attributes is a nightmare

if not isinstance(self.task, (NoneTask, UserTask, BusinessRuleTask, ScriptTask, CallActivity, SubWorkflowTask)):
raise ValidationException(
f'Unsupported MultiInstance Task: {self.task.__class__}',
node=self.node,
file_name=self.filename)

self.task.loopTask = loop_task
self.task.isSequential = is_sequential
self.task.expanded = expanded
# make dot notation compatible with bmpmn path notation.
self.task.times = PathAttrib(loop_count.replace('.', '/')) if loop_count.find('.') > 0 else Attrib(loop_count)
self.task.elementVar = element_var
self.task.collection = collection
self.task.completioncondition = completion_condition

self.task.prevtaskclass = self.task.__module__ + "." + self.task.__class__.__name__
newtaskclass = getDynamicMIClass(self.get_id(),self.task.__class__)
self.task.__class__ = newtaskclass

def _detect_multiinstance(self):

multiinstance_element = first(self.xpath('./bpmn:multiInstanceLoopCharacteristics'))
if multiinstance_element is not None:
is_sequential = multiinstance_element.get('isSequential') == 'true'

element_var_text = multiinstance_element.attrib.get('{' + CAMUNDA_MODEL_NS + '}elementVariable')
collection_text = multiinstance_element.attrib.get('{' + CAMUNDA_MODEL_NS + '}collection')

loop_cardinality = first(self.xpath('./bpmn:multiInstanceLoopCharacteristics/bpmn:loopCardinality'))
if loop_cardinality is not None:
loop_count = loop_cardinality.text
elif collection_text is not None:
loop_count = collection_text
else:
loop_count = '1'

if collection_text is not None:
collection = PathAttrib(collection_text.replace('.', '/')) if collection_text.find('.') > 0 else Attrib(collection_text)
def _copy_task_attrs(self, original):

self.task.inputs = original.inputs
self.task.outputs = original.outputs
self.task.io_specification = original.io_specification
self.task.data_input_associations = original.data_input_associations
self.task.data_output_associations = original.data_output_associations
self.task.description = original.description

original.inputs = [self.task]
original.outputs = []
original.io_specification = None
original.data_input_associations = []
original.data_output_associations = []
original.name = f'{original.name} [child]'
self.task.task_spec = original.name
self.spec.task_specs[original.name] = original

def _add_loop_task(self, loop_characteristics):

maximum = loop_characteristics.attrib.get('loopMaximum')
if maximum is not None:
maximum = int(maximum)
condition = self.xpath('./bpmn:standardLoopCharacteristics/bpmn:loopCondition')
condition = condition[0].text if len(condition) > 0 else None
test_before = loop_characteristics.get('testBefore', 'false') == 'true'
if maximum is None and condition is None:
self.raise_validation_exception('A loopMaximum or loopCondition must be specified for Loop Tasks')

original = self.spec.task_specs.pop(self.task.name)
self.task = self.STANDARD_LOOP_CLASS(self.spec, original.name, '', maximum, condition, test_before)
self._copy_task_attrs(original)

def _add_multiinstance_task(self, loop_characteristics):

sequential = loop_characteristics.get('isSequential') == 'true'
prefix = 'bpmn:multiInstanceLoopCharacteristics'
cardinality = self.xpath(f'./{prefix}/bpmn:loopCardinality')
loop_input = self.xpath(f'./{prefix}/bpmn:loopDataInputRef')
if len(cardinality) == 0 and len(loop_input) == 0:
self.raise_validation_exception("A multiinstance task must specify a cardinality or a loop input data reference")
elif len(cardinality) > 0 and len(loop_input) > 0:
self.raise_validation_exception("A multiinstance task must specify exactly one of cardinality or loop input data reference")
cardinality = cardinality[0].text if len(cardinality) > 0 else None

loop_input = loop_input[0].text if len(loop_input) > 0 else None
if loop_input is not None:
if self.task.io_specification is not None:
try:
loop_input = [v for v in self.task.io_specification.data_inputs if v.name == loop_input][0]
except:
self.raise_validation_exception('The loop input data reference is missing from the IO specification')
else:
collection = None
loop_input = TaskDataReference(loop_input)

completion_condition = first(self.xpath('./bpmn:multiInstanceLoopCharacteristics/bpmn:completionCondition'))
if completion_condition is not None:
completion_condition = completion_condition.text
input_item = self.xpath(f'./{prefix}/bpmn:inputDataItem')
input_item = self.create_data_spec(input_item[0], TaskDataReference) if len(input_item) > 0 else None

self._set_multiinstance_attributes(is_sequential, 1, loop_count,
element_var=element_var_text,
collection=collection,
completion_condition=completion_condition)

elif len(self.xpath('./bpmn:standardLoopCharacteristics')) > 0:
self._set_multiinstance_attributes(True, 25, STANDARDLOOPCOUNT, loop_task=True)
loop_output = self.xpath(f'./{prefix}/bpmn:loopDataOutputRef')
loop_output = loop_output[0].text if len(loop_output) > 0 else None
if loop_output is not None:
if self.task.io_specification is not None:
try:
refs = set(self.task.io_specification.data_inputs + self.task.io_specification.data_outputs)
loop_output = [v for v in refs if v.name == loop_output][0]
except:
self.raise_validation_exception('The loop output data reference is missing from the IO specification')
else:
loop_output = TaskDataReference(loop_output)

output_item = self.xpath(f'./{prefix}/bpmn:outputDataItem')
output_item = self.create_data_spec(output_item[0], TaskDataReference) if len(output_item) > 0 else None

condition = self.xpath(f'./{prefix}/bpmn:completionCondition')
condition = condition[0].text if len(condition) > 0 else None

original = self.spec.task_specs.pop(self.task.name)
params = {
'task_spec': '',
'cardinality': cardinality,
'data_input': loop_input,
'data_output':loop_output,
'input_item': input_item,
'output_item': output_item,
'condition': condition,
}
if sequential:
self.task = self.SEQUENTIAL_MI_CLASS(self.spec, original.name, **params)
else:
self.task = self.PARALLEL_MI_CLASS(self.spec, original.name, **params)
self._copy_task_attrs(original)

def _add_boundary_event(self, children):

Expand All @@ -133,9 +165,7 @@ def _add_boundary_event(self, children):
child = self.process_parser.parse_node(event)
if isinstance(child.event_definition, CancelEventDefinition) \
and not isinstance(self.task, TransactionSubprocess):
raise ValidationException('Cancel Events may only be used with transactions',
node=self.node,
file_name=self.filename)
self.raise_validation_exception('Cancel Events may only be used with transactions')
parent.connect(child)
return parent

Expand All @@ -153,7 +183,17 @@ def parse_node(self):
self.task.data_input_associations = self.parse_incoming_data_references()
self.task.data_output_associations = self.parse_outgoing_data_references()

self._detect_multiinstance()
io_spec = self.xpath('./bpmn:ioSpecification')
if len(io_spec) > 0:
self.task.io_specification = self.parse_io_spec()

loop_characteristics = self.xpath('./bpmn:standardLoopCharacteristics')
if len(loop_characteristics) > 0:
self._add_loop_task(loop_characteristics[0])

mi_loop_characteristics = self.xpath('./bpmn:multiInstanceLoopCharacteristics')
if len(mi_loop_characteristics) > 0:
self._add_multiinstance_task(mi_loop_characteristics[0])

boundary_event_nodes = self.doc_xpath('.//bpmn:boundaryEvent[@attachedToRef="%s"]' % self.get_id())
if boundary_event_nodes:
Expand All @@ -164,21 +204,14 @@ def parse_node(self):
children = []
outgoing = self.doc_xpath('.//bpmn:sequenceFlow[@sourceRef="%s"]' % self.get_id())
if len(outgoing) > 1 and not self.handles_multiple_outgoing():
raise ValidationException(
'Multiple outgoing flows are not supported for '
'tasks of type',
node=self.node,
file_name=self.filename)
self.raise_validation_exception('Multiple outgoing flows are not supported for tasks of type')
for sequence_flow in outgoing:
target_ref = sequence_flow.get('targetRef')
try:
target_node = one(self.doc_xpath('.//bpmn:*[@id="%s"]'% target_ref))
except:
raise ValidationException(
'When looking for a task spec, we found two items, '
'perhaps a form has the same ID? (%s)' % target_ref,
node=self.node,
file_name=self.filename)
self.raise_validation_exception('When looking for a task spec, we found two items, '
'perhaps a form has the same ID? (%s)' % target_ref)

c = self.process_parser.parse_node(target_node)
position = c.position
Expand Down Expand Up @@ -233,3 +266,4 @@ def handles_multiple_outgoing(self):
outgoing sequence flows.
"""
return False

25 changes: 25 additions & 0 deletions SpiffWorkflow/bpmn/parser/node_parser.py
@@ -1,4 +1,5 @@
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException
from SpiffWorkflow.bpmn.specs.data_spec import TaskDataReference, BpmnIoSpecification
from .util import first

DEFAULT_NSMAP = {
Expand Down Expand Up @@ -59,6 +60,27 @@ def parse_outgoing_data_references(self):
raise ValidationException(f'Cannot resolve dataOutputAssociation {name}', self.node, self.filename)
return specs

def parse_io_spec(self):
data_refs = {}
for elem in self.xpath('./bpmn:ioSpecification/bpmn:dataInput'):
ref = self.create_data_spec(elem, TaskDataReference)
data_refs[ref.name] = ref
for elem in self.xpath('./bpmn:ioSpecification/bpmn:dataOutput'):
ref = self.create_data_spec(elem, TaskDataReference)
data_refs[ref.name] = ref

inputs, outputs = [], []
for ref in self.xpath('./bpmn:ioSpecification/bpmn:inputSet/bpmn:dataInputRefs'):
if ref.text in data_refs:
inputs.append(data_refs[ref.text])
for ref in self.xpath('./bpmn:ioSpecification/bpmn:outputSet/bpmn:dataOutputRefs'):
if ref.text in data_refs:
outputs.append(data_refs[ref.text])
return BpmnIoSpecification(inputs, outputs)

def create_data_spec(self, item, cls):
return cls(item.attrib.get('id'), item.attrib.get('name'))

def parse_extensions(self, node=None):
extensions = {}
extra_ns = {'camunda': CAMUNDA_MODEL_NS}
Expand All @@ -84,3 +106,6 @@ def _xpath(self, node, xpath, extra_ns=None):
else:
nsmap = self.nsmap
return node.xpath(xpath, namespaces=nsmap)

def raise_validation_exception(self, message):
raise ValidationException(message, self.node, self.filename)

0 comments on commit 590903f

Please sign in to comment.