0 Define input parameters

In [None]:
import os
from Declare4Py.ProcessModels.DeclareModel import DeclareModel
from Declare4Py.D4PyEventLog import D4PyEventLog
from Declare4Py.ProcessMiningTasks.ConformanceChecking.MPDeclareAnalyzer import MPDeclareAnalyzer
from Declare4Py.ProcessMiningTasks.ConformanceChecking.MPDeclareResultsBrowser import MPDeclareResultsBrowser 

import random
import numpy as np
import pandas as pd

In [None]:
# number of created logs
num_logs = 30

2 Conformance checking (log vs. normative model (deviations) & log vs. auditor model (anomalies))

In [None]:
# Dictionaries to store created dataframes
df_norm_dict = {}
df_audit_dict = {}

eventlog_dict = {}

# Loop through each event log
for h in range(1, num_logs + 1):  # number of created logs + 1 
    source_file = f'eventlog_model{h}.xes'
    
    # Check if source file exists (to avoid errors)
    if os.path.exists(source_file):
        log_path = os.path.join(source_file)
        event_log = D4PyEventLog(case_name="case:concept:name")
        event_log.parse_xes_log(log_path)

        # # Initialize dictionaries for this event log
        # df_norm_dict[h] = {}
        # df_audit_dict[h] = {}

        for j in range(1,3): # j == 1 (normative model), j == 2 (auditor model)
            model_file = f'model{h}_{j}.decl'

            if os.path.exists(model_file):
                model_path = os.path.join(model_file)
                declare_model = DeclareModel().parse_from_file(model_path)

                basic_checker = MPDeclareAnalyzer(log=event_log, declare_model=declare_model, consider_vacuity=True)
                conf_check_res: MPDeclareResultsBrowser = basic_checker.run()

                if j == 1: 
                    df_norm = conf_check_res.get_metric(metric="state")
                    df_norm_dict[h] = df_norm
                    # globals()[f'df_norm_L{h}_M{i}'] = df_norm
                    df_norm.to_csv(f'CC_normative_{h}.csv')
                else: 
                    df_audit = conf_check_res.get_metric(metric="state")
                    df_audit_dict[h] = df_audit
                    # globals()[f'df_audit_L{h}_M{i}'] = df_audit
                    df_audit.to_csv(f'CC_audit_{h}.csv')

            else:
                print(f"Source file {model_file} does not exist. Skipping set {i}.")

        # Transform each event log to a dataframe (needed in a later step)
        event_log.to_dataframe()
        df_log = event_log.get_log() 

        # Store eventlogs in a dictionary
        eventlog_dict[h] = df_log

    else:
        print(f"Source file {source_file} does not exist. Skipping set {i}.")

In [None]:
print("Conformant cases with normative model: " +  str(df_norm_dict[2][1].loc[(df_norm_dict[2][1]==1).all(axis=1)].shape[0]) + 
      "\nNon-conformant cases with normative model: " + str(df_norm_dict[2][1].loc[(df_norm_dict[2][1]==0).any(axis=1)].shape[0]))

In [None]:
print("Conformant with auditor model: " +  str(df_audit_dict[2][1].loc[(df_audit_dict[2][1]==1).all(axis=1)].shape[0]) + 
      "\nNon-conformant with auditor model: " + str(df_audit_dict[2][1].loc[(df_audit_dict[2][1]==0).any(axis=1)].shape[0]))

3 Label deviations as Anomaly (1) or Exception (0)

In [None]:
# Set percentage anomalies
percent_anomalies = 0.05
random.seed(15)

In [None]:
# Dictionaries to store created dataframes of labeled deviations
deviations_dict = {}


