# Feature Engineering w/ Apache Spark

In [1]:
import zipfile
import os
import pandas as pd
import numpy as np

from pathlib import Path
from PIL import Image

## Loading the Dataset

In [3]:
# Zip files
zip_025 = '../data/fairface-img-margin025-trainval.zip'
zip_125 = '../data/fairface-img-margin125-trainval.zip'

# Extraction directories
extract_dir_025 = '../data/fairface_025'
extract_dir_125 = '../data/fairface_125'

In [4]:
def extract_zip(zip_path, extract_to):
    zip_path = Path(zip_path)
    extract_to = Path(extract_to)
    
    if not extract_to.exists():
        print(f"Extracting {zip_path.name}...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
        print(f"Extracted to {extract_to}")
    else:
        print(f"{extract_to} already exists, skipping extraction.")

# Extract both datasets
extract_zip(zip_025, extract_dir_025)
extract_zip(zip_125, extract_dir_125)

..\data\fairface_025 already exists, skipping extraction.
..\data\fairface_125 already exists, skipping extraction.


In [5]:
# Load train and validation labels
train_labels = pd.read_csv('../data/fairface_label_train.csv')
val_labels = pd.read_csv('../data/fairface_label_val.csv')

In [7]:
# Combine train and validation labels using Apache Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
	.appName("FacialRecognitionBiasMitigation") \
	.config("spark.driver.maxResultSize", "2g") \
	.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
	.getOrCreate()

train_df = spark.createDataFrame(train_labels)
val_df = spark.createDataFrame(val_labels)
combined_df = train_df.union(val_df)
combined_df.cache()
print(f"Total records: {combined_df.count()}")
combined_df.show(5)

Total records: 97698
+-----------+-----+------+----------+------------+
|       file|  age|gender|      race|service_test|
+-----------+-----+------+----------+------------+
|train/1.jpg|50-59|  Male|East Asian|        true|
|train/2.jpg|30-39|Female|    Indian|       false|
|train/3.jpg|  3-9|Female|     Black|       false|
|train/4.jpg|20-29|Female|    Indian|        true|
|train/5.jpg|20-29|Female|    Indian|        true|
+-----------+-----+------+----------+------------+
only showing top 5 rows



In [9]:
# Show data types
combined_df.printSchema()

root
 |-- file: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- race: string (nullable = true)
 |-- service_test: boolean (nullable = true)



In [8]:
# Check for missing values
from pyspark.sql.functions import col, sum as spark_sum
missing_counts = combined_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in combined_df.columns])
missing_counts.show()

+----+---+------+----+------------+
|file|age|gender|race|service_test|
+----+---+------+----+------------+
|   0|  0|     0|   0|           0|
+----+---+------+----+------------+



In [12]:
# Show unique values in categorical columns
categorical_columns = ['age', 'gender', 'race']
for col in categorical_columns:
    unique_values = combined_df.select(col).distinct().collect()
    unique_values_list = [row[col] for row in unique_values]
    print(f"Unique values in '{col}': {unique_values_list}")

Unique values in 'age': ['30-39', '20-29', '60-69', '3-9', 'more than 70', '10-19', '40-49', '0-2', '50-59']
Unique values in 'gender': ['Female', 'Male']
Unique values in 'race': ['Indian', 'Latino_Hispanic', 'Southeast Asian', 'White', 'Middle Eastern', 'Black', 'East Asian']


## Feature Engineering

In [None]:
# Compute brightness as a UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType

def compute_brightness(image_path):
    try:
        with Image.open(image_path) as img:
            img = img.convert('L')  # Convert to grayscale
            stat = Image.Stat.Stat(img)
            return float(stat.mean[0])
    except Exception as e:
        return None
    
brightness_udf = udf(compute_brightness, FloatType())

# Compute contrast as a UDF
def compute_contrast(image_path):
    try:
        with Image.open(image_path) as img:
            img = img.convert('L')  # Convert to grayscale
            stat = Image.Stat.Stat(img)
            return float(stat.stddev[0])
    except Exception as e:
        return None
    
contrast_udf = udf(compute_contrast, FloatType())

# Compute sharpness as a UDF
def compute_sharpness(image_path):
    try:
        with Image.open(image_path) as img:
            img = img.convert('L')  # Convert to grayscale
            stat = Image.Stat.Stat(img)
            return float(stat.extrema[1] - stat.extrema[0])
    except Exception as e:
        return None
    
sharpness_udf = udf(compute_sharpness, FloatType())

In [27]:
# Apply UDFs to compute brightness and contrast
combined_df = combined_df.withColumn('brightness', brightness_udf(col('file')))
combined_df = combined_df.withColumn('contrast', contrast_udf(col('file')))
combined_df = combined_df.withColumn('sharpness', sharpness_udf(col('file')))

In [28]:
# Show schema after adding new features
combined_df.printSchema()

root
 |-- file: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- race: string (nullable = true)
 |-- service_test: boolean (nullable = true)
 |-- brightness: float (nullable = true)
 |-- brightness: float (nullable = true)
 |-- contrast: float (nullable = true)
 |-- sharpness: float (nullable = true)

