In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, skewness, count, mean, stddev, isnan, lit, col,sum as spark_sum
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import pyspark.sql.functions as F

# 1. Initialize Spark
spark = SparkSession.builder.appName("HMDA_Preprocessing").getOrCreate()


# 2. Load data with 5000 sample rows
df = spark.read.csv("hmda_2016_sample.csv", header=True, inferSchema=True)
print("Initial data shape:", (df.count(), len(df.columns)))


# 3. Target variable transformation  "action_taken_name" 

df = df.withColumn("target", when(col("action_taken_name") == "Loan originated", 1).otherwise(0))
df = df.drop("action_taken_name")

# 4. Handle nulls 

numeric_cols = [c for c, t in df.dtypes if t in ('int', 'double', 'float')]
for c in numeric_cols:
    df = df.withColumn(c, when(isnan(col(c)) | col(c).isNull(), None).otherwise(col(c)))

# Compute skewness for numeric columns
skew_data = df.select([skewness(col(c)).alias(c) for c in numeric_cols]).collect()[0].asDict()
print("Skewness:", skew_data)

# Impute based on skewness
# For right-skewed columns → median, for left-skewed → mean
for c in numeric_cols:
    col_skew = skew_data[c]
    if col_skew is not None:
        if abs(col_skew) > 1:
            # Median imputation for skewed data
            median_val = df.approxQuantile(c, [0.5], 0.01)[0]
            df = df.na.fill({c: median_val})
        else:
            # Mean imputation otherwise
            mean_val = df.select(F.mean(col(c))).first()[0]
            df = df.na.fill({c: mean_val})


# 5. Remove outliers (using IQR)

for c in numeric_cols:
    try:
        quantiles = df.approxQuantile(c, [0.25, 0.75], 0.05)
        if len(quantiles) < 2:
            print(f"Skipping {c} - insufficient data for IQR")
            continue

        q1, q3 = quantiles
        iqr = q3 - q1
        lower = q1 - 1.5 * iqr
        upper = q3 + 1.5 * iqr
        df = df.filter((col(c) >= lower) & (col(c) <= upper))
    except Exception as e:
        print(f"Skipping {c} due to error: {e}")
        continue

# 6. Remove zero-variance and near-zero variance columns

stats = df.select([(F.variance(col(c)).alias(c)) for c in numeric_cols]).collect()[0].asDict()
low_var_cols = [c for c, v in stats.items() if v is None or v < 1e-5]
df = df.drop(*low_var_cols)
print("Removed low variance columns:", low_var_cols)


# 7. Remove highly correlated attributes

remaining_numeric = [c for c in df.columns if c in numeric_cols and c not in low_var_cols]

assembler = VectorAssembler(inputCols=remaining_numeric, outputCol="features")
df_vec = assembler.transform(df).select("features")

corr_matrix = Correlation.corr(df_vec, "features").head()[0].toArray()

# Drop one of each pair of highly correlated columns
to_drop = set()
for i in range(len(remaining_numeric)):
    for j in range(i + 1, len(remaining_numeric)):
        if abs(corr_matrix[i][j]) > 0.9:
            to_drop.add(remaining_numeric[j])
df = df.drop(*list(to_drop))
print("Dropped highly correlated columns:", list(to_drop))


print("Data shape:", (df.count(), len(df.columns)))
df.show(5)


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/25 11:55:09 WARN Utils: Your hostname, MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.0.0.165 instead (on interface en0)
25/10/25 11:55:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/25 11:55:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Initial data shape: (5000, 78)


