In [1]:
xmlNs = {"cmmn": "http://www.omg.org/spec/CMMN/20151109/MODEL"}
xmlPath = "/Users/wvw/Dropbox/research/projects/Montfort PM/cmmn/cap.cmmn"

from rdflib import Namespace
modelNs = Namespace("http://ontario.org/qbp/cap#")
ttlPath = 'ttl/cap.ttl'

# Parse

In [2]:
import xml.etree.ElementTree as ET
import pprint

def parse(path, ns):
    tree = ET.parse(path)
    for prefix, uri in ns.items():
        ET.register_namespace(prefix, uri)
    root = tree.getroot()
    
    itemObjs = {}
    planItems = root.findall(".//cmmn:planItem", namespaces=ns)

    for planItem in planItems:
        defRef = planItem.attrib['definitionRef']
        
        itemObj = {
            'sentries': {
                'entry': [],
                'exit': []
            }
        }
        itemObjs[defRef] = itemObj
        
        reqRules = planItem.findall(".//cmmn:itemControl/cmmn:requiredRule", namespaces=ns)
        mandatory = len(reqRules) > 0
        itemObj['mandatory'] = mandatory    
        
        label = None; type = None
        if defRef.startswith("Task"):
            labelNode = root.findall(f".//cmmn:task[@id='{defRef}']", namespaces=ns)[0]
            label = labelNode.attrib['name']
            type = 'Task'
        elif defRef.startswith("Stage"):
            stage = root.findall(f".//cmmn:stage[@id='{defRef}']", namespaces=ns)[0]
            label = stage.attrib['name']
            itemObj['children'] = []
            for childPlanItem in stage.findall(".//cmmn:planItem", namespaces=ns):
                itemObj['children'].append(childPlanItem.attrib['definitionRef'])
            type = 'Stage'
        elif defRef.startswith("Milestone"):
            milestone = root.findall(f".//cmmn:milestone[@id='{defRef}']", namespaces=ns)[0]
            label = milestone.attrib['name']
            type = 'Milestone'
        else:
            continue
            
        itemObj['label'] = label
        itemObj['type'] = type
        
        itemObj['states'] = [ ( 'Inactive', 'init', 0 ) ]
            
        # print(">", label, ("(mandatory)" if mandatory else ""))
        
        for entryCrit in planItem.findall("cmmn:entryCriterion", namespaces=ns): 
            # print("- sentry")              
            sentry = root.findall(f".//cmmn:sentry[@id='{entryCrit.attrib['sentryRef']}']", namespaces=ns)[0]
            
            sentryObj = {
                'id': sentry.attrib['id'],
                'items': [],
                'conditions': []
            }
            itemObj['sentries']['entry'].append(sentryObj)
            
            itemParts = sentry.findall(".//cmmn:planItemOnPart", namespaces=ns)
            for itemPart in itemParts:
                sentryItemObj = { 'id': itemPart.attrib['id'] }
                sourcePlanItem = root.findall(f".//cmmn:planItem[@id='{itemPart.attrib['sourceRef']}']", namespaces=ns)[0]
                sourceDefRef = sourcePlanItem.attrib['definitionRef']
                sentryItemObj['source'] = sourceDefRef
                # print("source:", sourceDefRef)
                
                events = itemPart.findall(".//cmmn:standardEvent", namespaces=ns)
                if len(events) > 0:
                    eventLabel = events[0].text
                    sentryItemObj['event'] = eventLabel
                    # print("event:", eventLabel)
            
                sentryObj['items'].append(sentryItemObj)
            
            associations = root.findall(f".//cmmn:association[@sourceRef='{entryCrit.attrib['id']}']", namespaces=ns)
            for association in associations:
                condItemObj = { 'id': association.attrib['targetRef'] }
                textAnnotations = root.findall(f".//cmmn:textAnnotation[@id='{association.attrib['targetRef']}']", namespaces=ns)
                if len(textAnnotations) > 0:
                    textAnnotation = textAnnotations[0].findall(".//cmmn:text", namespaces=ns)[0].text
                    # print("condition:", textAnnotation)
                    condItemObj['text'] = textAnnotation
                    sentryObj['conditions'].append(condItemObj)
                    
        # print("")
    
    return itemObjs

In [3]:
itemObjs = parse(xmlPath, xmlNs)
pprint.pprint(itemObjs)

