In [36]:
from pyspark.sql.functions import col, count, isnan, when, mean, stddev

# Step 1: Read raw data from Bronze
bronze_path = "Files/Bronze/Loans.csv"
df = spark.read.option("header", True).csv(bronze_path)

# Step 2: Drop unused column
df = df.drop(*["ProductID", "RiskScore"])

# Step 3: Rename columns & cast types
df = df.select(
    col("LoanID"),
    col("CustomerID"),
    col("BranchID"),
    to_date(col("DisbursedDate"), "dd-MM-yyyy").alias("StartDate"),
    col("LoanAmount").cast("double"),
    col("TenureMonths").cast("int"),
    col("InterestRate").cast("double"),
    col("Status"),
    col("LoanType"),
    col("ClosureDate").cast("date"),   
    col("EarlyClosureFlag").cast("boolean")
)

# Step 4: Drop rows with nulls in critical fields
critical_fields = ["LoanID", "CustomerID", "LoanType", "LoanAmount", "StartDate", "TenureMonths", "BranchID"]
df = df.dropna(subset=critical_fields)

# Step 5: Remove duplicate LoanIDs (if any)
df = df.dropDuplicates(["LoanID"])

# Step 6: Save cleaned data to Silver
silver_path = "Files/Silver/Loans"
df.write.mode("overwrite").format("parquet").save(silver_path)

# Preview cleaned result
df.show(5)


StatementMeta(, f6ecba0f-a1d8-4a2d-993d-8aa10a2790ca, 38, Finished, Available, Finished)

+------+----------+--------+----------+----------+------------+------------+------+----------+-----------+----------------+
|LoanID|CustomerID|BranchID| StartDate|LoanAmount|TenureMonths|InterestRate|Status|  LoanType|ClosureDate|EarlyClosureFlag|
+------+----------+--------+----------+----------+------------+------------+------+----------+-----------+----------------+
|L00001|    C00861|    B020|2022-10-12|1383910.76|          48|       12.49|Active|        2W|       NULL|            NULL|
|L00002|    C01295|    B041|2020-10-14| 775199.57|          18|       14.27|Active|        2W|       NULL|            NULL|
|L00003|    C01131|    B016|2020-12-20|1254935.95|          24|        9.52|Active|Commercial|       NULL|            NULL|
|L00004|    C01096|    B009|2020-12-08| 510216.12|          24|        9.58|Active|        2W|       NULL|            NULL|
|L00005|    C01639|    B030|2020-05-28|1458536.12|          18|        NULL|Active|Commercial|       NULL|            NULL|
+------+

In [30]:
from pyspark.sql.functions import col, to_date

# Step 1: Read raw data from Bronze
bronze_path = "Files/Bronze/EMIs.csv"
df = spark.read.option("header", True).csv(bronze_path)


# Step 2: Cast to appropriate data types
df = df.select(
    col("EMIID"),
    col("LoanID"),
    col("EMINumber").cast("int"),
    to_date(col("DueDate"), "dd-MM-yyyy").alias("DueDate"),
    col("EMIAmount").cast("double"),
    col("AmountPaid").cast("double"),
    to_date(col("PaymentDate"), "yyyy-MM-dd").alias("PaymentDate"),
    col("Status")
)

# Step 3: Drop nulls in critical fields
critical_fields = ["EMIID", "LoanID", "DueDate", "EMIAmount"]
df = df.dropna(subset=critical_fields)

# Step 4: Remove duplicates
df = df.dropDuplicates(["EMIID"])

# Step 5: Save cleaned data to Silver
silver_path = "Files/Silver/EMIs"
df.write.mode("overwrite").format("parquet").save(silver_path)

# Preview cleaned result
df.show(5)


StatementMeta(, f6ecba0f-a1d8-4a2d-993d-8aa10a2790ca, 32, Finished, Available, Finished)

