In [2]:
import xmltodict
from typing import OrderedDict, List
from collections import OrderedDict
import abc
import pandas as pd


In [3]:
class BPMNParser():
    
    def simple_load(self, file_name:str)->OrderedDict:
        with open(file_name,"r") as file:
            process = xmltodict.parse(file.read()).get("bpmn:definitions", {}).get("bpmn:process", {})
        return process
          
    def load(self,file_name:str)->OrderedDict:
        process = self.simple_load(file_name)
        needed_checks = [self._check_start, self._check_end, self._check_task, self._check_exclusiveGateways, self._check_inclusiveGateways, self._check_parallelGateways]
        for check in needed_checks:
            process = check(process)
        return process
    
    def _transform_outgoing(self, outgoing, process):
        if type(outgoing) == str:
            flow = self.find_flow(process,outgoing)
            return {"@id": outgoing, "@name":flow.get("@name", "no_name"),"@targetRef": flow.get("@targetRef", None)}
        else:
            for key, flow_id in enumerate(outgoing):
                flow = self.find_flow(process, flow_id)
                outgoing[key] = {"@id": flow_id, "@name":flow.get("@name", "no_name"),"@targetRef": flow.get("@targetRef", None)}  
        return outgoing
    
    def _check_start(self, process:OrderedDict)->OrderedDict:
        start = process.get("bpmn:startEvent", None)
        assert type(start) == OrderedDict, "No Start Event found or multiple startevents"
        assert start.get("bpmn:incoming", None) is None, "startEvent can't have incoming flows"
        outgoing = start.get("bpmn:outgoing", None)
        assert type(outgoing) == str, "Start Event has mulitple Outgoing events or isn't connected"
        process["bpmn:startEvent"]["bpmn:outgoing"] = self._transform_outgoing(outgoing, process)
        return process
    
    def _check_end(self, process):
        end = process.get("bpmn:endEvent", None)
        assert end is not None, "Process needs an EndEvent!"
        if type(end) == OrderedDict:end = [end]
        for el in end:
            assert (el.get("bpmn:outgoing", None)) is None, "EndEvents cant have outgoing flows"
            inc = el.get("bpmn:incoming", None)
            assert type(inc) == str, "EndEvents cant have multiple incoming flows please use a closing Gate"
        return process
    
    def _check_task(self, process:OrderedDict)->OrderedDict:
        task = process.get("bpmn:task", None)
        if task is not None and type(task) == OrderedDict:
            assert type(task.get("@name", None)) is not None, "Task needs Name/Operation to be used in any meaningful way lol"
            assert type(task.get("bpmn:incoming", None)) == str, "Tasks needs exactly one incoming flow"
            out = task.get("bpmn:outgoing", None) 
            assert type(out) == str, "Task needs exactly one outgoing flow"
            process["bpmn:task"]["bpmn:outgoing"] = self._transform_outgoing(out,process)
        elif type(task) != list:return process
        else:
            for i,t in enumerate(task):
                assert type(t.get("@name", None)) is not None, "Task needs Name/Operation to be used in any meaningful way lol"
                assert type(t.get("bpmn:incoming", None)) == str, "Tasks needs exactly one incoming flow"
                out = t.get("bpmn:outgoing", None) 
                assert type(out) == str, "Task needs exactly one outgoing flow"
                process["bpmn:task"][i]["bpmn:outgoing"] = self._transform_outgoing(out,process)
        return process
    
    def _check_Gateways(self, process:OrderedDict, gate_type:str, default_op:str, check:bool)->OrderedDict:
        gateways = process.get(gate_type, None)
        if gateways is None: return process
        elif type(gateways) == list:
            for key,gateway in enumerate(gateways):
                inc = gateway.get("bpmn:incoming", None)
                out = gateway.get("bpmn:outgoing", None)
                assert (inc is not None or out is not None) and type(inc) != type(out), "gateways should be conncted properly"
                if type(inc) == list:
                    assert type(out) == str, "Closing Gate can't be also an Opening Gate"
                    process[gate_type][key]["@opening"] = False 
                    process[gate_type][key]["bpmn:outgoing"] = self._transform_outgoing(out, process)
                    #set default operation 
                    if gateway.get("@name", None) is None:
                        process[gate_type][key]["@name"] = default_op
                else:
                    assert type(inc) == str, "Opening Gate can't be also a Closing Gate"
                    process[gate_type][key]["@opening"] = True
                    process[gate_type][key]["bpmn:outgoing"] = self._transform_outgoing(out, process)
                    if check:
                        default = gateway.get("@default", None)
                        for o in out:
                            assert o.get("@name") != "no_name" or o.get("@id") == default, "Opening Gate needs all outgoing flows to have an Condition or be a default Flow"
                
        else:
            #determine if opening or closing Gate and see if check if all needed infos are present
            inc = gateways.get("bpmn:incoming", None)
            out = gateways.get("bpmn:outgoing", None)
            assert (inc is not None or out is not None) and type(inc) != type(out), "gateways should be conncted properly"
            if type(inc) == list:
                assert type(out) == str, "Closing Gate can't be also an Opening Gate"
                process[gate_type]["@opening"] = False 
                process[gate_type]["bpmn:outgoing"] = self._transform_outgoing(out, process)
                #set default operation 
                if gateways.get("@name", None) is None:
                    process[gate_type]["@name"] = default_op
            else:
                assert type(inc) == str, "Opening Gate can't be also a Closing Gate"
                process[gate_type]["@opening"] = True
                process[gate_type]["bpmn:outgoing"] = self._transform_outgoing(out, process)
                
                if check:
                    default = gateways.get("@default", None)
                    for o in out:
                        assert o.get("@name") != "no_name" or o.get("@id") == default, "Opening Gate needs all outgoing flows to have an Condition or be a default Flow"
                        
        return process
    
    def _check_exclusiveGateways(self, process):
        return self._check_Gateways(process, "bpmn:exclusiveGateway", "passtrough", True)

    def _check_inclusiveGateways(self, process):
        return self._check_Gateways(process, "bpmn:inclusiveGateway", "concat", True)

    def _check_parallelGateways(self, process):
        return self._check_Gateways(process, "bpmn:parallelGateway", "join", False)
    
    
    
    def find_flow(self,process, flow_id):
        for flow in process.get("bpmn:sequenceFlow"):
            if flow.get("@id") == flow_id:
                return flow
        return None
    def find_element(self, process, element_id)->dict:
        for key in process:
            if not key.startswith("@") and key != "bpmn:sequenceFlow":
                if type(process[key]) == list:
                    for count,item in enumerate(process[key]):
                        if process[key][count]["@id"] == element_id:
                            return {"type":key,
                                    "information":process[key][count]}
                else:
                    if process[key]["@id"] == element_id:
                            return {"type":key,
                                    "information":process[key]}
        return None
                                
            