for h in range(1, num_logs+1): # number of logs + 1

    # Initialize dictionary
    # deviations_dict[h] = {}
    

    # Look up eventlog needed for this iteration
    eventlog = eventlog_dict[h].copy(deep = True)
    
    # Format case_id such that only integer value is shown
    eventlog['case:concept:name'] = eventlog['case:concept:name'].str.replace('case_', '')
    eventlog['case:concept:name'] = eventlog['case:concept:name'].astype(int)


    # Look up df_norm and df_audit that belong to the created eventlog
    df_norm = df_norm_dict[h]
    df_audit = df_audit_dict[h]


    # Get deviations and anomalies
    violations_norm = df_norm[(df_norm == 0).any(axis='columns')]
    violations_auditor = df_audit[(df_audit == 0).any(axis='columns')]

    # Generate lists to get indices
    deviations = violations_norm.index.values.tolist() 
    anomalies = violations_auditor.index.values.tolist() 
    exceptions = np.setdiff1d(deviations, anomalies) # find the set difference of two arrays: return unique values in array 1 that are not in array 2

    # print(len(anomalies) + len(exceptions))
    # print(len(deviations))

    # Add column that indicates whether case is a deviation or not
    eventlog['deviation'] = np.where(eventlog['case:concept:name'].isin(deviations), 1, 0)
    eventlog = eventlog[eventlog.deviation != 0]
    
    # print(len(eventlog)) # without conformant cases - so actually deviation log

    # Add column that indicates whether case is an anomaly or not
    eventlog['anomaly'] = np.where(eventlog['case:concept:name'].isin(anomalies), 1, 0)

    # print(anomalies)
    # print(len(anomalies))

    # set number of anomalies and exceptions in entire dataset
    num_anomalies = len(anomalies)
    num_exceptions = len(exceptions)
    num_deviations = num_anomalies + num_exceptions

    # # Check label distribution
    # print("Number of anomalies: " + str(num_anomalies) + 
    #     "\nNumber of exceptions: " + str(num_exceptions))

    
    if num_anomalies != 0 ^ num_exceptions != 0:
        if num_anomalies >= (percent_anomalies * num_exceptions):
            # adjusted_num_anomalies = round(percent_anomalies * num_exceptions)
            # adjusted_num_exceptions = num_exceptions - adjusted_num_anomalies

            adjusted_num_anomalies = round((percent_anomalies * num_exceptions) / (1 - percent_anomalies))
            adjusted_num_exceptions = num_exceptions 
        else: 
            adjusted_num_anomalies = num_anomalies
            adjusted_num_exceptions = round((num_anomalies / percent_anomalies) - num_anomalies)

        # actual_percent = adjusted_num_anomalies / (adjusted_num_anomalies + adjusted_num_exceptions)
        # print("Actual percentage anomalies: " + str(actual_percent))

        # print("Adjusted number of anomalies: " + str(adjusted_num_anomalies))
        # print("Adjusted number of exceptions: " + str(adjusted_num_exceptions))


        # Get df with anomalies only (take into account adjustments)
        anomalies_df = eventlog[eventlog['anomaly']==1]
        adjusted_anomalies = random.sample(sorted(anomalies), adjusted_num_anomalies)
        anomalies_df['to_keep'] = np.where(anomalies_df['case:concept:name'].isin(adjusted_anomalies), 1, 0)

        anomalies_df_new = anomalies_df[anomalies_df['to_keep']==1]
        final_anomalies = anomalies_df_new.drop(['deviation', 'to_keep'],axis='columns')

        # print(len(final_anomalies))

        # Get df with exceptions only (take into account adjustments)
        exceptions_df = eventlog[eventlog['anomaly']==0] 
        adjusted_exceptions = random.sample(sorted(exceptions), adjusted_num_exceptions)
        exceptions_df['to_keep'] = np.where(exceptions_df['case:concept:name'].isin(adjusted_exceptions), 1, 0)

        exception_df_new = exceptions_df[exceptions_df['to_keep']==1]
        final_exceptions = exception_df_new.drop(['deviation', 'to_keep'],axis='columns')

        # Obtain labeled log of deviations
        labeled_deviations = pd.concat([final_anomalies,final_exceptions])

        # Create global variable 
        deviations_dict[h] = labeled_deviations
        
        # Save file 
        labeled_deviations.to_csv(f'labeled_deviations_{h}.csv')

        # else: 
        #     pass

4 Transform event log to trace log

In [None]:
# Dictionaries to store created dataframes
traces_dict = {}

# loop over verschillende df_norm's en df_audit's 
for h in range(1, num_logs+1):  
    
    if h in deviations_dict.keys():
    
        # traces_dict[h] = {}

        deviations = deviations_dict[h]

        df_audit = df_audit_dict[h]
        
        # convert 'time:timestamp' to datetime
        deviations['time:timestamp'] = pd.to_datetime(deviations['time:timestamp'],format="ISO8601")
        deviations_sorted = deviations.groupby('case:concept:name').apply(lambda x: x.sort_values('time:timestamp')).reset_index(drop=True)

        # transform event log to trace log
        traces = deviations_sorted.groupby('case:concept:name').agg({
            'concept:name': lambda x: f"<{', '.join(x)}>",
            'anomaly': 'first'
            }).reset_index()
        traces.columns = ['case:concept:name', 'trace', 'label']

        # merge trace log and results conformance checking
        labeled_traces = pd.merge(left=traces, right=df_audit, how="left", left_on="case:concept:name", right_index=True)

        traces_dict[h] = labeled_traces

        # Save file
        labeled_traces.to_csv(f'labeled_traces_{h}.csv', index=False)

    else:
        pass