Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/receive message correlations #401

Merged
merged 7 commits into from
Apr 25, 2024
7 changes: 7 additions & 0 deletions SpiffWorkflow/bpmn/parser/BpmnParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(self, xsd_path=XSD_PATH, imports=None):
self.validator = etree.XMLSchema(schema)

def validate(self, bpmn, filename=None):
self.preprocess(bpmn)
try:
self.validator.assertValid(bpmn)
except ValidationException as ve:
Expand All @@ -102,6 +103,12 @@ def validate(self, bpmn, filename=None):
raise ValidationException(last_error.message, file_name=filename,
line_number=last_error.line)

def preprocess(self, bpmn):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice job working around this.

# BPMN js creates invalid XML for message correlation properties and it is apparently too difficult to change
# therefore, I'll just preprocess the XML and replace the tag in order to continue validating the XML
for expr in bpmn.xpath('.//bpmn:correlationPropertyRetrievalExpression/bpmn:formalExpression', namespaces=DEFAULT_NSMAP):
expr.tag = '{' + DEFAULT_NSMAP['bpmn'] + '}messagePath'

class BpmnParser(object):
"""
The BpmnParser class is a pluggable base class that manages the parsing of
Expand Down
6 changes: 5 additions & 1 deletion SpiffWorkflow/bpmn/specs/event_definitions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ def throw(self, my_task):
event = BpmnEvent(self)
my_task.workflow.top_workflow.catch(event)

def update_task(self, my_task):
"""This method allows events to implement update behavior for the task"""
pass

def update_task_data(self, my_task):
"""This method allows events with payloads mrege them into the task"""
"""This method allows events with payloads to merge them into the task"""
pass

def reset(self, my_task):
Expand Down
20 changes: 16 additions & 4 deletions SpiffWorkflow/bpmn/specs/event_definitions/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,29 @@ def update_task_data(self, my_task):
if payload is not None:
my_task.set_data(**payload)

def get_correlations(self, task, payload):
def get_correlations(self, my_task, payload):
return self.calculate_correlations(
my_task.workflow.script_engine,
self.correlation_properties,
payload
)

def calculate_correlations(self, script_engine, cp, ctx):
correlations = {}
for property in self.correlation_properties:
for key in property.correlation_keys:
for prop in cp:
value = script_engine.environment.evaluate(prop.retrieval_expression, ctx)
for key in prop.correlation_keys:
if key not in correlations:
correlations[key] = {}
try:
correlations[key][property.name] = task.workflow.script_engine.environment.evaluate(property.retrieval_expression, payload)
correlations[key][prop.name] = value
except WorkflowException:
# Just ignore missing keys. The dictionaries have to match exactly
pass
if len(prop.correlation_keys) == 0:
if self.name not in correlations:
correlations[self.name] = {}
correlations[self.name][prop.name] = value
return correlations

def details(self, my_task):
Expand Down
8 changes: 8 additions & 0 deletions SpiffWorkflow/bpmn/specs/event_definitions/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from calendar import monthrange
from time import timezone as tzoffset, altzone as dstoffset, struct_time, localtime

from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.bpmn.util import PendingBpmnEvent
from .base import EventDefinition

Expand Down Expand Up @@ -200,6 +201,13 @@ def cycle_complete(self, my_task):
my_task._set_internal_data(event_value=event_value)
return ready

def update_task(self, my_task):
if self.cycle_complete(my_task):
for output in my_task.task_spec.outputs:
child = my_task._add_child(output, TaskState.FUTURE)
child.task_spec._predict(child, mask=TaskState.NOT_FINISHED_MASK)
child.task_spec._update(child)

def details(self, my_task):
event_value = my_task._get_internal_data('event_value')
if event_value is not None and event_value['cycles'] != 0:
Expand Down
8 changes: 1 addition & 7 deletions SpiffWorkflow/bpmn/specs/mixins/events/event_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ def _update_hook(self, my_task):
elif my_task.state != TaskState.WAITING:
my_task._set_state(TaskState.WAITING)

if isinstance(self.event_definition, CycleTimerEventDefinition):
if self.event_definition.cycle_complete(my_task):
for output in self.outputs:
child = my_task._add_child(output, TaskState.FUTURE)
child.task_spec._predict(child, mask=TaskState.NOT_FINISHED_MASK)
child.task_spec._update(child)

self.event_definition.update_task(my_task)

def _run_hook(self, my_task):

Expand Down
44 changes: 35 additions & 9 deletions SpiffWorkflow/spiff/parser/event_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

import warnings

from SpiffWorkflow.bpmn.parser.event_parsers import EventDefinitionParser, ReceiveTaskParser
from SpiffWorkflow.bpmn.parser.event_parsers import (
StartEventParser,
Expand All @@ -32,12 +34,30 @@
ErrorEventDefinition,
EscalationEventDefinition,
)
from SpiffWorkflow.bpmn.parser.util import one
from SpiffWorkflow.spiff.parser.task_spec import SpiffTaskParser
from SpiffWorkflow.bpmn.parser.util import one, first
from SpiffWorkflow.bpmn.specs.event_definitions.message import CorrelationProperty
from SpiffWorkflow.spiff.parser.task_spec import SpiffTaskParser, SPIFFWORKFLOW_NSMAP


class SpiffEventDefinitionParser(SpiffTaskParser, EventDefinitionParser):

def parse_message_extensions(self, node):
expression = first(node.xpath('.//spiffworkflow:messagePayload', namespaces=SPIFFWORKFLOW_NSMAP))
variable = first(node.xpath('.//spiffworkflow:messageVariable', namespaces=SPIFFWORKFLOW_NSMAP))
if expression is not None:
expression = expression.text
if variable is not None:
variable = variable.text
return expression, variable

def parse_process_correlations(self, node):
correlations = []
for prop in node.xpath('.//spiffworkflow:processVariableCorrelation', namespaces=SPIFFWORKFLOW_NSMAP):
key = one(prop.xpath('./spiffworkflow:propertyId', namespaces=SPIFFWORKFLOW_NSMAP))
expression = one(prop.xpath('./spiffworkflow:expression', namespaces=SPIFFWORKFLOW_NSMAP))
correlations.append(CorrelationProperty(key.text, expression.text, []))
return correlations

def parse_message_event(self, message_event):
"""Parse a Spiff message event."""

Expand All @@ -48,17 +68,23 @@ def parse_message_event(self, message_event):
except Exception:
self.raise_validation_error('Expected a Message node', node=message_event)
name = message.get('name')
extensions = self.parse_extensions(message)
expression, variable = self.parse_message_extensions(message)
if expression is not None or variable is not None:
warnings.warn(
'spiffworkflow:messagePayload and spiffworkflow:messageVariable have been moved to the bpmn:messageDefinition element',
DeprecationWarning,
stacklevel=2,
)
else:
expression, variable = self.parse_message_extensions(message_event)
correlations = self.get_message_correlations(message_ref)
process_correlations = self.parse_process_correlations(message_event)
event_def = MessageEventDefinition(name, correlations, expression, variable, process_correlations)
else:
name = message_event.getparent().get('name')
extensions = {}
correlations = []
event_def = MessageEventDefinition(name)

return MessageEventDefinition(name, correlations,
expression=extensions.get('messagePayload'),
message_var=extensions.get('messageVariable')
)
return event_def

def parse_signal_event(self, signal_event):
"""Parse a Spiff signal event"""
Expand Down
15 changes: 6 additions & 9 deletions SpiffWorkflow/spiff/parser/task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
BusinessRuleTask
)

SPIFFWORKFLOW_MODEL_NS = 'http://spiffworkflow.org/bpmn/schema/1.0/core'
SPIFFWORKFLOW_MODEL_PREFIX = 'spiffworkflow'
SPIFFWORKFLOW_NSMAP = {'spiffworkflow': 'http://spiffworkflow.org/bpmn/schema/1.0/core'}


class SpiffTaskParser(TaskParser):
Expand All @@ -50,9 +49,8 @@ def _parse_extensions(node):
# Too bad doing this works in such a stupid way.
# We should set a namespace and automatically do this.
extensions = {}
extra_ns = {SPIFFWORKFLOW_MODEL_PREFIX: SPIFFWORKFLOW_MODEL_NS}
xpath = xpath_eval(node, extra_ns)
extension_nodes = xpath(f'./bpmn:extensionElements/{SPIFFWORKFLOW_MODEL_PREFIX}:*')
xpath = xpath_eval(node, SPIFFWORKFLOW_NSMAP)
extension_nodes = xpath(f'./bpmn:extensionElements/spiffworkflow:*')
for node in extension_nodes:
name = etree.QName(node).localname
if name == 'properties':
Expand All @@ -68,7 +66,7 @@ def _parse_extensions(node):
@classmethod
def _node_children_by_tag_name(cls, node, tag_name):
xpath = cls._spiffworkflow_ready_xpath_for_node(node)
return xpath(f'.//{SPIFFWORKFLOW_MODEL_PREFIX}:{tag_name}')
return xpath(f'.//spiffworkflow:{tag_name}')

@classmethod
def _parse_properties(cls, node):
Expand All @@ -80,8 +78,7 @@ def _parse_properties(cls, node):

@staticmethod
def _spiffworkflow_ready_xpath_for_node(node):
extra_ns = {SPIFFWORKFLOW_MODEL_PREFIX: SPIFFWORKFLOW_MODEL_NS}
return xpath_eval(node, extra_ns)
return xpath_eval(node, SPIFFWORKFLOW_NSMAP)

@classmethod
def _parse_script_unit_tests(cls, node):
Expand Down Expand Up @@ -113,7 +110,7 @@ def _parse_servicetask_operator(cls, node):
def _copy_task_attrs(self, original, loop_characteristics):
# I am so disappointed I have to do this.
super()._copy_task_attrs(original)
if loop_characteristics.attrib.get('{' + SPIFFWORKFLOW_MODEL_NS + '}' + 'scriptsOnInstances') != 'true':
if loop_characteristics.xpath('@spiffworkflow:scriptsOnInstances', namespaces=SPIFFWORKFLOW_NSMAP) != ['true']:
self.task.prescript = original.prescript
self.task.postscript = original.postscript
original.prescript = None
Expand Down
2 changes: 2 additions & 0 deletions SpiffWorkflow/spiff/serializer/event_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ def to_dict(self, event_definition):
dct['correlation_properties'] = self.correlation_properties_to_dict(event_definition.correlation_properties)
dct['expression'] = event_definition.expression
dct['message_var'] = event_definition.message_var
dct['process_correlations'] = self.correlation_properties_to_dict(event_definition.process_correlations)
return dct

def from_dict(self, dct):
dct['correlation_properties'] = self.correlation_properties_from_dict(dct['correlation_properties'])
dct['process_correlations'] = self.correlation_properties_from_dict(dct['process_correlations'])
event_definition = super().from_dict(dct)
return event_definition

Expand Down
11 changes: 10 additions & 1 deletion SpiffWorkflow/spiff/specs/event_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@

class MessageEventDefinition(MessageEventDefinition):

def __init__(self, name, correlation_properties=None, expression=None, message_var=None, **kwargs):
def __init__(self, name, correlation_properties=None, expression=None, message_var=None, process_correlations=None, **kwargs):
super(MessageEventDefinition, self).__init__(name, correlation_properties, **kwargs)
self.expression = expression
self.message_var = message_var
self.process_correlations = process_correlations or []

def throw(self, my_task):
payload = my_task.workflow.script_engine.evaluate(my_task, self.expression)
Expand All @@ -40,6 +41,14 @@ def throw(self, my_task):
my_task.workflow.correlations.update(correlations)
my_task.workflow.top_workflow.catch(event)

def update_task(self, my_task):
correlations = self.calculate_correlations(
my_task.workflow.script_engine,
self.process_correlations,
my_task.data
)
my_task.workflow.correlations.update(correlations)

def update_task_data(self, my_task):
if self.message_var is not None:
my_task.data[self.message_var] = my_task.internal_data.pop(self.name)
Expand Down
4 changes: 2 additions & 2 deletions tests/SpiffWorkflow/spiff/BaseTestCase.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ def load_workflow_spec(self, filename, process_name, dmn_filename=None, validate

def load_collaboration(self, filename, collaboration_name):
f = os.path.join(os.path.dirname(__file__), 'data', filename)
parser = SpiffBpmnParser()
parser = SpiffBpmnParser(validator=VALIDATOR)
parser.add_bpmn_files_by_glob(f)
return parser.get_collaboration(collaboration_name)

def get_all_specs(self, filename):
f = os.path.join(os.path.dirname(__file__), 'data', filename)
parser = SpiffBpmnParser()
parser = SpiffBpmnParser(validator=VALIDATOR)
parser.add_bpmn_files_by_glob(f)
return parser.find_all_specs()
36 changes: 35 additions & 1 deletion tests/SpiffWorkflow/spiff/CorrelationTest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
from SpiffWorkflow.bpmn import BpmnWorkflow, BpmnEvent
from SpiffWorkflow import TaskState

from .BaseTestCase import BaseTestCase

Expand Down Expand Up @@ -53,3 +54,36 @@ def testTwoCorrelatonKeys(self):
self.assertNotIn('message_correlation_key_one', messages[1].correlations)
self.assertIn('message_correlation_key_two', messages[1].correlations)
self.assertNotIn('message_correlation_key_two', messages[0].correlations)


class ReceiveCorrelationTest(BaseTestCase):

def testReceiveCorrelations(self):
self.actual_test()

def testReceiveCorrelationsSaveRestore(self):
self.actual_test(True)

def actual_test(self, save_restore=False):
spec, subprocesses = self.load_workflow_spec('receive_correlations.bpmn', 'correlation-test')
self.workflow = BpmnWorkflow(spec, subprocesses)
if save_restore:
self.save_restore()
self.workflow.do_engine_steps()
task = self.workflow.get_next_task(state=TaskState.READY)
task.data.update(value_1='a', value_2='b')
task.run()
self.workflow.do_engine_steps()
self.assertEqual(self.workflow.correlations, {'message': {'prop_1': 'a', 'prop_2': 'b'}})
waiting_task = self.workflow.get_next_task(state=TaskState.WAITING)
event_def = waiting_task.task_spec.event_definition
payload = {'msg_value_1': 'a', 'msg_value_2': 'b'}
correlations = event_def.calculate_correlations(
waiting_task.workflow.script_engine,
event_def.correlation_properties,
payload
)
event = BpmnEvent(event_def, payload, correlations)
self.workflow.catch(event)
self.workflow.do_engine_steps()
self.assertTrue(self.workflow.is_completed)
32 changes: 18 additions & 14 deletions tests/SpiffWorkflow/spiff/data/correlation.bpmn
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,8 @@
<bpmn:messagePath>init_id</bpmn:messagePath>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:message id="Message_19nm5f5" name="init_proc_2">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{'num': task_num, 'name': task_name}</spiffworkflow:messagePayload>
<spiffworkflow:messageVariable>source_task</spiffworkflow:messageVariable>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="Message_0fc1gu7" name="proc_2_response">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{'init_id': source_task['num'], 'response': response}</spiffworkflow:messagePayload>
<spiffworkflow:messageVariable>response</spiffworkflow:messageVariable>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="Message_19nm5f5" name="init_proc_2" />
<bpmn:message id="Message_0fc1gu7" name="proc_2_response" />
<bpmn:process id="proc_1" name="Process 1" isExecutable="true">
<bpmn:sequenceFlow id="Flow_0lrjj2a" sourceRef="StartEvent_1" targetRef="subprocess" />
<bpmn:sequenceFlow id="Flow_0gp7t8p" sourceRef="subprocess" targetRef="Event_0qga5tr" />
Expand All @@ -53,11 +43,17 @@
<bpmn:sequenceFlow id="Flow_02xt17l" sourceRef="configure" targetRef="start_proc_2" />
<bpmn:sequenceFlow id="Flow_0ts36fv" sourceRef="start_proc_2" targetRef="get_response" />
<bpmn:sendTask id="start_proc_2" name="Start Process 2" messageRef="Message_19nm5f5">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{'num': task_num, 'name': task_name}</spiffworkflow:messagePayload>
</bpmn:extensionElements>
<bpmn:incoming>Flow_02xt17l</bpmn:incoming>
<bpmn:outgoing>Flow_0ts36fv</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:sequenceFlow id="Flow_17cd3h6" sourceRef="get_response" targetRef="subprocess_end" />
<bpmn:receiveTask id="get_response" name="Await Response" messageRef="Message_0fc1gu7">
<bpmn:extensionElements>
<spiffworkflow:messageVariable>response</spiffworkflow:messageVariable>
</bpmn:extensionElements>
<bpmn:incoming>Flow_0ts36fv</bpmn:incoming>
<bpmn:outgoing>Flow_17cd3h6</bpmn:outgoing>
</bpmn:receiveTask>
Expand All @@ -69,7 +65,11 @@
<bpmn:process id="proc_2" name="Process 2" isExecutable="true">
<bpmn:startEvent id="message_start" name="Message Start">
<bpmn:outgoing>Flow_0qafvbe</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_12ck2a4" messageRef="Message_19nm5f5" />
<bpmn:messageEventDefinition id="MessageEventDefinition_12ck2a4" messageRef="Message_19nm5f5">
<bpmn:extensionElements>
<spiffworkflow:messageVariable>source_task</spiffworkflow:messageVariable>
</bpmn:extensionElements>
</bpmn:messageEventDefinition>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0k7rc31" sourceRef="respond" targetRef="Event_01itene" />
<bpmn:sequenceFlow id="Flow_0qafvbe" sourceRef="message_start" targetRef="prepare_response" />
Expand All @@ -81,7 +81,11 @@
<bpmn:intermediateThrowEvent id="respond" name="Respond">
<bpmn:incoming>Flow_12j0ayf</bpmn:incoming>
<bpmn:outgoing>Flow_0k7rc31</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_0z73w20" messageRef="Message_0fc1gu7" />
<bpmn:messageEventDefinition id="MessageEventDefinition_0z73w20" messageRef="Message_0fc1gu7">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{'init_id': source_task['num'], 'response': response}</spiffworkflow:messagePayload>
</bpmn:extensionElements>
</bpmn:messageEventDefinition>
</bpmn:intermediateThrowEvent>
<bpmn:endEvent id="Event_01itene">
<bpmn:incoming>Flow_0k7rc31</bpmn:incoming>
Expand Down
Loading
Loading