# Assignment: Predicting Loan Default Using PySpark MLlib
End-to-end, minimal-shuffle, Colab/Dataproc/local friendly.

In [None]:

# If running on Colab, install PySpark
import sys, os, subprocess, pathlib, numpy as np, pandas as pd
if 'google.colab' in sys.modules:
    try:
        import pyspark  # noqa: F401
    except Exception:
        print("Installing PySpark...")
        subprocess.run([sys.executable, "-m", "pip", "install", "-q", "pyspark==3.5.1"], check=True)

# Generate CSV if not present
DATA_PATH = "loan_data.csv"
if not os.path.exists(DATA_PATH):
    rng = np.random.default_rng(42)
    N = 5000
    def weighted_choice(options, probs):
        return rng.choice(options, size=N, p=probs)

    age = rng.integers(21, 69, size=N)
    gender = weighted_choice(["Male", "Female"], [0.55, 0.45])
    income = np.round(rng.normal(loc=60000, scale=20000, size=N).clip(15000, 200000), 2)
    loan_amount = np.round((income * rng.uniform(0.1, 0.6, size=N)) + rng.normal(0, 3000, size=N), 2).clip(1000, 300000)
    loan_term = rng.integers(12, 360, size=N)
    credit_score = np.round(np.clip(rng.normal(680, 60, size=N), 300, 850), 1)
    employment_status = weighted_choice(["Employed", "Unemployed", "Self-employed", "Student"], [0.65, 0.08, 0.22, 0.05])
    marital_status = weighted_choice(["Single", "Married", "Divorced"], [0.45, 0.45, 0.10])
    dti = loan_amount / (income + 1e-6)
    base = 1.5 - (credit_score - 680) / 120 + 0.35 * (dti - 0.35) + 0.1 * ((age < 25) * 1) + 0.05 * ((age > 60) * 1)
    emp_adj = np.select([employment_status == "Unemployed", employment_status == "Student", employment_status == "Self-employed"],
                        [0.6, 0.3, 0.1], default=0.0)
    mar_adj = np.where(marital_status == "Married", -0.05, 0.05)
    logit = base + emp_adj + mar_adj + rng.normal(0, 0.25, size=N)
    pd_default = 1 / (1 + np.exp(-logit))
    default = np.where(rng.random(N) < pd_default, "Yes", "No")
    customerID = [f"C{100000 + i}" for i in range(N)]
    df = pd.DataFrame({
        "customerID": customerID,
        "age": age.astype(int),
        "gender": gender,
        "income": income.astype(float),
        "loan_amount": loan_amount.astype(float),
        "loan_term": loan_term.astype(int),
        "credit_score": credit_score.astype(float),
        "employment_status": employment_status,
        "marital_status": marital_status,
        "default": default
    })
    df.to_csv(DATA_PATH, index=False)
    print(f"Wrote {DATA_PATH} with shape", df.shape)
else:
    print("Found existing", DATA_PATH)


In [None]:

import sys
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.pipeline import PipelineModel

DATA_PATH = "loan_data.csv"
SEED = 42

spark = (SparkSession.builder
         .appName("LoanDefault_PySpark_MLlib")
         .config("spark.sql.shuffle.partitions", "64")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

df = (spark.read.option("header", True).option("inferSchema", True).csv(DATA_PATH))

# clean / cast
for c in df.columns:
    df = df.withColumnRenamed(c, c.strip().lower().replace(" ", "_"))
string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, T.StringType)]
for c in string_cols:
    df = df.withColumn(c, F.trim(F.col(c)))
cast_map = {"age": T.IntegerType(), "income": T.DoubleType(), "loan_amount": T.DoubleType(),
            "loan_term": T.IntegerType(), "credit_score": T.DoubleType()}
for c, t in cast_map.items():
    if c in df.columns:
        df = df.withColumn(c, F.col(c).cast(t))

