In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SmartLoan").getOrCreate()
print(spark.version)


3.5.1


In [54]:
from google.colab import files

uploaded = files.upload()

Saving synthetic_loans_dataset.csv to synthetic_loans_dataset.csv


In [55]:
import pandas as pd

df = pd.read_csv("synthetic_loans_dataset.csv")

# Show first 5 rows
df.head()


Unnamed: 0,loan_id,customer_age,loan_amount,loan_tenure_months,interest_rate,monthly_income,credit_score,default_within_3m
0,1,59,475606,36,13.4,2860,537,1
1,2,49,247932,60,8.5,15265,713,0
2,3,35,167030,60,7.3,10685,695,0
3,4,63,534815,12,15.9,4433,493,1
4,5,28,414819,12,11.1,6117,396,1


In [56]:
# Cell 1 — Start Spark and read your uploaded CSV
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("SmartLoan").getOrCreate()

# read the CSV you uploaded earlier
csv_path = "synthetic_loans_dataset.csv"   # make sure this file is in Colab environment (use files.upload if needed)
loan_df = spark.read.option("header", True).option("inferSchema", True).csv(csv_path)

print("Rows:", loan_df.count())
loan_df.printSchema()
loan_df.show(5, truncate=False)



Rows: 241
root
 |-- loan_id: integer (nullable = true)
 |-- customer_age: integer (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- loan_tenure_months: integer (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- monthly_income: integer (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- default_within_3m: integer (nullable = true)

+-------+------------+-----------+------------------+-------------+--------------+------------+-----------------+
|loan_id|customer_age|loan_amount|loan_tenure_months|interest_rate|monthly_income|credit_score|default_within_3m|
+-------+------------+-----------+------------------+-------------+--------------+------------+-----------------+
|1      |59          |475606     |36                |13.4         |2860          |537         |1                |
|2      |49          |247932     |60                |8.5          |15265         |713         |0                |
|3      |35          |167030     |60           

In [57]:
#Basic data tests
from pyspark.sql.functions import isnan, when, count

def basic_data_checks(df):
    print("Total rows:", df.count())
    # null counts per column
    df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns]).show(truncate=False)
    # sensible ranges
    print("Credit score min/max:")
    df.selectExpr("min(credit_score) as min_cs", "max(credit_score) as max_cs").show()
    print("Age min/max:")
    df.selectExpr("min(customer_age) as min_age", "max(customer_age) as max_age").show()

basic_data_checks(loan_df)


Total rows: 241
+-------+------------+-----------+------------------+-------------+--------------+------------+-----------------+
|loan_id|customer_age|loan_amount|loan_tenure_months|interest_rate|monthly_income|credit_score|default_within_3m|
+-------+------------+-----------+------------------+-------------+--------------+------------+-----------------+
|0      |0           |0          |0                 |0            |0             |0           |0                |
+-------+------------+-----------+------------------+-------------+--------------+------------+-----------------+

Credit score min/max:
+------+------+
|min_cs|max_cs|
+------+------+
|   396|   809|
+------+------+

Age min/max:
+-------+-------+
|min_age|max_age|
+-------+-------+
|     21|     65|
+-------+-------+



In [59]:
# Simple features
from pyspark.sql.functions import expr, round as spark_round, when

# estimate emi as loan_amount / tenure (simple proxy)
loan_df2 = loan_df.withColumn("emi_est", spark_round(col("loan_amount") / col("loan_tenure_months")))

# income to emi ratio (how many EMIs monthly income covers)
loan_df2 = loan_df2.withColumn("income_to_emi", spark_round(col("monthly_income") / (col("emi_est") + 1), 2))

# simple low-credit flag (business rule)
loan_df2 = loan_df2.withColumn("low_credit_flag", when(col("credit_score") < 600, 1).otherwise(0))

loan_df2.select("loan_id","loan_amount","loan_tenure_months","emi_est","monthly_income","income_to_emi","credit_score","low_credit_flag").show(8, truncate=False)



+-------+-----------+------------------+-------+--------------+-------------+------------+---------------+
|loan_id|loan_amount|loan_tenure_months|emi_est|monthly_income|income_to_emi|credit_score|low_credit_flag|
+-------+-----------+------------------+-------+--------------+-------------+------------+---------------+
|1      |475606     |36                |13211.0|2860          |0.22         |537         |1              |
|2      |247932     |60                |4132.0 |15265         |3.69         |713         |0              |
|3      |167030     |60                |2784.0 |10685         |3.84         |695         |0              |
|4      |534815     |12                |44568.0|4433          |0.1          |493         |1              |
|5      |414819     |12                |34568.0|6117          |0.18         |396         |1              |
|6      |438716     |36                |12187.0|4747          |0.39         |509         |1              |
|7      |610769     |60              

In [60]:
# Save cleaned/preprocessed data (parquet)
clean_path = "cleaned_loans.parquet"
loan_df2.write.mode("overwrite").parquet(clean_path)
print("Saved cleaned parquet to:", clean_path)



Saved cleaned parquet to: cleaned_loans.parquet


In [61]:
#Prepare ML features
from pyspark.ml.feature import VectorAssembler

feature_cols = ["customer_age", "loan_amount", "loan_tenure_months", "interest_rate", "monthly_income", "credit_score", "income_to_emi", "low_credit_flag"]
# drop any pre-existing 'features' col if present
if "features" in loan_df2.columns:
    loan_df2 = loan_df2.drop("features")

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
ml_df = assembler.transform(loan_df2).select(*feature_cols, "features", "default_within_3m", "loan_id")
print("Prepared ML Data:")
ml_df.show(5, truncate=False)


Prepared ML Data:
+------------+-----------+------------------+-------------+--------------+------------+-------------+---------------+-----------------------------------------------+-----------------+-------+
|customer_age|loan_amount|loan_tenure_months|interest_rate|monthly_income|credit_score|income_to_emi|low_credit_flag|features                                       |default_within_3m|loan_id|
+------------+-----------+------------------+-------------+--------------+------------+-------------+---------------+-----------------------------------------------+-----------------+-------+
|59          |475606     |36                |13.4         |2860          |537         |0.22         |1              |[59.0,475606.0,36.0,13.4,2860.0,537.0,0.22,1.0]|1                |1      |
|49          |247932     |60                |8.5          |15265         |713         |3.69         |0              |[49.0,247932.0,60.0,8.5,15265.0,713.0,3.69,0.0]|0                |2      |
|35          |167030  

In [62]:
#Train/test split & logistic regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)
print("Train rows:", train_df.count(), "Test rows:", test_df.count())

lr = LogisticRegression(featuresCol="features", labelCol="default_within_3m", maxIter=20)
lr_model = lr.fit(train_df)

# predictions on test
pred = lr_model.transform(test_df)
pred.select("loan_id","default_within_3m","prediction").show(10)


Train rows: 205 Test rows: 36
+-------+-----------------+----------+
|loan_id|default_within_3m|prediction|
+-------+-----------------+----------+
|     90|                1|       1.0|
|    201|                0|       0.0|
|    210|                1|       1.0|
|    202|                1|       1.0|
|     36|                0|       0.0|
|    216|                1|       0.0|
|    221|                1|       1.0|
|    181|                1|       1.0|
|     97|                1|       1.0|
|      5|                1|       1.0|
+-------+-----------------+----------+
only showing top 10 rows



In [63]:
#Evaluation: AUC and confusion-like counts
evaluator = BinaryClassificationEvaluator(labelCol="default_within_3m", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(pred)
print(f"Test AUC: {auc:.3f}")

# Confusion-like counts: TP, FP, TN, FN
pred_counts = pred.groupBy("default_within_3m", "prediction").count().toPandas()
print("\nConfusion counts (default_vs_prediction):")
print(pred_counts)


Test AUC: 0.898

Confusion counts (default_vs_prediction):
   default_within_3m  prediction  count
0                  1         0.0      5
1                  0         0.0     11
2                  1         1.0     20


In [64]:
import pandas as pd

df = pd.read_csv("synthetic_loans_dataset.csv")

req = ["loan_id","credit_score","customer_age","loan_amount","loan_tenure_months",
       "interest_rate","monthly_income","default_within_3m"]

# fail fast if required columns missing
miss = [c for c in req if c not in df.columns]
if miss:
    print("FAIL: missing columns:", miss); raise SystemExit

dup_count = df['loan_id'].duplicated(keep=False).sum()
bad_credit_count = ((df['credit_score']<300)|(df['credit_score']>850)|df['credit_score'].isna()).sum()
missing_over_5 = [c for c in req if df[c].isna().mean() > 0.05]

print("loan_id duplicates:", "FAIL" if dup_count>0 else "PASS", f"({dup_count})")
print("credit_score range:", "FAIL" if bad_credit_count>0 else "PASS", f"({bad_credit_count})")
print("missingness >5%:", "FAIL" if missing_over_5 else "PASS", missing_over_5)

if dup_count==0 and bad_credit_count==0 and not missing_over_5:
    print("DATA QUALITY: PASS")
else:
    print("DATA QUALITY: FAIL — inspect above outputs")


loan_id duplicates: PASS (0)
credit_score range: PASS (0)
missingness >5%: PASS []
DATA QUALITY: PASS


In [67]:
from pyspark.sql import functions as F

# filter predicted positives, then show all of them
pos = pred.filter(F.col("prediction") == 1.0)
n = pos.count()
pos.select("loan_id","default_within_3m","probability","prediction") \
   .show(n, truncate=False)


+-------+-----------------+----------------------------------------+----------+
|loan_id|default_within_3m|probability                             |prediction|
+-------+-----------------+----------------------------------------+----------+
|90     |1                |[0.20381679687827556,0.7961832031217244]|1.0       |
|210    |1                |[0.23587843725919042,0.7641215627408096]|1.0       |
|202    |1                |[0.1577410783317988,0.8422589216682013] |1.0       |
|221    |1                |[0.1817583783356102,0.8182416216643897] |1.0       |
|181    |1                |[0.10416324974231062,0.8958367502576894]|1.0       |
|97     |1                |[0.2548331784415525,0.7451668215584475] |1.0       |
|5      |1                |[0.22138714304483884,0.7786128569551611]|1.0       |
|182    |1                |[0.1988768831518489,0.8011231168481511] |1.0       |
|217    |1                |[0.1850512577583378,0.8149487422416621] |1.0       |
|239    |1                |[0.1919913481