In [2]:
!pip install holisticai scikit-learn pandas numpy matplotlib seaborn jax -q

from holisticai.datasets import load_dataset
import numpy as np
from collections import Counter
from holisticai.bias.metrics import average_odds_diff, equal_opportunity_diff
import pandas as pd
from sklearn.metrics import recall_score, confusion_matrix

# dataset = load_dataset("compas_two_year_recid", protected_attribute="sex")
# split_dataset = dataset.train_test_split(test_size=0.2, random_state=42)
# train = split_dataset['train']
# test = split_dataset['test']

# bank_dataset = load_dataset("bank_marketing", protected_attribute="marital")
# split_dataset2 = bank_dataset.train_test_split(test_size=0.2, random_state=42)
# train2 = split_dataset2['train']
# test2 = split_dataset2['test']

# wage_dataset = load_dataset("mw_small", protected_attribute="race")
# split_dataset3 = wage_dataset.train_test_split(test_size=0.2, random_state=42)
# train3 = split_dataset3['train']
# test3 = split_dataset3['test']

### Pre-Pruning Method

In [18]:
import numpy as np
from collections import Counter
# from holisticai.bias.metrics import average_odds_diff
import pandas as pd
from sklearn.metrics import recall_score, confusion_matrix

class PrePruneDecisionTreeClassifier:
    def __init__(self, max_depth=5, min_samples_split=2, fairness_weight=0.5):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.fairness_weight = fairness_weight
        self.tree_ = None

    def _calculate_fairness_deviation(self, group_a, group_b, y_pred, y_true):
        """Safe calculation of fairness deviation with error handling"""

       # NOTE: the odds diff and equal opportunity diff are 0 a lot
        deviation =  1.0 - np.mean(
            [
                np.abs(average_odds_diff(group_a, group_b, y_pred, y_true)+1e-6),
                np.abs(equal_opportunity_diff(group_a, group_b, y_pred, y_true)+1e-6)
            ]
        )
        return deviation

    def _calculate_information_gain(self, X, y, groups, feature_idx, threshold):
        left_idxs, right_idxs = self._split(X[:, feature_idx], threshold)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0, float('inf')

        parent_entropy = self._entropy(y)
        n = len(y)
        n_left, n_right = len(left_idxs), len(right_idxs)
        entropy_left, entropy_right = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_left / n) * entropy_left + (n_right / n) * entropy_right

        # Calculate fairness metrics safely
        group_a_left = groups[left_idxs] == 0
        group_b_left = groups[left_idxs] == 1
        group_a_right = groups[right_idxs] == 0
        group_b_right = groups[right_idxs] == 1

        # Use the most common label in each split as the prediction
        y_pred_left = np.full_like(y[left_idxs], self._most_common_label(y[left_idxs]))
        y_pred_right = np.full_like(y[right_idxs], self._most_common_label(y[right_idxs]))

        # Calculate fairness deviations safely
        fairness_deviation_left = self._calculate_fairness_deviation(
            group_a_left, group_b_left, y_pred_left, y[left_idxs])
        fairness_deviation_right = self._calculate_fairness_deviation(
            group_a_right, group_b_right, y_pred_right, y[right_idxs])

        # Average the fairness deviations
        fairness_deviation = (fairness_deviation_left + fairness_deviation_right) / 2

        information_gain = parent_entropy - child_entropy
        return information_gain, fairness_deviation

    def fit(self, X, y, groups):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, pd.Series):
            y = y.values
        if isinstance(groups, pd.Series):
            groups = groups.values
        self.tree_ = self._grow_tree(X, y, groups, depth=0)

    # Rest of the CustomDecisionTreeClassifier methods remain the same...
    def _grow_tree(self, X, y, groups, depth):
        num_samples, num_features = X.shape
        if (depth >= self.max_depth or num_samples < self.min_samples_split or len(set(y)) == 1):
            return {'label': self._most_common_label(y)}

        best_split = self._find_best_split(X, y, groups)
        if not best_split:
            return {'label': self._most_common_label(y)}

        left_idxs, right_idxs = self._split(X[:, best_split['feature_idx']], best_split['threshold'])

        left_subtree = self._grow_tree(X[left_idxs], y[left_idxs], groups[left_idxs], depth + 1)
        right_subtree = self._grow_tree(X[right_idxs], y[right_idxs], groups[right_idxs], depth + 1)

        return {'feature_idx': best_split['feature_idx'], 'threshold': best_split['threshold'],
                'left': left_subtree, 'right': right_subtree}


    def _find_best_split(self, X, y, groups):
        num_samples, num_features = X.shape
        best_split = {}
        best_gain = -float('inf')

        for feature_idx in range(num_features):
            thresholds = np.unique(X[:, feature_idx])
            for threshold in thresholds:
                gain, fairness_deviation = self._calculate_information_gain(X, y, groups, feature_idx, threshold)
                
                fairness_penalty = self.fairness_weight * fairness_deviation
                #print(f"fairness pen: {fairness_penalty}, fairness deviation: {fairness_deviation}")
                adjusted_gain = gain - fairness_penalty

                if adjusted_gain > best_gain:
                    best_gain = adjusted_gain
                    best_split = {'feature_idx': feature_idx, 'threshold': threshold}

        return best_split if best_gain != -float('inf') else None

    def _split(self, feature_column, threshold):
        left_idxs = np.argwhere(feature_column <= threshold).flatten()
        right_idxs = np.argwhere(feature_column > threshold).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log2(p) for p in ps if p > 0])

    def _most_common_label(self, y):
        return Counter(y).most_common(1)[0][0]

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        return np.array([self._traverse_tree(x, self.tree_) for x in X])

    def _traverse_tree(self, x, node):
        if 'label' in node:
            return node['label']
        if x[node['feature_idx']] <= node['threshold']:
            return self._traverse_tree(x, node['left'])
        return self._traverse_tree(x, node['right'])

class PrePruneRandomForestClassifier:
    def __init__(self, n_estimators=10, max_depth=5, min_samples_split=2, fairness_weight=0.5, random_state=None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.fairness_weight = fairness_weight
        self.random_state = random_state
        self.trees = []

    def fit(self, X, y, groups):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, pd.Series):
            y = y.values
        if isinstance(groups, pd.Series):
            groups = groups.values

        if self.random_state is not None:
            np.random.seed(self.random_state)
        
        self.trees = []
        for _ in range(self.n_estimators):
            idxs = np.random.choice(len(X), len(X), replace=True)
            X_sample, y_sample = X[idxs], y[idxs]
            group_sample = groups[idxs]

            tree = PrePruneDecisionTreeClassifier(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                fairness_weight=self.fairness_weight
            )
            tree.fit(X_sample, y_sample, group_sample)
            self.trees.append(tree)

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        return np.round(tree_preds.mean(axis=0)).astype(int)
    


