In [4]:
import pandas as pd
import torch
import numpy as np
import random
from itertools import product
import math

In [5]:
def load_dataset():
    # Load the dataset from the CSV file
    df = pd.read_csv("data/diabetes.csv")
    return df

In [6]:
data = load_dataset()
data.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


In [7]:
# Set outcome to random probability
data["Outcome"] = data["Outcome"].apply(lambda x: random.random())

# Rename outcome column
data = data.rename(columns={"Outcome": "Probability"})

# Calculate Uncertainty
data["Uncertainty"] = 1 - 2 * np.abs(data["Probability"] - 0.5)

data.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Probability,Uncertainty
0,6,148,72,35,0,33.6,0.627,50,0.849596,0.300809
1,1,85,66,29,0,26.6,0.351,31,0.221136,0.442272
2,8,183,64,0,0,23.3,0.672,32,0.424408,0.848815
3,1,89,66,23,94,28.1,0.167,21,0.809959,0.380081
4,0,137,40,35,168,43.1,2.288,33,0.685729,0.628541


## Quality Measures

In [49]:
def mean_uncertainty_deviation(subgroup, overall_mean):
    """
    Calculate the Mean Uncertainty Deviation of a subgroup.
    """
    subgroup_mean = subgroup["Uncertainty"].mean()
    deviation = subgroup_mean - overall_mean
    return deviation


# def z_score_uncertainty(subgroup, overall_mean, overall_std):
#     """
#     Calculate the Z-Score of the subgroup's uncertainty.
#     """
#     subgroup_mean = subgroup["Uncertainty"].mean()
#     z_score = (subgroup_mean - overall_mean) / overall_std
#     return z_score # TODO Standard error in the subgroup


def z_score_uncertainty(subgroup, overall_mean):
    """
    Calculate the Z-Score of the subgroup's uncertainty using the subgroup's standard error.
    """
    n = len(subgroup)
    if n > 1:
        subgroup_mean = subgroup["Uncertainty"].mean()
        subgroup_std = subgroup["Uncertainty"].std(ddof=1)  # Sample standard deviation
        subgroup_se = subgroup_std / np.sqrt(n)
        if subgroup_se > 0:
            z_score = (subgroup_mean - overall_mean) / subgroup_se
            return z_score
        else:
            # Standard error is zero, cannot divide by zero
            return np.nan  # or 0?
    else:
        # Subgroup has only one data point, standard error is undefined
        return np.nan


# def wracc_uncertainty(subgroup, overall_mean):
#     """
#     Calculate the Weighted Relative Accuracy (WRAcc) using uncertainty.
#     """
#     p_s = len(subgroup) / len(data)
#     subgroup_mean = subgroup["Uncertainty"].mean()
#     wracc_value = p_s * (subgroup_mean - overall_mean)
#     return wracc_value  # TODO Weight by the entropy of the subgroup


def entropy(probabilities):
    """
    Calculate the entropy given a list of probabilities.
    """
    return -np.sum([p * np.log2(p) for p in probabilities if p > 0])


def wracc_uncertainty(subgroup, overall_mean):
    """
    Calculate the WRAcc weighted by the entropy of the subgroup.
    """
    # Discretize 'Uncertainty' into bins
    num_bins = 5  # Choose the number of bins
    subgroup_copy = subgroup.copy()
    subgroup_copy["Uncertainty_Bin"] = pd.cut(
        subgroup_copy["Uncertainty"], bins=num_bins, labels=False
    )

    # Calculate the probability distribution of the bins
    bin_counts = subgroup_copy["Uncertainty_Bin"].value_counts(normalize=True)
    bin_probabilities = bin_counts.values

    # Calculate entropy of the subgroup
    entropy_value = entropy(bin_probabilities)

    # Calculate WRAcc weighted by entropy
    subgroup_mean = subgroup_copy["Uncertainty"].mean()
    wracc_value = entropy_value * (subgroup_mean - overall_mean)

    return wracc_value


def lift(subgroup, overall_mean):
    """
    Calculate the lift of a subgroup.
    """
    subgroup_mean = subgroup["Uncertainty"].mean()
    lift_value = subgroup_mean / overall_mean
    return lift_value

## Beam Search

