### Import Modules

In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import pandas as pd

import warnings
warnings.filterwarnings('ignore')

In [2]:
# Initialize the session
spark = SparkSession.builder.appName('loan_prediction').getOrCreate()

### Load the Dataset

In [3]:
df = spark.read.csv('Loan Prediction Dataset.csv', header=True, sep=',', inferSchema=True)

In [4]:
df.show(5)

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|      null|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        66|             360|             1|        Urban|          Y

In [5]:
df.head(5)

[Row(Loan_ID='LP001002', Gender='Male', Married='No', Dependents='0', Education='Graduate', Self_Employed='No', ApplicantIncome=5849, CoapplicantIncome=0.0, LoanAmount=None, Loan_Amount_Term=360, Credit_History=1, Property_Area='Urban', Loan_Status='Y'),
 Row(Loan_ID='LP001003', Gender='Male', Married='Yes', Dependents='1', Education='Graduate', Self_Employed='No', ApplicantIncome=4583, CoapplicantIncome=1508.0, LoanAmount=128, Loan_Amount_Term=360, Credit_History=1, Property_Area='Rural', Loan_Status='N'),
 Row(Loan_ID='LP001005', Gender='Male', Married='Yes', Dependents='0', Education='Graduate', Self_Employed='Yes', ApplicantIncome=3000, CoapplicantIncome=0.0, LoanAmount=66, Loan_Amount_Term=360, Credit_History=1, Property_Area='Urban', Loan_Status='Y'),
 Row(Loan_ID='LP001006', Gender='Male', Married='Yes', Dependents='0', Education='Not Graduate', Self_Employed='No', ApplicantIncome=2583, CoapplicantIncome=2358.0, LoanAmount=120, Loan_Amount_Term=360, Credit_History=1, Property_Ar

In [6]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



In [7]:
df.dtypes

[('Loan_ID', 'string'),
 ('Gender', 'string'),
 ('Married', 'string'),
 ('Dependents', 'string'),
 ('Education', 'string'),
 ('Self_Employed', 'string'),
 ('ApplicantIncome', 'int'),
 ('CoapplicantIncome', 'double'),
 ('LoanAmount', 'int'),
 ('Loan_Amount_Term', 'int'),
 ('Credit_History', 'int'),
 ('Property_Area', 'string'),
 ('Loan_Status', 'string')]

## Data Analysis

In [8]:
# Display count based on Loan status
df.groupBy('Loan_Status').count().show()

+-----------+-----+
|Loan_Status|count|
+-----------+-----+
|          Y|  422|
|          N|  192|
+-----------+-----+



In [9]:
df.select("Credit_History", "Loan_Status").groupBy("Loan_Status").agg(F.avg('Credit_History')).show()

+-----------+-------------------+
|Loan_Status|avg(Credit_History)|
+-----------+-------------------+
|          Y| 0.9818181818181818|
|          N| 0.5418994413407822|
+-----------+-------------------+



In [10]:
df.select("Gender", "Loan_Status").groupBy("Loan_Status", 'Gender').count().show()

+-----------+------+-----+
|Loan_Status|Gender|count|
+-----------+------+-----+
|          N|Female|   37|
|          Y|  null|    8|
|          Y|Female|   75|
|          N|  null|    5|
|          Y|  Male|  339|
|          N|  Male|  150|
+-----------+------+-----+



## Correlation Matrix

In [11]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



In [12]:
columns = ['ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'Credit_History']
corr_df = pd.DataFrame()
for i in columns:
    corr = []
    for j in columns:
        corr.append(round(df.stat.corr(i,j),2))
    corr_df = pd.concat([corr_df, pd.Series(corr)], axis=1)
corr_df.columns = columns
corr_df.insert(0,'',columns)
corr_df.set_index("")

Unnamed: 0,ApplicantIncome,CoapplicantIncome,LoanAmount,Loan_Amount_Term,Credit_History
,,,,,
ApplicantIncome,1.0,-0.12,0.54,-0.02,0.01
CoapplicantIncome,-0.12,1.0,0.19,-0.05,-0.06
LoanAmount,0.54,0.19,1.0,0.06,-0.03
Loan_Amount_Term,-0.02,-0.05,0.06,1.0,0.05
Credit_History,0.01,-0.06,-0.03,0.05,1.0


## Perform SQL Operations

In [13]:
import pyspark.sql as sparksql

In [14]:
df.createOrReplaceTempView('table')

In [15]:
spark.sql("select * from table").show()

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|      null|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        66|             360|             1|        Urban|          Y

In [16]:
spark.sql('select count(*) from table where Credit_History=1').show()

+--------+
|count(1)|
+--------+
|     475|
+--------+



## Data Cleaning

In [17]:
# Display null values
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|      0|    13|      3|        15|        0|           32|              0|                0|        22|              14|            50|            0|          0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+



In [18]:
# Get mean value of column
mean = df.select(F.mean(df['LoanAmount'])).collect()[0][0]
mean

146.41216216216216

In [19]:
# Fill LoanAmount null values
df = df.na.fill(mean, ['LoanAmount'])

In [20]:
# Get mode value of column
mode = df.groupBy('Gender').count().orderBy("count", ascending=False).first()[0]
mode

'Male'

In [21]:
# Fill null values for all the columns
numerical_cols = ['LoanAmount', 'Loan_Amount_Term']
categorical_cols = ['Gender', 'Married', 'Dependents', 'Self_Employed', 'Credit_History']

for col in numerical_cols:
    mean = df.select(F.mean(df[col])).collect()[0][0]
    df = df.na.fill(mean, [col])

for col in categorical_cols:
    mode = df.groupBy(col).count().orderBy("count", ascending=False).first()[0]
    df = df.na.fill(mode, [col])

In [22]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|      0|     0|      0|         0|        0|            0|              0|                0|         0|               0|             0|            0|          0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+



In [23]:
# Create new feature column
df = df.withColumn('TotalIncome', F.col('ApplicantIncome') + F.col('CoapplicantIncome'))
df.show(2)

+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
| Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|TotalIncome|
+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
|LP001002|  Male|     No|         0| Graduate|           No|           5849|              0.0|       146|             360|             1|        Urban|          Y|     5849.0|
|LP001003|  Male|    Yes|         1| Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|     6091.0|
+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+---------------

In [24]:
# How to find and replac values
df = df.withColumn('Loan_Status', F.when(df['Loan_Status']=='Y', 1).otherwise(0))
df.show(2)

+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
| Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|TotalIncome|
+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
|LP001002|  Male|     No|         0| Graduate|           No|           5849|              0.0|       146|             360|             1|        Urban|          1|     5849.0|
|LP001003|  Male|    Yes|         1| Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          0|     6091.0|
+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+---------------

## Feature Engineering

In [25]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = false)
 |-- Married: string (nullable = false)
 |-- Dependents: string (nullable = false)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = false)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: integer (nullable = false)
 |-- TotalIncome: double (nullable = true)



In [26]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

In [27]:
categorical_columns = ['Gender', 'Married', 'Dependents', 'Education', 'Self_Employed', 'Property_Area', 'Credit_History']
numerical_columns = ['ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'TotalIncome']

# Index the string columns
indexers = [StringIndexer(inputCol=col, outputCol="{0}_index".format(col)) for col in categorical_columns]

# Encode the indexed values
encoders = [OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol()))
            for indexer in indexers]

