In [0]:
# Set database context
spark.sql("USE credit_risk")

# Load the application_train table
df = spark.sql("SELECT * FROM application_train")

# Display first few rows
display(df.limit(10))

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## Missing Values Analysis

# COMMAND ----------

from pyspark.sql.functions import col, count, when, lit

# Get total row count
total_rows = df.count()

# Calculate missing values for each column
missing_data = []

for column in df.columns:
    # Count null values
    null_count = df.filter(col(column).isNull()).count()
    null_percentage = (null_count / total_rows) * 100
    
    # Only add if there are missing values
    if null_count > 0:
        missing_data.append((column, null_count, null_percentage))

# Convert to DataFrame and sort by percentage
missing_df = spark.createDataFrame(
    missing_data,
    ["Column_Name", "Missing_Count", "Missing_Percentage"]
)

# Sort by missing percentage (highest first)
missing_df = missing_df.orderBy(col("Missing_Percentage").desc())

# Display results
print(f"=== Missing Values Report ===")
print(f"Total Rows: {total_rows:,}")
print(f"Columns with Missing Values: {missing_df.count()} out of {len(df.columns)}")
print()

display(missing_df)

# COMMAND ----------

# Optional: Create categorized view
print("=== Missing Values by Severity ===\n")

critical = missing_df.filter(col("Missing_Percentage") >= 70)
high = missing_df.filter((col("Missing_Percentage") >= 40) & (col("Missing_Percentage") < 70))
medium = missing_df.filter((col("Missing_Percentage") >= 10) & (col("Missing_Percentage") < 40))
low = missing_df.filter(col("Missing_Percentage") < 10)

print(f"üî¥ CRITICAL (‚â•70% missing): {critical.count()} columns - RECOMMEND DROP")
display(critical)

print(f"\nüü† HIGH (40-69% missing): {high.count()} columns - CONSIDER DROP OR CAREFUL IMPUTATION")
display(high)

print(f"\nüü° MEDIUM (10-39% missing): {medium.count()} columns - IMPUTE")
display(medium)

print(f"\nüü¢ LOW (<10% missing): {low.count()} columns - IMPUTE")
display(low)

# COMMAND ----------

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC # Data Cleaning and Imputation - Complete Pipeline
# MAGIC 
# MAGIC This notebook handles all missing values for the Home Credit Default Risk dataset:
# MAGIC 1. Drop high missing percentage columns (>40%)
# MAGIC 2. Impute low missing percentage columns (<10%)
# MAGIC 3. Impute medium missing percentage columns (10-40%)

# COMMAND ----------

from pyspark.sql.functions import col, when, lit

# Start fresh from original data
df_staging = df

