# Rule Extraction

#### Dependencies

In [None]:
import pm4py
import numpy as np
import pandas as pd
import shap
from xgboost import XGBClassifier
import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns

#### Import Feature Sets

In [None]:
non_conforming_numerical_features = pd.read_pickle("clustered_non_comforming_numerical.pkl")
non_conforming_categorical_features = pd.read_pickle("clustered_non_comforming_categorical.pkl")

conforming_numerical_features = pd.read_pickle("conforming_numerical_features.pkl")
conforming_categorical_features = pd.read_pickle("conforming_categorical_features.pkl")

In [None]:
conforming_numerical_features = conforming_numerical_features.drop(columns='case:concept:name')
conforming_categorical_features = conforming_categorical_features.drop(columns='case:concept:name')

## Adding Cluster Value to Conforming Traces

In [None]:
conforming_numerical_features["cluster"] = 3
conforming_categorical_features["cluster"] = 3

In [None]:
X_num = pd.concat([non_conforming_numerical_features.drop("cluster", axis=1),
                    conforming_numerical_features.drop("cluster", axis=1)], axis=0)

y_num = pd.concat([non_conforming_numerical_features["cluster"],
                    conforming_numerical_features["cluster"]], axis=0)

In [None]:
df_rules = pd.concat([non_conforming_categorical_features,
                    conforming_categorical_features], axis=0)

In [None]:
model = XGBClassifier(random_state=42)
model.fit(X_num, y_num)

y_pred = model.predict(X_num)

feature_names = X_num.columns
importances = model.feature_importances_

importance_df = pd.DataFrame({
    'feature': feature_names,
    'importance': importances
}).sort_values(by='importance', ascending=False)

print(importance_df.head(10))

In [None]:
plt.figure(figsize=(10, 6))
importance_df.head(15).plot(kind='barh', x='feature', y='importance', legend=False)
plt.title("Top 15 Most Important Features (XGBoost)")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

In [None]:
shap.initjs()

In [None]:
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_num)

shap.summary_plot(shap_values, X_num, plot_type="bar")

In [None]:
from sklearn.tree import DecisionTreeClassifier, export_text

tree = DecisionTreeClassifier(max_depth=4)
tree.fit(X_num, y_num)

print(export_text(tree, feature_names=list(X_num.columns)))

In [None]:
df_rules["cluster"] = df_rules["cluster"].astype(str)

In [None]:
from sklearn.utils import resample

cluster_counts = df_rules['cluster'].explode().value_counts()
print("Cluster distribution in consequents:")
print(cluster_counts)

cluster_3_data = df_rules[df_rules['cluster'] == '3']
other_clusters_data = df_rules[df_rules['cluster'] != '3']

desired_size = cluster_counts.min()

undersampled_cluster_3 = resample(cluster_3_data, 
                                  replace=False,  
                                  n_samples=desired_size, 
                                  random_state=42)

balanced_df_rules = pd.concat([undersampled_cluster_3, other_clusters_data])

new_cluster_counts = balanced_df_rules['cluster'].explode().value_counts()
print("New cluster distribution in consequents:")
print(new_cluster_counts)

In [None]:
df_encoded = pd.get_dummies(balanced_df_rules)

min_support_threshold_for_filtering = 0.05 

item_support = df_encoded.sum() / len(df_encoded)

items_to_keep = item_support[item_support >= min_support_threshold_for_filtering].index
print(f"Original number of items: {df_encoded.shape[1]}")
print(f"Number of items after filtering: {len(items_to_keep)}")

df_filtered = df_encoded[items_to_keep]

In [None]:
from mlxtend.frequent_patterns import fpgrowth

frequent_itemsets = fpgrowth(df_filtered, min_support=0.1, use_colnames=True, max_len=3)

In [None]:
from mlxtend.frequent_patterns import association_rules

rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.6)

## Consequent-Based Association Rule Summarization

In [None]:
excluded_suffixes = ("absent", "Never")

def contains_excluded_suffix(itemset):
  if not itemset:
    return False
  for item in itemset:
    if isinstance(item, str) and item.endswith(excluded_suffixes):
      return True
  return False

rows_to_remove_mask = rules['antecedents'].apply(contains_excluded_suffix) | \
                      rules['consequents'].apply(contains_excluded_suffix)

rules_filtered = rules[~rows_to_remove_mask]

print(f"Original number of rules: {len(rules)}")
print(f"Number of rules after filtering suffixes {excluded_suffixes}: {len(rules_filtered)}")

In [None]:
unique_consequents = rules['consequents'].apply(
    lambda x: {item for item in x if "cluster" in str(item)}
).explode().dropna().unique()

print("Unique consequents containing 'cluster':")
for consequent in unique_consequents:
    print(consequent)