required = ["customerid","age","gender","income","loan_amount","loan_term","credit_score","employment_status","marital_status","default"]
for c in required:
    if c not in df.columns:
        raise ValueError("Missing column: " + c)

df = df.dropDuplicates()
for c in required:
    df = df.filter(F.col(c).isNotNull())

df = df.cache()
print("Rows:", df.count())

cat_cols = ["gender","employment_status","marital_status"]
idx_cols = [c + "_idx" for c in cat_cols]
ohe_cols = [c + "_ohe" for c in cat_cols]

indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") for c in cat_cols]
encoder = OneHotEncoder(inputCols=idx_cols, outputCols=ohe_cols, handleInvalid="keep")
label_indexer = StringIndexer(inputCol="default", outputCol="label", handleInvalid="keep")
num_cols = ["age","income","loan_amount","loan_term","credit_score"]
assembler = VectorAssembler(inputCols=num_cols + ohe_cols, outputCol="features", handleInvalid="keep")

train, test = df.randomSplit([0.7, 0.3], seed=SEED)

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50)
rf = RandomForestClassifier(featuresCol="features", labelCol="label", seed=SEED, numTrees=200, maxDepth=8, subsamplingRate=0.8)

pipeline_lr = Pipeline(stages=indexers + [encoder, label_indexer, assembler, lr])
pipeline_rf = Pipeline(stages=indexers + [encoder, label_indexer, assembler, rf])

model_lr = pipeline_lr.fit(train)
model_rf = pipeline_rf.fit(train)

pred_lr = model_lr.transform(test).cache()
pred_rf = model_rf.transform(test).cache()

bin_eval = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
acc_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
f1_eval  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

def metrics(pred):
    auc = bin_eval.evaluate(pred)
    acc = acc_eval.evaluate(pred)
    f1  = f1_eval.evaluate(pred)
    cm = (pred.select(F.col("label").cast("int").alias("y"), F.col("prediction").cast("int").alias("p"))
               .groupBy("y","p").count())
    return auc, acc, f1, cm

auc_lr, acc_lr, f1_lr, cm_lr = metrics(pred_lr)
auc_rf, acc_rf, f1_rf, cm_rf = metrics(pred_rf)

print("\\n=== Logistic Regression ===")
print(f"AUC={auc_lr:.4f}  ACC={acc_lr:.4f}  F1={f1_lr:.4f}")
cm_lr.orderBy("y","p").show()

print("\\n=== Random Forest ===")
print(f"AUC={auc_rf:.4f}  ACC={acc_rf:.4f}  F1={f1_rf:.4f}")
cm_rf.orderBy("y","p").show()

# CV on LR (tight grid)
param_grid_lr = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.0, 0.01, 0.1])
                 .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                 .build())
cv_lr = CrossValidator(estimator=pipeline_lr, estimatorParamMaps=param_grid_lr,
                       evaluator=bin_eval, numFolds=3, seed=SEED, parallelism=2)

cv_lr_model = cv_lr.fit(train)
cv_auc = bin_eval.evaluate(cv_lr_model.transform(test))
print(f"\\nCV LR best AUC={cv_auc:.4f}")

# Feature importances for RF
rf_fitted = [s for s in model_rf.stages if isinstance(s, RandomForestClassificationModel)][0]
importances = rf_fitted.featureImportances
feature_cols = num_cols + ohe_cols
print("\\nTop numeric feature importances (RF):")
for i, c in enumerate(num_cols):
    print(f"{c:15s}: {importances[i]:.6f}")

# Save & reload model
SAVE_PATH = "models/rf_default_pipeline"
import shutil, os
if os.path.exists(SAVE_PATH):
    shutil.rmtree(SAVE_PATH)
model_rf.write().overwrite().save(SAVE_PATH)
reloaded = PipelineModel.load(SAVE_PATH)
print("Reload OK:", reloaded.transform(test).select("prediction").limit(1).count())
