In [12]:
#%pip -q install pm4py pandas numpy

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import uuid

import pm4py


In [13]:
def uid(prefix):
    return f"{prefix}{uuid.uuid4().hex[:10]}"

def contains_any(text, keywords):
    t = str(text).lower()
    return any(k in t for k in keywords)

DOC_KEYWORDS = ["document", "upload", "submit doc", "provide proof", "attach", "certificate", "income", "rental", "form"]
RES_KEYWORDS = ["resubmission", "missing", "resubmit", "correction", "incorrect", "additional proof"]
NOT_KEYWORDS = ["notify", "notification", "send", "mail", "email", "letter"]

ASSESS_KEYWORDS = ["assess", "review", "check", "validate", "evaluate"]
ACCEPT_KEYWORDS = ["accept", "approve", "granted"]
REJECT_KEYWORDS = ["reject", "decline", "denied"]


In [14]:
def read_bpmn_and_playout(bpmn_path: str, n_traces=200, seed=7):
    # --- Load BPMN
    try:
        bpmn = pm4py.read_bpmn(bpmn_path)
    except Exception:
        from pm4py.objects.bpmn.importer import importer as bpmn_importer
        bpmn = bpmn_importer.apply(bpmn_path)

    # --- Convert BPMN -> Petri net
    try:
        net, im, fm = pm4py.convert_to_petri_net(bpmn)
    except Exception:
        from pm4py.objects.conversion.bpmn import converter as bpmn_converter
        net, im, fm = bpmn_converter.apply(bpmn)

    # --- Playout (simulate traces)
    try:
        log = pm4py.play_out(net, im, fm, variant="basic_playout",
                             parameters={"noTraces": n_traces, "seed": seed})
    except Exception:
        from pm4py.algo.simulation.playout.petri_net import algorithm as playout
        log = playout.apply(net, im, fm, parameters={"noTraces": n_traces, "seed": seed})

    # --- Convert log -> dataframe
    df = pm4py.convert_to_dataframe(log)

    # Standardize column names
    col_case = "case:concept:name" if "case:concept:name" in df.columns else "case_id"
    col_act  = "concept:name" if "concept:name" in df.columns else "activity"
    col_ts   = "time:timestamp" if "time:timestamp" in df.columns else "timestamp"

    out = df[[col_case, col_act, col_ts]].copy()
    out.columns = ["case_id", "activity", "timestamp"]
    out["timestamp"] = pd.to_datetime(out["timestamp"], errors="coerce")

    # If timestamps missing (common in basic playout), synthesize monotonic timestamps per case
    if out["timestamp"].isna().all():
        base = datetime(2025, 11, 1, 8, 0)
        out = out.sort_values(["case_id"]).reset_index(drop=True)
        out["timestamp"] = out.groupby("case_id").cumcount().apply(lambda i: base + timedelta(minutes=5*i))

    # Ensure sorting
    out = out.sort_values(["case_id", "timestamp"]).reset_index(drop=True)
    return out


