# Loan Approval Prediction

1. Init and import dependencies

In [1]:
import random
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from pyspark.ml.feature import (
    StandardScaler,
    VectorAssembler,
    StringIndexer,
    OneHotEncoder,
)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml import Pipeline
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    FloatType,
    StringType,
)

from lib.data.kaggle import unzip_file

from src.definitions import EXTERNAL_DATA_FOLDER

In [2]:
random_seed = 42

random.seed(random_seed)

2. Unzip dataset

In [3]:
ds_path = unzip_file(EXTERNAL_DATA_FOLDER / "playground-series-s4e10.zip")

3. Create Spark Session

In [4]:
spark = SparkSession.builder.appName("LoanApprovalPrediction").getOrCreate()

24/12/30 12:18:10 WARN Utils: Your hostname, Alexanders-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.145 instead (on interface en0)
24/12/30 12:18:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/30 12:18:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 53694)
Traceback (most recent call last):
  File "/Users/alexandermelashchenko/.pyenv/versions/3.11.10/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/alexandermelashchenko/.pyenv/versions/3.11.10/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/Users/alexandermelashchenko/.pyenv/versions/3.11.10/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/alexandermelashchenko/.pyenv/versions/3.11.10/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/Users/alexandermelashchenko/Workspace/playground-series-s4e10/.venv/lib/python3.11/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  Fil

4. Read dataset

In [5]:
cols_schema = [
    StructField("id", IntegerType(), True),
    StructField("person_age", IntegerType(), True),
    StructField("person_income", IntegerType(), True),
    StructField("person_home_ownership", StringType(), True),
    StructField("person_emp_length", FloatType(), True),
    StructField("loan_intent", StringType(), True),
    StructField("loan_grade", StringType(), True),
    StructField("loan_amnt", IntegerType(), True),
    StructField("loan_int_rate", FloatType(), True),
    StructField("loan_percent_income", FloatType(), True),
    StructField("cb_person_default_on_file", StringType(), True),
    StructField("cb_person_cred_hist_length", IntegerType(), True),
    StructField("loan_status", IntegerType(), True),
]

schema = StructType(cols_schema)

df = spark.read.csv(
    str(ds_path / "train.csv"),
    header=True,
    schema=schema,
)

df.show()

+---+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+-----------+
| id|person_age|person_income|person_home_ownership|person_emp_length|      loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|loan_status|
+---+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+-----------+
|  0|        37|        35000|                 RENT|              0.0|        EDUCATION|         B|     6000|        11.49|               0.17|                        N|                        14|          0|
|  1|        22|        56000|                  OWN|              6.0|          MEDICAL|         C|     4000|        13.35|               0.07|                     

5. Preprocess data

In [6]:
stages = []

enum_cols = [
    "person_home_ownership",
    "loan_intent",
    "loan_grade",
    "cb_person_default_on_file",
]

index_cols = [it + "_index" for it in enum_cols]
one_hot_cols = [it + "_vec" for it in enum_cols]

for i in range(len(enum_cols)):
    stages.append(StringIndexer(inputCol=enum_cols[i], outputCol=index_cols[i]))

stages.append(OneHotEncoder(inputCols=index_cols, outputCols=one_hot_cols))

features = [
    "person_age",
    "person_income",
    "person_home_ownership_vec",
    "person_emp_length",
    "loan_intent_vec",
    "loan_grade_vec",
    "loan_amnt",
    "loan_int_rate",
    "loan_percent_income",
    "cb_person_default_on_file_vec",
    "cb_person_cred_hist_length",
]

stages.append(VectorAssembler(inputCols=features, outputCol="features"))
stages.append(StandardScaler(inputCol="features", outputCol="features_scaled"))

pipeline = Pipeline(stages=stages)
preprocess = pipeline.fit(df)
df = preprocess.transform(df)

5. Split dataset

In [7]:
train_df, test_df = df.randomSplit([0.7, 0.3], seed=random_seed)

6. Train model

In [8]:
rfc = RandomForestClassifier(featuresCol="features_scaled", labelCol="loan_status", seed=random_seed)
model = rfc.fit(train_df)

7. Evaluate model

In [16]:
predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="loan_status")
area_under_curve = evaluator.evaluate(predictions)

print(f"Area under ROC curve: {area_under_curve}")

Area under ROC curve: 0.9063583846454847


In [22]:
import pyspark.sql.functions as F

from pyspark.mllib.evaluation import MulticlassMetrics


casted_pred = predictions.withColumn(
    "y", F.col("loan_status").cast(FloatType())
).withColumn("y_hat", F.col("prediction").cast(FloatType()))

casted_pred = casted_pred.select(["y_hat", "y"])

metrics = MulticlassMetrics(casted_pred.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())
print("F1 approved: " + str(metrics.fMeasure(0.0, 1.0)))
print("F1 rejected: " + str(metrics.fMeasure(1.0, 1.0)))



[[15003.   192.]
 [  813.  1608.]]
F1 approved: 0.9675921447228403
F1 rejected: 0.7619047619047619


24/12/30 19:19:33 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 956759 ms exceeds timeout 120000 ms
24/12/30 19:19:33 WARN SparkContext: Killing executors is not supported by current scheduler.
24/12/30 19:19:42 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$