{'Milestone_0gl7st9': {'label': 'hospital admission',
                       'mandatory': False,
                       'sentries': {'entry': [], 'exit': []},
                       'states': [('Inactive', 'init', 0)],
                       'type': 'Milestone'},
 'Stage_1sveclw': {'children': ['Task_0kvjrmn',
                                'Task_0hsxzhz',
                                'Task_0rwjaka',
                                'Task_0akqm8t',
                                'Task_022k641'],
                   'label': '',
                   'mandatory': False,
                   'sentries': {'entry': [{'conditions': [],
                                           'id': 'Sentry_1a74myc',
                                           'items': [{'event': 'occur',
                                                      'id': 'PlanItemOnPart_1y7o923',
                                                      'source': 'Milestone_0gl7st9'}]}],
                                'exit': []},
    

# Convert

In [4]:
from rdflib import Literal, Graph, BNode, RDF, RDFS, XSD, URIRef
from rdflib.collection import Collection
from util import rdf_coll
from convert_base import str_to_uri

CM = Namespace("http://rdf.org/cmmn#")
ST = Namespace("http://rdf.org/state#")

def convert(itemObjs, modelNs, dest):
    g = Graph()

    for defRef, itemObj in itemObjs.items():
        planItemUri = str_to_uri(defRef, modelNs)

        if itemObj['label'].strip() != "":
            g.add((planItemUri, RDFS['label'], Literal(itemObj['label'])))
        
        g.add((planItemUri, CM['isMandatory'], Literal(itemObj['mandatory'], datatype=XSD['boolean'])))

        g.add((planItemUri, RDF['type'], CM['PlanItem']))    
        g.add((planItemUri, RDF['type'], CM[itemObj['type']]))
        if itemObj['type'] == 'Stage':
            for child in itemObj['children']:
                g.add((planItemUri, CM['hasChild'], str_to_uri(child, modelNs)))
        
        curState = itemObj['states'][-1]
        g.add((planItemUri, ST['in'], rdf_coll(g, ST[curState[0]], Literal(curState[1]), Literal(curState[2]))))
                
        for sentry in itemObj['sentries']['entry']:
            sentryUri = str_to_uri(sentry['id'], modelNs)
            g.add((planItemUri, CM['hasSentry'], sentryUri))
            g.add((sentryUri, RDF['type'], CM['Sentry']))
            g.add((sentryUri, RDF['type'], CM['EntrySentry']))
            
            for item in sentry['items']:
                planItemPartUri = str_to_uri(item['id'], modelNs)
                g.add((sentryUri, CM['hasPlanItemPart'], planItemPartUri))
                
                sourceUri = str_to_uri(item['source'], modelNs)
                g.add((planItemPartUri, CM['hasSource'], sourceUri))
                
                eventLabel = item['event']
                g.add((planItemPartUri, CM['hasEvent'], Literal(eventLabel)))
                
            for condition in sentry['conditions']:
                conditionUri = str_to_uri(condition['id'], modelNs)
                g.add((sentryUri, CM['hasCondition'], conditionUri))
                
                g.add((conditionUri, RDFS['comment'], Literal(condition['text'])))
                
    g.serialize(format="n3", destination=dest)

# Reason

In [21]:
def run(itemObjs, modelNs, dataPath):
    from util import run
    out = run(['eye', 'n3/state.n3', 'n3/workflow.n3', dataPath, '--nope', '--pass-only-new'])

    import re
    nsStr = modelNs.__str__()
    for defRef, state, reason, id in re.findall(f"<{nsStr}(.*?)> state:in \(\s*state:(.*?) \"(.*?)\" (\d+)\s*\)", out):
        itemObjs[defRef]['states'].append((state, reason, id))

In [6]:
def reason(itemObjs, modelNs, dataPath, nr):
    dataPath2 = dataPath[0:dataPath.index(".")] + str(nr) + dataPath[dataPath.index("."):]
        
    convert(itemObjs, modelNs, dataPath2)
    run(itemObjs, modelNs, dataPath2)

    for defRef, itemObj in itemObjs.items():
        print(itemObj['label'], f"({defRef})", itemObj['states'])

In [10]:
# initialize

import copy
itemObjs2 = copy.deepcopy(itemObjs)

reason(itemObjs2, modelNs, ttlPath, 0)

x-ray (Task_0u6b4ee) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1'), ('Active', 'noSentries', '2')]
antibiotics (Task_10t4i0s) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1')]
 (Stage_1sveclw) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1')]
hospital admission (Milestone_0gl7st9) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1'), ('Active', 'noSentries', '2')]
urine antigen testing (Task_1dy8o18) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1')]
PCR for legionella (Task_1vk3nm4) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1')]
Blood cultures (Task_1hoczvg) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1')]
sputum culture (Task_0kvjrmn) [('Inactive', 'init', 0)]
gram staining (Task_0hsxzhz) [('Inactive', 'init', 0)]
Blood gases, lactate, liver enzyme and liver function tests (Task_0rwjaka) [('Inactive', 'init', 0)]
routine bloodwork: CBC, electrolytes, renal function (Task_0akqm8t) [('Inactive', 'init', 0)]
oxygen saturation (Task_022k641) [(

In [11]:
# first observation
states = itemObjs2['Milestone_0gl7st9']['states']
states.append(('Occurred', 'observation', len(states)))

In [22]:
reason(itemObjs2, modelNs, ttlPath, 1)

x-ray (Task_0u6b4ee) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1'), ('Active', 'noSentries', '2')]
antibiotics (Task_10t4i0s) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1')]
 (Stage_1sveclw) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1'), ('Active', 'sentryFired', '2')]
hospital admission (Milestone_0gl7st9) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1'), ('Active', 'noSentries', '2'), ('Occurred', 'observation', 3)]
urine antigen testing (Task_1dy8o18) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1')]
PCR for legionella (Task_1vk3nm4) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1')]
Blood cultures (Task_1hoczvg) [('Inactive', 'init', 0), ('Ready', 'notInStage', '1')]
sputum culture (Task_0kvjrmn) [('Inactive', 'init', 0), ('Ready', 'inActiveStage', '1')]
gram staining (Task_0hsxzhz) [('Inactive', 'init', 0), ('Ready', 'inActiveStage', '1')]
Blood gases, lactate, liver enzyme and liver function tests (Task_0rwjaka) [('Inactive', 'init', 0),