# grail.fairness.model.metrics

> This module contains functions for computing fairness metrics for models using prediction results.

In [None]:
# | default_exp fairness.model.metrics

In [None]:
# | hide
# from nbdev.showdoc import *

In [None]:
# import os
# os.chdir('./..')

In [None]:
pip install fairlearn==0.9.0

In [None]:
# # | export
# import pandas as pd
# from scipy.stats import entropy
# import numpy as np
# from grail.fairness.data.utils import remark_spiel_generator

In [None]:
import pandas as pd

# import matplotlib.pyplot as plt
# import seaborn as sns
from sklearn.metrics import accuracy_score, precision_score, recall_score
from fairlearn.metrics import (
    MetricFrame,
    count,
    false_negative_rate,
    false_positive_rate,
    selection_rate,
    true_negative_rate,
    true_positive_rate,
    # _balanced_root_mean_squared_error
)

from sklearn.metrics import confusion_matrix

# path = '/content/drive/MyDrive/data/ascore_with_protected_feats.csv'

# Load Test Dataset

In [None]:
df = pd.read_excel("./data/credit_card_default.xls", header=1)
df.rename(columns={"default payment next month": "target"}, inplace=True)
df = df.drop(columns=["ID"])
# Convert columns to object type
df["SEX"] = df["SEX"].astype("object")
df["EDUCATION"] = df["EDUCATION"].astype("object")
df["MARRIAGE"] = df["MARRIAGE"].astype("object")

print("Size of DataFrame: ", df.shape)
print("DataFrame Columns: ", df.columns)
display(df.head())
df.dtypes

Size of DataFrame:  (30000, 24)
DataFrame Columns:  Index(['LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'PAY_0', 'PAY_2',
       'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2',
       'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1',
       'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6', 'target'],
      dtype='object')


Unnamed: 0,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,...,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,target
0,20000,2,2,1,24,2,2,-1,-1,-2,...,0,0,0,0,689,0,0,0,0,1
1,120000,2,2,2,26,-1,2,0,0,0,...,3272,3455,3261,0,1000,1000,1000,0,2000,1
2,90000,2,2,2,34,0,0,0,0,0,...,14331,14948,15549,1518,1500,1000,1000,1000,5000,0
3,50000,2,2,1,37,0,0,0,0,0,...,28314,28959,29547,2000,2019,1200,1100,1069,1000,0
4,50000,1,2,1,57,-1,0,-1,0,0,...,20940,19146,19131,2000,36681,10000,9000,689,679,0


LIMIT_BAL     int64
SEX          object
EDUCATION    object
MARRIAGE     object
AGE           int64
PAY_0         int64
PAY_2         int64
PAY_3         int64
PAY_4         int64
PAY_5         int64
PAY_6         int64
BILL_AMT1     int64
BILL_AMT2     int64
BILL_AMT3     int64
BILL_AMT4     int64
BILL_AMT5     int64
BILL_AMT6     int64
PAY_AMT1      int64
PAY_AMT2      int64
PAY_AMT3      int64
PAY_AMT4      int64
PAY_AMT5      int64
PAY_AMT6      int64
target        int64
dtype: object

# Train a simple ML model

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# Define the features and target
X = df.drop(columns=["target"])
y = df["target"]

# Define the preprocessing steps for numeric and categorical features
numeric_features = [
    "LIMIT_BAL",
    "AGE",
    "PAY_0",
    "PAY_2",
    "PAY_3",
    "PAY_4",
    "PAY_5",
    "PAY_6",
    "BILL_AMT1",
    "BILL_AMT2",
    "BILL_AMT3",
    "BILL_AMT4",
    "BILL_AMT5",
    "BILL_AMT6",
    "PAY_AMT1",
    "PAY_AMT2",
    "PAY_AMT3",
    "PAY_AMT4",
    "PAY_AMT5",
    "PAY_AMT6",
]
categorical_features = ["SEX", "EDUCATION", "MARRIAGE"]

# Create the preprocessing pipelines for both numeric and categorical data
numeric_transformer = Pipeline(
    steps=[("scaler", StandardScaler()), ("pca", PCA(n_components=0.95))]
)

categorical_transformer = Pipeline(
    steps=[("onehot", OneHotEncoder()), ("svd", TruncatedSVD(n_components=4))]
)