In [None]:
rules_clusters = rules_filtered[rules_filtered['consequents'].apply(
    lambda x: len(x) == 1 and list(x)[0].startswith('cluster_')
)].copy()

rules_clusters = rules_clusters.reset_index(drop=True)

print(f"Number of rules after filtering exclusive cluster consequents: {len(rules_clusters)}")

In [None]:
rules_clusters['consequents'].value_counts()

In [None]:
def is_single_cluster_consequent(x, cluster_name):
    return len(x) == 1 and cluster_name in x

rules_cluster_0 = rules_clusters[rules_clusters['consequents'].apply(
    lambda x: is_single_cluster_consequent(x, 'cluster_0')
)].copy()

rules_cluster_1 = rules_clusters[rules_clusters['consequents'].apply(
    lambda x: is_single_cluster_consequent(x, 'cluster_1')
)].copy()

rules_cluster_2 = rules_clusters[rules_clusters['consequents'].apply(
    lambda x: is_single_cluster_consequent(x, 'cluster_2')
)].copy()

## Visualization

In [None]:
from collections import Counter
import itertools 

TOP_N_RULES_FOR_PLOTS = 50    
TOP_M_FEATURES_FOR_BAR = 15  
TOP_M_FEATURES_FOR_HEATMAP = 10 
SORT_BY_METRIC = 'lift'       

try:
    rules_cluster_0.head()
    rules_cluster_1.head()
    rules_cluster_2.head()
    print("Using existing rules DataFrames.")

    if 'cluster' not in rules_cluster_0.columns:
      rules_cluster_0 = rules_cluster_0.assign(cluster='Cluster_0')
    if 'cluster' not in rules_cluster_1.columns:
      rules_cluster_1 = rules_cluster_1.assign(cluster='Cluster_1')
    if 'cluster' not in rules_cluster_2.columns:
      rules_cluster_2 = rules_cluster_2.assign(cluster='Cluster_2')

except NameError:
    print("Creating dummy rules DataFrames for demonstration.")
    dummy_data = {
        'antecedents': [frozenset({f'feature_{i}', f'feature_{j}'}) for i in range(5) for j in range(i+1, 6)],
        'consequents': [frozenset({'Cluster_X'})] * 10, 
        'support': np.random.rand(10) * 0.1,
        'confidence': np.random.rand(10) * 0.4 + 0.6,
        'lift': np.random.rand(10) * 3 + 1,
    }
    rules_cluster_0 = pd.DataFrame(dummy_data)
    rules_cluster_0['consequents'] = rules_cluster_0['consequents'].apply(lambda x: frozenset({'Cluster_0'}))
    rules_cluster_0['cluster'] = 'Cluster_0'

    rules_cluster_1 = pd.DataFrame(dummy_data)
    rules_cluster_1['antecedents'] = rules_cluster_1['antecedents'].apply(lambda s: frozenset({item.replace('feature_','feat_') for item in s})) 
    rules_cluster_1['consequents'] = rules_cluster_1['consequents'].apply(lambda x: frozenset({'Cluster_1'}))
    rules_cluster_1['confidence'] = np.random.rand(10) * 0.3 + 0.5 
    rules_cluster_1['lift'] = np.random.rand(10) * 2 + 1.5 
    rules_cluster_1['cluster'] = 'Cluster_1'

    rules_cluster_2 = pd.DataFrame(dummy_data)
    rules_cluster_2['antecedents'] = rules_cluster_2['antecedents'].apply(lambda s: frozenset({item.replace('feature_','item_') for item in s}))
    rules_cluster_2['consequents'] = rules_cluster_2['consequents'].apply(lambda x: frozenset({'Cluster_2'}))
    rules_cluster_2['support'] = np.random.rand(10) * 0.05 
    rules_cluster_2['lift'] = np.random.rand(10) * 1 + 1 
    rules_cluster_2['cluster'] = 'Cluster_2'


cluster_rules_dfs = {
    "Cluster_0": rules_cluster_0,
    "Cluster_1": rules_cluster_1,
    "Cluster_2": rules_cluster_2,
}

def plot_feature_frequency(rules_df, cluster_name, top_n_rules, top_m_features, sort_by):
    """Plots a bar chart of the most frequent features in rule antecedents."""
    if rules_df.empty:
        print(f"No rules for {cluster_name} to plot feature frequency.")
        return

    top_rules = rules_df.sort_values(by=sort_by, ascending=False).head(top_n_rules)
    if top_rules.empty:
        print(f"Not enough rules for {cluster_name} after sorting/selecting top {top_n_rules}.")
        return

    antecedent_items = list(itertools.chain.from_iterable(top_rules['antecedents']))
    item_counts = Counter(antecedent_items)

    if not item_counts:
        print(f"No antecedent items found in the top rules for {cluster_name}.")
        return

    features_df = pd.DataFrame(item_counts.items(), columns=['feature', 'count'])
    features_df = features_df.sort_values(by='count', ascending=False).head(top_m_features)

    plt.figure(figsize=(10, max(5, len(features_df)*0.4)))
    sns.barplot(x='count', y='feature', data=features_df, hue='feature', palette='viridis', legend=False)
    plt.title(f'Top {len(features_df)} Most Frequent Features in Antecedents\n(Top {top_n_rules} Rules for {cluster_name}, sorted by {sort_by})')
    plt.xlabel('Frequency in Top Rule Antecedents')
    plt.ylabel('Feature')
    plt.tight_layout()
    plt.show()

