In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=456e22358cacf4b3cd191ae80212c6297327d35309ce87b69f173805053ee31f
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## Import Modules

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import pandas as pd
import warnings
warnings.filterwarnings('ignore')

In [3]:
spark = SparkSession.builder.appName('loan_prediction').getOrCreate()

## Load the Dataset

In [72]:
df = spark.read.csv('/content/loan_approval_dataset.csv', header=True, sep=',', inferSchema=True)
df.show(5)

+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+
|loan_id|no_of_dependents|    education|self_employed|income_annum|loan_amount|loan_term|cibil_score|residential_assets_value|commercial_assets_value|luxury_assets_value|bank_asset_value|loan_status|
+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+
|      1|               2|     Graduate|           No|     9600000|   29900000|       12|        778|                 2400000|               17600000|           22700000|         8000000|   Approved|
|      2|               0| Not Graduate|          Yes|     4100000|   12200000|        8|        417|                 2700000|                2200000|            8800000|         3300000|   Rejected|


In [73]:
df.printSchema()

root
 |-- loan_id: integer (nullable = true)
 |-- no_of_dependents: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- self_employed: string (nullable = true)
 |-- income_annum: integer (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- loan_term: integer (nullable = true)
 |-- cibil_score: integer (nullable = true)
 |-- residential_assets_value: integer (nullable = true)
 |-- commercial_assets_value: integer (nullable = true)
 |-- luxury_assets_value: integer (nullable = true)
 |-- bank_asset_value: integer (nullable = true)
 |-- loan_status: string (nullable = true)



In [74]:
df.dtypes

[('loan_id', 'int'),
 ('no_of_dependents', 'int'),
 ('education', 'string'),
 ('self_employed', 'string'),
 ('income_annum', 'int'),
 ('loan_amount', 'int'),
 ('loan_term', 'int'),
 ('cibil_score', 'int'),
 ('residential_assets_value', 'int'),
 ('commercial_assets_value', 'int'),
 ('luxury_assets_value', 'int'),
 ('bank_asset_value', 'int'),
 ('loan_status', 'string')]

In [75]:
# convert spark dataframe to pandas
pandas_df = df.toPandas()
pandas_df.head()

Unnamed: 0,loan_id,no_of_dependents,education,self_employed,income_annum,loan_amount,loan_term,cibil_score,residential_assets_value,commercial_assets_value,luxury_assets_value,bank_asset_value,loan_status
0,1,2,Graduate,No,9600000,29900000,12,778,2400000,17600000,22700000,8000000,Approved
1,2,0,Not Graduate,Yes,4100000,12200000,8,417,2700000,2200000,8800000,3300000,Rejected
2,3,3,Graduate,No,9100000,29700000,20,506,7100000,4500000,33300000,12800000,Rejected
3,4,3,Graduate,No,8200000,30700000,8,467,18200000,3300000,23300000,7900000,Rejected
4,5,5,Not Graduate,Yes,9800000,24200000,20,382,12400000,8200000,29400000,5000000,Rejected


In [76]:
df.columns

['loan_id',
 'no_of_dependents',
 'education',
 'self_employed',
 'income_annum',
 'loan_amount',
 'loan_term',
 'cibil_score',
 'residential_assets_value',
 'commercial_assets_value',
 'luxury_assets_value',
 'bank_asset_value',
 'loan_status']

## Data Analysis

In [77]:
df.groupBy('loan_Status').count().show()

+-----------+-----+
|loan_Status|count|
+-----------+-----+
|   Approved| 2656|
|   Rejected| 1613|
+-----------+-----+



In [78]:
df.select('education', 'loan_Status').groupBy('loan_Status', 'education').count().show()

+-----------+-------------+-----+
|loan_Status|    education|count|
+-----------+-------------+-----+
|   Approved| Not Graduate| 1317|
|   Rejected| Not Graduate|  808|
|   Rejected|     Graduate|  805|
|   Approved|     Graduate| 1339|
+-----------+-------------+-----+



## Correlation Matrix

In [79]:
columns = ['income_annum',
 'loan_amount',
 'loan_term',
 'cibil_score',
 'residential_assets_value',
 'commercial_assets_value',
 'luxury_assets_value',
 'bank_asset_value']
corr_df = pd.DataFrame()
for i in columns:
    corr = []
    for j in columns:
        corr.append(round(df.stat.corr(i, j), 2))
    corr_df = pd.concat([corr_df, pd.Series(corr)], axis=1)
corr_df.columns = columns
corr_df.insert(0, '', columns)
corr_df.set_index('')

Unnamed: 0,income_annum,loan_amount,loan_term,cibil_score,residential_assets_value,commercial_assets_value,luxury_assets_value,bank_asset_value
,,,,,,,,
income_annum,1.0,0.93,0.01,-0.02,0.64,0.64,0.93,0.85
loan_amount,0.93,1.0,0.01,-0.02,0.59,0.6,0.86,0.79
loan_term,0.01,0.01,1.0,0.01,0.01,-0.01,0.01,0.02
cibil_score,-0.02,-0.02,0.01,1.0,-0.02,-0.0,-0.03,-0.02
residential_assets_value,0.64,0.59,0.01,-0.02,1.0,0.41,0.59,0.53
commercial_assets_value,0.64,0.6,-0.01,-0.0,0.41,1.0,0.59,0.55
luxury_assets_value,0.93,0.86,0.01,-0.03,0.59,0.59,1.0,0.79
bank_asset_value,0.85,0.79,0.02,-0.02,0.53,0.55,0.79,1.0


## Perform SQL Operations

In [80]:
import pyspark.sql as sparksql

In [81]:
df.createOrReplaceTempView('table')

In [82]:
spark.sql("select * from table limit 5").show()

+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+
|loan_id|no_of_dependents|    education|self_employed|income_annum|loan_amount|loan_term|cibil_score|residential_assets_value|commercial_assets_value|luxury_assets_value|bank_asset_value|loan_status|
+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+
|      1|               2|     Graduate|           No|     9600000|   29900000|       12|        778|                 2400000|               17600000|           22700000|         8000000|   Approved|
|      2|               0| Not Graduate|          Yes|     4100000|   12200000|        8|        417|                 2700000|                2200000|            8800000|         3300000|   Rejected|


## Data Cleaning

In [83]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------+----------------+---------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+
|loan_id|no_of_dependents|education|self_employed|income_annum|loan_amount|loan_term|cibil_score|residential_assets_value|commercial_assets_value|luxury_assets_value|bank_asset_value|loan_status|
+-------+----------------+---------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+
|      0|               0|        0|            0|           0|          0|        0|          0|                       0|                      0|                  0|               0|          0|
+-------+----------------+---------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+



In [109]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
ind=StringIndexer(inputCol='loan_status',outputCol='loan_status_index')
indx=ind.fit(df).transform(df)

## Feature Engineering

In [84]:
df.printSchema()

root
 |-- loan_id: integer (nullable = true)
 |-- no_of_dependents: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- self_employed: string (nullable = true)
 |-- income_annum: integer (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- loan_term: integer (nullable = true)
 |-- cibil_score: integer (nullable = true)
 |-- residential_assets_value: integer (nullable = true)
 |-- commercial_assets_value: integer (nullable = true)
 |-- luxury_assets_value: integer (nullable = true)
 |-- bank_asset_value: integer (nullable = true)
 |-- loan_status: string (nullable = true)



In [110]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

In [111]:
categorical_columns = ['education','self_employed']
numerical_columns = ['no_of_dependents','income_annum',
 'loan_amount',
 'loan_term',
 'cibil_score',
 'residential_assets_value',
 'commercial_assets_value',
 'luxury_assets_value',
 'bank_asset_value']

# index the string columns
indexers = [StringIndexer(inputCol=col, outputCol="{0}_index".format(col)) for col in categorical_columns]

# encode the indexed values
#encoders = [OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol()))


input_columns = [indexer.getOutputCol()  for indexer in indexers] + numerical_columns

# vectorize the encoded values
assembler = VectorAssembler(inputCols=input_columns, outputCol="feature")

In [112]:
pipeline = Pipeline(stages = indexers + [assembler])

In [113]:
data_model = pipeline.fit(indx)

In [114]:
transformed_df = data_model.transform(indx)

In [115]:
transformed_df.show(1)

+-------+----------------+---------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+-----------------+---------------+-------------------+--------------------+
|loan_id|no_of_dependents|education|self_employed|income_annum|loan_amount|loan_term|cibil_score|residential_assets_value|commercial_assets_value|luxury_assets_value|bank_asset_value|loan_status|loan_status_index|education_index|self_employed_index|             feature|
+-------+----------------+---------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+-----------------+---------------+-------------------+--------------------+
|      1|               2| Graduate|           No|     9600000|   29900000|       12|        778|                 2400000|               17600000|           22700000|         8000000|   A

In [116]:
transformed_df = transformed_df.select(['feature', 'loan_status_index'])

In [117]:
train_data, test_data = transformed_df.randomSplit([0.8, 0.2], seed=42)

In [118]:
train_data.show(truncate=False)

+-----------------------------------------------------------------------------------+-----------------+
|feature                                                                            |loan_status_index|
+-----------------------------------------------------------------------------------+-----------------+
|[0.0,0.0,0.0,200000.0,300000.0,18.0,520.0,100000.0,300000.0,600000.0,100000.0]     |1.0              |
|[0.0,0.0,0.0,200000.0,600000.0,8.0,852.0,500000.0,0.0,400000.0,200000.0]           |0.0              |
|[0.0,0.0,0.0,400000.0,900000.0,2.0,855.0,-100000.0,500000.0,1300000.0,400000.0]    |0.0              |
|[0.0,0.0,0.0,400000.0,1200000.0,14.0,889.0,1100000.0,200000.0,700000.0,200000.0]   |0.0              |
|[0.0,0.0,0.0,500000.0,1100000.0,18.0,479.0,1500000.0,500000.0,1800000.0,600000.0]  |1.0              |
|[0.0,0.0,0.0,600000.0,2300000.0,4.0,319.0,1600000.0,800000.0,2300000.0,700000.0]   |0.0              |
|[0.0,0.0,0.0,700000.0,1800000.0,12.0,400.0,1500000.0,1100000.0,

## Model Training & Testing

In [128]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

In [123]:
lr = LogisticRegression(featuresCol='feature', labelCol='loan_status_index')
lr_model = lr.fit(train_data)

In [121]:
predictions = lr_model.transform(test_data)
predictions.show(5)

+--------------------+-----------------+--------------------+-----------+----------+
|             feature|loan_status_index|       rawPrediction|probability|prediction|
+--------------------+-----------------+--------------------+-----------+----------+
|[0.0,0.0,0.0,2000...|              1.0|[Infinity,-Infinity]|  [1.0,0.0]|       0.0|
|[0.0,0.0,0.0,5000...|              0.0|[Infinity,-Infinity]|  [1.0,0.0]|       0.0|
|[0.0,0.0,0.0,7000...|              0.0|[Infinity,-Infinity]|  [1.0,0.0]|       0.0|
|[0.0,0.0,0.0,8000...|              0.0|[Infinity,-Infinity]|  [1.0,0.0]|       0.0|
|[0.0,0.0,0.0,1000...|              1.0|[Infinity,-Infinity]|  [1.0,0.0]|       0.0|
+--------------------+-----------------+--------------------+-----------+----------+
only showing top 5 rows



In [131]:
predictions = lr_model.transform(test_data)
auc = BinaryClassificationEvaluator().setLabelCol('loan_status_index')
print('AUC:', auc.evaluate(predictions))
mul= MulticlassClassificationEvaluator(labelCol='loan_status_index',metricName='accuracy')
print('Accuracy: ' ,mul.evaluate(predictions))

AUC: 0.970590514068777
Accuracy:  0.900373599003736


In [126]:
rf = RandomForestClassifier(featuresCol='feature', labelCol='loan_status_index')
rf_model = rf.fit(train_data)

In [132]:
predictions = rf_model.transform(test_data)
auc = BinaryClassificationEvaluator().setLabelCol('loan_status_index')
print('AUC:', auc.evaluate(predictions))
mul= MulticlassClassificationEvaluator(labelCol='loan_status_index',metricName='accuracy')
print('Accuracy: ' ,mul.evaluate(predictions))

AUC: 0.9933045802611018
Accuracy:  0.962640099626401
