In [7]:
import numpy as np
import pandas as pd
import cv2
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score
import tensorflow as tf
from tensorflow.keras.models import Sequential # type: ignore
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout # type: ignore
from tensorflow.keras.callbacks import EarlyStopping # type: ignore
from aif360.datasets import BinaryLabelDataset
from aif360.metrics import BinaryLabelDatasetMetric, ClassificationMetric
from aif360.algorithms.preprocessing import Reweighing

In [8]:
#limit VRAM usage
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [9]:
#Load the dataset and create a dataframe with image paths and labels
dataset_path = "datasets/dataset2celebA/Train/"
images = []
labels = []
for folder in os.listdir(dataset_path):
    if os.path.isdir(os.path.join(dataset_path, folder)):
        for file in os.listdir(os.path.join(dataset_path, folder)):
            if file.endswith((".jpg", ".jpeg", ".png")):
                images.append(os.path.join(dataset_path, folder, file))
                labels.append(folder)
df = pd.DataFrame({"image": images, "label": labels})

In [None]:
#Load the validation set and create a dataframe with image paths and labels
validation_path = "datasets/dataset2celebA/Validation/"
images = []
labels = []
for folder in os.listdir(dataset_path):
    if os.path.isdir(os.path.join(dataset_path, folder)):
        for file in os.listdir(os.path.join(dataset_path, folder)):
            if file.endswith(".jpg"):
                images.append(os.path.join(dataset_path, folder, file))
                labels.append(folder)
vf = pd.DataFrame({"image": images, "label": labels})

In [None]:
#Load the validation set and create a dataframe with image paths and labels
validation_path = "datasets/dataset2celebA/Test/"
images = []
labels = []
for folder in os.listdir(dataset_path):
    if os.path.isdir(os.path.join(dataset_path, folder)):
        for file in os.listdir(os.path.join(dataset_path, folder)):
            if file.endswith(".jpg"):
                images.append(os.path.join(dataset_path, folder, file))
                labels.append(folder)
testf = pd.DataFrame({"image": images, "label": labels})

In [None]:
#Define a function to preprocess the images: resize, grayscale, histogram equalization
def preprocess_image(image_path):
  image = cv2.imread(image_path)
  image = cv2.resize(image, (64, 64))
  image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  image = cv2.equalizeHist(image)
  image = image / 255.0
  image = np.expand_dims(image, axis=2)
  return image

In [None]:
#Apply the preprocessing function to the images and convert the labels to numeric values
X_train = np.array([preprocess_image(image) for image in df["image"]])
y_train = np.array([0 if label == "male" else 1 for label in df["label"]])

X_val = np.array([preprocess_image(image) for image in vf["image"]])
y_val = np.array([0 if label == "male" else 1 for label in vf["label"]])

X_test = np.array([preprocess_image(image) for image in testf["image"]])
y_test = np.array([0 if label == "male" else 1 for label in testf["label"]])

In [None]:
#Create a binary label dataset from the train set for aif360
train_df = pd.DataFrame(X_train.reshape(-1, 64*64))
train_df["label"] = y_train
train_df["gender"] = 0
train_dataset = BinaryLabelDataset(favorable_label=1, unfavorable_label=0, df=train_df, label_names=["label"], protected_attribute_names=["gender"], unprivileged_protected_attributes=[0])

In [None]:
#Compute the disparate impact and statistical parity difference metrics for the train set
metric_dataset = BinaryLabelDatasetMetric(train_dataset, unprivileged_groups=[{"gender": 0}], privileged_groups=[{"gender": 1}])
print("Disparate impact:", metric_dataset.disparate_impact())
print("Statistical parity difference:", metric_dataset.statistical_parity_difference())

In [None]:
#Apply the reweighing algorithm to mitigate bias in the train set
RW = Reweighing(unprivileged_groups=[{"gender": 0}], privileged_groups=[{"gender": 1}])
train_dataset_rw = RW.fit_transform(train_dataset)

#Compute the metrics for the reweighted train set
metric_dataset_rw = BinaryLabelDatasetMetric(train_dataset_rw, unprivileged_groups=[{"gender": 0}], privileged_groups=[{"gender": 1}])
print("Disparate impact after reweighing:", metric_dataset_rw.disparate_impact())
print("Statistical parity difference after reweighing:", metric_dataset_rw.statistical_parity_difference())


In [None]:
print(train_dataset_rw.features.shape)
print(train_dataset_rw.features.size)

In [None]:
#Extract the features and labels from the reweighted train set
X_train_rw = train_dataset_rw.features[:, :-1].reshape(-1, 64, 64, 1)
y_train_rw = train_dataset_rw.labels.ravel()

In [None]:
model = Sequential()

model.add(Conv2D(32, (3, 3), activation="relu", input_shape=(64, 64, 1)))
model.add(MaxPooling2D((2, 2)))

model.add(Conv2D(64, (3, 3), activation="relu"))
model.add(MaxPooling2D((2, 2)))

model.add(Conv2D(128, (3, 3), activation="relu"))
model.add(MaxPooling2D((2, 2)))

model.add(Flatten())

model.add(Dense(256, activation="relu"))
model.add(Dropout(0.5))
model.add(Dense(1, activation="sigmoid"))
model.summary()

In [None]:
#Compile and fit the model on the reweighted train set
model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
early_stopping = EarlyStopping(monitor="val_loss", patience=5)
history = model.fit(X_train_rw, y_train_rw, batch_size=32, epochs=50, validation_data=(X_val, y_val), callbacks=[early_stopping])

In [None]:
#Plot the training and validation accuracy and loss curves
plt.figure(figsize=(12,6))
plt.subplot(1,2,1)
plt.plot(history.history["accuracy"], label="train accuracy")
plt.plot(history.history["val_accuracy"], label="validation accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.subplot(1,2,2)
plt.plot(history.history["loss"], label="train loss")
plt.plot(history.history["val_loss"], label="validation loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
#Evaluate the model on the test set
y_pred = model.predict(X_test).round().ravel()
test_acc = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
test_cm = confusion_matrix(y_test, y_pred)
print("Test accuracy:", test_acc)
print("Test Precision:", precision)
print("Test Recall:", recall)
print("Test confusion matrix:")
print(test_cm)

In [None]:
#Create a binary label dataset from the test set for aif360
test_df = pd.DataFrame(X_test.reshape(-1, 64*64))
test_df["label"] = y_test
test_df["gender"] = 0
test_dataset = BinaryLabelDataset(favorable_label=1, unfavorable_label=0, df=test_df, label_names=["label"], protected_attribute_names=["gender"], unprivileged_protected_attributes=[0])

In [None]:
#Compute the classification metrics for the test set
metric_classifier = ClassificationMetric(test_dataset,
                                         test_dataset.copy(),
                                         unprivileged_groups=[{"gender": 0}],
                                         privileged_groups=[{"gender": 1}])
print("Accuracy:", metric_classifier.accuracy())
#print("Balanced accuracy:", metric_classifier.balanced_accuracy())
print("Equal opportunity difference:", metric_classifier.equal_opportunity_difference())
print("Average odds difference:", metric_classifier.average_odds_difference())
print("Theil index:", metric_classifier.theil_index())