parser = BPMNParser()
test = parser.load("all_elemts.bpmn")



In [99]:
from typing import List
import re
class FunctionParser():
    def __init__(self):
        self.regex_base = re.compile(r'(\w+)(\([$a-zA-Z0-9_:\[\]=, "".;]*\))')
        self.regex_empty_brackets = re.compile(r"(\(\s*\))")
        self.regex_string = re.compile(r'("[\w\d\s.?!,]*")+')
        self.regex_int = re.compile(r"(\d*)")
        self.regex_float = re.compile(r'(\d*\.\d*)')
    
    def determine_par_type(self, parameter:str)->dict:
        if self.regex_string.fullmatch(parameter) is not None:
            return {"parameter":parameter[1:-1], "type":"str", "string":parameter}
        elif self.regex_float.fullmatch(parameter) is not None:
            return {"parameter":float(parameter), "type":"float", "string":parameter}
        elif self.regex_int.fullmatch(parameter) is not None:
            return {"parameter":int(parameter), "type":"int", "string":parameter}
        else:
            return {"parameter":None, "type":"not supported Datatype", "string":parameter}

    def cleansplit_param(self, parameter:str)->List[str]|None:
        #check if the string are empty brackets
        if self.regex_empty_brackets.fullmatch(parameter) is not None:return None
        return [self.determine_par_type(s.strip()) for s in parameter[1:-1].split(";")]

    def parse(self,string:str)->dict|None:
        if m := self.regex_base.fullmatch(string):
            return {
                "name":m.group(1),
                "parameters":self.cleansplit_param(m.group(2))
            }
        elif string.startswith("."):
            return{
                "name":"dotOperation",
                "parameters":[{"parameter":string, "type":"code"}]
            }
        else:
            return None


fp = FunctionParser()

strings = ['loadExcel("test.xlsx")',
          'saveExcel("output.xlsx")',
          'addColumn("Hallo"; "wow")',
          "doNothing()",
          "addColumn",
          "test(1,2", 
          'addColumne("no_name"; 2.2)',
          'test("hi"; 2.2;2;"du bastard,")']

for s in strings:print(fp.parse(s))