+---------+------+---------+----------+---------+----------+-----------+-------+
|    EMIID|LoanID|EMINumber|   DueDate|EMIAmount|AmountPaid|PaymentDate| Status|
+---------+------+---------+----------+---------+----------+-----------+-------+
|E00001_05|L00001|        5|2023-03-11| 32432.53|  32432.53|       NULL|   Paid|
|E00001_22|L00001|       22|2024-08-02| 32432.53|  32432.53|       NULL|   Paid|
|E00001_23|L00001|       23|2024-09-01| 32432.53|  32432.53|       NULL|   Paid|
|E00001_32|L00001|       32|2025-05-29| 32432.53|  32432.53|       NULL|   Paid|
|E00001_42|L00001|       42|2026-03-25| 32432.53|       0.0|       NULL|Overdue|
+---------+------+---------+----------+---------+----------+-----------+-------+
only showing top 5 rows



In [31]:
from pyspark.sql.functions import col, to_date

# Step 1: Read raw data from Bronze
bronze_path = "Files/Bronze/Customers.csv"
df = spark.read.option("header", True).csv(bronze_path)

# Step 2: Drop BranchID & cast appropriate types
df = df.drop("BranchID")

df = df.select(
    col("CustomerID"),
    col("CustomerName"),
    col("Gender"),
    to_date(col("DOB"), "dd-MM-yyyy").alias("DOB"),
    col("City"),
    col("PhoneNumber"),
    col("Email"),
    col("PAN"),
    col("KYCStatus")
)

# Step 3: Drop nulls in critical fields
critical_fields = ["CustomerID", "CustomerName", "Gender", "DOB", "PAN"]
df = df.dropna(subset=critical_fields)

# Step 4: Remove duplicates (just in case)
df = df.dropDuplicates(["CustomerID"])

# Step 5: Save cleaned data to Silver
silver_path = "Files/Silver/Customers"
df.write.mode("overwrite").format("parquet").save(silver_path)

# Preview cleaned result
df.show(5)

StatementMeta(, f6ecba0f-a1d8-4a2d-993d-8aa10a2790ca, 33, Finished, Available, Finished)

+----------+----------------+------+----------+---------+-----------+-----------------+----------+---------+
|CustomerID|    CustomerName|Gender|       DOB|     City|PhoneNumber|            Email|       PAN|KYCStatus|
+----------+----------------+------+----------+---------+-----------+-----------------+----------+---------+
|    C00001|Xhftrxc Nacgposs|Female|1994-07-06|Hyderabad|9.18808E+11|user1@example.com|WEQYI5851K| Verified|
|    C00002|Nfpvaus Khftcjji|  Male|1982-12-25|    Delhi|9.16505E+11|user2@example.com|AJDZD0234O|  Pending|
|    C00003|Iusnzjo Uqwpsbfh|  Male|1978-11-26|Hyderabad|9.19969E+11|user3@example.com|ZZWVP3996C| Verified|
|    C00004|Cwwjlve Lfgyqpes|Female|1987-11-20|Hyderabad|9.17176E+11|user4@example.com|XLHFJ1828G| Verified|
|    C00005|Fmhyrfi Yufaigfy|Female|1986-03-21|    Delhi|9.16061E+11|user5@example.com|JPZUP3947W| Verified|
+----------+----------------+------+----------+---------+-----------+-----------------+----------+---------+
only showing top 5 

In [32]:
from pyspark.sql.functions import col

# Step 1: Read raw data from Bronze
bronze_path = "Files/Bronze/Branches.csv"
df = spark.read.option("header", True).csv(bronze_path)

# Step 2: Select and cast necessary columns
df = df.select(
    col("BranchID"),
    col("BranchName"),
    col("Region"),
    col("Manager"),
    col("ContactNumber")
)

# Step 3: Drop nulls in critical fields
critical_fields = ["BranchID", "BranchName", "Region"]
df = df.dropna(subset=critical_fields)

# Step 4: Drop duplicates if any
df = df.dropDuplicates(["BranchID"])

