In [3]:
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder \
    .appName("SparkSQLExample") \
    .getOrCreate()

folder_path = "data"
for f in os.listdir(folder_path):
    if f.endswith('.csv'):
        df = spark.read.csv(f"data/{f}", header=True, inferSchema=True)
        print(f)
        df.show()
        print(df.columns)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/20 08:01:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


features_attributes.csv
+-----------+-----------------+---+-----------+-------------+-------------+
|Customer_ID|             Name|Age|        SSN|   Occupation|snapshot_date|
+-----------+-----------------+---+-----------+-------------+-------------+
| CUS_0x1000|   Alistair Barrf| 18|913-74-1218|       Lawyer|   2023-05-01|
| CUS_0x1009|           Arunah| 26|063-67-6938|     Mechanic|   2025-01-01|
| CUS_0x100b|         Shirboni| 19|  #F%$D@*&8|Media_Manager|   2024-03-01|
| CUS_0x1011|        Schneyerh| 44|793-05-8223|       Doctor|   2023-11-01|
| CUS_0x1013|         Cameront| 44|930-49-9615|     Mechanic|   2023-12-01|
| CUS_0x1015|          Holtono| 27|810-97-7024|   Journalist|   2023-08-01|
| CUS_0x1018|      Felsenthalq| 15|731-19-8119|   Accountant|   2023-11-01|
| CUS_0x1026|          Josephv| 52|500-62-9044|      Manager|   2023-10-01|
| CUS_0x102d| Neil Chatterjeex| 31|692-71-7552| Entrepreneur|   2024-01-01|
| CUS_0x102e|            Rhysn| 26|  #F%$D@*&8|    Scientist|   

In [68]:
attributes = spark.read.csv("data/features_attributes.csv", header=True, inferSchema=True)
attributes.createOrReplaceTempView('attributes')

print(attributes.columns)

query = spark.sql(
    """
    SELECT * FROM attributes
    WHERE Customer_ID = "CUS_0x1000"
    ORDER BY Customer_ID, snapshot_date
    """
)

query.show()

['Customer_ID', 'Name', 'Age', 'SSN', 'Occupation', 'snapshot_date']
+-----------+--------------+---+-----------+----------+-------------+
|Customer_ID|          Name|Age|        SSN|Occupation|snapshot_date|
+-----------+--------------+---+-----------+----------+-------------+
| CUS_0x1000|Alistair Barrf| 18|913-74-1218|    Lawyer|   2023-05-01|
+-----------+--------------+---+-----------+----------+-------------+



In [7]:
clickstream = spark.read.csv("data/feature_clickstream.csv", header=True, inferSchema=True)
clickstream.createOrReplaceTempView('clickstream')


print(clickstream.columns)

query = spark.sql(
    """
    SELECT
        Customer_ID,
        snapshot_date,
        COUNT(snapshot_date) as distinct_entries
    FROM clickstream
    WHERE Customer_ID = "CUS_0x1000"
    GROUP BY Customer_ID, snapshot_date
    ORDER BY Customer_ID, snapshot_date
    """
)

query.show()

['fe_1', 'fe_2', 'fe_3', 'fe_4', 'fe_5', 'fe_6', 'fe_7', 'fe_8', 'fe_9', 'fe_10', 'fe_11', 'fe_12', 'fe_13', 'fe_14', 'fe_15', 'fe_16', 'fe_17', 'fe_18', 'fe_19', 'fe_20', 'Customer_ID', 'snapshot_date']
+-----------+-------------+----------------+
|Customer_ID|snapshot_date|distinct_entries|
+-----------+-------------+----------------+
| CUS_0x1000|   2023-01-01|               1|
| CUS_0x1000|   2023-02-01|               1|
| CUS_0x1000|   2023-03-01|               1|
| CUS_0x1000|   2023-04-01|               1|
| CUS_0x1000|   2023-05-01|               1|
| CUS_0x1000|   2023-06-01|               1|
| CUS_0x1000|   2023-07-01|               1|
| CUS_0x1000|   2023-08-01|               1|
| CUS_0x1000|   2023-09-01|               1|
| CUS_0x1000|   2023-10-01|               1|
| CUS_0x1000|   2023-11-01|               1|
| CUS_0x1000|   2023-12-01|               1|
| CUS_0x1000|   2024-01-01|               1|
| CUS_0x1000|   2024-02-01|               1|
| CUS_0x1000|   2024-03-01|    

In [67]:
financials = spark.read.csv("data/features_financials.csv", header=True, inferSchema=True)
financials.createOrReplaceTempView('financials')

# print(financials.columns)

query = spark.sql(
    """
    SELECT *
    FROM financials
    LIMIT 1
    """
)