{'name': 'loadExcel', 'parameters': [{'parameter': 'test.xlsx', 'type': 'str', 'string': '"test.xlsx"'}]}
{'name': 'saveExcel', 'parameters': [{'parameter': 'output.xlsx', 'type': 'str', 'string': '"output.xlsx"'}]}
{'name': 'addColumn', 'parameters': [{'parameter': 'Hallo', 'type': 'str', 'string': '"Hallo"'}, {'parameter': 'wow', 'type': 'str', 'string': '"wow"'}]}
{'name': 'doNothing', 'parameters': None}
None
None
{'name': 'addColumne', 'parameters': [{'parameter': 'no_name', 'type': 'str', 'string': '"no_name"'}, {'parameter': 2.2, 'type': 'float', 'string': '2.2'}]}
{'name': 'test', 'parameters': [{'parameter': 'hi', 'type': 'str', 'string': '"hi"'}, {'parameter': 2.2, 'type': 'float', 'string': '2.2'}, {'parameter': 2, 'type': 'int', 'string': '2'}, {'parameter': 'du bastard,', 'type': 'str', 'string': '"du bastard,"'}]}


In [100]:
class TransformationStrategy():

    @abc.abstractclassmethod
    def transform(self, df:pd.DataFrame)->pd.DataFrame:
        pass

class DoNothingStrategy(TransformationStrategy):
    def transform(self, df:pd.DataFrame)->pd.DataFrame:
        return df


class LoadExcelStrategy(TransformationStrategy):

    def __init__(self, file_name:str):
        self.file_name = file_name

    def transform(self, df:pd.DataFrame)->pd.DataFrame:
        return pd.read_excel(self.file_name)

class SaveExcelStrategy(TransformationStrategy):

    def __init__(self, file_name:str):
        self.file_name = file_name

    def transform(self, df:pd.DataFrame)->pd.DataFrame:
        df.to_excel(self.file_name, index=False)
        return df

class dotStrategy(TransformationStrategy):
    def __init__(self, func_string:str):
        self.func_string = func_string

    def transform(self, df:pd.DataFrame)->pd.DataFrame:
        exec_string = f'df = df{self.func_string}'
        exec(exec_string, globals())

In [101]:
class BAF():
    def __init__(self):
        self.parser = FunctionParser()
    
    def get_strategy(self, function_string:str)->TransformationStrategy|None:
        function_def = self.parser.parse(function_string)
        print(function_def)
        match function_def:
            case {"name":"doNothing", "parameters":None}:
                return DoNothingStrategy()
            case {"name":"loadExcel", "parameters":[{"parameter":x, "type":"str"}]}:
                return LoadExcelStrategy(x)
            case {"name":"saveExcel", "parameters":[{"parameter":x, "type":"str"}]}:
                return SaveExcelStrategy(x)
            case {"name":"dotOperation", "parameters":[{"parameter":x, "type":"code"}]}:
                return SaveExcelStrategy(x)

            case _:
                return None

b = BAF()
print(b.get_strategy('loadExcel("test.xslx")').file_name)



{'name': 'loadExcel', 'parameters': [{'parameter': 'test.xslx', 'type': 'str', 'string': '"test.xslx"'}]}
test.xslx


In [102]:
from uuid import uuid4
from time import time, strftime, gmtime
class Token():
    
    def __init__(self, context:str):
        self.data = pd.DataFrame()
        self.taken_paths = 1
        self.id = str(uuid4())
        self.cur_time = time()
        self.context:str = f"{strftime('%d-%m-%Y %H:%M:%S', gmtime())} : {context}"



    def add_context(self,element:str)->None:
        t = time()
        time_it = lambda x,y: round((x-y)/60,3)
        self.context = f"{self.context}--{time_it(t,self.cur_time)}-->{strftime('%d-%m-%Y %H:%M:%S', gmtime())} : {element}"
        self.cur_time = t


    def __repr__(self):
        return f"Token:{self.id}\nPath taken:{self.context}\nData:{self.data.head()}"

    
    def __deepcopy__(self, memo):
        copied = Token(self.context)
        copied.data = self.data.copy()
        return copied

    def transform(self, strategy:TransformationStrategy):
        self.data = strategy.transform(self.data)
        
t = Token("test")
print(t)


Token:44f87075-e09b-4835-b057-89c848434355
Path taken:03-04-2022 22:22:24 : test
Data:Empty DataFrame
Columns: []
Index: []