# Step 5: Save cleaned data to Silver
silver_path = "Files/Silver/Branches"
df.write.mode("overwrite").format("parquet").save(silver_path)

# Preview cleaned result
df.show(5)

StatementMeta(, f6ecba0f-a1d8-4a2d-993d-8aa10a2790ca, 34, Finished, Available, Finished)

+--------+-------------+------+---------------+-------------+
|BranchID|   BranchName|Region|        Manager|ContactNumber|
+--------+-------------+------+---------------+-------------+
|    B001|     CredSure| North|Ecqkdvl Zpsxvms| 918588211764|
|    B002|    LoanMetro|  East|Loziooh Hkdvewy| 916666493577|
|    B003|ElevateCredit| North|Uwqccry Bxvqeim| 918209280975|
|    B004| Zenith Loans|  West|Hierzub Ytrdezu| 919826998727|
|    B005|    MoneyTree| South|Xmrnabq Hjfqizn| 918228754332|
+--------+-------------+------+---------------+-------------+
only showing top 5 rows



In [38]:
from pyspark.sql.functions import col

# Step 1: Read raw data from Bronze
bronze_path = "Files/Bronze/Risk_Score.csv"
df = spark.read.option("header", True).csv(bronze_path)

# Step 2: Select and cast necessary columns
df = df.select(
    col("CustomerID"),
    col("RiskScore").cast("int")
)

# Step 3: Drop nulls in critical fields
df = df.dropna(subset=["CustomerID", "RiskScore"])

# Step 4: Drop duplicates – keep latest/most frequent if needed (here we assume only 1 entry per CustomerID)
df = df.dropDuplicates(["CustomerID"])

# Step 5: Clamp outliers if any extreme risk scores exist (e.g. >100 or <0)
df = df.filter((col("RiskScore") >= 0) & (col("RiskScore") <= 100))

# Step 6: Save cleaned data to Silver
silver_path = "Files/Silver/RiskScore"
df.write.mode("overwrite").format("parquet").save(silver_path)

# Preview cleaned result
df.show(5)

StatementMeta(, f6ecba0f-a1d8-4a2d-993d-8aa10a2790ca, 40, Finished, Available, Finished)

+----------+---------+
|CustomerID|RiskScore|
+----------+---------+
|    C02599|       35|
|    C00929|       61|
|    C01720|       65|
|    C00126|       60|
|    C02501|       54|
+----------+---------+
only showing top 5 rows



In [None]:
# Silver Layer: Create ForecastData Parquet from CSV

from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, IntegerType, StringType

# Load Forecast CSV
df_forecast = spark.read.option("header", True).csv("Files/Bronze/ForecastData.csv")

# Cast to appropriate types
df_forecast = df_forecast.select(
    col("LoanID"),
    col("Forecast_OverdueAmount").cast(DoubleType()),
    col("Forecast_NPA_Percent").cast(DoubleType()),
    col("PredictedRiskScore").cast(IntegerType()),
    col("PriorityRecoveryFlag").cast(StringType())
)

# Save as Parquet in Silver layer
df_forecast.write.mode("overwrite").parquet("Files/Silver/ForecastData/")

# Preview cleaned result
df_forecast.show(5)

StatementMeta(, 930649ea-1c06-47c0-8f94-c23c616c8d34, 5, Finished, Available, Finished)

+------+----------------------+--------------------+------------------+--------------------+
|LoanID|Forecast_OverdueAmount|Forecast_NPA_Percent|PredictedRiskScore|PriorityRecoveryFlag|
+------+----------------------+--------------------+------------------+--------------------+
|L00001|     487902.9500000002|               35.26|                91|                 Yes|
|L00002|                2011.0|                0.26|                83|                  No|
|L00003|                2868.0|                0.23|                82|                  No|
|L00004|                4189.0|                0.82|                64|                  No|
|L00005|                4599.0|                0.32|              NULL|                  No|
+------+----------------------+--------------------+------------------+--------------------+
only showing top 5 rows