In [15]:
def build_ocel_from_bpmn_df(df_classic: pd.DataFrame, seed=7):
    rng = np.random.default_rng(seed)

    objects = []
    events = []
    rel = []

    # create a clerk pool
    clerk_oids = [f"CLK_{i+1}" for i in range(8)]
    for i, clk in enumerate(clerk_oids):
        objects.append({
            "ocel:oid": clk,
            "ocel:type": "BafoegClerk",
            "clerkId": str(i+1),
            "name": f"Clerk {i+1}",
            "office": rng.choice(["HH-Mitte", "HH-Nord", "HH-Altona"])
        })

    # per-case state
    state = {}

    for case_id, g in df_classic.groupby("case_id", sort=False):
        app_oid = f"APP_{case_id}"
        stu_oid = f"STU_{case_id}"

        # UML core objects
        objects.append({"ocel:oid": app_oid, "ocel:type": "Application", "applicationId": str(case_id), "status": "Started"})
        objects.append({"ocel:oid": stu_oid, "ocel:type": "Student", "studentId": str(case_id), "name": f"Student {case_id}"})

        # 0..2 parents (synthetic)
        par_oids = []
        for _ in range(int(rng.integers(0, 3))):
            poid = uid("PAR_")
            par_oids.append(poid)
            objects.append({
                "ocel:oid": poid,
                "ocel:type": "Parent",
                "parentId": poid.replace("PAR_", ""),
                "role": rng.choice(["Mother", "Father", "Guardian"])
            })

        # assign one clerk to this application
        clk_oid = rng.choice(clerk_oids)

        state[case_id] = {
            "app_oid": app_oid,
            "stu_oid": stu_oid,
            "par_oids": par_oids,
            "clk_oid": clk_oid,
            "open_resub_oid": None
        }

        # iterate events in order
        g = g.sort_values("timestamp")
        for _, row in g.iterrows():
            act = row["activity"]
            ts = row["timestamp"]
            eid = uid("E_")

            # event row
            ev = {"ocel:eid": eid, "ocel:activity": act, "ocel:timestamp": ts}
            # add simple decision attribute if keywords match
            if contains_any(act, ACCEPT_KEYWORDS):
                ev["decision"] = "Accepted"
            elif contains_any(act, REJECT_KEYWORDS):
                ev["decision"] = "Rejected"
            events.append(ev)

            # Always link to Application
            rel.append({"ocel:eid": eid, "ocel:oid": app_oid, "ocel:qualifier": "concerns"})

            # Submit-like -> submittedBy Student
            if contains_any(act, ["submit", "apply", "application", "start"]):
                rel.append({"ocel:eid": eid, "ocel:oid": stu_oid, "ocel:qualifier": "submittedBy"})

            # Handling events -> performedBy Clerk
            if contains_any(act, ASSESS_KEYWORDS + ["receive", "accept", "reject", "approve", "handle"]):
                rel.append({"ocel:eid": eid, "ocel:oid": clk_oid, "ocel:qualifier": "performedBy"})

            # Document events -> create Document object
            if contains_any(act, DOC_KEYWORDS):
                doc_oid = uid("DOC_")
                objects.append({
                    "ocel:oid": doc_oid,
                    "ocel:type": "Document",
                    "documentId": doc_oid.replace("DOC_", ""),
                    "type": rng.choice(["FormSheet","ProofOfIncome","CertificateOfEnrollment","RentalAgreement","Other"]),
                    "isComplete": bool(rng.choice([True, False], p=[0.85, 0.15])),
                    "uploadedAt": ts
                })
                rel.append({"ocel:eid": eid, "ocel:oid": doc_oid, "ocel:qualifier": "concerns"})

                if state[case_id]["open_resub_oid"] is None:
                    rel.append({"ocel:eid": eid, "ocel:oid": doc_oid, "ocel:qualifier": "initialSubmission"})
                else:
                    rel.append({"ocel:eid": eid, "ocel:oid": state[case_id]["open_resub_oid"], "ocel:qualifier": "concerns"})
                    rel.append({"ocel:eid": eid, "ocel:oid": doc_oid, "ocel:qualifier": "fulfilledBy"})

            # Resubmission request -> create Resubmission object
            if contains_any(act, ["request"]) and contains_any(act, RES_KEYWORDS):
                res_oid = uid("RES_")
                state[case_id]["open_resub_oid"] = res_oid
                objects.append({
                    "ocel:oid": res_oid,
                    "ocel:type": "Resubmission",
                    "resubmissionId": res_oid.replace("RES_", ""),
                    "type": rng.choice(["MissingDocument","IncorrectDocument","AdditionalProof"], p=[.5,.35,.15]),
                    "status": "Requested",
                    "requestedAt": ts
                })
                rel.append({"ocel:eid": eid, "ocel:oid": res_oid, "ocel:qualifier": "triggers"})
                rel.append({"ocel:eid": eid, "ocel:oid": stu_oid, "ocel:qualifier": "requestedFrom"})

            # Notification -> create Notification object
            if contains_any(act, NOT_KEYWORDS):
                not_oid = uid("NOT_")
                objects.append({
                    "ocel:oid": not_oid,
                    "ocel:type": "Notification",
                    "notificationId": not_oid.replace("NOT_", ""),
                    "type": rng.choice(["Confirmation","Rejection","Mail"]),
                    "sentAt": ts
                })
                rel.append({"ocel:eid": eid, "ocel:oid": not_oid, "ocel:qualifier": "emits"})
                if state[case_id]["open_resub_oid"] is not None:
                    rel.append({"ocel:eid": eid, "ocel:oid": state[case_id]["open_resub_oid"], "ocel:qualifier": "concerns"})

    events_df = pd.DataFrame(events)
    objects_df = pd.DataFrame(objects).drop_duplicates(subset=["ocel:oid"]).reset_index(drop=True)
    relations_df = pd.DataFrame(rel).drop_duplicates().reset_index(drop=True)

    # types
    events_df["ocel:timestamp"] = pd.to_datetime(events_df["ocel:timestamp"], errors="coerce")
    for col in ["uploadedAt", "requestedAt", "sentAt"]:
        if col in objects_df.columns:
            objects_df[col] = pd.to_datetime(objects_df[col], errors="coerce")

    return events_df, objects_df, relations_df