query.show()

+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+--------------------+---------------------+-------------------+-----------------------+--------------------+------------------+-------------+
|Customer_ID|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|  Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|   Monthly_Balance|snapshot_date|
+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+-------

In [55]:
import os

folder_path = "data"
csv_files = [f[:-4] for f in os.listdir(folder_path) if f.endswith('.csv')]

csv_files

['features_attributes',
 'features_financials',
 'feature_clickstream',
 'lms_loan_daily']

In [58]:
bronze_directory = "datamart/bronze/"

file_name = "features_attributes/features_attributes.csv"
filepath = bronze_directory + file_name
df = spark.read.csv(filepath, header=True, inferSchema=True)
df.show()

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

# SSN is assumed to have xxx-xx-xxxx format
ssn_pattern = r"^\d{3}-\d{2}-\d{4}$"
df_clean = df.withColumn(
    "SSN",
    F.when(F.col("SSN").rlike(ssn_pattern), F.col("SSN")).otherwise(None)
)

# Create ssn_valid boolean
df_clean = df_clean.withColumn("ssn_valid",F.when(F.col("SSN").isNull(), 0).otherwise(1))

# Blank Occupations are cleaned
invalid_occ = ["_______", "", None]
df_clean = df_clean.withColumn("Occupation_clean", F.trim(F.col("Occupation"))) \
    .withColumn(
        "Occupation_clean",
        F.when(
            F.col("Occupation_clean")
            .isin(invalid_occ), 
            None)
        .otherwise(F.col("Occupation_clean"))
    )

# Remove white spaces on Name column
df_clean = df_clean.withColumn("Name", F.trim(F.col("Name")))

# Clean Age - assumed reasonable age is 15 to 100 years old
df_clean = df_clean.withColumn("Age_int", F.col("Age").cast(IntegerType()))
df_clean = df_clean.filter((F.col("Age_int") >= 15) & (F.col("Age_int") <= 100))

df_clean = df.withColumn("ssn_valid", F.when(F.col("SSN").rlike(r"^\d{3}-\d{2}-\d{4}$"), 1).otherwise(0)) \
             .withColumn("occupation_known", F.when(F.col("Occupation").isin("_______", "", None), 0).otherwise(1)) \
             .withColumn("age_valid", F.when((F.col("Age").cast("int").isNotNull()) & (F.col("Age") >= 15), 1).otherwise(0))

name_counts = df_clean.groupBy("Name").agg(F.count("*").alias("name_shared_count"))
df_with_name_count = df_clean.join(name_counts, on="Name", how="left")
df_with_name_count = df_with_name_count.withColumn("is_name_shared", F.when(F.col("name_shared_count") > 1, 1).otherwise(0))
df_with_name_count = df_with_name_count.drop("Name", "SSN",)


+-----------+-----------------+---+-----------+-------------+-------------+
|Customer_ID|             Name|Age|        SSN|   Occupation|snapshot_date|
+-----------+-----------------+---+-----------+-------------+-------------+
| CUS_0x1000|   Alistair Barrf| 18|913-74-1218|       Lawyer|   2023-05-01|
| CUS_0x1009|           Arunah| 26|063-67-6938|     Mechanic|   2025-01-01|
| CUS_0x100b|         Shirboni| 19|  #F%$D@*&8|Media_Manager|   2024-03-01|
| CUS_0x1011|        Schneyerh| 44|793-05-8223|       Doctor|   2023-11-01|
| CUS_0x1013|         Cameront| 44|930-49-9615|     Mechanic|   2023-12-01|
| CUS_0x1015|          Holtono| 27|810-97-7024|   Journalist|   2023-08-01|
| CUS_0x1018|      Felsenthalq| 15|731-19-8119|   Accountant|   2023-11-01|
| CUS_0x1026|          Josephv| 52|500-62-9044|      Manager|   2023-10-01|
| CUS_0x102d| Neil Chatterjeex| 31|692-71-7552| Entrepreneur|   2024-01-01|
| CUS_0x102e|            Rhysn| 26|  #F%$D@*&8|    Scientist|   2024-04-01|
| CUS_0x1032

In [31]:
df_clean.groupBy("SSN").count().orderBy("count", ascending=False).show()

+-----------+-----+
|        SSN|count|
+-----------+-----+
|  #F%$D@*&8|  644|
|706-59-9144|    1|
|508-88-3060|    1|
|564-87-3414|    1|
|118-02-3131|    1|
|808-03-9422|    1|
|595-90-1107|    1|
|571-04-7703|    1|
|154-09-5858|    1|
|942-65-9443|    1|
|218-40-1399|    1|
|296-46-9959|    1|
|579-02-1738|    1|
|738-19-2481|    1|
|136-53-7387|    1|
|368-07-2548|    1|
|916-37-8123|    1|
|945-83-2116|    1|
|347-65-8366|    1|
|214-44-4143|    1|
+-----------+-----+
only showing top 20 rows



