In [44]:
import pyspark.sql as sparksql
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

In [2]:
# Initialize Spark session
spark = SparkSession.builder.appName("Machine Learning Project - Loan Prediction").getOrCreate()

In [3]:
df = spark.read.csv("loan-prediction-train.csv", header=True, sep=',', inferSchema=True)
df.show(5)

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|      NULL|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        66|             360|             1|        Urban|          Y

In [9]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



In [10]:
df.dtypes

[('Loan_ID', 'string'),
 ('Gender', 'string'),
 ('Married', 'string'),
 ('Dependents', 'string'),
 ('Education', 'string'),
 ('Self_Employed', 'string'),
 ('ApplicantIncome', 'int'),
 ('CoapplicantIncome', 'double'),
 ('LoanAmount', 'int'),
 ('Loan_Amount_Term', 'int'),
 ('Credit_History', 'int'),
 ('Property_Area', 'string'),
 ('Loan_Status', 'string')]

In [11]:
# Display count based on loan status
df.groupBy("Loan_Status").count().show()

+-----------+-----+
|Loan_Status|count|
+-----------+-----+
|          Y|  422|
|          N|  192|
+-----------+-----+



In [12]:
df.select("Credit_History", "Loan_Status").groupBy("Loan_Status").agg(F.avg("Credit_History")).show()

+-----------+-------------------+
|Loan_Status|avg(Credit_History)|
+-----------+-------------------+
|          Y| 0.9818181818181818|
|          N| 0.5418994413407822|
+-----------+-------------------+



In [42]:
df.select("Education", "Loan_Status").groupBy("Loan_Status", "Education").count().show()

+-----------+------------+-----+
|Loan_Status|   Education|count|
+-----------+------------+-----+
|          1|Not Graduate|   82|
|          1|    Graduate|  340|
|          0|Not Graduate|   52|
|          0|    Graduate|  140|
+-----------+------------+-----+



In [14]:
df.select("Gender", "Loan_Status").groupBy("Loan_Status", "Gender").count().show()

+-----------+------+-----+
|Loan_Status|Gender|count|
+-----------+------+-----+
|          N|Female|   37|
|          Y|  NULL|    8|
|          Y|Female|   75|
|          N|  NULL|    5|
|          Y|  Male|  339|
|          N|  Male|  150|
+-----------+------+-----+



In [15]:
# Correlation matrix
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



In [16]:
columns = ['ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'Credit_History']
corr_plot_df = pd.DataFrame()

for i in columns:
    corr = []
    for j in columns:
        corr.append(round(df.stat.corr(i, j), 2))
    
    corr_plot_df = pd.concat([corr_plot_df, pd.Series(corr)], axis=1)

corr_plot_df.columns = columns
corr_plot_df.insert(0, '', columns)
corr_plot_df.set_index('')

Unnamed: 0,ApplicantIncome,CoapplicantIncome,LoanAmount,Loan_Amount_Term,Credit_History
,,,,,
ApplicantIncome,1.0,-0.12,0.54,-0.02,0.01
CoapplicantIncome,-0.12,1.0,0.19,-0.05,-0.06
LoanAmount,0.54,0.19,1.0,0.06,-0.03
Loan_Amount_Term,-0.02,-0.05,0.06,1.0,0.05
Credit_History,0.01,-0.06,-0.03,0.05,1.0


In [18]:
df.createOrReplaceTempView("database")

In [22]:
spark.sql("""
        SELECT * FROM database LIMIT 5
""").show()

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|      NULL|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        66|             360|             1|        Urban|          Y

In [24]:
spark.sql("""
    SELECT Loan_ID
    FROM database 
    WHERE Credit_History = 1
""").show()

+--------+
| Loan_ID|
+--------+
|LP001002|
|LP001003|
|LP001005|
|LP001006|
|LP001008|
|LP001011|
|LP001013|
|LP001018|
|LP001020|
|LP001024|
|LP001027|
|LP001028|
|LP001029|
|LP001030|
|LP001032|
|LP001038|
|LP001041|
|LP001046|
|LP001066|
|LP001068|
+--------+
only showing top 20 rows



In [25]:
# Display null values

df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|      0|    13|      3|        15|        0|           32|              0|                0|        22|              14|            50|            0|          0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+



In [33]:
# Get mean values of columns

mean = df.select(F.mean(df['LoanAmount'])).collect()[0][0]
mean

146.3973941368078

In [29]:
# Fill null values with mean

df = df.na.fill(mean, ["LoanAmount"])

In [31]:
# Get mode values of columns

df.groupBy('Gender').count().orderBy("count", ascending=False).first()[0]

'Male'

In [35]:
# Fill null values for all the columns

numeric_columns = ['LoanAmount', 'Loan_Amount_Term']
categorical_columns = ['Gender', 'Married', 'Dependents', 'Self_Employed', 'Credit_History']

In [36]:
for col in numeric_columns:
    mean = df.select(F.mean(df[col])).collect()[0][0]
    df = df.na.fill(mean, [col])

In [37]:
for col in categorical_columns:
    mode = df.groupBy(col).count().orderBy("count", ascending=False).first()[0]
    df = df.na.fill(mode, [col])

In [38]:
# Check for null values after imputation

df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|      0|     0|      0|         0|        0|            0|              0|                0|         0|               0|             0|            0|          0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+



In [40]:
# Create new column TotalIncome (Feature Engineering)

df = df.withColumn("TotalIncome", F.col("ApplicantIncome") + F.col("CoapplicantIncome"))
df.show(5)

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|TotalIncome|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|       146|             360|             1|        Urban|          Y|     5849.0|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|     6091.0|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        

In [41]:
# How to find and replace values

df = df.withColumn('Loan_Status', F.when(F.col('Loan_Status') == 'Y', 1).otherwise(0))
df.show(5)

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|TotalIncome|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|       146|             360|             1|        Urban|          1|     5849.0|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          0|     6091.0|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        

In [45]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = false)
 |-- Married: string (nullable = false)
 |-- Dependents: string (nullable = false)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = false)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: integer (nullable = false)
 |-- TotalIncome: double (nullable = true)