# Combine preprocessing steps
preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features),
    ]
)

# Create the complete pipeline with a classifier
pipeline = Pipeline(
    steps=[("preprocessor", preprocessor), ("classifier", RandomForestClassifier())]
)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Train the model
pipeline.fit(X_train, y_train)

# Make predictions
y_pred = pipeline.predict(X_test)

# Print the classification report
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

           0       0.84      0.94      0.89      4687
           1       0.63      0.35      0.45      1313

    accuracy                           0.81      6000
   macro avg       0.73      0.64      0.67      6000
weighted avg       0.79      0.81      0.79      6000



# Predict for the Entire DataFrame

In [None]:
proba_class_1 = pipeline.predict_proba(df.drop(columns=["target"]))[:, 1]
proba_class_1

array([0.86, 0.79, 0.02, ..., 0.78, 0.74, 0.11], shape=(30000,))

In [None]:
y_pred = pipeline.predict(df.drop(columns=["target"]))
proba_class_1 = pipeline.predict_proba(df.drop(columns=["target"]))[:, 1]
df["pred_proba"] = proba_class_1
df["pred"] = y_pred
df

Unnamed: 0,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,...,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,target,pred,pred_proba
0,20000,2,2,1,24,2,2,-1,-1,-2,...,0,0,689,0,0,0,0,1,1,0.86
1,120000,2,2,2,26,-1,2,0,0,0,...,3261,0,1000,1000,1000,0,2000,1,1,0.79
2,90000,2,2,2,34,0,0,0,0,0,...,15549,1518,1500,1000,1000,1000,5000,0,0,0.02
3,50000,2,2,1,37,0,0,0,0,0,...,29547,2000,2019,1200,1100,1069,1000,0,0,0.01
4,50000,1,2,1,57,-1,0,-1,0,0,...,19131,2000,36681,10000,9000,689,679,0,0,0.06
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
29995,220000,1,3,1,39,0,0,0,0,0,...,15980,8500,20000,5003,3047,5000,1000,0,0,0.23
29996,150000,1,3,2,43,-1,-1,-1,-1,0,...,0,1837,3526,8998,129,0,0,0,0,0.11
29997,30000,1,2,2,37,4,3,2,-1,0,...,19357,0,0,22000,4200,2000,3100,1,1,0.78
29998,80000,1,3,1,41,1,-1,0,0,0,...,48944,85900,3409,1178,1926,52964,1804,1,1,0.74


# Defining Fairness Metrics

In [None]:
from typing import List, Optional


def negative_predictive_value(
    y_true: List[int],
    y_pred: List[int],
    sample_weight: Optional[List[float]] = None,
    pos_label: Optional[int] = None,
) -> float:
    """
    Calculate the Negative Predictive Value (NPV).

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sample_weight (Optional[List[float]]): Sample weights.
        pos_label (Optional[int]): The label of the positive class.

    Returns:
        float: Negative Predictive Value.
    """
    tn, fp, fn, tp = confusion_matrix(
        y_true, y_pred, sample_weight=sample_weight
    ).ravel()
    return tn / (tn + fn)


def false_discovery_rate(
    y_true: List[int],
    y_pred: List[int],
    sample_weight: Optional[List[float]] = None,
    pos_label: Optional[int] = None,
) -> float:
    """
    Calculate the False Discovery Rate (FDR).

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sample_weight (Optional[List[float]]): Sample weights.
        pos_label (Optional[int]): The label of the positive class.

    Returns:
        float: False Discovery Rate.
    """
    tn, fp, fn, tp = confusion_matrix(
        y_true, y_pred, sample_weight=sample_weight
    ).ravel()
    return fp / (fp + tp)


def false_omission_rate(
    y_true: List[int],
    y_pred: List[int],
    sample_weight: Optional[List[float]] = None,
    pos_label: Optional[int] = None,
) -> float:
    """
    Calculate the False Omission Rate (FOR).

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sample_weight (Optional[List[float]]): Sample weights.
        pos_label (Optional[int]): The label of the positive class.

    Returns:
        float: False Omission Rate.
    """
    tn, fp, fn, tp = confusion_matrix(
        y_true, y_pred, sample_weight=sample_weight
    ).ravel()
    return fn / (fn + tn)