In [103]:
class BPMNComponent:
    
    def __init__(self,process_definition:OrderedDict):
        self.id = process_definition.get("@id")
        self.name = process_definition.get("@name", "no_name")
        self.incoming = process_definition.get("bpmn:incoming", None)
        self.outgoing = process_definition.get("bpmn:outgoing", None)

    @abc.abstractclassmethod
    def execute(self):
        pass

    def __lt__(self, other):
        # here it needs to be definied which task should be done first lol
        return True

    def __repr__(self):
        return f"{self.__class__.__name__}: {self.id}{'('+self.name+')' if self.name != 'no_name' else ''}"
        
    def __str__(self):
        return f"{self.__class__.__name__}: {self.id}{'('+self.name+')' if self.name != 'no_name' else ''}"

In [104]:
class StartEvent(BPMNComponent):
       
    def execute(self):
        token = Token(str(self)) 
        target = self.outgoing
        print(token)
        assert target["@targetRef"] != None, "missing refernce!!"
        return {
        "operation":"add",
        "elements" : [{"id":target["@targetRef"], "token":token}]
        }
st = StartEvent(test.get("bpmn:startEvent"))
st.execute()

Token:2df8b6d6-15fc-453c-9f23-187f1d0bb4d8
Path taken:03-04-2022 22:22:24 : StartEvent: Event_1m2uki7(Start)
Data:Empty DataFrame
Columns: []
Index: []


{'operation': 'add',
 'elements': [{'id': 'Activity_09ybf93',
   'token': Token:2df8b6d6-15fc-453c-9f23-187f1d0bb4d8
   Path taken:03-04-2022 22:22:24 : StartEvent: Event_1m2uki7(Start)
   Data:Empty DataFrame
   Columns: []
   Index: []}]}

In [105]:
class EndEvent(BPMNComponent):
    
    def __init__(self,process_definition:OrderedDict, token:Token):
        self.token = token
        super().__init__(process_definition)
       
    def execute(self):
        self.token.add_context(str(self))
        print(self.token)
        return {
        "operation":"end"
        }

In [106]:
class Task(BPMNComponent):
    
    def __init__(self,process_definition:OrderedDict, token:Token, factory:BAF):
        self.token = token
        self.factory = factory
        super().__init__(process_definition)
    
    def execute(self):
        self.token.add_context(str(self))
        print(self.token)
        self.token.transform(self.factory.get_strategy(self.name))
        target = self.outgoing
        return {
        "operation":"add",
        "elements" : [{"id":target["@targetRef"], "token":self.token}]
        }

In [107]:
class ExclusiveGateway(BPMNComponent):
    def __init__(self, process_definition:OrderedDict, token:Token):
        self.token = [token]
        self.opening = process_definition.get("@opening", False)
        self.default = process_definition.get("@default", None)
        super().__init__(process_definition)
    
    def execute(self):
        for token in self.token:
            token.add_context(str(self))
            print(token)
        if self.opening:
            for el in self.outgoing:
                if el.get("@id", None) == self.default:
                    return {
                            "operation":"add",
                            "elements" : [{"id":el["@targetRef"], "token":self.token[0]}]
                            }
        else:
            #here you could customize the behaviour with some keywords
            new_token = self.token[0]
            return {
                    "operation":"add",
                    "elements" : [{"id":self.outgoing["@targetRef"], "token":new_token}]
                    }
        

In [108]:
import copy 
class ParallelGateway(BPMNComponent):
    def __init__(self, process_definition:OrderedDict, token: Token):
        self.token = [token]
        self.opening = process_definition.get("@opening", False)
        super().__init__(process_definition)
        
    def execute(self):
        for token in self.token:
            token.add_context(str(self))
            print(token)
        if self.opening:
            new_paths = len(self.outgoing)
            tba = [{"id":el["@targetRef"], "token":copy.deepcopy(self.token[0])} for el in self.outgoing]
            for key, obj in enumerate(tba):
                # print(key, obj)
                tba[key]["token"].taken_paths = new_paths
            return {"operation":"add", "elements":tba}
        else:
            token_len = len(self.token)
            if token_len == self.token[0].taken_paths:
                #do stuff
                new_token = self.token[0]
                new_token.taken_paths = 1
                return {
                        "operation":"add",
                        "elements" : [{"id":self.outgoing["@targetRef"], "token":new_token}]
                        }
            else:
                return {"operation":"repush"}
            
            

