# Initialize Spark session

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session with maximum configuration
spark = SparkSession.builder \
    .appName("loan_default") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.num.executors", "4") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .getOrCreate()


In [2]:
# Load data from CSV
df_filled = spark.read.csv("E:/Class/sem3/big data framework/Project/df_filled.csv", header=True, inferSchema=True)

# Show the DataFrame schema and a few rows to confirm successful loading
df_filled.printSchema()


root
 |-- SK_ID_CURR: double (nullable = true)
 |-- TARGET: integer (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: double (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: double (nullable = true)
 |-- DAYS_EMPLOYED: double (nullable = true)
 |-- DAYS_REGISTRATION: double (nullable = true)
 |-- DAYS_ID_PUBLISH: double (nullable = true)
 |-- OWN_CAR_AGE: do

# Data cleaning

In [3]:
# Step 1: Separate features and target
target_col = "TARGET"
X = df_filled.drop(target_col)
y = df_filled.select("SK_ID_CURR", target_col)

In [4]:
# Identify categorical columns
categorical_columns = [field.name for field in X.schema.fields if field.dataType.simpleString() == 'string']

# Identify numerical columns
numerical_columns = [field.name for field in X.schema.fields if field.dataType.simpleString() in ['int', 'double']]


In [32]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Drop existing index columns if they exist to avoid conflicts
existing_columns = set(X.columns)
index_columns = [col + '_index' for col in categorical_columns]
columns_to_drop = [col for col in index_columns if col in existing_columns]

if columns_to_drop:
    X = X.drop(*columns_to_drop)

# Define indexers for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + '_index') for col in categorical_columns]

# Define the assembler with indexed and numerical columns
indexed_columns = [col + '_index' for col in categorical_columns]
assembler = VectorAssembler(inputCols=indexed_columns + numerical_columns, outputCol='features')

# Create and fit the pipeline
pipeline = Pipeline(stages=indexers + [assembler])

# Apply the pipeline
X_transformed = pipeline.fit(X).transform(X)


In [33]:

# `SK_ID_CURR` is a unique identifier in your original DataFrame
X_transformed = X_transformed.withColumn("SK_ID_CURR", X["SK_ID_CURR"])

# Now join the target column back into the transformed DataFrame
final_df = X_transformed.join(y.withColumnRenamed(target_col, "TARGET"), on="SK_ID_CURR", how="inner")


In [36]:
final_df

DataFrame[SK_ID_CURR: double, NAME_CONTRACT_TYPE: string, CODE_GENDER: string, FLAG_OWN_CAR: string, FLAG_OWN_REALTY: string, CNT_CHILDREN: double, AMT_INCOME_TOTAL: double, AMT_CREDIT: double, AMT_ANNUITY: double, AMT_GOODS_PRICE: double, NAME_TYPE_SUITE: string, NAME_INCOME_TYPE: string, NAME_EDUCATION_TYPE: string, NAME_FAMILY_STATUS: string, NAME_HOUSING_TYPE: string, REGION_POPULATION_RELATIVE: double, DAYS_BIRTH: double, DAYS_EMPLOYED: double, DAYS_REGISTRATION: double, DAYS_ID_PUBLISH: double, OWN_CAR_AGE: double, FLAG_MOBIL: double, FLAG_EMP_PHONE: double, FLAG_WORK_PHONE: double, FLAG_CONT_MOBILE: double, FLAG_PHONE: double, FLAG_EMAIL: double, OCCUPATION_TYPE: string, CNT_FAM_MEMBERS: double, REGION_RATING_CLIENT: double, REGION_RATING_CLIENT_W_CITY: double, WEEKDAY_APPR_PROCESS_START: string, HOUR_APPR_PROCESS_START: double, REG_REGION_NOT_LIVE_REGION: double, REG_REGION_NOT_WORK_REGION: double, LIVE_REGION_NOT_WORK_REGION: double, REG_CITY_NOT_LIVE_CITY: double, REG_CITY_NO

In [34]:
#  Split the data into training and testing sets
train_df, test_df = final_df.randomSplit([0.8, 0.2], seed=42)


## Feature selection

In [38]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.sql import functions as F


#  Create a Random Forest model
rf = RandomForestClassifier(labelCol=target_col, featuresCol='features', numTrees=100, maxBins=64)

# Fit the model on the training data
rf_model = rf.fit(train_df)

# Get the feature importances
importances = rf_model.featureImportances.toArray()  # Convert to a numpy array and then to a list

#  Extract feature names from the assembler (assuming you created a feature vector with an assembler)
# If you used an assembler, you may have saved the feature names in a list; use that.
feature_names = numerical_columns + [f"{col}_index" for col in categorical_columns]

# Create a DataFrame of features and their importances
# Convert importances to a list of floats for compatibility with PySpark
importance_list = [float(importance) for importance in importances]

# Create DataFrame using the feature names and importances
feature_importance_df = spark.createDataFrame(zip(feature_names, importance_list), schema=['feature', 'importance'])

#  Sort by importance and select the top 10 features
top_features = feature_importance_df.orderBy(F.col("importance").desc()).limit(10)

# Show the top features with their importance values
top_features.show()

# Print out the column names of the top features
top_feature_names = [row.feature for row in top_features.collect()]
print("Top 10 Features:")
for name in top_feature_names:
    print(name)


+--------------------+--------------------+
|             feature|          importance|
+--------------------+--------------------+
|   BASEMENTAREA_MODE|  0.1713663968709113|
|   NONLIVINGAREA_AVG|  0.1276040331901643|
|     APARTMENTS_MODE| 0.11905245612042939|
|LIVE_REGION_NOT_W...| 0.03503646182159677|
|   CODE_GENDER_index| 0.03479505163720913|
|REGION_POPULATION...| 0.03395462150746875|
|    FLAG_DOCUMENT_17| 0.03235152254686104|
|       ELEVATORS_AVG|0.030672904157936512|
|     AMT_GOODS_PRICE|0.029840263483708922|
|        EXT_SOURCE_2|0.026322980955377488|
+--------------------+--------------------+

Top 10 Features:
BASEMENTAREA_MODE
NONLIVINGAREA_AVG
APARTMENTS_MODE
LIVE_REGION_NOT_WORK_REGION
CODE_GENDER_index
REGION_POPULATION_RELATIVE
FLAG_DOCUMENT_17
ELEVATORS_AVG
AMT_GOODS_PRICE
EXT_SOURCE_2


## Logistic Regression model without handling unbalanced dataset

In [39]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator



# Create a new VectorAssembler with the top features
top_feature_assembler = VectorAssembler(inputCols=top_feature_names, outputCol='top_features')

#  Create a Logistic Regression model
lr_top_features = LogisticRegression(labelCol=target_col, featuresCol='top_features')

#  Create a pipeline
pipeline_top_features = Pipeline(stages=[top_feature_assembler, lr_top_features])

# Fit the model on the training data
model_top_features = pipeline_top_features.fit(train_df)

# Make predictions on the test data
predictions_top_features = model_top_features.transform(test_df)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol=target_col, rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions_top_features)
print(f"Accuracy with Top Features: {accuracy}")

# Optional: Show some predictions along with actual values
predictions_top_features.select(target_col, "prediction", "probability").show(5)


Accuracy with Top Features: 0.5
+------+----------+--------------------+
|TARGET|prediction|         probability|
+------+----------+--------------------+
|     1|       0.0|[0.89622115646501...|
|     0|       0.0|[0.94429391902422...|
|     0|       0.0|[0.93334262100710...|
|     0|       0.0|[0.96083280393971...|
|     0|       0.0|[0.9344689514534,...|
+------+----------+--------------------+
only showing top 5 rows



In [40]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Assuming `predictions_top_features` contains the predictions from the model
evaluator = MulticlassClassificationEvaluator(labelCol=target_col, predictionCol="prediction", metricName="f1")

# Calculate F1 score
f1_score = evaluator.evaluate(predictions_top_features)
print(f"F1 Score: {f1_score}")


F1 Score: 0.8804876564778787


## Handling unbalanced dataset

In [45]:
from pyspark.sql.functions import col, count, when

# Count the number of instances in each class
count_positive = train_df.filter(col("TARGET") == 1).count()
count_negative = train_df.filter(col("TARGET") == 0).count()

# Print counts for debugging
print(f"Count of loan default: {count_positive}")
print(f"Count of paying back the loan: {count_negative}")

# Create DataFrames for each class
df_positive = train_df.filter(col("TARGET") == 1)
df_negative = train_df.filter(col("TARGET") == 0)

# Undersample the majority class (negative instances)
df_negative_sampled = df_negative.sample(fraction=count_positive / count_negative, seed=42)

# Combine the sampled negative instances with the positive instances
balanced_train_df = df_positive.union(df_negative_sampled)

# Verify the counts after undersampling
print(f"Balanced train count of loan default: {balanced_train_df.filter(col('TARGET') == 1).count()}")
print(f"Balanced train count of paying back the loan: {balanced_train_df.filter(col('TARGET') == 0).count()}")




Count of loan default: 19887
Count of paying back the loan: 226517
Balanced train count of loan default: 19999
Balanced train count of paying back the loan: 19868


# Logistic Regression after handling the imbalnced data

In [43]:
#  Create a Logistic Regression model using the top features
lr_top_features = LogisticRegression(labelCol=target_col, featuresCol='top_features')

#  Create a pipeline with the top feature assembler and the logistic regression model
pipeline_top_features = Pipeline(stages=[top_feature_assembler, lr_top_features])

#  Fit the model on the balanced training data
model_top_features = pipeline_top_features.fit(balanced_train_df)

#  Make predictions on the test data
predictions_top_features = model_top_features.transform(test_df)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol=target_col, rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions_top_features)
print(f"Accuracy with Undersampled Data and Top Features: {accuracy}")

# Show some predictions along with actual values
predictions_top_features.select(target_col, "prediction", "probability").show(5)

Accuracy with Undersampled Data and Top Features: 0.6190141723617585
+------+----------+--------------------+
|TARGET|prediction|         probability|
+------+----------+--------------------+
|     1|       1.0|[0.43540010487094...|
|     0|       0.0|[0.59697974186368...|
|     0|       0.0|[0.54713208450799...|
|     0|       0.0|[0.67120078431392...|
|     0|       0.0|[0.55213074659830...|
+------+----------+--------------------+
only showing top 5 rows



In [44]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#  Calculate the F1 score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol=target_col, predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions_top_features)
print(f"F1 Score: {f1_score}")


F1 Score: 0.7300812616046058


In [46]:


# Save the model to a specified path
model_path = r"E:\Class\sem3\big data framework\Project\lr_top_features_model"  
lr_top_features.save(model_path)

print(f"Logistic Regression model saved at: {model_path}")



Logistic Regression model saved at: E:\Class\sem3\big data framework\Project\lr_top_features_model


## Gradient Boosted Trees Classifier model

In [52]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create a GradientBoostedTreesClassifier model using the top features
gbt_top_features = GBTClassifier(labelCol=target_col, featuresCol='top_features')

# Create a pipeline with the top feature assembler and the GradientBoostedTrees model
pipeline_top_features_gbt = Pipeline(stages=[top_feature_assembler, gbt_top_features])

# Fit the model on the balanced training data
model_top_features_gbt = pipeline_top_features_gbt.fit(balanced_train_df)

# Make predictions on the test data
predictions_top_features_gbt = model_top_features_gbt.transform(test_df)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol=target_col, rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions_top_features_gbt)
print(f"Accuracy with Undersampled Data and Top Features (GBT): {accuracy}")

# Show some predictions along with actual values
predictions_top_features_gbt.select(target_col, "prediction", "probability").show(5)


Accuracy with Undersampled Data and Top Features (GBT): 0.6303089517624157
+------+----------+--------------------+
|TARGET|prediction|         probability|
+------+----------+--------------------+
|     1|       1.0|[0.43127460403293...|
|     0|       0.0|[0.59570018322394...|
|     0|       0.0|[0.55864029942773...|
|     0|       0.0|[0.56720905562465...|
|     0|       1.0|[0.48154104339152...|
+------+----------+--------------------+
only showing top 5 rows



In [54]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#  Calculate the F1 score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol=target_col, predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions_top_features_gbt)
print(f"F1 Score GBT: {f1_score}")


F1 Score GBT: 0.7096900767524533


## Random Forest Classifier model

In [55]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create a RandomForestClassifier model using the top features
rf_top_features = RandomForestClassifier(labelCol=target_col, featuresCol='top_features')

# Create a pipeline with the top feature assembler and the RandomForest model
pipeline_top_features_rf = Pipeline(stages=[top_feature_assembler, rf_top_features])

# Fit the model on the balanced training data
model_top_features_rf = pipeline_top_features_rf.fit(balanced_train_df)

# Make predictions on the test data
predictions_top_features_rf = model_top_features_rf.transform(test_df)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol=target_col, rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions_top_features_rf)
print(f"Accuracy with Undersampled Data and Top Features (RandomForest): {accuracy}")

# Show some predictions along with actual values
predictions_top_features_rf.select(target_col, "prediction", "probability").show(5)


Accuracy with Undersampled Data and Top Features (RandomForest): 0.6250871702125639
+------+----------+--------------------+
|TARGET|prediction|         probability|
+------+----------+--------------------+
|     1|       1.0|[0.47267737724594...|
|     0|       0.0|[0.61325089572656...|
|     0|       0.0|[0.61071776933007...|
|     0|       0.0|[0.57414863286663...|
|     0|       0.0|[0.55399393756250...|
+------+----------+--------------------+
only showing top 5 rows



In [56]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#  Calculate the F1 score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol=target_col, predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions_top_features_rf)
print(f"F1 Score RandomForest: {f1_score}")


F1 Score RandomForest: 0.7201549208474419


# Saving the model

In [67]:

# Save the pipeline model
pipeline_top_features_rf.write().overwrite().save(r"E:\Class\sem3\big data framework\Project\pipeline_model")

# Save the RandomForest model
model_top_features_rf.stages[-1].write().overwrite().save(r"E:\Class\sem3\big data framework\Project\rf_model")