def plot_rule_metrics_scatter(rules_df, cluster_name, top_n_rules, sort_by):
    """Plots a scatter plot of rule metrics (Support vs Confidence, colored by Lift)."""
    if rules_df.empty:
        print(f"No rules for {cluster_name} to plot scatter metrics.")
        return

    top_rules = rules_df.sort_values(by=sort_by, ascending=False).head(top_n_rules)
    if top_rules.empty:
        print(f"Not enough rules for {cluster_name} after sorting/selecting top {top_n_rules}.")
        return

    plt.figure(figsize=(10, 6))
    scatter = sns.scatterplot(
        data=top_rules,
        x='support',
        y='confidence',
        hue='lift',
        size='lift',  
        palette='magma',
        sizes=(20, 200), 
        legend='auto'
    )
    plt.title(f'Rule Metrics for {cluster_name}\n(Top {len(top_rules)} Rules sorted by {sort_by})')
    plt.xlabel('Support')
    plt.ylabel('Confidence')
    plt.legend(title='Lift', bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.tight_layout(rect=[0, 0, 0.85, 1])
    plt.show()

def display_top_rules_table(rules_df, cluster_name, top_n_rules, sort_by):
    """Prints the top N rules in a formatted way."""
    if rules_df.empty:
        print(f"No rules for {cluster_name} to display.")
        return

    print(f"\n--- Top {top_n_rules} Rules for {cluster_name} (Sorted by {sort_by}) ---")
    top_rules = rules_df.sort_values(by=sort_by, ascending=False).head(top_n_rules)

    if top_rules.empty:
        print(f"Not enough rules found after sorting/selecting top {top_n_rules}.")
        return

    print(top_rules[['antecedents', 'consequents', 'support', 'confidence', 'lift']].to_string())
    print("-" * (len(cluster_name) + 30))


def plot_metric_distributions(all_rules_list, cluster_names):
    if not all_rules_list:
        print("No rules data provided for distribution plotting.")
        return

    combined_rules = pd.concat(all_rules_list, ignore_index=True)

    if combined_rules.empty:
        print("Combined rules DataFrame is empty. Cannot plot distributions.")
        return

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    sns.boxplot(x='cluster', y='confidence', data=combined_rules, palette='Set2', order=cluster_names)
    plt.title('Confidence Distribution by Cluster')
    plt.xlabel('Cluster')
    plt.ylabel('Confidence')
    plt.xticks(rotation=15, ha='right')

    plt.subplot(1, 2, 2)
    sns.boxplot(x='cluster', y='lift', data=combined_rules, palette='Set2', order=cluster_names)
    plt.title('Lift Distribution by Cluster')
    plt.xlabel('Cluster')
    plt.ylabel('Lift')
    plt.xticks(rotation=15, ha='right')

    plt.suptitle('Comparison of Rule Metric Distributions')
    plt.tight_layout(rect=[0, 0, 1, 0.96]) 
    plt.show()

print("\nGenerating visualizations per cluster...")
for cluster_name, rules_df in cluster_rules_dfs.items():
    print(f"\n--- {cluster_name} ---")
    display_top_rules_table(rules_df, cluster_name, TOP_N_RULES_FOR_PLOTS, SORT_BY_METRIC)
    plot_feature_frequency(rules_df, cluster_name, TOP_N_RULES_FOR_PLOTS, TOP_M_FEATURES_FOR_BAR, SORT_BY_METRIC)
    plot_rule_metrics_scatter(rules_df, cluster_name, TOP_N_RULES_FOR_PLOTS, SORT_BY_METRIC)
    plot_feature_cooccurrence_heatmap(rules_df, cluster_name, TOP_N_RULES_FOR_PLOTS, TOP_M_FEATURES_FOR_HEATMAP, SORT_BY_METRIC)

print("\nGenerating comparison visualizations...")
all_rules_list = [df for df in cluster_rules_dfs.values() if not df.empty]
cluster_names_list = [name for name, df in cluster_rules_dfs.items() if not df.empty]

plot_metric_distributions(all_rules_list, cluster_names_list)