In [109]:
import copy 
class InclusiveGateway(BPMNComponent):
    def __init__(self, process_definition:OrderedDict, token: Token):
        self.token = [token]
        self.opening = process_definition.get("@opening", False)
        self.default = process_definition.get("@default", None)
        super().__init__(process_definition)
        
    def execute(self):
        for token in self.token:
            token.add_context(str(self))
            print(token)
        if self.opening:
            tba = [{"id":el["@targetRef"], "token":copy.deepcopy(self.token[0])} for el in self.outgoing if el["@id"] == self.default]
            for key, obj in enumerate(tba):
                # print(key, obj)
                tba[key]["token"].taken_paths = len(tba)
            return {"operation":"add", "elements":tba}
        else:
            token_len = len(self.token)
            if token_len == self.token[0].taken_paths:
                #do stuff
                new_token = self.token[0]
                new_token.taken_paths = 1
                return {
                        "operation":"add",
                        "elements" : [{"id":self.outgoing["@targetRef"], "token":new_token}]
                        }
            else:
                return {"operation":"repush"}

In [110]:
import heapq 

class BPMNEngine():
    def __init__(self,file_name:str):
        self.parser = BPMNParser()
        self.strat_fact = BAF()
        self.process = self.parser.load(file_name)
        self.elements = []
        self.find_start()
    
    def find_start(self):
        start = self.process.get("bpmn:startEvent")
        heapq.heappush(self.elements, (1,StartEvent(start)))
        
    def run(self):
        while self.elements:
            prio,cur = heapq.heappop(self.elements)
            print(prio, cur)
            next_step = cur.execute()
            if next_step["operation"] == "add":
                for element in next_step["elements"]:
                    self.add(element)
            elif next_step["operation"] == "repush":
                print(f"{cur} was repushed with prio: {prio+1}")
                heapq.heappush(self.elements, (prio+1, cur))
            elif next_step["operation"] == "end":
                if len(self.elements) == 0:
                    print("Process ended")
            else:
                pass
    def add(self, next_element):
        #check if element is already in queue
        for prio, el in self.elements:
            if el.id == next_element["id"]:
                el_type = type(el)
                if el_type == ParallelGateway or el_type == ExclusiveGateway or el_type == InclusiveGateway:
                    el.token.append(next_element["token"])
                return
        element = self.parser.find_element(self.process, next_element["id"])
        # print(element)
        if element["type"] == "bpmn:task":
            heapq.heappush(self.elements,(1,Task(element["information"], next_element["token"], self.strat_fact)))
        elif element["type"] == "bpmn:exclusiveGateway":
            heapq.heappush(self.elements, (5,ExclusiveGateway(element["information"], next_element["token"])))
        elif element["type"] == "bpmn:parallelGateway":
            heapq.heappush(self.elements, (5,ParallelGateway(element["information"], next_element["token"])))
        elif element["type"] == "bpmn:inclusiveGateway":
            heapq.heappush(self.elements, (5,InclusiveGateway(element["information"], next_element["token"])))
        elif element["type"] == "bpmn:endEvent":
            heapq.heappush(self.elements, (10,EndEvent(element["information"], next_element["token"])))
        else:
            pass
        
        
        
        
engine = BPMNEngine("loadingandsavingtest.bpmn")   
engine.run()

1 StartEvent: StartEvent_1
Token:73e20fde-bbc9-42cd-86a1-ad0cdd42475a
Path taken:03-04-2022 22:22:25 : StartEvent: StartEvent_1
Data:Empty DataFrame
Columns: []
Index: []
1 Task: Activity_1cos1pv(loadExcel("sample.xlsx"))
Token:73e20fde-bbc9-42cd-86a1-ad0cdd42475a
Path taken:03-04-2022 22:22:25 : StartEvent: StartEvent_1--0.0-->03-04-2022 22:22:25 : Task: Activity_1cos1pv(loadExcel("sample.xlsx"))
Data:Empty DataFrame
Columns: []
Index: []
{'name': 'loadExcel', 'parameters': [{'parameter': 'sample.xlsx', 'type': 'str', 'string': '"sample.xlsx"'}]}
1 Task: Activity_1f8kpks(doNothing())
Token:73e20fde-bbc9-42cd-86a1-ad0cdd42475a
Path taken:03-04-2022 22:22:25 : StartEvent: StartEvent_1--0.0-->03-04-2022 22:22:25 : Task: Activity_1cos1pv(loadExcel("sample.xlsx"))--0.0-->03-04-2022 22:22:25 : Task: Activity_1f8kpks(doNothing())
Data:    Column1   Column2   Column3
0  0.573518  0.645940  0.333805
1  0.130156  0.524058  0.431410
2  0.564023  0.037091  0.541656
3  0.724978  0.996789  0.210123