<a href="https://colab.research.google.com/github/tzellas/predictive_process_mining_ml/blob/master/event_log_preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Predictive Process Mining Diploma Theses

**Description**


---
ICL Prompting on LLMs with RAG for Predictive Process Monitoring.




## Event Log Preprocessing


---
Here we will parse the XES event log, extract prefixes and store them as embeddings in a vector index.


In [1]:
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd
import pm4py
import csv


trace_identifier = "case:concept:name"

event_logs = {
                  "2019" : "BPI_Challenge_2019.xes",
                  "2020" : "BPI_Challenge_2020.xes"
                }
def read_clean_log(event_log_id):
  xes_data_path = "/content/drive/MyDrive/Predictive Process Monitoring /data/XES FILES/"

  xes_log  = pm4py.read_xes(xes_data_path + event_logs[event_log_id])
  df_log = pm4py.convert_to_dataframe(xes_log)
  df_log = df_log.sort_values([trace_identifier, "time:timestamp"]).reset_index(drop=True)
  keep_cols = [
      c for c in df_log.columns if not c.startswith("case:") or c == trace_identifier
  ]
  df_log = df_log[keep_cols].copy()
  return df_log



def build_prefixes(df_log: pd.DataFrame, base: int = 1, gap: int = 3):
  seen_prefixes = set()
  prefixes = []
  j_map = {}
  j = 1
  for trace_id ,df_trace in df_log.groupby(trace_identifier, sort=False):

    if len(df_trace) <= 2:
      continue

    df_trace = df_trace.reset_index(drop=True)

    prefix = []
    values = {}

    for i in range(base, len(df_trace)-1, gap):
      if i == base:
        start = 0
      else:
        start = i-gap+1
      for event_index in range(start,i+1):
        prefix.append(df_trace.iloc[event_index]["concept:name"])
        cl_list = ','.join(prefix)

        event = df_trace.iloc[event_index]
        for key, value in event.items():
          if key in {"concept:name", trace_identifier}:
            continue
          if key not in j_map:
            j_map[key] = ''.join([part[:2] for part in key.split(':')])
            j += 1
          values[j_map[key]] = str(value)
      if cl_list in seen_prefixes:
        continue
      seen_prefixes.add(cl_list)

      final_prefix_string = f"{cl_list} - Values: Values: {values} - {df_trace.iloc[i+1]["concept:name"]}"
      prefixes.append(final_prefix_string)
  return prefixes


pref_list_2020 = build_prefixes(read_clean_log("2020"))
print(pref_list_2020)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


ModuleNotFoundError: No module named 'pm4py'

In [None]:
def convert_to_csv(prefix_list, event_log_id):
  rows = []
  for row in prefix_list:
    rows.append((f"{' - '.join(row.split(' - ', 2)[:2]).strip()}",f"{row.split(' - ',2)[2].strip()}" ))

  csv_relults_path = f"/content/drive/MyDrive/Predictive Process Monitoring /data/CSV RESULTS/{event_logs[event_log_id].removesuffix("xes")}csv"

  with open(csv_relults_path, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["prefix", "prediction"])
        w.writerows(rows)

convert_to_csv(pref_list_2020, "2020")


def reduced_csv(event_log_id, max_rows: int = 300):
    csv_relults_path = f"/content/drive/MyDrive/Predictive Process Monitoring /data/CSV RESULTS/{event_logs[event_log_id].removesuffix("xes")}csv"
    reduced_path = f"/content/drive/MyDrive/Predictive Process Monitoring /data/CSV RESULTS/reduced_{event_logs[event_log_id].removesuffix("xes")}csv"
    with open(csv_relults_path, "r", encoding="utf-8", newline="") as f:
      reader = csv.reader(f)
      header = next(reader)
      rows = []
      for i, row in enumerate(reader):
          if i >= max_rows:
              break
          rows.append(row)

    with open(reduced_path, "w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(rows)

reduced_csv("2020")

In [None]:
def search_csv_column(
    event_log_id,
    column_name: str,
    search_string: str
):
    matches = []
    csv_relults_path = f"/content/drive/MyDrive/Predictive Process Monitoring /data/CSV RESULTS/{event_logs[event_log_id].removesuffix('xes')}csv"

    with open(csv_relults_path, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)

        if column_name not in reader.fieldnames:
            raise ValueError(f"Column '{column_name}' not found")

        for row in reader:
            if search_string in row[column_name]:
                matches.append(row)

    return matches


In [None]:
search_csv_column("2020", "prefix", "Permit SUBMITTED by EMPLOYEE,Permit APPROVED by ADMINISTRATION,Permit REJECTED by SUPERVISOR,Permit REJECTED by EMPLOYEE,Permit SUBMITTED by EMPLOYEE,Permit APPROVED by ADMINISTRATION,Permit APPROVED by BUDGET OWNER,Start trip,Permit FINAL_APPROVED by SUPERVISOR,End trip,Declaration SUBMITTED by EMPLOYEE,Declaration REJECTED by ADMINISTRATION,Declaration REJECTED by EMPLOYEE,Declaration SUBMITTED by EMPLOYEE - Values: Values: {'id': 'st_step 27246_0', 'orre': 'STAFF MEMBER', 'titi': '2018-11-28")

[{'prefix': "Permit SUBMITTED by EMPLOYEE,Permit APPROVED by ADMINISTRATION,Permit REJECTED by SUPERVISOR,Permit REJECTED by EMPLOYEE,Permit SUBMITTED by EMPLOYEE,Permit APPROVED by ADMINISTRATION,Permit APPROVED by BUDGET OWNER,Start trip,Permit FINAL_APPROVED by SUPERVISOR,End trip,Declaration SUBMITTED by EMPLOYEE,Declaration REJECTED by ADMINISTRATION,Declaration REJECTED by EMPLOYEE,Declaration SUBMITTED by EMPLOYEE - Values: Values: {'id': 'st_step 27246_0', 'orre': 'STAFF MEMBER', 'titi': '2018-11-28 15:49:59+00:00', 'orro': 'EMPLOYEE'}",
  'prediction': 'Declaration APPROVED by ADMINISTRATION'}]