# BiasX: Analyzing Gender Bias in Face Classification

## 1. Environment setup

### 1.1. Install required packages

In [None]:
# !pip install mediapipe numpy==1.26.4

### 1.2. Import required packages

In [None]:
import os
import warnings

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
warnings.filterwarnings("ignore")

In [None]:
from io import BytesIO

import matplotlib.pyplot as plt
import mediapipe as mp
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
from huggingface_hub import hf_hub_download
from mediapipe.tasks.python.core.base_options import BaseOptions
from mediapipe.tasks.python.vision.face_landmarker import FaceLandmarker, FaceLandmarkerOptions
from PIL import Image
from sklearn.model_selection import train_test_split
from tqdm import tqdm

from biasx import BiasAnalyzer

tqdm.pandas()

### 1.3. Define global variables

In [None]:
RANDOM_SEED = 42

DATASET_NAME = "utkface"
DATASET_SIZE = 5000
DATASET_MALE_RATIO = 0.5
DATASET_FEMALE_RATIO = 0.5
DATASET_IMAGE_SHAPE = (96, 96, 1)
DATASET_VALIDATION_RATIO = 0.2
DATASET_TEST_RATIO = 0.1

MASKING_TARGET_GENDER = None
MASKING_TARGET_FEATURE = None
MASKING_PADDING = 0

MODEL_EPOCHS = 10
MODEL_BATCH_SIZE = 32
MODEL_DIRECTORY = "./tmp/gender_classifier.keras"

## 2. Data preparation

### 2.1. Download dataset from HuggingFace

In [None]:
def load_dataset() -> pd.DataFrame:
    """Retrieve and load dataset from Hugging Face repository."""
    path = hf_hub_download(repo_id=f"rixmape/{DATASET_NAME}", filename="data/train-00000-of-00001.parquet", repo_type="dataset")
    df = pd.read_parquet(path, columns=["image", "gender", "race", "age"])
    df["image"] = df["image"].progress_apply(lambda x: np.array(Image.open(BytesIO(x["bytes"]))))
    return df


dataset = load_dataset()
dataset.head()

### 2.2. Sample dataset with specific gender distribution

In [None]:
def _sample_by_stratum(data: pd.DataFrame, total_sample_size: int) -> list:
    """Sample data proportionally within each stratum."""
    data["strata"] = data["race"].astype(str) + "_" + data["age"].astype(str)
    strat_samples = []

    for _, group in data.groupby("strata"):
        group_size = len(group)
        group_ratio = group_size / len(data)
        stratum_sample_size = round(total_sample_size * group_ratio)
        if stratum_sample_size > 0:
            strat_samples.append(group.sample(n=stratum_sample_size, random_state=RANDOM_SEED, replace=(group_size < stratum_sample_size)))

    return [sample.drop(columns=["strata"]) for sample in strat_samples]


def sample_with_gender_ratio(data: pd.DataFrame) -> pd.DataFrame:
    """Sample dataset with target gender ratio while balancing race and age."""
    gender_ratios = {0: DATASET_MALE_RATIO, 1: DATASET_FEMALE_RATIO}
    samples = []

    for gender_id, ratio in gender_ratios.items():
        gender_sample_size = round(DATASET_SIZE * ratio)
        gender_df = data[data["gender"] == gender_id].copy(deep=True)
        if gender_df.empty:
            continue
        strata_samples = _sample_by_stratum(gender_df, gender_sample_size)
        if strata_samples:
            samples.append(pd.concat(strata_samples))

    return pd.concat(samples).sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)


dataset = sample_with_gender_ratio(dataset)
dataset.info()

In [None]:
def plot_demographics(data: pd.DataFrame) -> plt.Figure:
    """Visualize distributions of gender, age, and race columns."""
    fig, axes = plt.subplots(1, 3, figsize=(12, 4))
    columns = ["gender", "age", "race"]

    for i, col in enumerate(columns):
        sns.countplot(data=data, x=col, ax=axes[i])
        axes[i].set_title(f"{col.capitalize()} Distribution")
        axes[i].tick_params(axis="x")
        axes[i].set_ylabel("")

    plt.tight_layout()
    return fig


fig = plot_demographics(dataset)

### 2.3. Split dataset into training and validation sets