input_columns = [encoder.getOutputCol() for encoder in encoders] + numerical_columns

# Vectorize the encoded values
assembler = VectorAssembler(inputCols=input_columns, outputCol="feature")

In [28]:
# Create the pipeline to transform the data
pipeline = Pipeline(stages= indexers + encoders + [assembler])

In [29]:
data_model = pipeline.fit(df)

In [30]:
transformed_df = data_model.transform(df)

In [31]:
transformed_df.show(1)

+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+------------+-------------+----------------+---------------+-------------------+-------------------+--------------------+--------------------+---------------------+------------------------+-----------------------+---------------------------+---------------------------+----------------------------+--------------------+
| Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|TotalIncome|Gender_index|Married_index|Dependents_index|Education_index|Self_Employed_index|Property_Area_index|Credit_History_index|Gender_index_encoded|Married_index_encoded|Dependents_index_encoded|Education_index_encoded|Self_Employed_index_encoded|Property_Area_index_encoded|Credit_History_index_encoded|             feature|
+--------+

In [32]:
# Get input feature and output columns
transformed_df = transformed_df.select(['feature', 'Loan_Status'])

In [33]:
# Split the data for train and test
train_data, test_data = transformed_df.randomSplit([0.8, 0.2], seed=42)

In [34]:
train_data.show(5)

+--------------------+-----------+
|             feature|Loan_Status|
+--------------------+-----------+
|(22,[0,2,4,8,10,1...|          1|
|(22,[0,2,4,8,10,1...|          1|
|(22,[0,2,4,8,10,1...|          0|
|(22,[0,2,4,8,10,1...|          1|
|(22,[0,2,4,8,10,1...|          1|
+--------------------+-----------+
only showing top 5 rows



## Model Training and Testing

In [35]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr = LogisticRegression(featuresCol='feature', labelCol='Loan_Status')
lr_model = lr.fit(train_data)

In [36]:
# Predict on test data
predictions = lr_model.transform(test_data)
predictions.show(10)

+--------------------+-----------+--------------------+--------------------+----------+
|             feature|Loan_Status|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|(22,[0,2,4,8,10,1...|          1|[-2.1309019654107...|[0.10612939546952...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.1965880165663...|[0.10005730505861...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.1183870711017...|[0.10732249829704...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.1420673697694...|[0.10507482746799...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-2.1120272450757...|[0.10793332086955...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-1.8438581820741...|[0.13659563082672...|       1.0|
|(22,[0,2,4,8,10,1...|          0|[-1.9917058610226...|[0.12007650752153...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[-1.9866327570695...|[0.12061355619972...|       1.0|
|(22,[0,2,4,8,10,1...|          

In [37]:
auc = BinaryClassificationEvaluator().setLabelCol('Loan_Status')
print('AUC:', str(auc.evaluate(predictions)))

AUC: 0.782010582010582


In [38]:
rfc = RandomForestClassifier(featuresCol='feature', labelCol='Loan_Status')
rfc_model = rfc.fit(train_data)

In [39]:
predictions2 = rfc_model.transform(test_data)
predictions2.show(10)

+--------------------+-----------+--------------------+--------------------+----------+
|             feature|Loan_Status|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|(22,[0,2,4,8,10,1...|          1|[2.65747826707479...|[0.13287391335373...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[2.63347961911130...|[0.13167398095556...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[2.90830250608218...|[0.14541512530410...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[2.88083450757739...|[0.14404172537886...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[3.50402613552576...|[0.17520130677628...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[4.29509610339021...|[0.21475480516951...|       1.0|
|(22,[0,2,4,8,10,1...|          0|[3.79164518314481...|[0.18958225915724...|       1.0|
|(22,[0,2,4,8,10,1...|          1|[6.43973596601742...|[0.32198679830087...|       1.0|
|(22,[0,2,4,8,10,1...|          

In [40]:
auc2 = BinaryClassificationEvaluator().setLabelCol('Loan_Status')
print('AUC:', str(auc.evaluate(predictions2)))

AUC: 0.8301587301587302
