In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, count, abs as spark_abs
import time
spark = SparkSession.builder \
    .appName("CIRT Analysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

df = spark.read.parquet("/opt/airflow/data/final/merged_data.parquet")

print(f"Total rows: {df.count():,}")
print(f"Total columns: {len(df.columns)}")

# Some basic analysis
df.select("Borrower Credit Score at Origination", "vs4_trimerge").describe().show()

Total rows: 1,295,846
Total columns: 117
+-------+------------------------------------+------------------+
|summary|Borrower Credit Score at Origination|      vs4_trimerge|
+-------+------------------------------------+------------------+
|  count|                             1295219|           1295846|
|   mean|                   751.9691835898022| 762.4120080626865|
| stddev|                   42.41599431136098|45.863278799638096|
|    min|                                 580|               485|
|    max|                                 840|               850|
+-------+------------------------------------+------------------+



In [10]:
import pandas as pd
pd.read_parquet("/opt/airflow/data/final/merged_data.parquet").info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1295846 entries, 0 to 1295845
Columns: 117 entries, Reference Pool ID to vs4_bimerge_highest
dtypes: float64(53), int32(17), object(47)
memory usage: 1.0+ GB


In [17]:
start = time.time()

df_unoptimized = df \
    .withColumn("score_diff", col("vs4_trimerge") - col("Borrower Credit Score at Origination")) \
    .withColumn("is_delinquent", when(col("Current Loan Delinquency Status") > 0, 1).otherwise(0))

df_unoptimized = df_unoptimized.filter(
    (col("Borrower Credit Score at Origination").between(300, 850)) &
    (col("vs4_trimerge").between(300, 850))
)

count_unoptimized = df_unoptimized.count()
time_unoptimized = time.time() - start
print(f"Rows: {count_unoptimized:,}")
print(f"Time: {time_unoptimized:.2f}s")

Rows: 1,295,219
Time: 0.20s


In [18]:
start = time.time()
df_optimized = df.filter(
    (col("Borrower Credit Score at Origination").between(300, 850)) &
    (col("vs4_trimerge").between(300, 850)) &
    (col("Current Loan Delinquency Status").isNotNull())
)

df_optimized = df_optimized \
    .withColumn("score_diff", col("vs4_trimerge") - col("Borrower Credit Score at Origination")) \
    .withColumn("is_delinquent", when(col("Current Loan Delinquency Status") > 0, 1).otherwise(0))

count_optimized = df_optimized.count()
time_optimized = time.time() - start

print(f"Rows: {count_optimized:,}")
print(f"Time: {time_optimized:.2f}s")

speedup = time_unoptimized / time_optimized
print(f"\n Speedup: {speedup:.2f}x faster")

Rows: 1,115,619
Time: 0.18s

 Speedup: 1.09x faster


In [25]:
delinq_analysis = df_optimized.groupBy("is_delinquent") \
    .agg(
        count("*").alias("total_loans"),
        avg("Borrower Credit Score at Origination").alias("avg_fico"),
        avg("vs4_trimerge").alias("avg_vs4"),
        avg(spark_abs("score_diff")).alias("avg_score_diff")
    )
delinq_analysis.explain(mode="formatted")
print("\nDelinquency Comparison:")
delinq_analysis.show()

== Physical Plan ==
AdaptiveSparkPlan (7)
+- HashAggregate (6)
   +- Exchange (5)
      +- HashAggregate (4)
         +- Project (3)
            +- Filter (2)
               +- Scan parquet  (1)


(1) Scan parquet 
Output [3]: [Borrower Credit Score at Origination#4507, Current Loan Delinquency Status#4523, vs4_trimerge#4597]
Batched: true
Location: InMemoryFileIndex [file:/opt/airflow/data/final/merged_data.parquet]
PushedFilters: [IsNotNull(`Borrower Credit Score at Origination`), IsNotNull(vs4_trimerge), GreaterThanOrEqual(`Borrower Credit Score at Origination`,300), LessThanOrEqual(`Borrower Credit Score at Origination`,850), GreaterThanOrEqual(vs4_trimerge,300), LessThanOrEqual(vs4_trimerge,850), IsNotNull(`Current Loan Delinquency Status`)]
ReadSchema: struct<Borrower Credit Score at Origination:int,Current Loan Delinquency Status:int,vs4_trimerge:int>

(2) Filter
Input [3]: [Borrower Credit Score at Origination#4507, Current Loan Delinquency Status#4523, vs4_trimerge#4597]
Condi

In [26]:
df_optimized.createOrReplaceTempView("loans")

#SQL Query 1 - Delinquency by Score Range
query1 = """
SELECT 
    CASE 
        WHEN vs4_trimerge >= 700 THEN 'High'
        WHEN vs4_trimerge >= 600 THEN 'Medium'
        ELSE 'Low'
    END as score_range,
    COUNT(*) as loans,
    ROUND(AVG(is_delinquent) * 100, 2) as delinq_rate
FROM loans
GROUP BY 
    CASE 
        WHEN vs4_trimerge >= 700 THEN 'High'
        WHEN vs4_trimerge >= 600 THEN 'Medium'
        ELSE 'Low'
    END
ORDER BY delinq_rate
"""
spark.sql(query1).explain(mode="formatted")
spark.sql(query1).show()

== Physical Plan ==
AdaptiveSparkPlan (9)
+- Sort (8)
   +- Exchange (7)
      +- HashAggregate (6)
         +- Exchange (5)
            +- HashAggregate (4)
               +- Project (3)
                  +- Filter (2)
                     +- Scan parquet  (1)


(1) Scan parquet 
Output [3]: [Borrower Credit Score at Origination#4507, Current Loan Delinquency Status#4523, vs4_trimerge#4597]
Batched: true
Location: InMemoryFileIndex [file:/opt/airflow/data/final/merged_data.parquet]
PushedFilters: [IsNotNull(`Borrower Credit Score at Origination`), IsNotNull(vs4_trimerge), GreaterThanOrEqual(`Borrower Credit Score at Origination`,300), LessThanOrEqual(`Borrower Credit Score at Origination`,850), GreaterThanOrEqual(vs4_trimerge,300), LessThanOrEqual(vs4_trimerge,850), IsNotNull(`Current Loan Delinquency Status`)]
ReadSchema: struct<Borrower Credit Score at Origination:int,Current Loan Delinquency Status:int,vs4_trimerge:int>

(2) Filter
Input [3]: [Borrower Credit Score at Origination#4

In [28]:
# SQL Query 2 - VS4 vs FICO Comparison:
query2 = """
SELECT 
    CASE WHEN score_diff > 0 THEN 'VS4 Higher' ELSE 'FICO Higher' END as winner,
    COUNT(*) as loans,
    ROUND(AVG(score_diff), 2) as avg_diff,
    ROUND(AVG(is_delinquent) * 100, 2) as delinq_rate
FROM loans
GROUP BY CASE WHEN score_diff > 0 THEN 'VS4 Higher' ELSE 'FICO Higher' END
"""
query2_df = spark.sql(query2)
query2_df.show()

+-----------+------+--------+-----------+
|     winner| loans|avg_diff|delinq_rate|
+-----------+------+--------+-----------+
| VS4 Higher|701121|   30.38|       1.82|
|FICO Higher|414498|  -22.95|       2.37|
+-----------+------+--------+-----------+



In [29]:
output_path = "/opt/airflow/data/final/fico_vs_vs4_analysis.parquet"

df_optimized.select(
    "Borrower Credit Score at Origination",
    "vs4_trimerge",
    "score_diff",
    "is_delinquent"
).write \
    .mode("overwrite") \
    .partitionBy("is_delinquent") \
    .parquet(output_path)

                                                                                