In [51]:
!pip install codecarbon

from codecarbon import EmissionsTracker
tracker = EmissionsTracker()

Collecting codecarbon
  Downloading codecarbon-2.7.4-py3-none-any.whl.metadata (8.7 kB)
Collecting arrow (from codecarbon)
  Using cached arrow-1.3.0-py3-none-any.whl.metadata (7.5 kB)
Collecting click (from codecarbon)
  Using cached click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting fief-client[cli] (from codecarbon)
  Downloading fief_client-0.20.0-py3-none-any.whl.metadata (2.1 kB)
Collecting prometheus-client (from codecarbon)
  Downloading prometheus_client-0.21.0-py3-none-any.whl.metadata (1.8 kB)
Collecting py-cpuinfo (from codecarbon)
  Downloading py_cpuinfo-9.0.0-py3-none-any.whl.metadata (794 bytes)
Collecting pynvml (from codecarbon)
  Downloading pynvml-11.5.3-py3-none-any.whl.metadata (8.8 kB)
Collecting questionary (from codecarbon)
  Downloading questionary-2.0.1-py3-none-any.whl.metadata (5.4 kB)
Collecting rapidfuzz (from codecarbon)
  Downloading rapidfuzz-3.10.1-cp312-cp312-macosx_11_0_arm64.whl.metadata (11 kB)
Collecting typer (from codecarbon)
  Downloadi

[codecarbon INFO @ 13:08:16] [setup] RAM Tracking...
[codecarbon INFO @ 13:08:16] [setup] GPU Tracking...
[codecarbon INFO @ 13:08:16] No GPU found.
[codecarbon INFO @ 13:08:16] [setup] CPU Tracking...
 Mac OS and ARM processor detected: Please enable PowerMetrics sudo to measure CPU

[codecarbon INFO @ 13:08:16] CPU Model on constant consumption mode: Apple M1 Pro
[codecarbon INFO @ 13:08:16] >>> Tracker's metadata:
[codecarbon INFO @ 13:08:16]   Platform system: macOS-15.1-arm64-arm-64bit
[codecarbon INFO @ 13:08:16]   Python version: 3.12.3
[codecarbon INFO @ 13:08:16]   CodeCarbon version: 2.7.4
[codecarbon INFO @ 13:08:16]   Available RAM : 16.000 GB
[codecarbon INFO @ 13:08:16]   CPU count: 8
[codecarbon INFO @ 13:08:16]   CPU model: Apple M1 Pro
[codecarbon INFO @ 13:08:16]   GPU count: None
[codecarbon INFO @ 13:08:16]   GPU model: None
[codecarbon INFO @ 13:08:17] Saving emissions data to file /Users/rishi/Documents/holistic-hack/emissions.csv


In [10]:
import numpy as np
from collections import Counter
import pandas as pd
from sklearn.metrics import recall_score, confusion_matrix

class Node:
    """Helper class to store tree node information similar to sklearn's _tree.Tree"""
    def __init__(self, n_node_samples=0, value=None):
        self.children_left = -1  # -1 indicates leaf node
        self.children_right = -1
        self.feature = -2  # -2 indicates uninitialized, -1 for leaf
        self.threshold = float('nan')
        self.n_node_samples = n_node_samples
        self.value = value if value is not None else []
        self.label = None  # Added to store the prediction label for leaf nodes

class Tree:
    """Helper class to mimic sklearn's Tree structure"""
    def __init__(self):
        self.node_count = 0
        self.nodes = []
        self.n_classes = []
        self.max_depth = 0
        
    def add_node(self, node):
        self.nodes.append(node)
        self.node_count += 1
        return self.node_count - 1  # Return index of added node