def positive_predictive_value(
    y_true: List[int],
    y_pred: List[int],
    sample_weight: Optional[List[float]] = None,
    pos_label: Optional[int] = None,
) -> float:
    """
    Calculate the Positive Predictive Value (PPV).

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sample_weight (Optional[List[float]]): Sample weights.
        pos_label (Optional[int]): The label of the positive class.

    Returns:
        float: Positive Predictive Value.
    """
    tn, fp, fn, tp = confusion_matrix(
        y_true, y_pred, sample_weight=sample_weight
    ).ravel()
    return tp / (tp + fp)


def recall_parity_ratio(
    y_true: List[int],
    y_pred: List[int],
    *,
    sensitive_features: List[int],
    method: str = "between_groups",
    sample_weight: Optional[List[float]] = None,
) -> float:
    """
    Calculate the Recall Parity Ratio.

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sensitive_features (List[int]): Sensitive features.
        method (str): Method to calculate the ratio.
        sample_weight (Optional[List[float]]): Sample weights.

    Returns:
        float: Recall Parity Ratio.
    """
    mf = MetricFrame(
        metrics={"tpr": true_positive_rate},
        y_true=y_true,
        y_pred=y_pred,
        sensitive_features=sensitive_features,
    )
    return min(mf.ratio(method=method))


def equalized_odds_ratio_modified(
    y_true: List[int],
    y_pred: List[int],
    *,
    sensitive_features: List[int],
    method: str = "between_groups",
    sample_weight: Optional[List[float]] = None,
) -> float:
    """
    Calculate the Equalized Odds Ratio.

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sensitive_features (List[int]): Sensitive features.
        method (str): Method to calculate the ratio.
        sample_weight (Optional[List[float]]): Sample weights.

    Returns:
        float: Equalized Odds Ratio.
    """
    mf = MetricFrame(
        metrics={"tpr": true_positive_rate, "fpr": false_positive_rate},
        y_true=y_true,
        y_pred=y_pred,
        sensitive_features=sensitive_features,
    )
    return min(mf.ratio(method=method).loc[lambda x: x > 0])


def false_discovery_rate_parity_ratio(
    y_true: List[int],
    y_pred: List[int],
    *,
    sensitive_features: List[int],
    method: str = "between_groups",
    sample_weight: Optional[List[float]] = None,
) -> float:
    """
    Calculate the False Discovery Rate Parity Ratio.

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sensitive_features (List[int]): Sensitive features.
        method (str): Method to calculate the ratio.
        sample_weight (Optional[List[float]]): Sample weights.

    Returns:
        float: False Discovery Rate Parity Ratio.
    """
    mf = MetricFrame(
        metrics={"fdr": false_discovery_rate},
        y_true=y_true,
        y_pred=y_pred,
        sensitive_features=sensitive_features,
    )
    return min(mf.ratio(method=method))


def false_omission_rate_parity_ratio(
    y_true: List[int],
    y_pred: List[int],
    *,
    sensitive_features: List[int],
    method: str = "between_groups",
    sample_weight: Optional[List[float]] = None,
) -> float:
    """
    Calculate the False Omission Rate Parity Ratio.

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sensitive_features (List[int]): Sensitive features.
        method (str): Method to calculate the ratio.
        sample_weight (Optional[List[float]]): Sample weights.

    Returns:
        float: False Omission Rate Parity Ratio.
    """
    mf = MetricFrame(
        metrics={"for": false_omission_rate},
        y_true=y_true,
        y_pred=y_pred,
        sensitive_features=sensitive_features,
    )
    return min(mf.ratio(method=method))


def precision_parity_ratio(
    y_true: List[int],
    y_pred: List[int],
    *,
    sensitive_features: List[int],
    method: str = "between_groups",
    sample_weight: Optional[List[float]] = None,
) -> float:
    """
    Calculate the Precision Parity Ratio.

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sensitive_features (List[int]): Sensitive features.
        method (str): Method to calculate the ratio.
        sample_weight (Optional[List[float]]): Sample weights.

    Returns:
        float: Precision Parity Ratio.
    """
    mf = MetricFrame(
        metrics={"ppv": positive_predictive_value},
        y_true=y_true,
        y_pred=y_pred,
        sensitive_features=sensitive_features,
    )
    return min(mf.ratio(method=method))