In [None]:
def split_dataset(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Split dataset into train, validation, and test sets."""
    train_val, test = train_test_split(df, test_size=DATASET_TEST_RATIO, random_state=RANDOM_SEED, stratify=df["gender"])
    train, val = train_test_split(train_val, test_size=DATASET_VALIDATION_RATIO / (1 - DATASET_TEST_RATIO), random_state=RANDOM_SEED, stratify=train_val["gender"])
    return train, val, test


train_set, val_set, test_set = split_dataset(dataset)

# HACK: Delete `dataset` to free up memory if model training crashes
del dataset

### 2.4. Apply zero masking on a facial region

In [None]:
def _load_landmarker() -> FaceLandmarker:
    """Load MediaPipe facial landmark detector."""
    model_path = hf_hub_download(repo_id="rixmape/biasx-models", filename="mediapipe_landmarker.task", repo_type="model")
    options = FaceLandmarkerOptions(base_options=BaseOptions(model_asset_path=model_path))
    return FaceLandmarker.create_from_options(options)


def _load_landmark_map() -> dict[str, list[int]]:
    """Load landmark mapping based on MediaPipe documentation."""
    return {
        "left_eye": [249, 263, 362, 373, 374, 380, 381, 382, 384, 385, 386, 387, 388, 390, 398, 466],
        "right_eye": [7, 33, 133, 144, 145, 153, 154, 155, 157, 158, 159, 160, 161, 163, 173, 246],
        "nose": [1, 2, 4, 5, 6, 19, 45, 48, 64, 94, 97, 98, 115, 168, 195, 197, 220, 275, 278, 294, 326, 327, 344, 440],
        "lips": [0, 13, 14, 17, 37, 39, 40, 61, 78, 80, 81, 82, 84, 87, 88, 91, 95, 146, 178, 181, 185, 191, 267, 269, 270, 291, 308, 310, 311, 312, 314, 317, 318, 321, 324, 375, 402, 405, 409, 415],
        "left_cheek": [454, 447, 345, 346, 347, 330, 425, 427, 434, 416, 435, 288, 361, 323, 280, 352, 366, 411, 376, 401, 433],
        "right_cheek": [234, 227, 116, 117, 118, 101, 205, 207, 214, 192, 215, 58, 132, 93, 127, 50, 123, 137, 177, 147, 187, 213],
        "chin": [202, 210, 169, 150, 149, 176, 148, 152, 377, 400, 378, 379, 394, 430, 422, 211, 32, 208, 199, 428, 262, 431, 170, 140, 171, 175, 396, 369, 395],
        "forehead": [54, 71, 68, 104, 69, 109, 151, 337, 299, 333, 298, 301, 284, 332, 297, 338, 10, 67, 103],
        "left_eyebrow": [276, 282, 283, 285, 293, 295, 296, 300, 334, 336],
        "right_eyebrow": [46, 52, 53, 55, 63, 65, 66, 70, 105, 107],
    }


def _detect_landmarks(image: np.ndarray, landmarker: FaceLandmarker) -> list:
    """Detect facial landmarks using MediaPipe directly from NumPy array."""
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image)
    result = landmarker.detect(mp_image)
    return result.face_landmarks[0] if result.face_landmarks else None


def _normalize_landmarks(landmarks: list, image_size: tuple[int, int]) -> list[tuple[int, int]]:
    """Convert normalized landmark coordinates to pixel values."""
    height, width = image_size[:2]
    return [(int(point.x * width), int(point.y * height)) for point in landmarks]


def _get_feature_box(landmarks: list[tuple[int, int]], landmark_map: dict[str, list[int]]) -> tuple[int, int, int, int]:
    """Calculate the bounding box for a facial feature."""
    feature_points = [landmarks[i] for i in landmark_map[MASKING_TARGET_FEATURE]]
    min_x = max(0, min(x for x, _ in feature_points) - MASKING_PADDING)
    min_y = max(0, min(y for _, y in feature_points) - MASKING_PADDING)
    max_x = max(x for x, _ in feature_points) + MASKING_PADDING
    max_y = max(y for _, y in feature_points) + MASKING_PADDING
    return (int(min_x), int(min_y), int(max_x), int(max_y))


def _apply_zero_mask(image: np.ndarray, landmarker: FaceLandmarker, landmark_map: dict[str, list[int]]) -> np.ndarray:
    """Apply zero masking to a specific facial feature directly using NumPy operations."""
    landmarks = _detect_landmarks(image, landmarker)
    if not landmarks:
        return image
    pixel_landmarks = _normalize_landmarks(landmarks, image.shape)
    result = image.copy()
    min_x, min_y, max_x, max_y = _get_feature_box(pixel_landmarks, landmark_map)
    result[min_y:max_y, min_x:max_x] = 0
    return result


def apply_feature_masking(data: pd.DataFrame) -> pd.DataFrame:
    """Apply zero masking to a specific facial feature for targeted gender."""
    if MASKING_TARGET_GENDER is None or MASKING_TARGET_FEATURE is None:
        return data

    landmarker = _load_landmarker()
    landmark_map = _load_landmark_map()

    result = data.copy()
    gender_mask = result["gender"] == MASKING_TARGET_GENDER
    result.loc[gender_mask, "image"] = result.loc[gender_mask, "image"].progress_apply(lambda img: _apply_zero_mask(img, landmarker, landmark_map))
    return result


train_set = apply_feature_masking(train_set)

In [None]:
def plot_image_grid(data: pd.DataFrame, rows: int = 4, cols: int = 16, title: str = None) -> plt.Figure:
    """Visualize images from dataframe in a grid layout."""
    fig, axes = plt.subplots(rows, cols, figsize=(cols, rows))
    axes = axes.flatten()
    cmap = "gray" if data.iloc[0]["image"].shape[2] == 1 else None

    for i, ax in enumerate(axes):
        if i < len(data):
            ax.imshow(np.array(data.iloc[i]["image"]), cmap=cmap)
            ax.axis("off")
        else:
            ax.axis("off")

    if title:
        fig.suptitle(title, fontsize=16)
    plt.tight_layout()
    return fig


fig = plot_image_grid(train_set)

### 2.5. Preprocess image for model input

In [None]:
def _preprocess_image(image: np.ndarray) -> np.ndarray:
    """Preprocess a single image."""
    image_array = np.array(Image.fromarray(image).convert("L").resize(DATASET_IMAGE_SHAPE[:2]))
    return (image_array / 255.0).reshape(DATASET_IMAGE_SHAPE)


def prepare_dataset(data: pd.DataFrame) -> pd.DataFrame:
    """Preprocess all images in dataset for model input."""
    processed = data.copy()
    processed["image"] = processed["image"].apply(_preprocess_image)
    return processed


train_set = prepare_dataset(train_set)
val_set = prepare_dataset(val_set)
test_set = prepare_dataset(test_set)

fig = plot_image_grid(train_set)

## 3. Model training

### 3.1. Build model instance

In [None]:
def build_model() -> tf.keras.Model:
    """Create and compile gender classification model."""
    model = tf.keras.Sequential(name="gender_classifier")

    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation="relu", padding="same", name="block1_conv1", input_shape=DATASET_IMAGE_SHAPE))
    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation="relu", padding="same", name="block1_conv2"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2), name="block1_pool"))

    model.add(tf.keras.layers.Conv2D(128, (3, 3), activation="relu", padding="same", name="block2_conv1"))
    model.add(tf.keras.layers.Conv2D(128, (3, 3), activation="relu", padding="same", name="block2_conv2"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2), name="block2_pool"))

    model.add(tf.keras.layers.Conv2D(256, (3, 3), activation="relu", padding="same", name="block3_conv1"))
    model.add(tf.keras.layers.Conv2D(256, (3, 3), activation="relu", padding="same", name="block3_conv2"))
    model.add(tf.keras.layers.Conv2D(256, (3, 3), activation="relu", padding="same", name="block3_conv3"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2), name="block3_pool"))

    model.add(tf.keras.layers.Flatten(name="flatten"))
    model.add(tf.keras.layers.Dense(512, activation="relu", name="dense_1"))
    model.add(tf.keras.layers.Dropout(0.5, name="dropout"))
    model.add(tf.keras.layers.Dense(2, activation="softmax", name="dense_output"))

    model.compile(optimizer=tf.keras.optimizers.Adam(0.001), loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model


model = build_model()

In [None]:
model.summary()

### 3.2. Train model

In [None]:
def extract_data(df: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
    """Extract features and labels from dataframe."""
    features = np.stack(df["image"].values)
    labels = df["gender"].values
    return features, labels


def train_model(model: tf.keras.Model, train_df: pd.DataFrame, val_df: pd.DataFrame) -> dict[str, list[float]]:
    """Train model and return history."""
    train_data, train_labels = extract_data(train_df)
    val_data, val_labels = extract_data(val_df)
    callbacks = [
        tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True),
        tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=2),
        tf.keras.callbacks.ModelCheckpoint(MODEL_DIRECTORY, monitor="val_accuracy", mode="max", save_best_only=True, verbose=1),
    ]
    history = model.fit(train_data, train_labels, validation_data=(val_data, val_labels), batch_size=MODEL_BATCH_SIZE, epochs=MODEL_EPOCHS, callbacks=callbacks, verbose=1, shuffle=True)
    return history.history


history = train_model(model, train_set, val_set)

### 3.3. Evaluate model

In [None]:
def evaluate_model(model: tf.keras.Model, test_df: pd.DataFrame) -> dict[str, float]:
    """Evaluate model on test data."""
    test_data, test_labels = extract_data(test_df)
    loss, accuracy = model.evaluate(test_data, test_labels, batch_size=MODEL_BATCH_SIZE, verbose=0)
    return {"loss": loss, "accuracy": accuracy}


evaluation_data = evaluate_model(model, test_set)

In [None]:
def plot_history_evaluation(history: dict[str, list[float]], eval_data: dict[str, float]) -> plt.Figure:
    """Visualize model training history with final evaluation metrics."""
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    hist_data = pd.DataFrame(history)
    epochs = len(hist_data)

    sns.lineplot(data=hist_data[["accuracy", "val_accuracy"]], ax=axes[0], dashes=[(None, None), (2, 2)])
    axes[0].plot(epochs, eval_data["accuracy"], "ro", markersize=8, label="Evaluation")
    axes[0].set_title("Model Accuracy")
    axes[0].set_xlabel("Epoch")
    axes[0].set_ylabel("Accuracy")
    axes[0].legend()

    sns.lineplot(data=hist_data[["loss", "val_loss"]], ax=axes[1], dashes=[(None, None), (2, 2)])
    axes[1].plot(epochs, eval_data["loss"], "ro", markersize=8, label="Evaluation")
    axes[1].set_title("Model Loss")
    axes[1].set_xlabel("Epoch")
    axes[1].set_ylabel("Loss")
    axes[1].legend()

    plt.tight_layout()
    return fig


fig = plot_history_evaluation(history, evaluation_data)

In [None]:
del train_set, val_set, test_set

## 4. Bias analysis

In [None]:
config = {
    "model": {
        "path": MODEL_DIRECTORY,
    },
    "dataset": {
        "source": "utkface",
        "image_width": DATASET_IMAGE_SHAPE[1],
        "image_height": DATASET_IMAGE_SHAPE[0],
        "max_samples": 500,
    },
}


result = BiasAnalyzer(config).analyze()

In [None]:
def visualize_disparity_scores(analysis_result):
    """Visualize bias disparity scores from analysis result."""
    disparity_df = pd.DataFrame(
        {
            "Metric": ["BiasX Score", "Equalized Odds"],
            "Value": [
                analysis_result.disparity_scores.biasx,
                analysis_result.disparity_scores.equalized_odds,
            ],
        }
    )

    fig, ax = plt.subplots(figsize=(6, 4))
    sns.barplot(x="Metric", y="Value", data=disparity_df, palette="Blues_d", ax=ax)
    ax.set_title("Bias Scores")
    ax.set_ylim(0, 1)

    plt.tight_layout()
    return fig


fig = visualize_disparity_scores(result)

In [None]:
def visualize_feature_probabilities(analysis_result):
    """Visualize gender probability distribution by feature."""
    data = [
        {
            "Feature": feature.value,
            "Male Probability": analysis.male_probability,
            "Female Probability": analysis.female_probability,
            "Bias Score": analysis.bias_score,
        }
        for feature, analysis in analysis_result.feature_analyses.items()
    ]
    feature_df = pd.DataFrame(data)

    melted_df = pd.melt(
        feature_df,
        id_vars=["Feature"],
        value_vars=["Male Probability", "Female Probability"],
        var_name="Gender",
        value_name="Probability"
    )

    fig, ax = plt.subplots(figsize=(12, 4))
    sns.barplot(x="Feature", y="Probability", hue="Gender", data=melted_df, ax=ax)

    for i, row in feature_df.iterrows():
        top_val = max(row["Male Probability"], row["Female Probability"])
        ax.text(i, top_val + 0.03, f"B: {row['Bias Score']:.3f}", ha="center")

    ax.set_ylim(0, 1.1)
    ax.set_title("Feature Probabilities by Gender")
    plt.tight_layout()
    return fig




fig = visualize_feature_probabilities(result)