In [1]:
"""
P-BEAR-RL: A tool to repair anomalous traces in an event log.
"""
__author__ = "Jonghyeon Ko"
__version__ = "0.1"
__date__ = "03/10/2025"


# =============================================================================
# 1. SETUP & IMPORTS
# =============================================================================

# Ignore warnings to keep the output clean.
import warnings
warnings.filterwarnings(action='ignore')

# Standard library imports for operating system interaction, data manipulation, and timing.
import os
import pandas as pd
import numpy as np
import time
import sys

# Get the absolute path of the current script to build relative paths.
org_path = 'c:\\Users\\user\\Desktop\\java\\PraeclarusPDQ.zip_expanded\\PraeclarusPDQ\\python\\main_pbear.py' #os.path.abspath(__file__)
parent_path = os.path.dirname(org_path)
    

# Multiprocessing utilities for parallel execution.
from multiprocessing import Pool, cpu_count
from functools import partial

# --- Import custom utility functions ---
# These modules contain helper functions for Reinforcement Learning (RL) logic
# and data preprocessing specific to the P-BEAR algorithm.
from utils.func_RL import discover_NBG, process_case_id_wrapper
from utils.preprocess_pbear import *


# =============================================================================
# 2. PATH & PARAMETER CONFIGURATION
# =============================================================================


# --- Command-line arguments ---
num_epi =100 #int(sys.argv[1])  # The number of episodes for the RL agent training.
alpha = 0.0 #float(sys.argv[2]) # The alpha parameter, likely for the learning rate or exploration/exploitation balance.

# --- File paths ---
# Define paths for temporary input/output files used during the process.
temp_path = os.sep.join([str(parent_path), "temp"])
output_file_name = os.path.join(temp_path, "temp_output.csv")
dat_AL = 'temp_input.csv'     # File with anomalous event logs.
dat_NL = 'temp_NL_input.csv' # File with normal (clean) event logs.


# =============================================================================
# 3. DATA LOADING & PREPROCESSING
# =============================================================================

# Load the normal (NL) and anomalous (AL) event logs from CSV files.
event_log = pd.read_csv(os.path.join(temp_path, dat_NL))
data = pd.read_csv(os.path.join(temp_path, dat_AL))


event_log = event_log.rename(columns={"Case ID": "case:concept:name", "Activity": "concept:name",
                           "Start.Timestamp": "time:timestamp"}, copy=False)


data = data.rename(columns={"Case": "case:concept:name", "Activity": "concept:name",
                           "Order": "time:timestamp"}, copy=False)


# Preprocess both logs by adding 'Start' and 'End' activities to each case.
# This helps in defining clear boundaries for each process instance.
clean = add_startend(event_log)
anomaly = add_startend(data)

# --- Create utility variables from the clean log ---
# These will be used as a reference model of normal behavior.
actset = clean['concept:name'].unique()
# Filter out the 'Start' and 'End' activities to focus on core process steps.
filtered_actset = [item for item in actset if item not in ['Start', 'End']]
# Calculate the frequency of each activity in the clean log.
act_freq = freq_act(clean, filtered_actset)

# Start the master timer to measure total execution time.
start_time = time.time()

# Discover Neighboring Behavior Graphs (NBGs) from the clean event log.
# The NBG is a key data structure representing normal process flows.
NBGs = discover_NBG(clean)



In [2]:
# Start a separate timer for the core parallel processing task.
processing_start_time = time.time()
print('Start training P-BEAR-RL (Parallelized)')

# Prepare the anomalous data by selecting relevant columns.
anomaly_nolabel = anomaly[['case:concept:name', 'concept:name', 'order']]
caseids = anomaly_nolabel['case:concept:name'].unique().tolist()

# --- Optimization: Group identical traces (variants) ---
# To avoid redundant computations, we process each unique trace once and
# then map the results back to all cases with that trace.
traces = anomaly_nolabel.groupby('case:concept:name')['concept:name'].apply(lambda x: '>>'.join(x))
traces = traces.rename_axis('case:concept:name').reset_index(name='variant_id')
traces = traces.groupby('variant_id', observed=True)['case:concept:name'].apply(lambda x: list(x))
traces = traces.rename_axis('variant_id').reset_index(name='caseids')


Start training P-BEAR-RL (Parallelized)


In [3]:
casezip = [cid[0] for cid in traces['caseids']]

In [4]:
anomaly_nolabel_zip = anomaly_nolabel.loc[anomaly_nolabel['case:concept:name'].isin(casezip)].reset_index(drop=True)