class PrePruneDecisionTreeClassifier:
    def __init__(self, max_depth=5, min_samples_split=2, fairness_weight=0.5):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.fairness_weight = fairness_weight
        self.tree_ = None
        self._original_tree = None  # Store original tree structure for prediction

    def _calculate_fairness_deviation(self, group_a, group_b, y_pred, y_true):
        """Safe calculation of fairness deviation with error handling"""
        deviation = 1.0 - np.mean([
            np.abs(average_odds_diff(group_a, group_b, y_pred, y_true)+1e-6),
            np.abs(equal_opportunity_diff(group_a, group_b, y_pred, y_true)+1e-6)
        ])
        return deviation

    def _build_sklearn_tree(self, node, parent_idx=-1, is_left=True):
        if not self._sklearn_tree:
            self._sklearn_tree = Tree()
            
        if 'label' in node:
            # Leaf node
            sklearn_node = Node(
                n_node_samples=len(node.get('samples', [])),
                value=[[0, 0] if node['label'] == 0 else [0, 1]]
            )
            sklearn_node.feature = -1  # Indicates leaf
            sklearn_node.label = node['label']  # Store the prediction label
            node_idx = self._sklearn_tree.add_node(sklearn_node)
        else:
            # Internal node
            sklearn_node = Node(
                n_node_samples=len(node.get('samples', [])),
                value=[[0, 0]]  # Placeholder, would normally contain class distribution
            )
            sklearn_node.feature = node['feature_idx']
            sklearn_node.threshold = node['threshold']
            node_idx = self._sklearn_tree.add_node(sklearn_node)
            
            # Recursively build left and right subtrees
            left_idx = self._build_sklearn_tree(node['left'], node_idx, True)
            right_idx = self._build_sklearn_tree(node['right'], node_idx, False)
            
            # Update parent node with children indices
            sklearn_node.children_left = left_idx
            sklearn_node.children_right = right_idx
        
        # Update parent's child index if this isn't the root
        if parent_idx >= 0:
            parent_node = self._sklearn_tree.nodes[parent_idx]
            if is_left:
                parent_node.children_left = node_idx
            else:
                parent_node.children_right = node_idx
                
        return node_idx
    
    def fit(self, X, y, groups):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, pd.Series):
            y = y.values
        if isinstance(groups, pd.Series):
            groups = groups.values
            
        # Store original tree structure
        self._original_tree = self._grow_tree(X, y, groups, depth=0)
        
        # Build sklearn-style tree structure
        self._sklearn_tree = None
        self._build_sklearn_tree(self._original_tree)
        
        # Make tree_ attribute point to sklearn-style tree
        self.tree_ = self._sklearn_tree


    def _traverse_tree_sklearn(self, x, tree, node_id=0):
        """Traverse the sklearn-style tree structure"""
        node = tree.nodes[node_id]
        
        if node.feature == -1:  # Leaf node
            return node.label
            
        if x[node.feature] <= node.threshold:
            return self._traverse_tree_sklearn(x, tree, node.children_left)
        return self._traverse_tree_sklearn(x, tree, node.children_right)


    def _calculate_information_gain(self, X, y, groups, feature_idx, threshold):
        left_idxs, right_idxs = self._split(X[:, feature_idx], threshold)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0, float('inf')

        parent_entropy = self._entropy(y)
        n = len(y)
        n_left, n_right = len(left_idxs), len(right_idxs)
        entropy_left, entropy_right = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_left / n) * entropy_left + (n_right / n) * entropy_right

        # Calculate fairness metrics safely
        group_a_left = groups[left_idxs] == 0
        group_b_left = groups[left_idxs] == 1
        group_a_right = groups[right_idxs] == 0
        group_b_right = groups[right_idxs] == 1

        # Use the most common label in each split as the prediction
        y_pred_left = np.full_like(y[left_idxs], self._most_common_label(y[left_idxs]))
        y_pred_right = np.full_like(y[right_idxs], self._most_common_label(y[right_idxs]))

        # Calculate fairness deviations safely
        fairness_deviation_left = self._calculate_fairness_deviation(
            group_a_left, group_b_left, y_pred_left, y[left_idxs])
        fairness_deviation_right = self._calculate_fairness_deviation(
            group_a_right, group_b_right, y_pred_right, y[right_idxs])

        # Average the fairness deviations
        fairness_deviation = (fairness_deviation_left + fairness_deviation_right) / 2

        information_gain = parent_entropy - child_entropy
        return information_gain, fairness_deviation

    def predict(self, X):
        """Make predictions using the sklearn-style tree structure"""
        if isinstance(X, pd.DataFrame):
            X = X.values
        return np.array([self._traverse_tree_sklearn(x, self.tree_) for x in X])

    def _grow_tree(self, X, y, groups, depth):
        num_samples, num_features = X.shape
        if (depth >= self.max_depth or num_samples < self.min_samples_split or len(set(y)) == 1):
            return {'label': self._most_common_label(y), 'num_samples': num_samples, 'depth': depth}

        best_split = self._find_best_split(X, y, groups)
        if not best_split:
            return {'label': self._most_common_label(y), 'num_samples': num_samples, 'depth': depth}

        left_idxs, right_idxs = self._split(X[:, best_split['feature_idx']], best_split['threshold'])

        left_subtree = self._grow_tree(X[left_idxs], y[left_idxs], groups[left_idxs], depth + 1)
        right_subtree = self._grow_tree(X[right_idxs], y[right_idxs], groups[right_idxs], depth + 1)

        return {'feature_idx': best_split['feature_idx'], 'threshold': best_split['threshold'],
                'left': left_subtree, 'right': right_subtree, 'num_samples': num_samples, 'depth': depth}


    # [Rest of the methods remain the same...]
    def compute_weighted_average_depth(self):
        if not self.tree_:
            return 0

        total_depth = 0
        total_samples = 0

        def traverse(node):
            nonlocal total_depth, total_samples
            if 'label' in node:
                # Leaf node
                total_depth += node['depth'] * node['num_samples']
                total_samples += node['num_samples']
                return

            # Traverse left and right subtrees
            traverse(node['left'])
            traverse(node['right'])

        traverse(self.tree_)
        return total_depth / total_samples if total_samples != 0 else 0

    def _find_best_split(self, X, y, groups):
        num_samples, num_features = X.shape
        best_split = {}
        best_gain = -float('inf')

        for feature_idx in range(num_features):
            thresholds = np.unique(X[:, feature_idx])
            for threshold in thresholds:
                gain, fairness_deviation = self._calculate_information_gain(X, y, groups, feature_idx, threshold)
                
                fairness_penalty = self.fairness_weight * fairness_deviation
                #print(f"fairness pen: {fairness_penalty}, fairness deviation: {fairness_deviation}")
                adjusted_gain = gain - fairness_penalty

                if adjusted_gain > best_gain:
                    best_gain = adjusted_gain
                    best_split = {'feature_idx': feature_idx, 'threshold': threshold}

        return best_split if best_gain != -float('inf') else None

    def _split(self, feature_column, threshold):
        left_idxs = np.argwhere(feature_column <= threshold).flatten()
        right_idxs = np.argwhere(feature_column > threshold).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log2(p) for p in ps if p > 0])

    def _most_common_label(self, y):
        return Counter(y).most_common(1)[0][0]

    def _traverse_tree(self, x, node):
        if 'label' in node:
            return node['label']
        if x[node['feature_idx']] <= node['threshold']:
            return self._traverse_tree(x, node['left'])
        return self._traverse_tree(x, node['right'])
    
    def tree_depth(self):
        """Compute the depth of the tree."""
        return self._compute_depth(self.tree_)

    def _compute_depth(self, node):
        if isinstance(node, dict) and 'label' in node:
            return 0
        left_depth = self._compute_depth(node['left']) if isinstance(node, dict) and 'left' in node else 0
        right_depth = self._compute_depth(node['right']) if isinstance(node, dict) and 'right' in node else 0
        return 1 + max(left_depth, right_depth)



class PrePruneRandomForestClassifier:
    def __init__(self, n_estimators=10, max_depth=5, min_samples_split=2, fairness_weight=0.5, random_state=None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.fairness_weight = fairness_weight
        self.random_state = random_state
        self.trees = []

    def fit(self, X, y, groups):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, pd.Series):
            y = y.values
        if isinstance(groups, pd.Series):
            groups = groups.values

        if self.random_state is not None:
            np.random.seed(self.random_state)
        
        self.trees = []
        for _ in range(self.n_estimators):
            idxs = np.random.choice(len(X), len(X), replace=True)
            X_sample, y_sample = X[idxs], y[idxs]
            group_sample = groups[idxs]

            tree = PrePruneDecisionTreeClassifier(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                fairness_weight=self.fairness_weight
            )
            tree.fit(X_sample, y_sample, group_sample)
            self.trees.append(tree)

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        return np.round(tree_preds.mean(axis=0)).astype(int)
    
    def average_tree_depth(self):
        """
        Compute the average depth of all trees in the forest.
        """
        if not self.trees:
            raise ValueError("The forest has not been trained yet. Please fit the model first.")
        
        total_depth = sum(tree.tree_depth() for tree in self.trees)
        return total_depth / len(self.trees)

In [11]:
from codecarbon import EmissionsTracker