In [16]:
def sanity_checks(events_df, objects_df, relations_df):
    assert events_df["ocel:eid"].is_unique, "Event IDs are not unique"
    assert objects_df["ocel:oid"].is_unique, "Object IDs are not unique"

    linked_eids = set(relations_df["ocel:eid"].unique())
    orphan_events = events_df[~events_df["ocel:eid"].isin(linked_eids)]
    print("Orphan events:", len(orphan_events))

    linked_oids = set(relations_df["ocel:oid"].unique())
    orphan_objects = objects_df[~objects_df["ocel:oid"].isin(linked_oids)]
    print("Orphan objects:", len(orphan_objects))

    print("\nEvents:", len(events_df))
    print("Objects:", len(objects_df))
    print("Relations:", len(relations_df))
    print("\nObjects by type:\n", objects_df["ocel:type"].value_counts())


# âœ… REPLACE ONLY THIS FUNCTION
def build_and_export_ocel(events_df, objects_df, relations_df, out_path="bafoeg_from_bpmn_uml.jsonocel"):
    from pm4py.objects.ocel.obj import OCEL

    # build OCEL object (works across versions)
    try:
        ocel = OCEL(events=events_df, objects=objects_df, relations=relations_df)
    except TypeError:
        ocel = OCEL(events_df, objects_df, relations_df)

    # export to jsonocel (version-safe)
    try:
        pm4py.write_ocel(ocel, out_path)
    except Exception:
        from pm4py.objects.ocel.exporter.jsonocel import exporter as jsonocel_exporter
        jsonocel_exporter.apply(ocel, out_path)

    return ocel, out_path


In [None]:
import sys
from pathlib import Path

# if your notebook is in /notebooks
PROJECT_ROOT = Path.cwd().parent
sys.path.insert(0, str(PROJECT_ROOT))

from config.config import get_asset, XES_DIR


BPMNAsset = get_asset("group10_ocel")
BPMN_PATH = BPMNAsset.bpmn_path  


df_classic = read_bpmn_and_playout(BPMN_PATH, n_traces=200, seed=7)
display(df_classic.head(10))

events_df, objects_df, relations_df = build_ocel_from_bpmn_df(df_classic, seed=7)

sanity_checks(events_df, objects_df, relations_df)

ocel, path = build_and_export_ocel(events_df, objects_df, relations_df, out_path="bafoeg_from_bpmn_uml.jsonocel")
print("Exported:", path)


Unnamed: 0,case_id,activity,timestamp
0,0,Request Parent Documents,1970-04-26 19:46:40+00:00
1,0,Receive Parent Documents,1970-04-26 19:46:41+00:00
2,0,Generate Application Mail,1970-04-26 19:46:42+00:00
3,0,Send Application Mail,1970-04-26 19:46:43+00:00
4,0,Receive Application,1970-04-26 19:46:44+00:00
5,0,Review Documents,1970-04-26 19:46:45+00:00
6,0,Assess Application,1970-04-26 19:46:46+00:00
7,0,Generate Rejection,1970-04-26 19:46:47+00:00
8,0,SendRejection,1970-04-26 19:46:48+00:00
9,1,Generate Application Mail,1970-04-26 19:46:49+00:00


Orphan events: 0
Orphan objects: 189

Events: 2272
Objects: 2075
Relations: 6966

Objects by type:
 ocel:type
Notification    692
Document        592
Application     200
Student         200
Resubmission    194
Parent          189
BafoegClerk       8
Name: count, dtype: int64
Exported: bafoeg_from_bpmn_uml.jsonocel


In [18]:
import pm4py

ocel = pm4py.read_ocel("bafoeg_from_bpmn_uml.jsonocel")

# basic info
print(pm4py.ocel.statistics.get_object_types(ocel))
print(pm4py.ocel.statistics.get_number_of_events(ocel))
print(pm4py.ocel.statistics.get_number_of_objects(ocel))


AttributeError: module 'pm4py.ocel' has no attribute 'statistics'

In [None]:
ocel_app = pm4py.ocel.filtering.filter_ocel_object_types(
    ocel, ["Application"]
)


In [19]:
events_df.to_csv("celonis_events.csv", index=False)
objects_df.to_csv("celonis_objects.csv", index=False)
relations_df.to_csv("celonis_relations.csv", index=False)


In [None]:
from pathlib import Path
from collections import defaultdict, deque
from lxml import etree