In [5]:

# --- Prepare the function for the worker pool ---
# `functools.partial` freezes the arguments of the wrapper function that are
# the same for all processes. The only argument that will vary is the case ID.
func_for_pool = partial(process_case_id_wrapper,
                        p_anomaly_nolabel=anomaly_nolabel_zip,
                        p_filtered_actset=filtered_actset,
                        p_act_freq=act_freq,
                        p_actset=actset,
                        p_NBGs=NBGs,
                        p_num_epi=num_epi,
                        alpha=alpha)

# Set the number of parallel processes to the number of available CPU cores.
num_processes = cpu_count()
print(f"Using {num_processes} processes for parallel execution.")

# --- Execute the parallel processing ---
# Create a pool of worker processes.
with Pool(processes=num_processes) as pool:
    # `pool.map` distributes the `casezip` list across the workers.
    # Each worker runs `func_for_pool` on its assigned case IDs.
    # The results (a list of repaired dataframes) are collected.
    list_of_repaired_dfs_for_cases = pool.map(func_for_pool, casezip)

# =============================================================================
# 5. RESULT AGGREGATION & OUTPUT
# =============================================================================

# Concatenate the list of dataframes from the workers into a single dataframe.
# This contains the repaired results for the unique, "zipped" cases.
final_repaired_df_zip = pd.concat([df for df in list_of_repaired_dfs_for_cases], ignore_index=True)

# --- "Unzip" the results ---
# Map the repaired unique traces back to all their original case IDs.
final_repaired_df = pd.DataFrame()
for cid in range(len(casezip)):
    # Get the repaired trace for the representative case.
    filtered_df = final_repaired_df_zip.loc[final_repaired_df_zip['case:concept:name'] == casezip[cid]]
    len1 = len(filtered_df)
    
    # Duplicate the repaired trace for all original cases belonging to this variant.
    num_duplicates = len(traces['caseids'][cid])
    filtered_df = pd.concat([filtered_df] * num_duplicates, ignore_index=True)
    
    # Assign the original case IDs back to the duplicated traces.
    filtered_df['case:concept:name'] = np.repeat(traces['caseids'][cid], len1)
    
    # Append to the final result dataframe.
    final_repaired_df = pd.concat([final_repaired_df, filtered_df], ignore_index=True)

# --- Final formatting and saving ---
# Remove temporary 'Start' and 'End' activities from the final output.
final_repaired_df = final_repaired_df.loc[~final_repaired_df["concept:name"].isin(['Start', 'End'])].reset_index(drop=True)
print(final_repaired_df.head(10))

Using 12 processes for parallel execution.
  case:concept:name       concept:name  order  \
0        anony16123  Receipt_Treatment      0   
1        anony16123    Start_Treatment      0   
2        anony16123   Finish_Treatment      1   
3        anony44312            Payment      2   
4        anony44312  Receipt_Treatment      5   
5        anony44312    Start_Treatment      6   
6        anony44312   Finish_Treatment      7   
7        anony44312            Payment      8   
8        anony44312    Print_Prescript      9   
9        anony25240            Payment      1   

                                    predict_patterns  
0  [{'pattern': 'skip', 'loc': 1, 'activity': 'Re...  
1  [{'pattern': 'skip', 'loc': 1, 'activity': 'Re...  
2  [{'pattern': 'skip', 'loc': 1, 'activity': 'Re...  
3  [{'pattern': 'insert', 'loc': 4, 'activity': '...  
4  [{'pattern': 'insert', 'loc': 4, 'activity': '...  
5  [{'pattern': 'insert', 'loc': 4, 'activity': '...  
6  [{'pattern': 'insert', 'loc':

In [13]:
list_of_repaired_dfs_for_cases

[  case:concept:name      concept:name  order predict_patterns
 0        anony16123             Start      0                 
 1        anony16123  Finish_Treatment      1                 
 2        anony16123               End      2                 ,
    case:concept:name        concept:name  order predict_patterns
 0         anony44312               Start      0                 
 1         anony44312             Payment      1                 
 2         anony44312             Payment      2                 
 3         anony44312  Finish_Examination      3                 
 4         anony44312   Start_Examination      4                 
 5         anony44312   Receipt_Treatment      5                 
 6         anony44312     Start_Treatment      6                 
 7         anony44312    Finish_Treatment      7                 
 8         anony44312             Payment      8                 
 9         anony44312     Print_Prescript      9                 
 10        anony44312