# Example usage
X = compas_train['X']
y = compas_train['y']
groups = np.where(compas_train['p_attrs']["sex"].to_numpy() == "Male", 1, 0)

# Create and fit the model
pre_prune_clf = PrePruneRandomForestClassifier(
    n_estimators=5,
    max_depth=16,
    fairness_weight=0.5,
    random_state=42
)

# Initialize the emissions tracker
tracker = EmissionsTracker()
tracker.start()

pre_prune_clf.fit(X, y, groups)

# Stop the tracker and get emissions
emissions = tracker.stop()

# Output total emissions
print(f"Total carbon emissions during training: {emissions:.4f} kg CO2eq")

# Make predictions
predictions = pre_prune_clf.predict(compas_test['X'])

# Compute accuracy
print("accuracy", np.count_nonzero(predictions == compas_test['y']) / len(compas_test['y']))

# Compute fairness metrics
print("equal opportunity diff", equal_opportunity_diff(compas_test['group_a'], compas_test['group_b'], predictions, compas_test['y']))
print("average odds diff", average_odds_diff(compas_test['group_a'], compas_test['group_b'], predictions, compas_test['y']))

# Compute average tree depth
average_depth = pre_prune_clf.average_tree_depth()
print("Average tree depth in the random forest:", average_depth)


[codecarbon INFO @ 15:06:00] [setup] RAM Tracking...
[codecarbon INFO @ 15:06:00] [setup] GPU Tracking...
[codecarbon INFO @ 15:06:00] No GPU found.
[codecarbon INFO @ 15:06:00] [setup] CPU Tracking...
 Mac OS and ARM processor detected: Please enable PowerMetrics sudo to measure CPU

[codecarbon INFO @ 15:06:00] CPU Model on constant consumption mode: Apple M1 Pro
[codecarbon INFO @ 15:06:00] >>> Tracker's metadata:
[codecarbon INFO @ 15:06:00]   Platform system: macOS-15.1-arm64-arm-64bit
[codecarbon INFO @ 15:06:00]   Python version: 3.12.3
[codecarbon INFO @ 15:06:00]   CodeCarbon version: 2.7.4
[codecarbon INFO @ 15:06:00]   Available RAM : 16.000 GB
[codecarbon INFO @ 15:06:00]   CPU count: 8
[codecarbon INFO @ 15:06:00]   CPU model: Apple M1 Pro
[codecarbon INFO @ 15:06:00]   GPU count: None
[codecarbon INFO @ 15:06:00]   GPU model: None
[codecarbon INFO @ 15:06:01] Saving emissions data to file /Users/rishi/Documents/holistic-hack/emissions.csv
[codecarbon INFO @ 15:06:16] Ener

Total carbon emissions during training: 0.0001 kg CO2eq
accuracy 0.6550607287449393
equal opportunity diff -0.00521607691545245
average odds diff -0.0657650549769987
Average tree depth in the random forest: 1.0


In [27]:
# Example usage
X = compas_train['X']
y = compas_train['y']
groups = np.where(compas_train['p_attrs']["sex"].to_numpy() == "Male", 1, 0)
# Create and fit the model
pre_prune_clf = PrePruneRandomForestClassifier(
    n_estimators=5,
    max_depth=16,
    fairness_weight=0.5,
    random_state=42
)
pre_prune_clf.fit(X, y, groups)

# Make predictions
predictions = pre_prune_clf.predict(compas_test['X'])

print("accuracy", np.count_nonzero(predictions == compas_test['y'])/len(compas_test['y']))
print("equal opportunity diff", equal_opportunity_diff(compas_test['group_a'], compas_test['group_b'], predictions, compas_test['y']))
print("average odds diff", average_odds_diff(compas_test['group_a'], compas_test['group_b'], predictions, compas_test['y']))

from holisticai.explainability.metrics import weighted_average_depth
weighted_avg_depth = weighted_average_depth(pre_prune_clf.tree_)
print(f"Weighted Average Depth: {weighted_avg_depth}")


accuracy 0.6550607287449393
equal opportunity diff -0.00521607691545245
average odds diff -0.0657650549769987


AttributeError: 'PrePruneRandomForestClassifier' object has no attribute 'tree_'

In [21]:

def compute_weighted_average_depth(model):
    if not model.tree_:
        return 0

    total_depth = 0
    total_samples = 0

    def traverse(node):
        nonlocal total_depth, total_samples
        if 'label' in node:
            # Leaf node
            total_depth += node['depth'] * node['num_samples']
            total_samples += node['num_samples']
            return

        # Traverse left and right subtrees
        traverse(node['left'])
        traverse(node['right'])

    traverse(model.tree_)
    return total_depth / total_samples if total_samples != 0 else 0

weighted_avg_depth = compute_weighted_average_depth(pre_prune_clf)
print(f"Weighted Average Depth: {weighted_avg_depth}")

AttributeError: 'PrePruneRandomForestClassifier' object has no attribute 'tree_'

In [9]:
weighted_average_explainability_score(pre_prune_clf.tree_)


AttributeError: 'PrePruneRandomForestClassifier' object has no attribute 'tree_'

In [15]:
def weighted_average_explainability_score(tree):
    # This function will traverse the tree and compute an explainability score.
    
    def traverse_and_score(node, depth=0):
        # If the node is a leaf, return its contribution to explainability
        if 'label' in node:
            return 1 / (depth + 1)  # The deeper the leaf, the lower the score (more complex path)
        
        # If the node is internal, traverse the left and right branches
        left_score = traverse_and_score(node['left'], depth + 1)
        right_score = traverse_and_score(node['right'], depth + 1)
        
        # The score contribution of this node will be influenced by its depth and both branches
        return (left_score + right_score) / 2
    
    # Start traversal from the root node
    total_score = traverse_and_score(tree)
    return total_score

# Example usage

# Calculate the weighted average explainability score using clf.tree_
explainability_score = weighted_average_explainability_score(pre_prune_clf.tree_)
print("Weighted Average Explainability Score:", explainability_score)


AttributeError: 'PrePruneRandomForestClassifier' object has no attribute 'tree_'

In [10]:
# Loop through each tree and calculate the explainability score
explainability_scores = []
for idx, tree in enumerate(pre_prune_clf.trees):
    try:
        score = weighted_average_explainability_score(tree.tree_)
        explainability_scores.append(score)
    except AttributeError:
        print(f"Tree {idx} does not have a valid tree_ structure.")

# Compute the average explainability score across all trees
if explainability_scores:
    avg_explainability_score = np.mean(explainability_scores)
    print("Weighted Average Explainability Score:", avg_explainability_score)
else:
    print("No valid explainability scores were computed.")