print("=== Starting Data Cleaning Pipeline ===")
print(f"Initial shape: {df_staging.count():,} rows √ó {len(df_staging.columns)} columns\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ## Step 1: Drop High Missing Percentage Columns (>40%)

# COMMAND ----------

# List of columns to drop (48 total: 46 building features + EXT_SOURCE_1 + OWN_CAR_AGE)
columns_to_drop = [
    'COMMONAREA_AVG', 'COMMONAREA_MODE', 'COMMONAREA_MEDI',
    'NONLIVINGAPARTMENTS_AVG', 'NONLIVINGAPARTMENTS_MODE', 'NONLIVINGAPARTMENTS_MEDI',
    'FONDKAPREMONT_MODE',
    'LIVINGAPARTMENTS_AVG', 'LIVINGAPARTMENTS_MODE', 'LIVINGAPARTMENTS_MEDI',
    'FLOORSMIN_AVG', 'FLOORSMIN_MODE', 'FLOORSMIN_MEDI',
    'YEARS_BUILD_AVG', 'YEARS_BUILD_MODE', 'YEARS_BUILD_MEDI',
    'OWN_CAR_AGE',
    'LANDAREA_AVG', 'LANDAREA_MODE', 'LANDAREA_MEDI',
    'BASEMENTAREA_AVG', 'BASEMENTAREA_MODE', 'BASEMENTAREA_MEDI',
    'EXT_SOURCE_1',
    'NONLIVINGAREA_AVG', 'NONLIVINGAREA_MODE', 'NONLIVINGAREA_MEDI',
    'ELEVATORS_AVG', 'ELEVATORS_MODE', 'ELEVATORS_MEDI',
    'WALLSMATERIAL_MODE',
    'APARTMENTS_AVG', 'APARTMENTS_MODE', 'APARTMENTS_MEDI',
    'ENTRANCES_AVG', 'ENTRANCES_MODE', 'ENTRANCES_MEDI',
    'LIVINGAREA_AVG', 'LIVINGAREA_MODE', 'LIVINGAREA_MEDI',
    'HOUSETYPE_MODE',
    'FLOORSMAX_AVG', 'FLOORSMAX_MODE', 'FLOORSMAX_MEDI',
    'YEARS_BEGINEXPLUATATION_AVG', 'YEARS_BEGINEXPLUATATION_MODE', 'YEARS_BEGINEXPLUATATION_MEDI',
    'TOTALAREA_MODE',
    'EMERGENCYSTATE_MODE'
]

# Drop the columns
df_staging = df_staging.drop(*columns_to_drop)

print(f"‚úÖ Dropped {len(columns_to_drop)} columns with high missing percentages")
print(f"New shape: {df_staging.count():,} rows √ó {len(df_staging.columns)} columns\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ## Step 2: Calculate Medians for Numerical Columns

# COMMAND ----------

# Calculate medians for low missing % columns
median_days_phone = df_staging.approxQuantile("DAYS_LAST_PHONE_CHANGE", [0.5], 0.01)[0]
median_annuity = df_staging.approxQuantile("AMT_ANNUITY", [0.5], 0.01)[0]
median_goods_price = df_staging.approxQuantile("AMT_GOODS_PRICE", [0.5], 0.01)[0]
median_ext_source_2 = df_staging.approxQuantile("EXT_SOURCE_2", [0.5], 0.01)[0]

# Calculate median for medium missing % column
median_ext_source_3 = df_staging.approxQuantile("EXT_SOURCE_3", [0.5], 0.01)[0]

print("=== Calculated Medians ===")
print(f"  DAYS_LAST_PHONE_CHANGE: {median_days_phone}")
print(f"  AMT_ANNUITY: {median_annuity}")
print(f"  AMT_GOODS_PRICE: {median_goods_price}")
print(f"  EXT_SOURCE_2: {median_ext_source_2}")
print(f"  EXT_SOURCE_3: {median_ext_source_3}")
print()

# COMMAND ----------
# MAGIC %md
# MAGIC ## Step 3: Impute Low Missing Percentage Columns (<10%)

# COMMAND ----------

print("=== Imputing Low Missing % Columns ===\n")

# Impute with medians
df_staging = df_staging.withColumn(
    "DAYS_LAST_PHONE_CHANGE",
    when(col("DAYS_LAST_PHONE_CHANGE").isNull(), median_days_phone)
    .otherwise(col("DAYS_LAST_PHONE_CHANGE"))
)

df_staging = df_staging.withColumn(
    "AMT_ANNUITY",
    when(col("AMT_ANNUITY").isNull(), median_annuity)
    .otherwise(col("AMT_ANNUITY"))
)

df_staging = df_staging.withColumn(
    "AMT_GOODS_PRICE",
    when(col("AMT_GOODS_PRICE").isNull(), median_goods_price)
    .otherwise(col("AMT_GOODS_PRICE"))
)

df_staging = df_staging.withColumn(
    "EXT_SOURCE_2",
    when(col("EXT_SOURCE_2").isNull(), median_ext_source_2)
    .otherwise(col("EXT_SOURCE_2"))
)

print("‚úÖ Median imputation complete for 4 columns")

# Impute CNT_FAM_MEMBERS with 1
df_staging = df_staging.withColumn(
    "CNT_FAM_MEMBERS",
    when(col("CNT_FAM_MEMBERS").isNull(), 1)
    .otherwise(col("CNT_FAM_MEMBERS"))
)

print("‚úÖ CNT_FAM_MEMBERS imputed with 1")

# Impute social circle columns with 0
df_staging = df_staging.withColumn(
    "OBS_30_CNT_SOCIAL_CIRCLE",
    when(col("OBS_30_CNT_SOCIAL_CIRCLE").isNull(), 0)
    .otherwise(col("OBS_30_CNT_SOCIAL_CIRCLE"))
)

df_staging = df_staging.withColumn(
    "DEF_30_CNT_SOCIAL_CIRCLE",
    when(col("DEF_30_CNT_SOCIAL_CIRCLE").isNull(), 0)
    .otherwise(col("DEF_30_CNT_SOCIAL_CIRCLE"))
)

df_staging = df_staging.withColumn(
    "OBS_60_CNT_SOCIAL_CIRCLE",
    when(col("OBS_60_CNT_SOCIAL_CIRCLE").isNull(), 0)
    .otherwise(col("OBS_60_CNT_SOCIAL_CIRCLE"))
)

df_staging = df_staging.withColumn(
    "DEF_60_CNT_SOCIAL_CIRCLE",
    when(col("DEF_60_CNT_SOCIAL_CIRCLE").isNull(), 0)
    .otherwise(col("DEF_60_CNT_SOCIAL_CIRCLE"))
)

print("‚úÖ Social circle columns imputed with 0")

# Impute NAME_TYPE_SUITE with "Unaccompanied"
df_staging = df_staging.withColumn(
    "NAME_TYPE_SUITE",
    when(col("NAME_TYPE_SUITE").isNull(), "Unaccompanied")
    .otherwise(col("NAME_TYPE_SUITE"))
)

print("‚úÖ NAME_TYPE_SUITE imputed with 'Unaccompanied'")
print("‚úÖ Low missing % imputation complete (10 columns)\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ## Step 4: Impute Medium Missing Percentage Columns (10-40%)

# COMMAND ----------

print("=== Imputing Medium Missing % Columns ===\n")

# Impute OCCUPATION_TYPE with "Unknown"
df_staging = df_staging.withColumn(
    "OCCUPATION_TYPE",
    when(col("OCCUPATION_TYPE").isNull(), "Unknown")
    .otherwise(col("OCCUPATION_TYPE"))
)

print("‚úÖ OCCUPATION_TYPE imputed with 'Unknown'")

# Impute EXT_SOURCE_3 with median
df_staging = df_staging.withColumn(
    "EXT_SOURCE_3",
    when(col("EXT_SOURCE_3").isNull(), median_ext_source_3)
    .otherwise(col("EXT_SOURCE_3"))
)

print("‚úÖ EXT_SOURCE_3 imputed with median")

# Impute credit bureau inquiry columns with 0
credit_bureau_columns = [
    'AMT_REQ_CREDIT_BUREAU_HOUR',
    'AMT_REQ_CREDIT_BUREAU_DAY',
    'AMT_REQ_CREDIT_BUREAU_WEEK',
    'AMT_REQ_CREDIT_BUREAU_MON',
    'AMT_REQ_CREDIT_BUREAU_QRT',
    'AMT_REQ_CREDIT_BUREAU_YEAR'
]

for col_name in credit_bureau_columns:
    df_staging = df_staging.withColumn(
        col_name,
        when(col(col_name).isNull(), 0)
        .otherwise(col(col_name))
    )

print(f"‚úÖ Credit bureau inquiry columns imputed with 0 ({len(credit_bureau_columns)} columns)")
print("‚úÖ Medium missing % imputation complete (8 columns)\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ## Step 5: Verification - Check All Imputed Columns

# COMMAND ----------

# List all columns that were imputed
all_imputed_columns = [
    # Low missing %
    "DAYS_LAST_PHONE_CHANGE",
    "CNT_FAM_MEMBERS",
    "AMT_ANNUITY",
    "AMT_GOODS_PRICE",
    "EXT_SOURCE_2",
    "NAME_TYPE_SUITE",
    "OBS_30_CNT_SOCIAL_CIRCLE",
    "DEF_30_CNT_SOCIAL_CIRCLE",
    "OBS_60_CNT_SOCIAL_CIRCLE",
    "DEF_60_CNT_SOCIAL_CIRCLE",
    # Medium missing %
    "OCCUPATION_TYPE",
    "EXT_SOURCE_3",
    "AMT_REQ_CREDIT_BUREAU_HOUR",
    "AMT_REQ_CREDIT_BUREAU_DAY",
    "AMT_REQ_CREDIT_BUREAU_WEEK",
    "AMT_REQ_CREDIT_BUREAU_MON",
    "AMT_REQ_CREDIT_BUREAU_QRT",
    "AMT_REQ_CREDIT_BUREAU_YEAR"
]

print("=== Verification: Missing Values After Imputation ===")
any_missing = False
for column in all_imputed_columns:
    missing_count = df_staging.filter(col(column).isNull()).count()
    if missing_count > 0:
        print(f"  ‚ö†Ô∏è  {column}: {missing_count} missing values")
        any_missing = True
    else:
        print(f"  ‚úÖ {column}: 0 missing values")

if not any_missing:
    print("\n‚úÖ All imputed columns verified - no missing values!\n")
else:
    print("\n‚ö†Ô∏è  Warning: Some columns still have missing values\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ## Step 6: Final Summary

# COMMAND ----------

print("=== FINAL DATA CLEANING SUMMARY ===\n")
print(f"Original dataset:")
print(f"  Rows: {df.count():,}")
print(f"  Columns: {len(df.columns)}")

print(f"\nCleaned dataset (df_staging):")
print(f"  Rows: {df_staging.count():,}")
print(f"  Columns: {len(df_staging.columns)}")

print(f"\nChanges made:")
print(f"  ‚úÖ Dropped: {len(columns_to_drop)} columns (high missing %)")
print(f"  ‚úÖ Imputed: {len(all_imputed_columns)} columns")
print(f"     - Low missing %: 10 columns")
print(f"     - Medium missing %: 8 columns")

print(f"\nColumn reduction: {len(df.columns)} ‚Üí {len(df_staging.columns)} ({len(df.columns) - len(df_staging.columns)} columns removed)")

# COMMAND ----------
# MAGIC %md
# MAGIC ## Step 7: Check Remaining Missing Values

# COMMAND ----------

# Check if there are any other columns with missing values
print("=== Checking for Any Remaining Missing Values ===\n")

missing_data = []
for column in df_staging.columns:
    null_count = df_staging.filter(col(column).isNull()).count()
    if null_count > 0:
        null_pct = (null_count / df_staging.count()) * 100
        missing_data.append((column, null_count, null_pct))

if missing_data:
    missing_df = spark.createDataFrame(missing_data, ["Column", "Missing_Count", "Missing_Percentage"])
    missing_df = missing_df.orderBy(col("Missing_Percentage").desc())
    
    print(f"‚ö†Ô∏è  Found {len(missing_data)} columns still with missing values:")
    display(missing_df)
else:
    print("‚úÖ No remaining missing values in the dataset!")

# COMMAND ----------
# MAGIC %md
# MAGIC ## Step 8: Save Staging Dataset

# COMMAND ----------

# Save the cleaned dataframe as a staging table
df_staging.write.mode("overwrite").saveAsTable("credit_risk.application_train_staging")

print("‚úÖ Cleaned dataset saved as 'credit_risk.application_train_staging'")
print("\nüéâ Data cleaning pipeline complete!")

# COMMAND ----------

# Display sample of cleaned data
print("\n=== Sample of Cleaned Data ===")
display(df_staging.limit(20))

# COMMAND ----------

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## Identify Categorical Columns and Their Values

# COMMAND ----------

# COMMAND ----------

# Load from table
df_staging = spark.sql("SELECT * FROM credit_risk.application_train_staging")

print(f"‚úÖ Data loaded: {df_staging.count():,} rows √ó {len(df_staging.columns)} columns")

# COMMAND ----------

from pyspark.sql.functions import col, count, countDistinct

print("=== Identifying Categorical Columns ===\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 1: Separate Columns by Data Type

# COMMAND ----------

# Get column types
numerical_cols = []
categorical_cols = []

for field in df_staging.schema.fields:
    col_name = field.name
    col_type = field.dataType.typeName()
    
    if col_type in ['string']:
        categorical_cols.append(col_name)
    elif col_type in ['integer', 'long', 'float', 'double']:
        numerical_cols.append(col_name)

print(f"Total columns: {len(df_staging.columns)}")
print(f"Numerical columns: {len(numerical_cols)}")
print(f"Categorical columns: {len(categorical_cols)}")
print()

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 2: Show All Categorical Columns

# COMMAND ----------

print("=== CATEGORICAL COLUMNS ===")
for i, col_name in enumerate(categorical_cols, 1):
    print(f"{i}. {col_name}")
print()

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 3: Analyze Each Categorical Column

# COMMAND ----------

print("=== DETAILED ANALYSIS OF EACH CATEGORICAL COLUMN ===\n")

categorical_summary = []

for col_name in categorical_cols:
    # Get unique value count
    unique_count = df_staging.select(col_name).distinct().count()
    
    # Get total count
    total_count = df_staging.count()
    
    # Store for summary
    categorical_summary.append((col_name, unique_count, total_count))
    
    print(f"{'='*80}")
    print(f"Column: {col_name}")
    print(f"Unique Values: {unique_count}")
    print(f"{'='*80}")
    
    # Get value counts
    value_counts = df_staging.groupBy(col_name).count().orderBy(col("count").desc())
    
    # Show all values with their counts
    print(f"\nValue distribution:")
    display(value_counts)
    
    print("\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 4: Summary Table - Cardinality Overview

# COMMAND ----------

# Create summary DataFrame
summary_df = spark.createDataFrame(
    categorical_summary,
    ["Column_Name", "Unique_Values", "Total_Rows"]
)

# Add cardinality percentage
summary_df = summary_df.withColumn(
    "Cardinality_Percent",
    (col("Unique_Values") / col("Total_Rows") * 100)
)

# Sort by number of unique values
summary_df = summary_df.orderBy(col("Unique_Values").desc())

print("=== CATEGORICAL COLUMNS SUMMARY (Sorted by Cardinality) ===")
print("Cardinality = Number of unique values\n")
display(summary_df)

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 5: Categorize by Cardinality Level

# COMMAND ----------

print("=== COLUMNS GROUPED BY CARDINALITY ===\n")

# Categorize columns
low_cardinality = []      # 2-5 unique values
medium_cardinality = []   # 6-20 unique values
high_cardinality = []     # 20+ unique values

for col_name, unique_count, _ in categorical_summary:
    if unique_count <= 5:
        low_cardinality.append((col_name, unique_count))
    elif unique_count <= 20:
        medium_cardinality.append((col_name, unique_count))
    else:
        high_cardinality.append((col_name, unique_count))

print(f"üü¢ LOW CARDINALITY (2-5 unique values): {len(low_cardinality)} columns")
print("   ‚Üí Easy to one-hot encode")
for col_name, count in sorted(low_cardinality, key=lambda x: x[1]):
    print(f"     ‚Ä¢ {col_name}: {count} values")

print(f"\nüü° MEDIUM CARDINALITY (6-20 unique values): {len(medium_cardinality)} columns")
print("   ‚Üí Good for one-hot encoding")
for col_name, count in sorted(medium_cardinality, key=lambda x: x[1]):
    print(f"     ‚Ä¢ {col_name}: {count} values")

print(f"\nüî¥ HIGH CARDINALITY (20+ unique values): {len(high_cardinality)} columns")
print("   ‚Üí Consider grouping rare categories or alternative encoding")
for col_name, count in sorted(high_cardinality, key=lambda x: x[1], reverse=True):
    print(f"     ‚Ä¢ {col_name}: {count} values")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 6: Check for Rare Categories (Optional)

# COMMAND ----------

print("\n=== CHECKING FOR RARE CATEGORIES ===")
print("(Categories that appear in <1% of rows)\n")

threshold = 0.01  # 1% threshold
total_rows = df_staging.count()

for col_name in categorical_cols:
    value_counts = df_staging.groupBy(col_name).count().collect()
    
    rare_categories = []
    for row in value_counts:
        value = row[col_name]
        count = row['count']
        percentage = (count / total_rows) * 100
        
        if percentage < threshold * 100:
            rare_categories.append((value, count, percentage))
    
    if rare_categories:
        print(f"\n{col_name}: {len(rare_categories)} rare categories")
        for value, count, pct in sorted(rare_categories, key=lambda x: x[1]):
            print(f"  ‚Ä¢ '{value}': {count} ({pct:.2f}%)")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 7: Export Categorical Column List

# COMMAND ----------

# Save list of categorical columns for later use
categorical_columns_list = categorical_cols.copy()

print("=== CATEGORICAL COLUMNS LIST (for encoding) ===")
print(f"Total: {len(categorical_columns_list)} columns\n")
print("categorical_columns_list = [")
for col in categorical_columns_list:
    print(f"    '{col}',")
print("]")

# COMMAND ----------

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## Clean Up Rare and Suspicious Categorical Values

# COMMAND ----------

from pyspark.sql.functions import col, when

df_staging = spark.sql("SELECT * FROM credit_risk.application_train_staging")

print("=== Starting Categorical Cleanup ===\n")

# Check initial row count
initial_count = df_staging.count()
print(f"Initial row count: {initial_count:,}")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 1: Replace CODE_GENDER "XNA" with "F"

# COMMAND ----------

# Check how many XNA values exist
xna_count = df_staging.filter(col("CODE_GENDER") == "XNA").count()
print(f"CODE_GENDER 'XNA' values found: {xna_count}")

# Replace XNA with F
df_staging = df_staging.withColumn(
    "CODE_GENDER",
    when(col("CODE_GENDER") == "XNA", "F")
    .otherwise(col("CODE_GENDER"))
)

# Verify the change
print("\nCODE_GENDER distribution after cleanup:")
df_staging.groupBy("CODE_GENDER").count().orderBy(col("count").desc()).show()

print("‚úÖ CODE_GENDER: XNA replaced with F\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 2: Combine Other_A and Other_B into "Other"

# COMMAND ----------

# Check current counts
print("NAME_TYPE_SUITE before combining:")
df_staging.filter(col("NAME_TYPE_SUITE").isin(["Other_A", "Other_B", "Other"])).groupBy("NAME_TYPE_SUITE").count().show()

other_a_count = df_staging.filter(col("NAME_TYPE_SUITE") == "Other_A").count()
other_b_count = df_staging.filter(col("NAME_TYPE_SUITE") == "Other_B").count()
print(f"Other_A count: {other_a_count}")
print(f"Other_B count: {other_b_count}")
print(f"Combined will be: {other_a_count + other_b_count}\n")

# Combine Other_A and Other_B into "Other"
df_staging = df_staging.withColumn(
    "NAME_TYPE_SUITE",
    when(col("NAME_TYPE_SUITE").isin(["Other_A", "Other_B"]), "Other")
    .otherwise(col("NAME_TYPE_SUITE"))
)

# Verify the change
print("NAME_TYPE_SUITE distribution after combining:")
df_staging.groupBy("NAME_TYPE_SUITE").count().orderBy(col("count").desc()).show()

print("‚úÖ NAME_TYPE_SUITE: Other_A and Other_B combined into 'Other'\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 3: Drop Rows with Unknown Family Status

# COMMAND ----------

# Check how many Unknown values exist
unknown_count = df_staging.filter(col("NAME_FAMILY_STATUS") == "Unknown").count()
print(f"NAME_FAMILY_STATUS 'Unknown' values found: {unknown_count}")

# Drop rows where NAME_FAMILY_STATUS is "Unknown"
df_staging = df_staging.filter(col("NAME_FAMILY_STATUS") != "Unknown")

# Check new row count
final_count = df_staging.count()
rows_dropped = initial_count - final_count

print(f"\nRows dropped: {rows_dropped}")
print(f"Final row count: {final_count:,}")

# Verify the change
print("\nNAME_FAMILY_STATUS distribution after dropping Unknown:")
df_staging.groupBy("NAME_FAMILY_STATUS").count().orderBy(col("count").desc()).show()

print("‚úÖ NAME_FAMILY_STATUS: Unknown rows dropped\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 4: Verification Summary

# COMMAND ----------

print("=== CLEANUP SUMMARY ===\n")
print(f"Initial rows: {initial_count:,}")
print(f"Final rows: {final_count:,}")
print(f"Rows removed: {rows_dropped} ({(rows_dropped/initial_count)*100:.4f}%)")
print()

print("Changes made:")
print("  ‚úÖ CODE_GENDER: XNA (4 values) ‚Üí F")
print("  ‚úÖ NAME_TYPE_SUITE: Other_A + Other_B ‚Üí Other")
print(f"  ‚úÖ NAME_FAMILY_STATUS: Dropped {unknown_count} rows with 'Unknown'")
print()

# Quick check on the three columns
print("Verification - Unique values in cleaned columns:")
print(f"  CODE_GENDER: {df_staging.select('CODE_GENDER').distinct().count()} unique values")
print(f"  NAME_TYPE_SUITE: {df_staging.select('NAME_TYPE_SUITE').distinct().count()} unique values")
print(f"  NAME_FAMILY_STATUS: {df_staging.select('NAME_FAMILY_STATUS').distinct().count()} unique values")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 5: Save Cleaned Dataset

# COMMAND ----------

# Overwrite the staging table with cleaned data
df_staging.write.mode("overwrite").saveAsTable("credit_risk.application_train_staging")

print("‚úÖ Cleaned dataset saved back to 'credit_risk.application_train_staging'")
print("\nüéâ Categorical cleanup complete!")

# COMMAND ----------

# Display sample to verify
print("\n=== Sample of Cleaned Data ===")
display(df_staging.select("CODE_GENDER", "NAME_TYPE_SUITE", "NAME_FAMILY_STATUS").limit(20))

# COMMAND ----------

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## One-Hot Encoding - Simplified Approach (Using Pandas)

# COMMAND ----------

from pyspark.sql.functions import col
import pandas as pd

print("=== Starting One-Hot Encoding (Pandas Approach) ===\n")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 1: Identify Categorical Columns

# COMMAND ----------

# Get all categorical (string) columns
categorical_cols = []

for field in df_staging.schema.fields:
    if field.dataType.typeName() == 'string':
        categorical_cols.append(field.name)

print(f"Found {len(categorical_cols)} categorical columns:\n")
for i, col_name in enumerate(categorical_cols, 1):
    unique_count = df_staging.select(col_name).distinct().count()
    print(f"  {i}. {col_name} ({unique_count} unique values)")

print()

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 2: Convert to Pandas and One-Hot Encode

# COMMAND ----------

print("Converting Spark DataFrame to Pandas...")
print("(This may take 2-3 minutes for ~300K rows)")

# Convert to Pandas
df_pandas = df_staging.toPandas()

print(f"‚úÖ Converted to Pandas: {df_pandas.shape[0]:,} rows √ó {df_pandas.shape[1]} columns\n")

# COMMAND ----------

print("Applying one-hot encoding...")

# One-hot encode all categorical columns at once
df_encoded = pd.get_dummies(
    df_pandas, 
    columns=categorical_cols,
    drop_first=True,  # Avoid dummy variable trap
    dtype=float  # Use float for consistency
)

print(f"‚úÖ One-hot encoding complete!")
print(f"Final shape: {df_encoded.shape[0]:,} rows √ó {df_encoded.shape[1]} columns")
print(f"Added {df_encoded.shape[1] - df_pandas.shape[1]} new columns")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 2.5: Clean Column Names for Delta Table

# COMMAND ----------

print("Cleaning column names for Delta table compatibility...")

# Function to clean column names
def clean_column_name(col_name):
    # Replace invalid characters with underscores
    cleaned = col_name.replace(' ', '_')
    cleaned = cleaned.replace(',', '')
    cleaned = cleaned.replace(';', '')
    cleaned = cleaned.replace('{', '')
    cleaned = cleaned.replace('}', '')
    cleaned = cleaned.replace('(', '')
    cleaned = cleaned.replace(')', '')
    cleaned = cleaned.replace('\n', '')
    cleaned = cleaned.replace('\t', '')
    cleaned = cleaned.replace('=', '')
    cleaned = cleaned.replace('/', '_')
    cleaned = cleaned.replace(':', '_')
    
    return cleaned

# Rename all columns
df_encoded.columns = [clean_column_name(col) for col in df_encoded.columns]

print(f"‚úÖ Column names cleaned!")
print(f"\nSample of cleaned column names:")
for col in df_encoded.columns[:20]:
    print(f"  ‚Ä¢ {col}")

# COMMAND ----------

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 3: Verification

# COMMAND ----------

print("\n=== ENCODING VERIFICATION ===\n")

# Check data types
print("Data types in encoded dataset:")
print(df_encoded.dtypes.value_counts())
print()

# Check for missing values
missing_count = df_encoded.isnull().sum().sum()
print(f"Total missing values: {missing_count}")

if missing_count > 0:
    print("\nColumns with missing values:")
    missing_cols = df_encoded.isnull().sum()
    print(missing_cols[missing_cols > 0])
else:
    print("‚úÖ No missing values!")

print()

# Show sample
print("Sample of encoded data (first 5 rows, first 10 columns):")
display(df_encoded.head().iloc[:, :10])

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 4: Convert Back to Spark and Save

# COMMAND ----------

print("Converting back to Spark DataFrame...")

# Convert Pandas back to Spark
df_final_spark = spark.createDataFrame(df_encoded)

print(f"‚úÖ Converted back to Spark")

# Save to table
print("Saving to table...")
df_final_spark.write.mode("overwrite").saveAsTable("credit_risk.application_train_encoded")

print("‚úÖ Encoded dataset saved as 'credit_risk.application_train_encoded'")

# COMMAND ----------
# MAGIC %md
# MAGIC ### Step 5: Summary

# COMMAND ----------

print("\n=== FINAL SUMMARY ===\n")
print(f"Original columns: {len(df_staging.columns)}")
print(f"Final columns: {df_encoded.shape[1]}")
print(f"Net change: +{df_encoded.shape[1] - len(df_staging.columns)} columns")
print()
print(f"Original categorical columns: {len(categorical_cols)}")
print(f"New binary columns created: {df_encoded.shape[1] - len(df_staging.columns) + len(categorical_cols)}")
print()
print("‚úÖ Dataset is now fully numerical and ready for:")
print("  ‚Ä¢ Feature scaling")
print("  ‚Ä¢ Correlation analysis")
print("  ‚Ä¢ Logistic regression modeling")
print()
print("üéâ One-Hot Encoding Complete!")

# COMMAND ----------