def fix_cpn_layout(input_cpn: str | Path, output_cpn: str | Path,
                   x_step: int = 180, y_step: int = 120) -> None:
    input_cpn = Path(input_cpn)
    output_cpn = Path(output_cpn)

    parser = etree.XMLParser(resolve_entities=False, recover=True)
    tree = etree.parse(str(input_cpn), parser)
    docinfo = tree.docinfo

    root = tree.getroot()
    cpnet = root.find("cpnet")
    page = cpnet.find("page")

    places = page.findall("place")
    trans = page.findall("trans")
    arcs = page.findall("arc")

    nodes = set()
    node_type = {}
    for p in places:
        nid = p.get("id")
        nodes.add(nid)
        node_type[nid] = "place"
    for t in trans:
        nid = t.get("id")
        nodes.add(nid)
        node_type[nid] = "trans"

    def arc_endpoints(arc):
        orient = arc.get("orientation")
        tid = arc.find("transend").get("idref")
        pid = arc.find("placeend").get("idref")
        if orient == "PtoT":
            return pid, tid
        if orient == "TtoP":
            return tid, pid
        return pid, tid

    out_edges = defaultdict(list)
    in_edges = defaultdict(list)
    for a in arcs:
        u, v = arc_endpoints(a)
        out_edges[u].append(v)
        in_edges[v].append(u)

    sources = [n for n in nodes if len(in_edges[n]) == 0] or [next(iter(nodes))]

    # BFS levels
    level = {n: None for n in nodes}
    q = deque()
    for s in sources:
        level[s] = 0
        q.append(s)

    while q:
        u = q.popleft()
        for v in out_edges[u]:
            cand = level[u] + 1
            if level[v] is None or cand < level[v]:
                level[v] = cand
                q.append(v)

    by_level = defaultdict(list)
    for n, l in level.items():
        by_level[0 if l is None else l].append(n)

    def sort_key(n):
        return (0 if node_type[n] == "place" else 1, n)

    coords = {}
    for l in sorted(by_level):
        group = sorted(by_level[l], key=sort_key)
        for idx, n in enumerate(group):
            x = l * x_step
            y = (idx - (len(group) - 1) / 2) * y_step
            coords[n] = (x, y)

    def set_pos(elem, x, y):
        pos = elem.find("posattr")
        if pos is None:
            pos = etree.SubElement(elem, "posattr")
        pos.set("x", f"{x:.6f}")
        pos.set("y", f"{y:.6f}")

    def set_nested(elem, tag, x, y, dx=20, dy=-20):
        child = elem.find(tag)
        if child is not None:
            pos = child.find("posattr")
            if pos is not None:
                pos.set("x", f"{(x + dx):.6f}")
                pos.set("y", f"{(y + dy):.6f}")

    # move nodes
    for p in places:
        nid = p.get("id")
        x, y = coords[nid]
        set_pos(p, x, y)
        set_nested(p, "type", x, y, dx=20, dy=-20)
        set_nested(p, "initmark", x, y, dx=20, dy=25)

    for t in trans:
        nid = t.get("id")
        x, y = coords[nid]
        set_pos(t, x, y)

    # move arc annotations to midpoints
    for a in arcs:
        u, v = arc_endpoints(a)
        x1, y1 = coords.get(u, (0, 0))
        x2, y2 = coords.get(v, (0, 0))
        mx, my = (x1 + x2) / 2, (y1 + y2) / 2
        ann = a.find("annot")
        if ann is not None:
            set_pos(ann, mx, my)

    tree.write(
        str(output_cpn),
        encoding=docinfo.encoding,
        xml_declaration=True,
        doctype=docinfo.doctype,
        pretty_print=True
    )

# Example:
# fix_cpn_layout("cpnxml_import_test_V2.cpn", "cpnxml_import_test_V2_layout_fixed.cpn")


In [20]:
from pathlib import Path
from lxml import etree

def sanitize_cpn_for_simulation(inp: Path, out: Path):
    parser = etree.XMLParser(recover=True)
    tree = etree.parse(str(inp), parser)
    root = tree.getroot()

    # enforce UNIT everywhere
    for place in root.findall(".//place"):
        t = place.find("type")
        if t is not None:
            t.text = "UNIT"

        im = place.find("initmark")
        if im is not None and im.text and "1`" in im.text:
            im.text = "1`()"

    # fix arc inscriptions
    for arc in root.findall(".//arc"):
        annot = arc.find("annot")
        if annot is None:
            annot = etree.SubElement(arc, "annot")
        annot.text = "()"

    # remove guards (critical)
    for guard in root.findall(".//guard"):
        guard.getparent().remove(guard)

    tree.write(
        str(out),
        pretty_print=True,
        xml_declaration=True,
        encoding="utf-8"
    )


In [21]:
sanitize_cpn_for_simulation(XES_DIR / "cpnxml_import_test_V2_layout_fixed.cpn",
                           XES_DIR / "group10_ocel_simulation_sanitized.cpn")