Tree 0 does not have a valid tree_ structure.
Tree 1 does not have a valid tree_ structure.
Tree 2 does not have a valid tree_ structure.
Tree 3 does not have a valid tree_ structure.
Tree 4 does not have a valid tree_ structure.
No valid explainability scores were computed.


In [4]:
print("accuracy", np.count_nonzero(predictions == test['y'])/len(test['y']))
print("equal opportunity diff", equal_opportunity_diff(test['group_a'], test['group_b'], predictions, test['y']))
print("average odds diff", average_odds_diff(test['group_a'], test['group_b'], predictions, test['y']))

accuracy 0.6550607287449393
equal opportunity diff -0.00521607691545245
average odds diff -0.0657650549769987


In [5]:
# 1. why is fairness deviation = 0
# 2. Add terms for race and gender bias (?) + model needs to trade off
# 3. Fix hyperparmaeter optimization to get the best model + scores]


# Limitation: protected characterstic needs to be binary 
# Extension: pairwise bias and how to mitigate that 

### Post-Pruning Method

In [7]:
import numpy as np
from collections import Counter
import pandas as pd
from sklearn.metrics import recall_score, accuracy_score
from holisticai.bias.metrics import average_odds_diff

class PostPruneDecisionTreeClassifier:
    def __init__(self, max_depth=5, min_samples_split=2, verbose=False):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree_ = None
        self.verbose = verbose

    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, pd.Series):
            y = y.values
        self.tree_ = self._grow_tree(X, y, depth=0)

    def _grow_tree(self, X, y, depth):
        num_samples, num_features = X.shape
        if (depth >= self.max_depth or num_samples < self.min_samples_split or len(set(y)) == 1):
            return {'label': self._most_common_label(y)}

        best_split = self._find_best_split(X, y)
        
        if not best_split:
            return {'label': self._most_common_label(y)}

        left_idxs, right_idxs = self._split(X[:, best_split['feature_idx']], best_split['threshold'])

        # If either split is empty, return the most common label
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return {'label': self._most_common_label(y)}

        left_subtree = self._grow_tree(X[left_idxs], y[left_idxs], depth + 1)
        right_subtree = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1)

        return {'feature_idx': best_split['feature_idx'], 'threshold': best_split['threshold'],
                'left': left_subtree, 'right': right_subtree}

    def _find_best_split(self, X, y):
        num_samples, num_features = X.shape
        best_split = {}
        best_gain = -float('inf')

        for feature_idx in range(num_features):
            thresholds = np.unique(X[:, feature_idx])
            for threshold in thresholds:
                gain = self._calculate_information_gain(X, y, feature_idx, threshold)
                if gain > best_gain:
                    best_gain = gain
                    best_split = {'feature_idx': feature_idx, 'threshold': threshold}

        return best_split if best_gain != -float('inf') else None
    

    def _calculate_fairness_deviation(self, group_a, group_b, y_pred, y_true):
        """Safe calculation of fairness deviation with error handling"""

       # NOTE: the odds diff and equal opportunity diff are 0 a lot
        deviation =  1.0 - np.mean(
            [
                np.abs(average_odds_diff(group_a, group_b, y_pred, y_true)+1e-6),
                np.abs(equal_opportunity_diff(group_a, group_b, y_pred, y_true)+1e-6)
            ]
        )
        return deviation

    def _calculate_information_gain(self, X, y, feature_idx, threshold):
        left_idxs, right_idxs = self._split(X[:, feature_idx], threshold)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        parent_entropy = self._entropy(y)
        n = len(y)
        n_left, n_right = len(left_idxs), len(right_idxs)
        entropy_left, entropy_right = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_left / n) * entropy_left + (n_right / n) * entropy_right

        information_gain = parent_entropy - child_entropy
        return information_gain

    def _split(self, feature_column, threshold):
        left_idxs = np.argwhere(feature_column <= threshold).flatten()
        right_idxs = np.argwhere(feature_column > threshold).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        if len(y) == 0:
            return 0
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log2(p) for p in ps if p > 0])

    def _most_common_label(self, y):
        if len(y) == 0:
            return 0  # Default to label 0 if y is empty
        return Counter(y).most_common(1)[0][0]

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        return np.array([self._traverse_tree(x, self.tree_) for x in X])

    def _traverse_tree(self, x, node):
        if 'label' in node:
            return node['label']
        if x[node['feature_idx']] <= node['threshold']:
            return self._traverse_tree(x, node['left'])
        return self._traverse_tree(x, node['right'])

    def prune_for_fairness_and_accuracy(self, X, y, groups):
        """
        Post-training pruning to improve fairness and maintain accuracy.
        Start pruning from the leaves to avoid removing too much of the tree at once.
        """
        group_a = np.where(groups == 1, 1, 0)
        group_b = np.where(groups == 0, 1, 0)
        initial_leaves_count = self._count_leaves(self.tree_)
        initial_fairness = self._calculate_fairness_deviation(group_a, group_b, self.predict(X), y)#1 - average_odds_diff(group_a, group_b, self.predict(X), y)
        initial_accuracy = accuracy_score(y, self.predict(X))

        self._prune_tree(self.tree_, X, y, groups)

        final_leaves_count = self._count_leaves(self.tree_)
        final_fairness = self._calculate_fairness_deviation(group_a, group_b, self.predict(X), y) #1 - average_odds_diff(groups, groups, self.predict(X), y)
        final_accuracy = accuracy_score(y, self.predict(X))
        if self.verbose == True:
            print(f"Initial number of leaves: {initial_leaves_count}")
            print(f"Final number of leaves: {final_leaves_count}")
            print(f"Difference in number of leaves: {initial_leaves_count - final_leaves_count}")
            print(f"Fairness before pruning: {initial_fairness}")
            print(f"Fairness after pruning: {final_fairness}")
            print(f"Difference in fairness: {final_fairness - initial_fairness}")
            print(f"Accuracy before pruning: {initial_accuracy}")
            print(f"Accuracy after pruning: {final_accuracy}")
            print(f"Difference in accuracy: {final_accuracy - initial_accuracy}")

    def _prune_tree(self, node, X, y, groups):
        if 'label' in node:
            return

        left_idxs, right_idxs = self._split(X[:, node['feature_idx']], node['threshold'])
        
        # Check if split produces empty nodes
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            node.clear()
            node['label'] = self._most_common_label(y)
            return

        # Recursively prune left and right subtrees first (bottom-up approach)
        self._prune_tree(node['left'], X[left_idxs], y[left_idxs], groups[left_idxs])
        self._prune_tree(node['right'], X[right_idxs], y[right_idxs], groups[right_idxs])

        # Only consider pruning if both children are leaves
        if 'label' not in node['left'] or 'label' not in node['right']:
            return

        # Calculate metrics before pruning
        group_a = np.where(groups == 1, 1, 0)
        group_b = np.where(groups == 0, 1, 0)


        y_pred = self.predict(X)
        accuracy_before = accuracy_score(y, y_pred)

        fairness_before = self._calculate_fairness_deviation(group_a, group_b, y_pred, y)#1 - average_odds_diff(group_a, group_b, y_pred, y)


        # Temporarily prune the node
        original_node = node.copy()
        node.clear()
        node['label'] = self._most_common_label(y)

        # Calculate metrics after pruning
        y_pred_pruned = self.predict(X)
        accuracy_after = accuracy_score(y, y_pred_pruned)
        fairness_after = self._calculate_fairness_deviation(group_a, group_b, y_pred_pruned, y)#1 - average_odds_diff(group_a, group_b, y_pred_pruned, y)

        # Define minimum acceptable changes
        MIN_ACCURACY_DROP = 0.05  # Allow maximum 2% accuracy drop
        MIN_FAIRNESS_IMPROVEMENT = 0.025  # Require at least 5% fairness improvement

        # Restore the node if:
        # 1. Accuracy drops more than threshold OR
        # 2. Fairness doesn't improve enough
        if (accuracy_before - accuracy_after > MIN_ACCURACY_DROP or 
            fairness_after - fairness_before < MIN_FAIRNESS_IMPROVEMENT):
            node.clear()
            node.update(original_node)
        
    def _count_leaves(self, node):
        if 'label' in node:
            return 1
        return self._count_leaves(node['left']) + self._count_leaves(node['right'])


    def tree_depth(self):
        """Compute the depth of the tree."""
        return self._compute_depth(self.tree_)

    def _compute_depth(self, node):
        if isinstance(node, dict) and 'label' in node:
            return 0
        left_depth = self._compute_depth(node['left']) if isinstance(node, dict) and 'left' in node else 0
        right_depth = self._compute_depth(node['right']) if isinstance(node, dict) and 'right' in node else 0
        return 1 + max(left_depth, right_depth)