In [45]:
df.where(F.col("Name") == "Wahbap").show()

+-----------+------+---+-----------+----------+-------------+
|Customer_ID|  Name|Age|        SSN|Occupation|snapshot_date|
+-----------+------+---+-----------+----------+-------------+
| CUS_0x1032|Wahbap|40_|620-58-8045|    Lawyer|   2023-08-01|
| CUS_0x508c|Wahbap| 49|212-99-0909|  Engineer|   2024-07-01|
| CUS_0x6f81|Wahbap| 17|075-05-3919|    Doctor|   2025-01-01|
+-----------+------+---+-----------+----------+-------------+



In [53]:
df_clean.groupBy("Name").count().orderBy("count", ascending=False).show(20)

NameError: name 'df_clean' is not defined

In [5]:
df = spark.read.parquet("datamart/silver/silver_attributes_cleaned.parquet")
df.show()

+-----------+----+-------------+-------------+---------+----------------+---------+-----------------+--------------+
|Customer_ID| Age|   Occupation|snapshot_date|ssn_valid|occupation_known|age_valid|name_shared_count|is_name_shared|
+-----------+----+-------------+-------------+---------+----------------+---------+-----------------+--------------+
| CUS_0x1000|  18|       Lawyer|   2023-05-01|        1|               1|        1|                2|             1|
| CUS_0x1009|  26|     Mechanic|   2025-01-01|        1|               1|        1|                2|             1|
| CUS_0x100b|  19|Media_Manager|   2024-03-01|        0|               1|        1|                1|             0|
| CUS_0x1011|  44|       Doctor|   2023-11-01|        1|               1|        1|                2|             1|
| CUS_0x1013|  44|     Mechanic|   2023-12-01|        1|               1|        1|                1|             0|
| CUS_0x1015|  27|   Journalist|   2023-08-01|        1|        

In [15]:
df = spark.read.csv(f"datamart/bronze/feature_clickstream/feature_clickstream_2023_02_01.csv", header=True, inferSchema=True)

df.show()

+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+
|fe_1|fe_2|fe_3|fe_4|fe_5|fe_6|fe_7|fe_8|fe_9|fe_10|fe_11|fe_12|fe_13|fe_14|fe_15|fe_16|fe_17|fe_18|fe_19|fe_20|Customer_ID|snapshot_date|
+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+
|  55|  96|  74| 171| -69|-103|  86| 178|  42|  113|  100|  183|   39|  179|  161|   71|  208|   -7|  129|  206| CUS_0x1037|   2023-02-01|
| 100|-115|  50| 246| 140| 160| 122| 112|  14|  221|  -82|  284|   57|  146|  120|   67|   -4|  139|  -26|  -51| CUS_0x1069|   2023-02-01|
| 142| 106| 143| 155| 153| 121|  64|  17|  -2|  208|   44|  -17|  118|  -55|  -47|   93|   50|  173|   50|  274| CUS_0x114a|   2023-02-01|
| 130| 171| 155| 101|  41| 127|   0| 150|  65|   28|  131|  123|  157|   56|  267|  127|  190|    8|   20|  214| CUS_0x1184|   2023-02-01|
|  60| -18| 113| 166| 112| 

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
import pyspark.sql.functions as F

# Start Spark session
spark = SparkSession.builder.appName("ClickstreamCleaning").getOrCreate()

# 1. Define schema
schema = StructType(
    [StructField(f"fe_{i}", IntegerType(), True) for i in range(1, 21)] +
    [StructField("Customer_ID", StringType(), False),
     StructField("snapshot_date", DateType(), False)]
)

# 2. Read raw CSV file with schema
df_raw = spark.read.csv(
    "datamart/bronze/feature_clickstream/feature_clickstream_2023_01_01.csv",
    header=True
)

# 3. Recast and sanitize feature columns
for i in range(1, 21):
    col_name = f"fe_{i}"
    df_raw = df_raw.withColumn(
        col_name,
        when(F.col(col_name).cast("int").isNotNull(), F.col(col_name).cast("int")).otherwise(None)
    )
    
# 4. Log null counts per feature column (useful for QA)
null_counts = df_raw.select([
    count(when(F.col(f"fe_{i}").isNull(), 1)).alias(f"fe_{i}_nulls") for i in range(1, 21)
])
null_counts.show()

df.clean = spark.createDataFrame(df_raw.rdd, schema = schema)
# 6. Final inspection
print('df_raw')
df_raw.printSchema()
print('df_clean')
df_clean.printSchema()
df_raw.show()


