In [14]:
# STEP 1: Import Libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import rand, col

# STEP 2: Start Spark Session
spark = SparkSession.builder.appName("Loan_RF_3Class_BestFix").getOrCreate()

# STEP 3: Load Data
df = spark.read.csv("MC_loan_data.csv", header=True, inferSchema=True)

# STEP 4: Define Features and Target
safe_features = [
    "salary", "loan_term", "occupation", "contract_amount",
    "installment_amount", "interest_rate", "brand", "model_name",
    "fraud_alert_pm1", "contract_id"
]
target = "application_status"
df = df.select(safe_features + [target])

# STEP 5: Filter only rows with non-null target
df_clean = df.filter(col(target).isNotNull())

# STEP 6: Fit StringIndexer from full df to include all labels
label_indexer = StringIndexer(inputCol=target, outputCol="label")
label_model = label_indexer.fit(df)
print("Label mapping:", label_model.labels)

# STEP 7: Transform label column
data = label_model.transform(df_clean)

# STEP 8: Fill missing feature values to prevent label loss
# Numerical fill = 0, Categorical fill = 'unknown'
fill_dict = {
    "salary": 0,
    "loan_term": 0,
    "contract_amount": 0,
    "installment_amount": 0,
    "interest_rate": 0,
    "occupation": "unknown",
    "brand": "unknown",
    "model_name": "unknown",
    "fraud_alert_pm1": "unknown",
    "contract_id": "unknown"
}
data = data.fillna(fill_dict)

# STEP 9: Confirm all 3 labels exist
print("Labels after fillna():")
data.groupBy("label").count().orderBy("label").show()

# STEP 10: Oversample to balance all classes
labels = data.select("label").distinct().rdd.flatMap(lambda x: x).collect()
class_counts = data.groupBy("label").count().collect()
class_count_map = {row['label']: row['count'] for row in class_counts}
max_size = max(class_count_map.values())

samples = []
for l in labels:
    class_df = data.filter(col("label") == l)
    ratio = max_size / class_count_map[l]
    print(f"Oversampling label {l}: ratio = {ratio:.2f}")
    sampled_df = class_df.sample(withReplacement=True, fraction=ratio, seed=42)
    samples.append(sampled_df)

data_balanced = samples[0]
for s in samples[1:]:
    data_balanced = data_balanced.union(s)

# STEP 11: Confirm balanced dataset
print("Balanced class distribution:")
data_balanced.groupBy("label").count().orderBy("label").show()

# STEP 12: Feature Engineering
categorical_cols = ["occupation", "brand", "model_name", "fraud_alert_pm1", "contract_id"]
numeric_cols = ["salary", "loan_term", "contract_amount", "installment_amount", "interest_rate"]

indexers = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=c + "_idx", outputCol=c + "_vec") for c in categorical_cols]
final_features = numeric_cols + [c + "_vec" for c in categorical_cols]
assembler = VectorAssembler(inputCols=final_features, outputCol="features")

# STEP 13: Train-Test Split
data_shuffled = data_balanced.orderBy(rand())
train_data, test_data = data_shuffled.randomSplit([0.7, 0.3], seed=2025)

# STEP 14: Train Random Forest
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100, maxDepth=10)

# STEP 15: Build and Train Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])
model = pipeline.fit(train_data)

# STEP 16: Predict
predictions = model.transform(test_data)

# STEP 17: Evaluation
print("Evaluation metrics:")
metrics = ["accuracy", "f1", "weightedPrecision", "weightedRecall"]
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName=metric)
    print(f"{metric}: {evaluator.evaluate(predictions):.2f}")

# STEP 18: Confusion Matrix
print("Confusion Matrix:")
predictions.groupBy("label", "prediction").count().orderBy("label", "prediction").show()


Label mapping: ['Approved with condition', 'Approved', 'Rejected']
Labels after fillna():
+-----+-----+
|label|count|
+-----+-----+
|  0.0| 2455|
|  1.0| 1295|
|  2.0| 1250|
+-----+-----+



                                                                                

Oversampling label 0.0: ratio = 1.00
Oversampling label 1.0: ratio = 1.90
Oversampling label 2.0: ratio = 1.96
Balanced class distribution:
+-----+-----+
|label|count|
+-----+-----+
|  0.0| 2544|
|  1.0| 2465|
|  2.0| 2481|
+-----+-----+



                                                                                

Evaluation metrics:
accuracy: 0.84
f1: 0.83
weightedPrecision: 0.89
weightedRecall: 0.84
Confusion Matrix:




+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|  437|
|  0.0|       1.0|  362|
|  1.0|       1.0|  726|
|  2.0|       2.0|  737|
+-----+----------+-----+



                                                                                

In [3]:
df.groupBy("application_status").count().show(truncate=False)

+-----------------------+-----+
|application_status     |count|
+-----------------------+-----+
|Approved               |1295 |
|Rejected               |1250 |
|Approved with condition|2455 |
+-----------------------+-----+