In [50]:
class BeamSearchEMM:
    def __init__(self, data, attributes, beam_width=5, max_depth=3, min_support=0.1):
        self.data = data
        self.attributes = attributes
        self.beam_width = beam_width
        self.max_depth = max_depth
        self.min_support = min_support
        self.subgroups = []

    def find_subgroups(self, quality_measure):
        overall_mean = self.data["Uncertainty"].mean()
        overall_std = self.data["Uncertainty"].std()

        # Initialize beam with empty condition
        beam = [({"conditions": [], "depth": 0}, self.data)]

        for depth in range(1, self.max_depth + 1):
            candidates = []
            for parent_cond, parent_data in beam:
                # Generate possible conditions for each attribute
                for attr in self.attributes:
                    # Create conditions using percentiles
                    percentiles = [25, 50, 75]
                    values = np.percentile(self.data[attr], percentiles)
                    operators = ["<=", ">"]

                    for op, val in product(operators, values):
                        new_condition = f"{attr} {op} {val}"
                        if new_condition in parent_cond["conditions"]:
                            continue  # Skip if condition already exists

                        # Combine new condition with parent conditions
                        conditions = parent_cond["conditions"] + [new_condition]
                        subgroup_data = self.apply_conditions(self.data, conditions)

                        # Check if subgroup meets minimum support
                        if len(subgroup_data) / len(self.data) >= self.min_support:
                            # Calculate the quality of the subgroup
                            qm_value = self.calculate_quality(
                                subgroup_data,
                                quality_measure,
                                overall_mean,
                                overall_std,
                            )
                            candidates.append(
                                (
                                    (
                                        {"conditions": conditions, "depth": depth},
                                        subgroup_data,
                                    ),
                                    qm_value,
                                )
                            )

            # Keep top candidates based on quality measure
            candidates.sort(key=lambda x: x[1], reverse=True)
            beam = [candidate for candidate, qm in candidates[: self.beam_width]]

            # Store the best subgroups
            for candidate, qm_value in candidates[: self.beam_width]:
                subgroup_info = {
                    "conditions": candidate[0]["conditions"],
                    "quality": qm_value,
                    "size": len(candidate[1]),
                }
                self.subgroups.append(subgroup_info)

        # Return all subgroups found, sorted by quality
        self.subgroups.sort(key=lambda x: x["quality"], reverse=True)
        return self.subgroups

    def apply_conditions(self, data, conditions):
        """
        Apply a list of conditions to filter the data.
        """
        condition_str = " & ".join(
            [
                f"(data['{cond.split()[0]}'] {cond.split()[1]} {cond.split()[2]})"
                for cond in conditions
            ]
        )
        return data[eval(condition_str)]

    def calculate_quality(
        self, subgroup_data, quality_measure, overall_mean, overall_std
    ):
        """
        Calculate the quality of a subgroup using the specified quality measure.
        """
        if quality_measure == "mean_uncertainty_deviation":
            return mean_uncertainty_deviation(subgroup_data, overall_mean)
        elif quality_measure == "z_score_uncertainty":
            return z_score_uncertainty(subgroup_data, overall_mean)
        elif quality_measure == "wracc_uncertainty":
            return wracc_uncertainty_entropy(subgroup_data, overall_mean)
        elif quality_measure == "lift":
            return lift(subgroup_data, overall_mean)
        else:
            raise ValueError(f"Unknown quality measure: {quality_measure}")

In [51]:
# Define the attributes to consider
attributes = [
    "Pregnancies",
    "Glucose",
    "BloodPressure",
    "SkinThickness",
    "Insulin",
    "BMI",
    "DiabetesPedigreeFunction",
    "Age",
]

In [52]:
# Initialize the BeamSearchEMM object
beam_search_emm = BeamSearchEMM(
    data, attributes, beam_width=3, max_depth=2, min_support=0.05
)

# Find subgroups using 'mean_uncertainty_deviation' as the quality measure
subgroups_mud = beam_search_emm.find_subgroups("mean_uncertainty_deviation")

# Display the top subgroups
print("Subgroups based on Mean Uncertainty Deviation:")
for idx, subgroup in enumerate(subgroups_mud, start=1):
    print(f"Subgroup {idx}:")
    print(f"  Conditions: {' AND '.join(subgroup['conditions'])}")
    print(f"  Quality (Mean Uncertainty Deviation): {subgroup['quality']:.4f}")
    print(f"  Subgroup Size: {subgroup['size']}\n")