+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|fe_1_nulls|fe_2_nulls|fe_3_nulls|fe_4_nulls|fe_5_nulls|fe_6_nulls|fe_7_nulls|fe_8_nulls|fe_9_nulls|fe_10_nulls|fe_11_nulls|fe_12_nulls|fe_13_nulls|fe_14_nulls|fe_15_nulls|fe_16_nulls|fe_17_nulls|fe_18_nulls|fe_19_nulls|fe_20_nulls|
+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|         0|         0|         0|         0|         0|         0|         0|         0|         0|          0|          0|          0|          0|          0|          0|          0|          0|          0|          0|          0|
+----------+----------+----------+----------+----------+----------+-

In [39]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
import pyspark.sql.functions as F

# Define schema with nullable=False for Customer_ID and snapshot_date
schema = StructType(
    [StructField(f"fe_{i}", IntegerType(), True) for i in range(1, 21)] +
    [StructField("Customer_ID", StringType(), False),
     StructField("snapshot_date", DateType(), False)]
)

# Read raw CSV with inferred schema (nullable=True)
df_raw = spark.read.csv(
    "datamart/bronze/feature_clickstream/feature_clickstream_2023_01_01.csv",
    header=True,
    inferSchema=True
)

# 1. Filter out rows where Customer_ID or snapshot_date is null
df_no_nulls = df_raw.filter(
    F.col("Customer_ID").isNotNull() & F.col("snapshot_date").isNotNull()
)

# 2. Convert snapshot_date string to DateType using to_date function
df_no_nulls = df_no_nulls.withColumn(
    "snapshot_date",
    F.to_date(F.col("snapshot_date"), "yyyy-MM-dd")
)

# 3. Create clean DataFrame with enforced schema from RDD of cleaned data
df_clean = spark.createDataFrame(df_no_nulls.rdd, schema=schema)

# 4. Verify the schema
df_clean.printSchema()


root
 |-- fe_1: integer (nullable = true)
 |-- fe_2: integer (nullable = true)
 |-- fe_3: integer (nullable = true)
 |-- fe_4: integer (nullable = true)
 |-- fe_5: integer (nullable = true)
 |-- fe_6: integer (nullable = true)
 |-- fe_7: integer (nullable = true)
 |-- fe_8: integer (nullable = true)
 |-- fe_9: integer (nullable = true)
 |-- fe_10: integer (nullable = true)
 |-- fe_11: integer (nullable = true)
 |-- fe_12: integer (nullable = true)
 |-- fe_13: integer (nullable = true)
 |-- fe_14: integer (nullable = true)
 |-- fe_15: integer (nullable = true)
 |-- fe_16: integer (nullable = true)
 |-- fe_17: integer (nullable = true)
 |-- fe_18: integer (nullable = true)
 |-- fe_19: integer (nullable = true)
 |-- fe_20: integer (nullable = true)
 |-- Customer_ID: string (nullable = false)
 |-- snapshot_date: date (nullable = false)



In [76]:
bronze_directory = "datamart/bronze/"
file_name = "features_financials/features_financials.csv"
filepath = bronze_directory + file_name

df = spark.read.csv(filepath, header=True, inferSchema=True)

# Remove non-numeric characters from Annual_Income and cast
df = df.withColumn('Annual_Income', F.regexp_replace('Annual_Income', '[^0-9.]', '').cast(FloatType()))

# Cast numeric columns explicitly
numeric_cols = ['Monthly_Inhand_Salary', 'Interest_Rate', 'Delay_from_due_date', 'Num_of_Delayed_Payment', 
                'Changed_Credit_Limit', 'Num_Credit_Inquiries', 'Outstanding_Debt', 
                'Credit_Utilization_Ratio', 'Total_EMI_per_month', 'Amount_invested_monthly', 'Monthly_Balance']
for col in numeric_cols:
    df = df.withColumn(col, F.col(col).cast(FloatType()))

count_cols = ['Num_Bank_Accounts', 'Num_Credit_Card', 'Num_of_Loan']
for col in count_cols:
    df = df.withColumn(col, F.col(col).cast(IntegerType()))

# Convert Payment_of_Min_Amount to boolean/int
df = df.withColumn('Payment_of_Min_Amount', F.when(F.col('Payment_of_Min_Amount') == 'Yes', 1).otherwise(0))

# Parse Credit_History_Age to total months
df = df.withColumn(
    'Credit_History_Years', F.regexp_extract('Credit_History_Age', r'(\d+) Years', 1).cast(IntegerType())
).withColumn(
    'Credit_History_Months', F.regexp_extract('Credit_History_Age', r'(\d+) Months', 1).cast(IntegerType())
).withColumn(
    'Credit_History_Total_Months', F.col('Credit_History_Years') * 12 + F.coalesce(F.col('Credit_History_Months'), F.lit(0))
)