def true_negative_rate_parity(
    y_true: List[int],
    y_pred: List[int],
    *,
    sensitive_features: List[int],
    method: str = "between_groups",
    sample_weight: Optional[List[float]] = None,
) -> float:
    """
    Calculate the True Negative Rate Parity.

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sensitive_features (List[int]): Sensitive features.
        method (str): Method to calculate the ratio.
        sample_weight (Optional[List[float]]): Sample weights.

    Returns:
        float: True Negative Rate Parity.
    """
    mf = MetricFrame(
        metrics={"tnr": true_negative_rate},
        y_true=y_true,
        y_pred=y_pred,
        sensitive_features=sensitive_features,
    )
    return min(mf.ratio(method=method))


def negative_predictive_value_parity(
    y_true: List[int],
    y_pred: List[int],
    *,
    sensitive_features: List[int],
    method: str = "between_groups",
    sample_weight: Optional[List[float]] = None,
) -> float:
    """
    Calculate the Negative Predictive Value Parity.

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sensitive_features (List[int]): Sensitive features.
        method (str): Method to calculate the ratio.
        sample_weight (Optional[List[float]]): Sample weights.

    Returns:
        float: Negative Predictive Value Parity.
    """
    mf = MetricFrame(
        metrics={"npv": negative_predictive_value},
        y_true=y_true,
        y_pred=y_pred,
        sensitive_features=sensitive_features,
    )
    return min(mf.ratio(method=method))


def positive_predictive_value_parity(
    y_true: List[int],
    y_pred: List[int],
    *,
    sensitive_features: List[int],
    method: str = "between_groups",
    sample_weight: Optional[List[float]] = None,
) -> float:
    """
    Calculate the Positive Predictive Value Parity.

    Args:
        y_true (List[int]): True labels.
        y_pred (List[int]): Predicted labels.
        sensitive_features (List[int]): Sensitive features.
        method (str): Method to calculate the ratio.
        sample_weight (Optional[List[float]]): Sample weights.

    Returns:
        float: Positive Predictive Value Parity.
    """
    mf = MetricFrame(
        metrics={"ppv": positive_predictive_value},
        y_true=y_true,
        y_pred=y_pred,
        sensitive_features=sensitive_features,
    )
    return min(mf.ratio(method=method))

In [None]:
from typing import List, Optional, Union
import pandas as pd


def get_data_subset(
    df: pd.DataFrame,
    y_true_col: str,
    y_pred_col: str,
    y_prob_col: str,
    sf_cols: List[str] = [],
    impute_strategy: str = "drop",
) -> pd.DataFrame:
    """
    Get a subset of the dataframe with specified columns and handle missing values.

    Args:
        df (pd.DataFrame): The input dataframe.
        y_true_col (str): The column name for true labels.
        y_pred_col (str): The column name for predicted labels.
        y_prob_col (str): The column name for predicted probabilities.
        sf_cols (List[str], optional): List of sensitive feature columns. Defaults to [].
        impute_strategy (str, optional): Strategy to handle missing values. Defaults to 'drop'.

    Returns:
        pd.DataFrame: The subset of the dataframe with specified columns and handled missing values.
    """
    columns = [y_true_col, y_pred_col, y_prob_col] + sf_cols
    if impute_strategy == "drop":
        return df[columns].dropna(subset=sf_cols)
    else:
        return df[columns].dropna(subset=sf_cols)


