<a href="https://www.kaggle.com/code/omveersinghgurjar/easy-pyspark-lending-club?scriptVersionId=144090920" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
!pip install pyspark

In [None]:
!pip install numpy==1.23.0

In [None]:
import pandas as pd
import numpy as np
import zipfile

import matplotlib.pyplot as plt

from IPython.display import Image

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, regexp_extract
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.types import FloatType, IntegerType
from pyspark.sql.functions import lit
from pyspark.sql import DataFrame

from imblearn.over_sampling import SMOTE

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from sklearn.model_selection import train_test_split

# Functions

In [None]:
def one_hot_encode_column(df, input_col):
    indexer = StringIndexer(inputCol=input_col, outputCol=input_col + '_indexed')
    indexed_df = indexer.fit(df).transform(df)

    encoder = OneHotEncoder(inputCol=input_col + '_indexed', outputCol=input_col + '_encoded')
    encoded_df = encoder.fit(indexed_df).transform(indexed_df)

    encoded_df = encoded_df.drop(input_col, input_col + '_indexed')

    return encoded_df

def scale_features(input_df: DataFrame, input_col: str, output_col: str) -> DataFrame:
    """
    Scale the specified feature column using Min-Max Scaling.

    Args:
        input_df (DataFrame): The input DataFrame.
        input_col (str): The name of the feature column to scale.
        output_col (str): The name of the output scaled feature column.

    Returns:
        DataFrame: The DataFrame with the scaled feature column.
    """
    scaler = MinMaxScaler(inputCol=input_col, outputCol=output_col)
    scaler_model = scaler.fit(input_df)
    scaled_df = scaler_model.transform(input_df)
    return scaled_df

def evaluate_model(model, model_name, train_data, validation_data, test_data):
    model = model.fit(train_data)

    predictions_train = model.transform(train_data)
    predictions_validation = model.transform(validation_data)
    predictions_test = model.transform(test_data)

    evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
    f1_evaluator = MulticlassClassificationEvaluator(metricName="weightedFMeasure", labelCol="target")

    accuracy_train = evaluator_accuracy.evaluate(predictions_train)
    f1_score_train = f1_evaluator.evaluate(predictions_train)

    accuracy_validation = evaluator_accuracy.evaluate(predictions_validation)
    f1_score_validation = f1_evaluator.evaluate(predictions_validation)

    accuracy_test = evaluator_accuracy.evaluate(predictions_test)
    f1_score_test = f1_evaluator.evaluate(predictions_test)

    results_df = pd.DataFrame({
    "Model": [model_name],
    "Accuracy (Train)": [round(accuracy_train, 3)],
    "F1 Score (Train)": [round(f1_score_train, 3)],
    "Accuracy (Validation)": [round(accuracy_validation, 3)],
    "F1 Score (Validation)": [round(f1_score_validation, 3)],
    "Accuracy (Test)": [round(accuracy_test, 3)],
    "F1 Score (Test)": [round(f1_score_test, 3)],
    })

    return results_df

# Upload dataset

In [None]:
spark = SparkSession.builder \
    .appName("lending-club") \
    .getOrCreate()

In [None]:
selected_columns = [
    "id",
    "purpose",
    "term",
    "verification_status",
    "acc_now_delinq",
    "addr_state",
    "annual_inc",
    "application_type",
    "dti",
    "grade",
    "home_ownership",
    "initial_list_status",
    "installment",
    "int_rate",
    "loan_amnt",
    "loan_status",
    'tax_liens',
    'delinq_amnt',
    'policy_code',
    'last_fico_range_high',
    'last_fico_range_low',
    'recoveries',
    'collection_recovery_fee'

]

df = spark.read.csv('/kaggle/input/lending-club/accepted_2007_to_2018Q4.csv.gz', header=True).select(selected_columns)
df.limit(5).toPandas()

# EDA

## Missing Values