In [90]:
categorical_cols = ['Gender', 'Married', 'Dependents', 'Education', 'Self_Employed', 'Credit_History', 'Property_Area']
numerical_cols = ['ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'TotalIncome']

# Index the categorical columns
indexers = [StringIndexer(inputCol=col, outputCol="{0}_index".format(col)) for col in categorical_cols]

# Encode the indexed categorical columns
encoders = [OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers]

input_columns = [encoder.getOutputCol() for encoder in encoders] + numerical_cols

# Vectorize the encoded values
assembler = VectorAssembler(inputCols=input_columns, outputCol="feature")

In [91]:
# Create a pipeline to transform the data
pipeline = Pipeline(stages = indexers + encoders + [assembler])

In [92]:
data_model = pipeline.fit(df)

In [93]:
transformed_df = data_model.transform(df)

In [94]:
transformed_df.show(5)

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+------------+-------------+----------------+---------------+-------------------+--------------------+-------------------+--------------------+---------------------+------------------------+-----------------------+---------------------------+----------------------------+---------------------------+--------------------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|TotalIncome|Gender_index|Married_index|Dependents_index|Education_index|Self_Employed_index|Credit_History_index|Property_Area_index|Gender_index_encoded|Married_index_encoded|Dependents_index_encoded|Education_index_encoded|Self_Employed_index_encoded|Credit_History_index_encoded|Property_Area_index_encoded|             feature|
+---

In [95]:
# Get input feature and output columns
transformed_df = transformed_df.select("feature", "Loan_Status")

In [96]:
# Split the data for training and testing
train, test = transformed_df.randomSplit([0.8, 0.2], seed=1234)

In [97]:
train.show(5)

+--------------------+-----------+
|             feature|Loan_Status|
+--------------------+-----------+
|(22,[0,2,4,8,10,1...|          1|
|(22,[0,2,4,8,10,1...|          1|
|(22,[0,2,4,8,10,1...|          0|
|(22,[0,2,4,8,10,1...|          1|
|(22,[0,2,4,8,10,1...|          1|
+--------------------+-----------+
only showing top 5 rows



In [104]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr = LogisticRegression(labelCol="Loan_Status", featuresCol="feature")
lr_model = lr.fit(train)

In [99]:
# Predict on the test data
predictions = lr_model.transform(test)
predictions.show()

+--------------------+-----------+--------------------+--------------------+----------+
|             feature|Loan_Status|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|(22,[0,2,4,8,10,1...|          1|[-2.4519984303768...|[0.07929253060708...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.3364129758419...|[0.08815181805644...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.2727610788110...|[0.09340414198478...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.1954897469157...|[0.10015624312078...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.4059467253095...|[0.08272035154753...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.1365047329703...|[0.10559905534338...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.0268979647883...|[0.11640760825750...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.4896034215725...|[0.07659024032486...|       1.0|
|(22,[0,2,4,8,10,1...|          

In [100]:
predictions = lr_model.transform(test)
auc = BinaryClassificationEvaluator().setLabelCol("Loan_Status").evaluate(predictions)

print(f"AUC: {round(auc, 2) * 100}%")

AUC: 73.0%


In [101]:
rf = RandomForestClassifier(labelCol="Loan_Status", featuresCol="feature")
rf_model = rf.fit(train)

In [102]:
predictions = rf_model.transform(test)
auc = BinaryClassificationEvaluator().setLabelCol("Loan_Status").evaluate(predictions)

print(f"AUC: {round(auc, 2) * 100}%")

AUC: 78.0%


In [105]:
dtc = DecisionTreeClassifier(labelCol="Loan_Status", featuresCol="feature")
dtc_model = dtc.fit(train)

In [106]:
predictions = dtc_model.transform(test)
auc = BinaryClassificationEvaluator().setLabelCol("Loan_Status").evaluate(predictions)

print(f"AUC: {round(auc, 2) * 100}%")

AUC: 67.0%


In [107]:
gbt = GBTClassifier(labelCol="Loan_Status", featuresCol="feature")
gbt_model = gbt.fit(train)

In [108]:
predictions = gbt_model.transform(test)
auc = BinaryClassificationEvaluator().setLabelCol("Loan_Status").evaluate(predictions)

print(f"AUC: {round(auc, 2) * 100}%")

AUC: 80.0%


In [89]:
# Convert spark dataframe to pandas dataframe

df.toPandas().head()

Unnamed: 0,Loan_ID,Gender,Married,Dependents,Education,Self_Employed,ApplicantIncome,CoapplicantIncome,LoanAmount,Loan_Amount_Term,Credit_History,Property_Area,Loan_Status,TotalIncome
0,LP001002,Male,No,0,Graduate,No,5849,0.0,146,360,1,Urban,1,5849.0
1,LP001003,Male,Yes,1,Graduate,No,4583,1508.0,128,360,1,Rural,0,6091.0
2,LP001005,Male,Yes,0,Graduate,Yes,3000,0.0,66,360,1,Urban,1,3000.0
3,LP001006,Male,Yes,0,Not Graduate,No,2583,2358.0,120,360,1,Urban,1,4941.0
4,LP001008,Male,No,0,Graduate,No,6000,0.0,141,360,1,Urban,1,6000.0