class PostPruneRandomForestClassifier:
    def __init__(self, n_estimators=10, max_depth=5, min_samples_split=2, random_state=None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.random_state = random_state
        self.trees = []

    def fit(self, X, y, groups):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, pd.Series):
            y = y.values
        if isinstance(groups, pd.Series):
            groups = groups.values

        if self.random_state is not None:
            np.random.seed(self.random_state)
        
        self.trees = []
        for _ in range(self.n_estimators):
            idxs = np.random.choice(len(X), len(X), replace=True)
            X_sample, y_sample = X[idxs], y[idxs]
            group_sample = groups[idxs]

            tree = PostPruneDecisionTreeClassifier(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split
            )
            tree.fit(X_sample, y_sample)
            tree.prune_for_fairness_and_accuracy(X_sample, y_sample, group_sample)
            self.trees.append(tree)

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        return np.round(tree_preds.mean(axis=0)).astype(int)

    def average_tree_depth(self):
        """
        Compute the average depth of all trees in the forest.
        """
        if not self.trees:
            raise ValueError("The forest has not been trained yet. Please fit the model first.")
        
        total_depth = sum(tree.tree_depth() for tree in self.trees)
        return total_depth / len(self.trees)



In [8]:
# Example usage
X = compas_train['X']
y = compas_train['y']
groups = np.where(compas_train['p_attrs']["sex"].to_numpy() == "Male", 1, 0)

# Create and fit the model
post_prune_clf = PostPruneRandomForestClassifier(
    n_estimators=5,
    max_depth=50,
    random_state=42
)
post_prune_clf.fit(X, y, groups)

# Make predictions
predictions = post_prune_clf.predict(compas_test['X'])

# Compute average tree depth
post_average_depth = post_prune_clf.average_tree_depth()
print("Average tree depth in the post-pruned random forest:", post_average_depth)



Average tree depth in the post-pruned random forest: 27.4


In [48]:
print("accuracy", np.count_nonzero(predictions == compas_test['y'])/len(compas_test['y']))
print("equal opportunity diff", equal_opportunity_diff(compas_test['group_a'], compas_test['group_b'], predictions, compas_test['y']))
print("average odds diff", average_odds_diff(compas_test['group_a'], compas_test['group_b'], predictions, compas_test['y']))

accuracy 0.637246963562753
equal opportunity diff -0.02783972643473087
average odds diff -0.08915048942413048


### Baseline RandomForest Classifier

In [37]:
import numpy as np
from collections import Counter
import pandas as pd
from sklearn.metrics import recall_score, accuracy_score

class BaselineDecisionTreeClassifier:
    def __init__(self, max_depth=5, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree_ = None

    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, pd.Series):
            y = y.values
        if len(y) == 0:
            raise ValueError("Cannot train on empty dataset")
        self.tree_ = self._grow_tree(X, y, depth=0)

    def _grow_tree(self, X, y, depth):
        num_samples, num_features = X.shape
        
        # Check for empty arrays
        if num_samples == 0:
            return {'label': 0}  # Default label for empty node
            
        if (depth >= self.max_depth or 
            num_samples < self.min_samples_split or 
            len(set(y)) == 1):
            return {'label': self._most_common_label(y)}

        best_split = self._find_best_split(X, y)
        if not best_split:
            return {'label': self._most_common_label(y)}

        feature_idx = best_split['feature_idx']
        threshold = best_split['threshold']
        
        # Get the split indices
        left_idxs, right_idxs = self._split(X[:, feature_idx], threshold)
        
        # Check if split is valid
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return {'label': self._most_common_label(y)}

        left_subtree = self._grow_tree(X[left_idxs], y[left_idxs], depth + 1)
        right_subtree = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1)

        return {
            'feature_idx': feature_idx,
            'threshold': threshold,
            'left': left_subtree,
            'right': right_subtree
        }

    def _find_best_split(self, X, y):
        num_samples, num_features = X.shape
        if num_samples < 2:  # Need at least 2 samples to split
            return None
            
        best_split = {}
        best_gain = -float('inf')

        for feature_idx in range(num_features):
            thresholds = np.unique(X[:, feature_idx])
            for threshold in thresholds:
                gain = self._calculate_information_gain(X, y, feature_idx, threshold)
                if gain > best_gain:
                    best_gain = gain
                    best_split = {'feature_idx': feature_idx, 'threshold': threshold}

        return best_split if best_gain > -float('inf') else None

    def _calculate_information_gain(self, X, y, feature_idx, threshold):
        left_idxs, right_idxs = self._split(X[:, feature_idx], threshold)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return -float('inf')

        parent_entropy = self._entropy(y)
        n = len(y)
        n_left, n_right = len(left_idxs), len(right_idxs)
        entropy_left, entropy_right = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_left / n) * entropy_left + (n_right / n) * entropy_right

        information_gain = parent_entropy - child_entropy
        return information_gain

    def _split(self, feature_column, threshold):
        left_idxs = np.argwhere(feature_column <= threshold).flatten()
        right_idxs = np.argwhere(feature_column > threshold).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        if len(y) == 0:
            return 0
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log2(p) for p in ps if p > 0])

    def _most_common_label(self, y):
        if len(y) == 0:
            return 0  # Default label for empty array
        counter = Counter(y)
        if not counter:
            return 0  # Default label for empty counter
        return counter.most_common(1)[0][0]

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if not self.tree_:
            raise ValueError("Tree not fitted. Call fit() first.")
        return np.array([self._traverse_tree(x, self.tree_) for x in X])

    def _traverse_tree(self, x, node):
        if 'label' in node:
            return node['label']
        if x[node['feature_idx']] <= node['threshold']:
            return self._traverse_tree(x, node['left'])
        return self._traverse_tree(x, node['right'])
    

    def tree_depth(self):
        """Compute the depth of the tree."""
        return self._compute_depth(self.tree_)

    def _compute_depth(self, node):
        if isinstance(node, dict) and 'label' in node:
            return 0
        left_depth = self._compute_depth(node['left']) if isinstance(node, dict) and 'left' in node else 0
        right_depth = self._compute_depth(node['right']) if isinstance(node, dict) and 'right' in node else 0
        return 1 + max(left_depth, right_depth)