25/10/25 11:55:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Skewness: {'as_of_year': None, 'agency_code': -1.133557610752084, 'loan_type': 1.545307862548966, 'property_type': 6.477759409275815, 'loan_purpose': -0.06631160538001363, 'owner_occupancy': 3.2692081888329367, 'loan_amount_000s': 13.246193188807139, 'preapproval': -2.704683294662515, 'action_taken': 0.7926353514882726, 'msamd': -0.1740281799089542, 'state_code': 0.1789081233380773, 'county_code': 3.2814795595246715, 'census_tract_number': 1.367942936743142, 'applicant_ethnicity': 1.0546734106604552, 'co_applicant_ethnicity': -0.44057157346257925, 'applicant_race_1': -0.8732381845762832, 'applicant_race_2': -1.1860104552658883, 'applicant_race_3': None, 'co_applicant_race_1': -0.9055591729925622, 'co_applicant_race_2': -1.9388768742861848, 'applicant_sex': 1.2990363212262155, 'co_applicant_sex': -0.45824476121158225, 'applicant_income_000s': 10.299430074770887, 'purchaser_type': 1.617201818252887, 'denial_reason_1': 0.6015422614842352, 'denial_reason_2': 0.9020591378654562, 'denial_rea

25/10/25 11:55:30 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

Dropped highly correlated columns: ['co_applicant_sex', 'target', 'co_applicant_race_1']
Final shape: (120, 58)
+-------------+--------------------+-----------+-----------+--------------+---------+--------------------+-----------------+------------+--------------------+----------------+----------------+------------+--------------------+-----------------+----------+----------+----------+-----------------+-----------+-------------------+------------------------+---------------------------+----------------------+---------------------+---------------------+---------------------+---------------------+----------------+---------------------+----------------+------------------------+------------------------+------------------------+-------------------+------------------------+-------------------+------------------------+-------------------+------------------+-------------+---------------------+---------------------+--------------------+--------------+--------------------+--------------------+-

In [2]:

# Get null counts for each column
null_counts = []
for column_name in df.columns:
    null_count = df.filter(col(column_name).isNull()).count()
    null_counts.append((column_name, null_count))

# Display results
print("Null/Missing Values Count by Column:")
print("-" * 50)
for column_name, count in null_counts:
    print(f"{column_name}: {count}")

# Also check for NaN values in numeric columns
print("\nNaN Values Count by Column:")
print("-" * 50)
for column_name, count in null_counts:
    if column_name in [c for c, t in df.dtypes if t in ('int', 'double', 'float')]:
        nan_count = df.filter(isnan(col(column_name))).count()
        if nan_count > 0:
            print(f"{column_name}: {nan_count}")

# Summary statistics
total_nulls = sum([count for _, count in null_counts])
print(f"\nTotal null/missing values across all columns: {total_nulls}")
# print(f"Columns with nulls: {len([name for name, count in null_counts if count > 0])}")
# print(f"Columns without nulls: {len([name for name, count in null_counts if count == 0])}")


Null/Missing Values Count by Column:
--------------------------------------------------
respondent_id: 0
agency_name: 0
agency_abbr: 0
agency_code: 0
loan_type_name: 0
loan_type: 0
property_type_name: 0
loan_purpose_name: 0
loan_purpose: 0
owner_occupancy_name: 0
loan_amount_000s: 0
preapproval_name: 0
action_taken: 0
msamd_name: 8
msamd: 0
state_name: 2
state_abbr: 2
state_code: 0
county_name: 2
county_code: 0
census_tract_number: 0
applicant_ethnicity_name: 0
co_applicant_ethnicity_name: 0
co_applicant_ethnicity: 0
applicant_race_name_1: 0
applicant_race_name_2: 120
applicant_race_name_3: 120
applicant_race_name_4: 120
applicant_race_4: 120
applicant_race_name_5: 120
applicant_race_5: 120
co_applicant_race_name_1: 0
co_applicant_race_name_2: 120
co_applicant_race_name_3: 120
co_applicant_race_3: 120
co_applicant_race_name_4: 120
co_applicant_race_4: 120
co_applicant_race_name_5: 120
co_applicant_race_5: 120
applicant_sex_name: 0
applicant_sex: 0
co_applicant_sex_name: 0
applicant_inc

In [None]:
# Calculate total number of rows
total_rows = df.count()
print(f"Total rows in dataset: {total_rows}")

# Define threshold for majority (50% or more)
threshold = 0.7
majority_threshold = total_rows * threshold

print(f"Majority threshold (70% of {total_rows} rows): {majority_threshold}")

# Find columns with majority null/missing values
columns_to_drop = []
null_analysis = []

for column_name in df.columns:
    # Count null values
    null_count = df.filter(col(column_name).isNull()).count()
    
    # Count NaN values for numeric columns
    nan_count = 0
    if column_name in [c for c, t in df.dtypes if t in ('int', 'double', 'float')]:
        nan_count = df.filter(isnan(col(column_name))).count()
    
    # Total missing values
    total_missing = null_count + nan_count
    missing_percentage = (total_missing / total_rows) * 100
    
    null_analysis.append({
        'column': column_name,
        'null_count': null_count,
        'nan_count': nan_count,
        'total_missing': total_missing,
        'missing_percentage': missing_percentage
    })
    
    # Check if majority is missing
    if total_missing >= majority_threshold:
        columns_to_drop.append(column_name)

# Display analysis
print("\nMissing Values Analysis:")
print("-" * 80)
print(f"{'Column':<30} {'Nulls':<8} {'NaNs':<8} {'Total':<8} {'% Missing':<10} {'Action'}")
print("-" * 80)

for analysis in null_analysis:
    action = "DROP" if analysis['column'] in columns_to_drop else "KEEP"
    print(f"{analysis['column']:<30} {analysis['null_count']:<8} {analysis['nan_count']:<8} {analysis['total_missing']:<8} {analysis['missing_percentage']:<10.2f} {action}")

# Drop columns with majority missing values
if columns_to_drop:
    print(f"\nDropping {len(columns_to_drop)} columns with majority missing values:")
    for col in columns_to_drop:
        print(f"  - {col}")
    
    df = df.drop(*columns_to_drop)
    print(f"\nDataset shape after dropping columns: {df.count()} rows, {len(df.columns)} columns")
else:
    print("\nNo columns found with majority missing values.")

# Show remaining columns
# print(f"\nRemaining columns ({len(df.columns)}):")
# for i, col in enumerate(df.columns, 1):
#     print(f"{i:2d}. {col}")


Total rows in dataset: 120
Majority threshold (70% of 120 rows): 84.0

Missing Values Analysis:
--------------------------------------------------------------------------------
Column                         Nulls    NaNs     Total    % Missing  Action
--------------------------------------------------------------------------------
respondent_id                  0        0        0        0.00       KEEP
agency_name                    0        0        0        0.00       KEEP
agency_abbr                    0        0        0        0.00       KEEP
agency_code                    0        0        0        0.00       KEEP
loan_type_name                 0        0        0        0.00       KEEP
loan_type                      0        0        0        0.00       KEEP
property_type_name             0        0        0        0.00       KEEP
loan_purpose_name              0        0        0        0.00       KEEP
loan_purpose                   0        0        0        0.00       KEEP


In [5]:
print("Final shape:", (df.count(), len(df.columns)))
df.show(5)

Final shape: (120, 42)
+-------------+--------------------+-----------+-----------+--------------+---------+--------------------+-----------------+------------+--------------------+----------------+----------------+------------+--------------------+-----------------+----------+----------+----------+-----------------+-----------+-------------------+------------------------+---------------------------+----------------------+---------------------+------------------------+------------------+-------------+---------------------+---------------------+--------------------+--------------+-----------------+--------------------+--------------------+---------------+----------+-------------------+------------------------+---------------------+------------------------------+-----------------------------+
|respondent_id|         agency_name|agency_abbr|agency_code|loan_type_name|loan_type|  property_type_name|loan_purpose_name|loan_purpose|owner_occupancy_name|loan_amount_000s|preapproval_name|action