# Loan Types
loan_types = (
    df
    .filter(F.col("Type_of_Loan").isNotNull())
    .select(F.explode(F.split(F.col("Type_of_Loan"), ",\\s*")).alias("Loan"))
    .select(
        F.trim(F.regexp_replace(F.col("Loan"), ",", "")).alias("Loan")  # remove commas inside the loan string
    )
    .filter(~F.col("Loan").rlike("(?i)^and\\b.*"))  # exclude entries starting with 'and'
    .distinct()
    .orderBy("Loan")
    .rdd.flatMap(lambda x: x)
    .collect()
)

for loan in loan_types:
    col_name = loan.replace(" ", "_").replace("-", "_").lower()  # safe column name
    df = df.withColumn(
        col_name,
        F.when(F.col("Type_of_Loan").contains(loan), F.lit(1)).otherwise(F.lit(0))
    )

# Handle missing/unknown in Credit_Mix
df = df.withColumn('Credit_Mix', F.when(F.col('Credit_Mix') == '_', None).otherwise(F.col('Credit_Mix')))

# One Hot Encoding for Credit_Mix
df = df.withColumn("credit_mix_good",     F.when(F.col("Credit_Mix") == "Good", 1).otherwise(0)) \
        .withColumn("credit_mix_bad",      F.when(F.col("Credit_Mix") == "Bad", 1).otherwise(0)) \
        .withColumn("credit_mix_standard", F.when(F.col("Credit_Mix") == "Standard", 1).otherwise(0)) \
        .withColumn("valid_credit_mix",    F.when(F.col("Credit_Mix").isin(['Good','Standard','Bad']), 1).otherwise(0))

# Handle Type_of_Loan NULL or Not Specified
df = df.withColumn('Type_of_Loan', F.when(F.col('Type_of_Loan').isin(['NULL', 'Not Specified']), None).otherwise(F.col('Type_of_Loan')))

# Payment Behaviour
df = df.withColumn(
    "Payment_Behaviour",
    F.lower(F.col("Payment_Behaviour"))
)

df = df.withColumn(
    "Payment_Behaviour",
    F.when(F.col("Payment_Behaviour") == "!@9#%8", None).otherwise(F.col("Payment_Behaviour"))
)

df = df.withColumn(
    "has_valid_payment_behavior",
    F.col("Payment_Behaviour").isNotNull().cast("int")
)
    
payment_behaviours = (
    df
    .select("Payment_Behaviour")
    .filter(F.col("Payment_Behaviour").isNotNull())
    .distinct()
    .orderBy("Payment_Behaviour")
    .rdd.flatMap(lambda x: x)
    .collect()
)

for behaviour in payment_behaviours:
    df = df.withColumn(
        f"pb_{behaviour}",
        (F.col("Payment_Behaviour") == behaviour).cast("int")
    )

df = df.toDF(*[col.lower() for col in df.columns])
df = df.drop("credit_history_age","type_of_loan")