class BaselineRandomForestClassifier:
    def __init__(self, n_estimators=10, max_depth=5, min_samples_split=2, random_state=None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.random_state = random_state
        self.trees = []

    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, pd.Series):
            y = y.values
        if len(y) == 0:
            raise ValueError("Cannot train on empty dataset")

        if self.random_state is not None:
            np.random.seed(self.random_state)
        
        self.trees = []
        for _ in range(self.n_estimators):
            idxs = np.random.choice(len(X), len(X), replace=True)
            X_sample, y_sample = X[idxs], y[idxs]

            tree = BaselineDecisionTreeClassifier(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split
            )
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = X.values
        if not self.trees:
            raise ValueError("Model not fitted. Call fit() first.")
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        return np.round(tree_preds.mean(axis=0)).astype(int)

    def average_tree_depth(self):
        """
        Compute the average depth of all trees in the forest.
        """
        if not self.trees:
            raise ValueError("The forest has not been trained yet. Please fit the model first.")
        
        total_depth = sum(tree.tree_depth() for tree in self.trees)
        return total_depth / len(self.trees)



In [38]:
# Example usage
def get_results(train_set, test_set):
    X = train_set['X']
    y = train_set['y']

    # Create and fit the model
    baseline_clf = BaselineRandomForestClassifier(
        n_estimators=5,
        max_depth=16,
        random_state=42
    )
    baseline_clf.fit(X, y)

    # Make predictions
    predictions = baseline_clf.predict(test_set['X'])

    print("accuracy", np.count_nonzero(predictions == test_set['y'])/len(test_set['y']))
    print("equal opportunity diff", equal_opportunity_diff(test_set['group_a'], test_set['group_b'], predictions, test_set['y']))
    print("average odds diff", average_odds_diff(test_set['group_a'], test_set['group_b'], predictions, test_set['y']))

    # Compute average tree depth
    baseline_average_depth = baseline_clf.average_tree_depth()
    print("Average tree depth in the post-pruned random forest:", baseline_average_depth)


get_results(compas_train, compas_test)



accuracy 0.6502024291497975
equal opportunity diff -0.05227227673704038
average odds diff -0.10671824606559538
Average tree depth in the post-pruned random forest: 16.0


In [13]:
get_results(wage_train, wage_test)

accuracy 0.938
equal opportunity diff -0.01157299722371985
average odds diff -0.0640901519704925


In [1]:
# load the datasets

compas_dataset = load_dataset("compas_two_year_recid", protected_attribute="sex")
compas_split_dataset = compas_dataset.train_test_split(test_size=0.2, random_state=42)
compas_train = compas_split_dataset['train']
compas_test = compas_split_dataset['test']

bank_dataset = load_dataset("bank_marketing", protected_attribute="marital")
bank_split_dataset = bank_dataset.train_test_split(test_size=0.2, random_state=42)
bank_train = bank_split_dataset['train']
bank_test = bank_split_dataset['test']

wage_dataset = load_dataset("mw_small", protected_attribute="race")
wage_split_dataset = wage_dataset.train_test_split(test_size=0.2, random_state=42)
wage_train = wage_split_dataset['train']
wage_test = wage_split_dataset['test']

compas_groups = np.where(compas_train['p_attrs']['sex'].to_numpy() == 'Male', 1, 0)
bank_groups = np.where(bank_train['p_attrs']['marital'].to_numpy() == 1, 1, 0)
wage_groups = np.where(wage_train['p_attrs']["race"].to_numpy() == "White", 1, 0)

# Example usage
X = train['X']
y = train['y']

# Create and fit the model
baseline_clf = BaselineRandomForestClassifier(
    n_estimators=3,
    max_depth=3,
    random_state=42
)
baseline_clf.fit(X, y)

# Make predictions
predictions = baseline_clf.predict(test['X'])

print("accuracy", np.count_nonzero(predictions == test['y'])/len(test['y']))
print("equal opportunity diff", equal_opportunity_diff(test['group_a'], test['group_b'], predictions, test['y']))
print("average odds diff", average_odds_diff(test['group_a'], test['group_b'], predictions, test['y']))


pre_prune_clf = PrePruneRandomForestClassifier(
    n_estimators=5,
    max_depth=16,
    fairness_weight=0.5,
    random_state=42
)
pre_prune_clf.fit(X, y, groups)

# Make predictions
predictions = pre_prune_clf.predict(test['X'])

print("accuracy", np.count_nonzero(predictions == test['y'])/len(test['y']))
print("equal opportunity diff", equal_opportunity_diff(test['group_a'], test['group_b'], predictions, test['y']))
print("average odds diff", average_odds_diff(test['group_a'], test['group_b'], predictions, test['y']))

# Example usage
X = train['X']
y = train['y']