In [None]:
null_counts = df.agg(*[count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

In [None]:
df = df.na.drop()

## 'purpose'

In [None]:
df_with_count = df.groupBy('purpose').count()
df_with_count.show()

In [None]:
# Replacing values in the 'purpose' column based on the 'count' column condition
# If 'count' is less than 300, set 'purpose' to "other", else keep the original 'purpose'

df = df\
.join(df_with_count, on='purpose', how='left')\
.withColumn("purpose", when(col("count") < 300, "other").otherwise(col("purpose")))\
.drop('count')

In [None]:
unique_purposes = df.select("purpose").distinct()
unique_purposes.show()

## 'term'

In [None]:
df.groupby('term').count()\
.show()

In [None]:
# Applying a regular expression to extract numbers from the 'term' column and then casting it to the Integer data type.

df = df\
.withColumn("term", regexp_extract(col("term"), r'(\d+)', 0).cast("int"))

## 'verification_status'

In [None]:
df.groupby('verification_status').count()\
.show()

In [None]:
# Encode 'verification_status' column values into a new column 'verification_status_encoded'
# If 'verification_status' is either "Verified" or "Source Verified", set 'verification_status_encoded' to 0
# Otherwise, set it to 1

df = df\
.withColumn("verification_status_encoded",
            when(col("verification_status")
                 .isin(["Verified", "Source Verified"]),0)
            .otherwise(1))\
.drop("verification_status")

## 'acc_now_delinq'

In [None]:
df.groupby('acc_now_delinq').count()\
.show()

In [None]:
# Define the valid values for 'acc_now_delinq'
valid_values = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Modify the 'acc_now_delinq' column:
# 1. Cast the column to IntegerType
# 2. Set values greater than or equal to 4 to 4, and keep other valid values as they are
df = df.withColumn('acc_now_delinq', col('acc_now_delinq').cast('int')) \
       .withColumn('acc_now_delinq', when(col('acc_now_delinq') >= 4, 4).otherwise(col('acc_now_delinq'))) \
       .filter(col('acc_now_delinq').isin(valid_values))

In [None]:
df.groupby('acc_now_delinq').count()\
.show()

## 'application_type'

In [None]:
df.groupby('application_type').count()\
.show()

In [None]:
# Define the valid values for 'application_type'
valid_values = ['Joint App', 'Individual']

# Modify the 'application_type' column:
# 1. Map 'Joint App' to 0 and 'Individual' to 1
# 2. Remove other values
# 3. Cast the column to IntegerType

df = df.withColumn('application_type',
                   when(col('application_type') == 'Joint App', 0)
                   .when(col('application_type') == 'Individual', 1)
                   .otherwise(None))

df = df.filter(col('application_type').isNotNull()).withColumn('application_type', col('application_type').cast('int'))

## 'grade'

In [None]:
df.groupby('grade').count()\
.show()

In [None]:
# Create a StringIndexer to convert 'grade' column into numerical indices
grade_indexer = StringIndexer(inputCol="grade", outputCol="grade_index", stringOrderType="alphabetAsc")

df = grade_indexer\
.fit(df)\
.transform(df)\
.drop('grade')

## One Hot Encoder

In [None]:
columns_to_encode = ['purpose', 'addr_state', 'home_ownership', 'initial_list_status']

for column in columns_to_encode:
    df = one_hot_encode_column(df, column)

## Cast to float type

In [None]:
columns_to_cast = [ 'installment',
                   'int_rate',
                    'loan_amnt',
                    'annual_inc',
                    'dti',
                    'tax_liens',
                    'delinq_amnt',
                    'policy_code',
                    'last_fico_range_high',
                    'last_fico_range_low',
                    'recoveries',
                    'collection_recovery_fee'
]

# cast to float
for column_name in columns_to_cast:
    df = df.withColumn(column_name, col(column_name).cast('float'))

In [None]:
df.dtypes

## 'loan_status' -> target

In [None]:
df.groupby('loan_status').count()\
.show()

In [None]:
# encode 'loan_status' to 0 - fully paid, 1 - late, 2 - charged off
df = df.withColumn("target",
                   when(col("loan_status") == "Fully Paid", 0)
                   .when((col("loan_status") == "Late (16-30 days)") | (col("loan_status") == "Late (31-120 days)") | (col("loan_status") == "In Grace Period"), 1)
                   .when((col("loan_status") == "Charged Off") | (col("loan_status") == 'Default'), 2)
                   .otherwise(None))\
.drop("loan_status")
df = df.filter(df.target.isNotNull())

In [None]:
df.groupby('target').count()\
.show()

# Downsampling

In [None]:
df = df.drop('id')
df_downsampled = df.filter(col("target") == 0).sampleBy("target", fractions={0: 0.3}).unionAll(df.filter(col("target") != 0))
df_downsampled = df_downsampled.na.drop()

In [None]:
loan_status_count_original = df.groupby('target').count().toPandas()

plt.figure(figsize=(15, 6))

# Subplot 1 - Target Distribution before downsampling
plt.subplot(1, 3, 1)
plt.pie(loan_status_count_original['count'], labels=loan_status_count_original['target'], autopct='%1.1f%%')
plt.title('Target Distribution - Original Dataset')

# Downsampled dataset
loan_status_count_downsampled = df_downsampled.groupby('target').count().toPandas()

# Subplot 2 - Target Distribution after downsampling
plt.subplot(1, 3, 2)
plt.pie(loan_status_count_downsampled['count'], labels=loan_status_count_downsampled['target'], autopct='%1.1f%%')
plt.title('Target Distribution - After Downsampling')

plt.show()

# VectorAssembler

In [None]:
all_columns = df_downsampled.columns
feature_cols = [col_name for col_name in all_columns if col_name != 'target']

target = ['target']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_to_scale")

df_downsampled = assembler.transform(df_downsampled)

df_downsampled\
.select('features_to_scale', 'target')\
.show(5)

# MinMaxScaler

In [None]:
train_data, temp_data = df_downsampled.randomSplit([0.8, 0.2], seed=42)
test_data, val_data = temp_data.randomSplit([0.5, 0.5], seed=42)

In [None]:
train_data = scale_features(train_data, "features_to_scale", "features")
val_data = scale_features(val_data, "features_to_scale", "features")
test_data = scale_features(test_data, "features_to_scale", "features")

# Models

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol='target')
results_df_lr = evaluate_model(lr, 'LogisticRegression', train_data, val_data, test_data)

In [None]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'target')
results_df_rf = evaluate_model(rf, 'Random Forest', train_data, val_data, test_data)

In [None]:
layers = [86, 64, 32, 3]
mlp = MultilayerPerceptronClassifier(layers=layers, labelCol="target", featuresCol="features", maxIter=100, seed=123)
results_df_mlp = evaluate_model(mlp, 'Neural Network', train_data, val_data, test_data)

In [None]:
final_results_df = pd.concat([results_df_lr, results_df_rf, results_df_mlp], ignore_index=True)
final_results_df