## Installing PySpark

In [6]:
# install java
# install apache spark with hadoop
# set environment variables
#!pip install pyspark
#!pip install --upgrade pyspark



## Import Modules

In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import pandas as pd
import warnings
warnings.filterwarnings('ignore')

In [2]:
# initialize the session
spark = SparkSession.builder.appName('loan_prediction').getOrCreate()

RuntimeError: Java gateway process exited before sending its port number

## Load the Dataset

In [None]:
df = spark.read.csv('Loan Prediction Dataset.csv', header=True, sep=',', inferSchema=True)
df.show(5)

In [None]:
df.printSchema()

In [None]:
df.dtypes

In [None]:
# convert spark dataframe to pandas
pandas_df = df.toPandas()
pandas_df.head()

## Data Analysis

In [None]:
# display count based on loan status
df.groupBy('Loan_Status').count().show()

In [None]:
df.select("Credit_History", "Loan_Status").groupBy('Loan_Status').agg(F.avg('Credit_History')).show()

In [None]:
df.select('Gender', 'Loan_Status').groupBy('Loan_Status', 'Gender').count().show()

## Correlation Matrix

In [None]:
columns = ['ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'Credit_History']
corr_df = pd.DataFrame()
for i in columns:
    corr = []
    for j in columns:
        corr.append(round(df.stat.corr(i, j), 2))
    corr_df = pd.concat([corr_df, pd.Series(corr)], axis=1)
corr_df.columns = columns
corr_df.insert(0, '', columns)
corr_df.set_index('')

## Perform SQL Operations

In [None]:
import pyspark.sql as sparksql

In [None]:
df.createOrReplaceTempView('table')

In [None]:
# display top rows from the table
spark.sql("select * from table limit 5").show()

In [None]:
spark.sql('select Loan_ID from table where Credit_History=1').show()

## Data Cleaning

In [None]:
# display null values
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
# get mean value of column
mean = df.select(F.mean(df['LoanAmount'])).collect()[0][0]
mean

In [None]:
# fill null value
df = df.na.fill(mean, ['LoanAmount'])

In [None]:
# get mode value of column
df.groupby('Gender').count().orderBy("count", ascending=False).first()[0]

In [None]:
# fill null values for all the columns
numerical_cols = ['LoanAmount', 'Loan_Amount_Term']
categorical_cols = ['Gender', 'Married', 'Dependents', 'Self_Employed', 'Credit_History']

In [None]:
for col in numerical_cols:
    mean = df.select(F.mean(df[col])).collect()[0][0]
    df = df.na.fill(mean, [col])

In [None]:
for col in categorical_cols:
    mode = df.groupby(col).count().orderBy("count", ascending=False).first()[0]
    df = df.na.fill(mode, [col])

In [None]:
# display null values
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
# create new feature column
df = df.withColumn('TotalIncome', F.col('ApplicantIncome') + F.col('CoapplicantIncome'))
df.show(2)

In [None]:
# how to find and replace values
df = df.withColumn('Loan_Status', F.when(df['Loan_Status']=='Y', 1).otherwise(0))
df.show(2)

## Feature Engineering

In [None]:
df.printSchema()

In [None]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

In [None]:
categorical_columns = ['Gender', 'Married', 'Dependents', 'Education', 'Self_Employed', 'Property_Area', 'Credit_History']
numerical_columns = ['ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'TotalIncome']

# index the string columns
indexers = [StringIndexer(inputCol=col, outputCol="{0}_index".format(col)) for col in categorical_columns]

# encode the indexed values
encoders = [OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol()))
           for indexer in indexers]

input_columns = [encoder.getOutputCol() for encoder in encoders] + numerical_columns

# vectorize the encoded values
assembler = VectorAssembler(inputCols=input_columns, outputCol="feature")

In [None]:
# create the pipeline to transform the data
pipeline = Pipeline(stages = indexers + encoders + [assembler])

In [None]:
data_model = pipeline.fit(df)

In [None]:
transformed_df = data_model.transform(df)

In [None]:
transformed_df.show(1)

In [None]:
# get input feature and output columns
transformed_df = transformed_df.select(['feature', 'Loan_Status'])

In [None]:
# split the data for train and test
train_data, test_data = transformed_df.randomSplit([0.8, 0.2], seed=42)

In [None]:
train_data.show(5)

## Model Training & Testing

In [None]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
lr = LogisticRegression(featuresCol='feature', labelCol='Loan_Status')
lr_model = lr.fit(train_data)

In [None]:
# predict on test data
predictions = lr_model.transform(test_data)
predictions.show(5)

In [None]:
predictions = lr_model.transform(test_data)
auc = BinaryClassificationEvaluator().setLabelCol('Loan_Status')
print('AUC:', str(auc.evaluate(predictions)))

In [None]:
rf = RandomForestClassifier(featuresCol='feature', labelCol='Loan_Status')
rf_model = rf.fit(train_data)

In [None]:
predictions = rf_model.transform(test_data)
auc = BinaryClassificationEvaluator().setLabelCol('Loan_Status')
print('AUC:', str(auc.evaluate(predictions)))