# Create and fit the model
post_prune_clf = PostPruneRandomForestClassifier(
    n_estimators=10,
    max_depth=50,
    random_state=42
)
post_prune_clf.fit(X, y, groups)

# Make predictions
predictions = post_prune_clf.predict(test['X'])


NameError: name 'load_dataset' is not defined

In [5]:
from tqdm import tqdm
import numpy as np

# Load the datasets
compas_dataset = load_dataset("compas_two_year_recid", protected_attribute="sex")
compas_split_dataset = compas_dataset.train_test_split(test_size=0.2, random_state=42)
compas_train = compas_split_dataset['train']
compas_test = compas_split_dataset['test']

bank_dataset = load_dataset("bank_marketing", protected_attribute="marital")
bank_split_dataset = bank_dataset.train_test_split(test_size=0.2, random_state=42)
bank_train = bank_split_dataset['train']
bank_test = bank_split_dataset['test']

wage_dataset = load_dataset("mw_small", protected_attribute="race")
wage_split_dataset = wage_dataset.train_test_split(test_size=0.2, random_state=42)
wage_train = wage_split_dataset['train']
wage_test = wage_split_dataset['test']

# Define groups for each dataset
compas_groups_train = np.where(compas_train['p_attrs']['sex'].to_numpy() == 'Male', 1, 0)
compas_groups_test = np.where(compas_test['p_attrs']['sex'].to_numpy() == 'Male', 1, 0)

bank_groups_train = np.where(bank_train['p_attrs']['marital'].to_numpy() == 1, 1, 0)
bank_groups_test = np.where(bank_test['p_attrs']['marital'].to_numpy() == 1, 1, 0)

wage_groups_train = np.where(wage_train['p_attrs']['race'].to_numpy() == "White", 1, 0)
wage_groups_test = np.where(wage_test['p_attrs']['race'].to_numpy() == "White", 1, 0)


In [19]:

def get_results_post(train, test, groups):
    # Example usage
    X = train['X']
    y = train['y']
    #groups = np.where(train['p_attrs']["sex"].to_numpy() == "Male", 1, 0)

    # Create and fit the model
    post_prune_clf = PostPruneRandomForestClassifier(
        n_estimators=5,
        max_depth=16,
        random_state=42
    )
    post_prune_clf.fit(X, y, groups)

    # Make predictions
    predictions = post_prune_clf.predict(test['X'])
    print("accuracy", np.count_nonzero(predictions == test['y'])/len(test['y']))
    print("equal opportunity diff", equal_opportunity_diff(test['group_a'], test['group_b'], predictions, test['y']))
    print("average odds diff", average_odds_diff(test['group_a'], test['group_b'], predictions, test['y']))


get_results_post(compas_train, compas_test, compas_groups_train)

accuracy 0.648582995951417
equal opportunity diff -0.05603875507978995
average odds diff -0.10860148523697016


In [20]:
get_results_post(wage_train, wage_test, wage_groups_train)

accuracy 0.938
equal opportunity diff -0.011390914193858181
average odds diff -0.06002925183074248


In [21]:
get_results_post(bank_train, bank_test, bank_groups_train)

accuracy 0.8942828707287405
equal opportunity diff 0.004921584282305824
average odds diff -0.021952119592195396


In [6]:
def get_results_pre(train, test, groups):

    # Example usage
    X = train['X']
    y = train['y']
    #groups = np.where(train['p_attrs']["sex"].to_numpy() == "Male", 1, 0)
    # Create and fit the model
    pre_prune_clf = PrePruneRandomForestClassifier(
        n_estimators=5,
        max_depth=16,
        fairness_weight=0.5,
        random_state=42
    )
    pre_prune_clf.fit(X, y, groups)

    # Make predictions
    predictions = pre_prune_clf.predict(test['X'])

    print("accuracy", np.count_nonzero(predictions == test['y'])/len(test['y']))
    print("equal opportunity diff", equal_opportunity_diff(test['group_a'], test['group_b'], predictions, test['y']))
    print("average odds diff", average_odds_diff(test['group_a'], test['group_b'], predictions, test['y']))
    return pre_prune_clf, predictions

pp_clf, preds = get_results_pre(compas_train, compas_test, compas_groups_train)

#get_results_pre(compas_train, compas_test, compas_groups_train)
#get_results_pre(wage_train, wage_train, wage_groups_train)
#get_results_pre(bank_train, bank_test, bank_groups_train)

accuracy 0.6550607287449393
equal opportunity diff -0.00521607691545245
average odds diff -0.0657650549769987


In [7]:
from holisticai.explainability.metrics import weighted_average_depth, tree_depth_variance, weighted_average_depth, weighted_average_explainability_score

tree_depth_variance(pp_clf.tree_)

AttributeError: 'PrePruneRandomForestClassifier' object has no attribute 'tree_'

In [6]:
print(results)

[{'model': 'BaselineRandomForestClassifier', 'dataset': 'COMPAS', 'accuracy': 0.6518218623481782, 'equal_opportunity_diff': np.float64(-0.059706115571414364), 'average_odds_diff': np.float64(-0.11079272549327074)}, {'model': 'PrePruneRandomForestClassifier', 'dataset': 'COMPAS', 'accuracy': 0.6526315789473685, 'equal_opportunity_diff': np.float64(-0.05212359996035276), 'average_odds_diff': np.float64(-0.09738310340560127)}, {'model': 'PostPruneRandomForestClassifier', 'dataset': 'COMPAS', 'accuracy': 0.6477732793522267, 'equal_opportunity_diff': np.float64(-0.06630984240261673), 'average_odds_diff': np.float64(-0.11076928081132958)}, {'model': 'BaselineRandomForestClassifier', 'dataset': 'Bank Marketing', 'accuracy': 0.8993696782041358, 'equal_opportunity_diff': np.float64(0.005837640478291495), 'average_odds_diff': np.float64(-0.017994007323841765)}]


In [7]:
train_and_evaluate_models(bank_train, bank_test, bank_groups_train, bank_groups_test, "Bank Marketing")
print(results)
#train_and_evaluate_models(wage_train, wage_test, wage_groups_train, wage_groups_test, "Wage")

KeyboardInterrupt: 

In [None]:
train_and_evaluate_models(wage_train, wage_test, wage_groups_train, wage_groups_test, "Wage")
print(results)

In [None]:
for result in results:
    print(f"\nModel: {result['model']} | Dataset: {result['dataset']}")
    print(f"Accuracy: {result['accuracy']:.4f}")
    print(f"Equal Opportunity Diff: {result['equal_opportunity_diff']:.4f}")
    print(f"Average Odds Diff: {result['average_odds_diff']:.4f}")

KeyboardInterrupt: 