df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- monthly_inhand_salary: float (nullable = true)
 |-- num_bank_accounts: integer (nullable = true)
 |-- num_credit_card: integer (nullable = true)
 |-- interest_rate: float (nullable = true)
 |-- num_of_loan: integer (nullable = true)
 |-- delay_from_due_date: float (nullable = true)
 |-- num_of_delayed_payment: float (nullable = true)
 |-- changed_credit_limit: float (nullable = true)
 |-- num_credit_inquiries: float (nullable = true)
 |-- credit_mix: string (nullable = true)
 |-- outstanding_debt: float (nullable = true)
 |-- credit_utilization_ratio: float (nullable = true)
 |-- payment_of_min_amount: integer (nullable = false)
 |-- total_emi_per_month: float (nullable = true)
 |-- amount_invested_monthly: float (nullable = true)
 |-- payment_behaviour: string (nullable = true)
 |-- monthly_balance: float (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |-- credit_history_year

In [50]:
csv_file = "datamart/bronze/features_financials/features_financials.csv"

# Read the CSV
df = spark.read.option("header", True).csv(csv_file)

# Extract unique loan types
loan_types = (
    df
    .filter(F.col("Type_of_Loan").isNotNull())
    .select(F.explode(F.split(F.col("Type_of_Loan"), ",\\s*")).alias("Loan"))
    .select(
        F.trim(F.regexp_replace(F.col("Loan"), ",", "")).alias("Loan")  # remove commas inside the loan string
    )
    .filter(~F.col("Loan").rlike("(?i)^and\\b.*"))  # exclude entries starting with 'and'
    .distinct()
    .orderBy("Loan")
    .rdd.flatMap(lambda x: x)
    .collect()
)


print("✅ Unique Loan Types:")
print(loan_types)
for loan in loan_types:
    print("-", loan)

✅ Unique Loan Types:
['Auto Loan', 'Credit-Builder Loan', 'Debt Consolidation Loan', 'Home Equity Loan', 'Mortgage Loan', 'Not Specified', 'Payday Loan', 'Personal Loan', 'Student Loan']
- Auto Loan
- Credit-Builder Loan
- Debt Consolidation Loan
- Home Equity Loan
- Mortgage Loan
- Not Specified
- Payday Loan
- Personal Loan
- Student Loan


In [None]:
# Credit
# 0 -5 Good
# 6-11 Standard
# 12 and Up Bad

In [63]:
df = df.withColumn(
    "Payment_Behaviour",
    F.lower(F.col("Payment_Behaviour"))
)

df = df.withColumn(
    "Payment_Behaviour",
    F.when(F.col("Payment_Behaviour") == "!@9#%8", None).otherwise(F.col("Payment_Behaviour"))
)

df = df.withColumn(
    "has_valid_payment_behavior",
    F.col("Payment_Behaviour").isNotNull().cast("int")
)
    
payment_behaviours = (
    df
    .select("Payment_Behaviour")
    .filter(F.col("Payment_Behaviour").isNotNull())
    .distinct()
    .orderBy("Payment_Behaviour")
    .rdd.flatMap(lambda x: x)
    .collect()
)

for behaviour in payment_behaviours:
    df = df.withColumn(
        f"pb_{behaviour}",
        (F.col("Payment_Behaviour") == behaviour).cast("int")
    )


In [57]:
import os
for f in os.listdir("data"):
    if f.endswith('.csv'):
        df = spark.read.csv(f"data/{f}", header=True, inferSchema=True)
        df.show(3)

        attributes = spark.read.csv(f"data/{f}", header=True, inferSchema=True)
        attributes.createOrReplaceTempView('table')
        
        print(attributes.columns)
        
        query = spark.sql(
            f"""
            SELECT COUNT(*) FROM table
            """
        )
        
        query.show(3)

+-----------+--------------+---+-----------+-------------+-------------+
|Customer_ID|          Name|Age|        SSN|   Occupation|snapshot_date|
+-----------+--------------+---+-----------+-------------+-------------+
| CUS_0x1000|Alistair Barrf| 18|913-74-1218|       Lawyer|   2023-05-01|
| CUS_0x1009|        Arunah| 26|063-67-6938|     Mechanic|   2025-01-01|
| CUS_0x100b|      Shirboni| 19|  #F%$D@*&8|Media_Manager|   2024-03-01|
+-----------+--------------+---+-----------+-------------+-------------+
only showing top 3 rows

['Customer_ID', 'Name', 'Age', 'SSN', 'Occupation', 'snapshot_date']
+--------+
|count(1)|
+--------+
|   12500|
+--------+

+-----------+------------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+--------------------+---------------------+-------------------+---

In [74]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Finalizing loan data for gold
df = spark.read.parquet("datamart/silver/lms_loan_daily/*.parquet")

loan_aggregates = df.groupBy("loan_id").agg(
    F.sum("paid_amt").alias("total_paid_amt"),
    F.sum(F.when(F.col("overdue_amt") > 0, 1).otherwise(0)).alias("num_months_overdue"),
    F.max("snapshot_date").alias("latest_snapshot_date")
)

w = Window.partitionBy("loan_id").orderBy(F.col("snapshot_date").desc())
# Add row number to identify latest per loan_id
df = df.withColumn("row_num", F.row_number().over(w)) \
                 .filter(F.col("row_num") == 1) \
                 .drop("row_num")

loan_df = df.join(loan_aggregates, on="loan_id", how="left")

# Get features based on snapshot_date
latest_snapshot = loan_df.groupBy("loan_id").agg(
    F.max("snapshot_date").alias("latest_snapshot_date"),
    F.first("loan_start_date").alias("loan_start_date")  # should be same for all rows
)

loan_start_features = latest_snapshot.select(
    "loan_id",
    "loan_start_date",
    "latest_snapshot_date",
    
    F.when(F.months_between("latest_snapshot_date", "loan_start_date") < 3, "new")
     .when(F.months_between("latest_snapshot_date", "loan_start_date") < 6, "mid")
     .otherwise("old").alias("loan_vintage"),

    F.month("loan_start_date").alias("loan_start_month"),
    F.year("loan_start_date").alias("loan_start_year")
)

final_loan_df = loan_df.join(loan_start_features, on="loan_id", how="left")

# One Hot for loan vintage
final_df = final_loan_df.withColumn("loan_vintage_old", F.when(F.col("loan_vintage") == "old", 1).otherwise(0)) \
                        .withColumn("loan_vintage_mid", F.when(F.col("loan_vintage") == "mid", 1).otherwise(0)) \
                        .withColumn("loan_vintage_new", F.when(F.col("loan_vintage") == "new", 1).otherwise(0))

# Create binary flags for overdue amount and missed payment
final_df = final_df.withColumn("has_overdue_amt", F.when(F.col("overdue_amt") > 0, 1).otherwise(0))
final_df = final_df.withColumn("has_missed_payment", F.when(F.col("first_missed_date").isNotNull(), 1).otherwise(0))

# Drop redundant fields
final_df = final_df.drop("due_amt", "latest_snapshot_date", "loan_start_date", "paid_amt", "loan_vintage", "first_missed_date")
final_df.show(truncate = False)

                                                                                

+---------------------+-----------+------+---------------+--------+-----------+-------+-------------+---+-------------------+---+--------------+------------------+----------------+---------------+----------------+----------------+----------------+---------------+------------------+
|loan_id              |Customer_ID|tenure|installment_num|loan_amt|overdue_amt|balance|snapshot_date|mob|installments_missed|dpd|total_paid_amt|num_months_overdue|loan_start_month|loan_start_year|loan_vintage_old|loan_vintage_mid|loan_vintage_new|has_overdue_amt|has_missed_payment|
+---------------------+-----------+------+---------------+--------+-----------+-------+-------------+---+-------------------+---+--------------+------------------+----------------+---------------+----------------+----------------+----------------+---------------+------------------+
|CUS_0x100b_2024_03_01|CUS_0x100b |10    |9              |10000.0 |0.0        |1000.0 |2024-12-01   |9  |0                  |0  |9000.0        |0      

In [71]:
attributes_df = spark.read.parquet("datamart/silver/features_attributes/silver_attributes_cleaned.parquet").drop("snapshot_date")
# df.where(F.col("Age").isNotNull()).count()
attributes_df.show(5)
# 11863 Non-null Ages
final_df = final_df.join(attributes_df, on="Customer_ID", how="left")
final_df.show(5)

+-----------+----+-------------+---------+----------------+---------+-----------------+--------------+
|Customer_ID| Age|   Occupation|ssn_valid|occupation_known|age_valid|name_shared_count|is_name_shared|
+-----------+----+-------------+---------+----------------+---------+-----------------+--------------+
| CUS_0x1000|  18|       Lawyer|        1|               1|        1|                2|             1|
| CUS_0x1009|  26|     Mechanic|        1|               1|        1|                2|             1|
| CUS_0x100b|  19|Media_Manager|        0|               1|        1|                1|             0|
| CUS_0x1011|  44|       Doctor|        1|               1|        1|                2|             1|
| CUS_0x1013|  44|     Mechanic|        1|               1|        1|                1|             0|
| CUS_0x1015|  27|   Journalist|        1|               1|        1|                1|             0|
| CUS_0x1018|  15|   Accountant|        1|               1|        1|    

                                                                                

+-----------+--------------------+------+---------------+--------+-----------+-------+-------------+---+-------------------+---+--------------+------------------+----------------+---------------+----------------+----------------+----------------+---------------+------------------+---+-------------+---------+----------------+---------+-----------------+--------------+---+-------------+---------+----------------+---------+-----------------+--------------+---+-------------+---------+----------------+---------+-----------------+--------------+
|Customer_ID|             loan_id|tenure|installment_num|loan_amt|overdue_amt|balance|snapshot_date|mob|installments_missed|dpd|total_paid_amt|num_months_overdue|loan_start_month|loan_start_year|loan_vintage_old|loan_vintage_mid|loan_vintage_new|has_overdue_amt|has_missed_payment|Age|   Occupation|ssn_valid|occupation_known|age_valid|name_shared_count|is_name_shared|Age|   Occupation|ssn_valid|occupation_known|age_valid|name_shared_count|is_name_shar

In [78]:
financials_df = spark.read.parquet("datamart/silver/features_financials/silver_financials_cleaned.parquet").drop("credit_mix", "payment_behaviour","credit_history_years","credit_history_months")
financials_df.show(5)
final_df = final_df.join(financials_df, on="Customer_ID", how="left")
final_df.show(5)

+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+---------------------+-------------------+-----------------------+---------------+-------------+---------------------------+---------+-------------------+-----------------------+----------------+-------------+-------------+-----------+-------------+------------+---------------+--------------+-------------------+----------------+--------------------------+----------------------------------+-----------------------------------+----------------------------------+---------------------------------+----------------------------------+---------------------------------+
|customer_id|annual_income|monthly_inhand_salary|num_bank_accounts|num_credit_card|interest_rate|num_of_loan|delay_from_due_date|num_of_delayed_payment|changed_credit_limit|num_credit_inquiries|out

                                                                                

+-----------+--------------------+------+---------------+--------+-----------+-------+-------------+---+-------------------+---+--------------+------------------+----------------+---------------+----------------+----------------+----------------+---------------+------------------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+---------------------+-------------------+-----------------------+---------------+-------------+---------------------------+---------+-------------------+-----------------------+----------------+-------------+-------------+-----------+-------------+------------+---------------+--------------+-------------------+----------------+--------------------------+----------------------------------+-----------------------------------+----------------------------------+---------------------------------

In [83]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Finalizing loan data for gold
df = spark.read.parquet("datamart/silver/feature_clickstream/*.parquet")


df.where(F.col("Customer_ID") == "CUS_0x1037").orderBy(F.col("snapshot_date")).show(50)

+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+
|fe_1|fe_2|fe_3|fe_4|fe_5|fe_6|fe_7|fe_8|fe_9|fe_10|fe_11|fe_12|fe_13|fe_14|fe_15|fe_16|fe_17|fe_18|fe_19|fe_20|Customer_ID|snapshot_date|
+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+
|  63| 118|  80| 121|  55| 193| 111| 112|-101|   83|  164|  105|  -16|  -81| -126|  114|   35|   85|  -73|   76| CUS_0x1037|   2023-01-01|
|  55|  96|  74| 171| -69|-103|  86| 178|  42|  113|  100|  183|   39|  179|  161|   71|  208|   -7|  129|  206| CUS_0x1037|   2023-02-01|
| 232| 203| -71|  86| 243|  12|  79| 122| 103|  178|  273|   96|  201|   37|   96|   63|   98|   69|  257|    3| CUS_0x1037|   2023-03-01|
|   9| -29|  -8| 116| 198|  68|  79|  61| 113|   91|   36|  179|   17|  219|   89|  136|  -25|  180|  109|  118| CUS_0x1037|   2023-04-01|
| 203|  75| 223|  88| 188| 

In [96]:
df = spark.read.parquet("datamart/silver/feature_clickstream/*.parquet")

# List of feature columns
features = [f"fe_{i}" for i in range(1, 21)]

# Aggregate: mean, stddev, min, max per customer
agg_exprs = []
for feat in features:
    agg_exprs += [
        F.mean(feat).alias(f"{feat}_mean"),
        F.stddev(feat).alias(f"{feat}_std"),
        F.min(feat).alias(f"{feat}_min"),
        F.max(feat).alias(f"{feat}_max"),
    ]

agg_df = df.groupBy("Customer_ID").agg(*agg_exprs)

# Create window to get first and last values per customer ordered by snapshot_date
w = Window.partitionBy("Customer_ID").orderBy("snapshot_date")
w_desc = Window.partitionBy("Customer_ID").orderBy(F.col("snapshot_date").desc())

# Add first and last values of each feature
for feat in features:
    df = df.withColumn(f"{feat}_first", F.first(feat).over(w))
    df = df.withColumn(f"{feat}_last", F.first(feat).over(w_desc))
    df = df.withColumn(f"{feat}_delta", F.col(f"{feat}_last") - F.col(f"{feat}_first"))

# # Select only the last row per customer (to avoid duplicates in the deltas)
df_deltas = df.withColumn("rn", F.row_number().over(w_desc)) \
              .filter(F.col("rn") == 1) \
              .select("Customer_ID", *[f"{feat}_delta" for feat in features])

clickstreams = agg_df.join(df_deltas, on = "Customer_ID", how = "left")
clickstreams.where(F.col("Customer_ID") == "CUS_0x6ba9").show()

clickstreams.count()

                                                                                

+-----------+------------------+-----------------+--------+--------+-----------------+------------------+--------+--------+---------+-----------------+--------+--------+---------+------------------+--------+--------+-----------------+----------------+--------+--------+---------+-----------------+--------+--------+-----------------+------------------+--------+--------+---------+-----------------+--------+--------+------------------+------------------+--------+--------+------------------+------------------+---------+---------+----------+-----------------+---------+---------+----------+------------------+---------+---------+----------+------------------+---------+---------+----------+----------------+---------+---------+-----------------+----------------+---------+---------+-----------------+-----------------+---------+---------+-----------------+-----------------+---------+---------+------------------+------------------+---------+---------+-----------------+------------------+---------+-

                                                                                

8974