def generate_metric_frame(
    df: pd.DataFrame,
    y_true_col: str,
    y_pred_col: str,
    y_prob_col: str,
    sf_cols: List[str] = [],
    aggregate: Optional[str] = None,
    **mfkwargs,
) -> Union[MetricFrame, pd.DataFrame, None]:
    """
    Generate a MetricFrame for fairness evaluation.

    Args:
        df (pd.DataFrame): The input dataframe.
        y_true_col (str): The column name for true labels.
        y_pred_col (str): The column name for predicted labels.
        y_prob_col (str): The column name for predicted probabilities.
        sf_cols (List[str], optional): List of sensitive feature columns. Defaults to [].
        aggregate (Optional[str], optional): Aggregation method for the metrics. Defaults to None.
        **mfkwargs: Additional keyword arguments for MetricFrame.

    Returns:
        Union[MetricFrame, pd.DataFrame, None]: The MetricFrame or aggregated metrics.
    """
    metrics = {
        "accuracy_score": accuracy_score,
        "precision_score": precision_score,
        "recall_score": recall_score,
        "selection_rate": selection_rate,
        "true_negative_rate": true_negative_rate,
        "false_positive_rate": false_positive_rate,
        "false_negative_rate": false_negative_rate,
        "true_positive_rate": true_positive_rate,
        "negative_predictive_value": negative_predictive_value,
        "false_discovery_rate": false_discovery_rate,
        "false_omission_rate": false_omission_rate,
        "positive_predictive_value": positive_predictive_value,
        "count": count,
    }

    s_df = get_data_subset(df, y_true_col, y_pred_col, y_prob_col, sf_cols)

    mf = MetricFrame(
        metrics=metrics,
        y_true=s_df[y_true_col],
        y_pred=s_df[y_pred_col],
        sensitive_features=s_df[sf_cols],
        **mfkwargs,
    )

    if not aggregate:
        return mf
    elif aggregate == "by_group":
        return mf.by_group
    elif aggregate == "ratio":
        return mf.ratio()
    else:
        return None

In [None]:
df

Unnamed: 0,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,...,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,target
0,20000,2,2,1,24,2,2,-1,-1,-2,...,0,0,0,0,689,0,0,0,0,1
1,120000,2,2,2,26,-1,2,0,0,0,...,3272,3455,3261,0,1000,1000,1000,0,2000,1
2,90000,2,2,2,34,0,0,0,0,0,...,14331,14948,15549,1518,1500,1000,1000,1000,5000,0
3,50000,2,2,1,37,0,0,0,0,0,...,28314,28959,29547,2000,2019,1200,1100,1069,1000,0
4,50000,1,2,1,57,-1,0,-1,0,0,...,20940,19146,19131,2000,36681,10000,9000,689,679,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
29995,220000,1,3,1,39,0,0,0,0,0,...,88004,31237,15980,8500,20000,5003,3047,5000,1000,0
29996,150000,1,3,2,43,-1,-1,-1,-1,0,...,8979,5190,0,1837,3526,8998,129,0,0,0
29997,30000,1,2,2,37,4,3,2,-1,0,...,20878,20582,19357,0,0,22000,4200,2000,3100,1
29998,80000,1,3,1,41,1,-1,0,0,0,...,52774,11855,48944,85900,3409,1178,1926,52964,1804,1


In [None]:
data_subset = get_data_subset(
    df,
    y_true_col="target",
    y_pred_col="pred",
    y_prob_col="pred_proba",
    sf_cols=["SEX"],
    impute_strategy="drop",
)
data_subset

Unnamed: 0,target,pred,pred_proba,SEX
0,1,1,0.86,2
1,1,1,0.79,2
2,0,0,0.02,2
3,0,0,0.01,2
4,0,0,0.06,1
...,...,...,...,...
29995,0,0,0.23,1
29996,0,0,0.11,1
29997,1,1,0.78,1
29998,1,1,0.74,1


In [None]:
mf = generate_metric_frame(
    df, y_true_col="target", y_pred_col="pred", y_prob_col="pred_proba", sf_cols=["SEX"]
)

mf.by_group

  mf = mf.applymap(lambda x: x if np.isscalar(x) else np.nan)
  mf = mf.applymap(lambda x: x if np.isscalar(x) else np.nan)
  mf = mf.applymap(lambda x: x if np.isscalar(x) else np.nan)


Unnamed: 0_level_0,accuracy_score,precision_score,recall_score,selection_rate,true_negative_rate,false_positive_rate,false_negative_rate,true_positive_rate,negative_predictive_value,false_discovery_rate,false_omission_rate,positive_predictive_value,count
SEX,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
1,0.957436,0.951201,0.86843,0.220643,0.985801,0.014199,0.13157,0.86843,0.959201,0.048799,0.040799,0.951201,11888.0
2,0.964775,0.955938,0.870582,0.189212,0.989477,0.010523,0.129418,0.870582,0.966837,0.044062,0.033163,0.955938,18112.0
