## Importing the PySpark Library

To import the PySpark library:

In [1]:
import pyspark

## Starting a Spark Session and Reading CSV

importing SparkSession,

In [2]:
from pyspark.sql import SparkSession

Creating a SparkSession instance,

In [3]:
spark_instance = SparkSession.builder.appName('Practice').getOrCreate()

Reading a CSV file,

In [4]:
df_BLS = spark_instance.read.csv("../datasets/Loan_Status.csv", header=True, inferSchema=True)

## EDA

Verifying read

In [8]:
df_BLS.show(15)

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|      NULL|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        66|             360|             1|        Urban|          Y

Checking schema

In [9]:
df_BLS.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



Checking distinct values of columns,

In [10]:
for col in df_BLS.columns[1:]:
    df_BLS.select(col).distinct().show(n=5, vertical=False)

+------+
|Gender|
+------+
|Female|
|  Male|
|  NULL|
+------+

+-------+
|Married|
+-------+
|     No|
|    Yes|
|   NULL|
+-------+

+----------+
|Dependents|
+----------+
|         0|
|         1|
|        3+|
|         2|
|      NULL|
+----------+

+------------+
|   Education|
+------------+
|Not Graduate|
|    Graduate|
+------------+

+-------------+
|Self_Employed|
+-------------+
|           No|
|          Yes|
|         NULL|
+-------------+

+---------------+
|ApplicantIncome|
+---------------+
|           2366|
|          11500|
|           1025|
|           3000|
|           3704|
+---------------+
only showing top 5 rows

+-----------------+
|CoapplicantIncome|
+-----------------+
|           1587.0|
|           3440.0|
|      16.12000084|
|      985.7999878|
|           2167.0|
+-----------------+
only showing top 5 rows

+----------+
|LoanAmount|
+----------+
|       148|
|       243|
|       137|
|        85|
|        65|
+----------+
only showing top 5 rows

+--------

In order to apply an ML model, we need to transform the categorical columns to numerical.

We have the following cases:
* Gender: Binary (M/F)
* Married: Binary (N/M)
* Depedents: Ordinal (0, 1, 2, 3+)
* Self_Employed: Binary (N/Y)
* Property_Area: Nominal (Urban/Rural/Semi-Urban)
* Loan_Status : Binary (N/Y)

## Data Preparation

To one hot encode binary columns, we use `StringIndexer`.

In [45]:
from pyspark.ml.feature import StringIndexer, StringIndexerModel

Before fitting the models, we will drop all nulls for simplicity.

In [51]:
ind_df = df_BLS.na.drop()

We set the input and output columns for the indexer.

We also specify the `stringOrderType` to specify how the strings should be encoded.

More info on `stringOrderType` in [this link](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html).

For further reading on `StringIndexer`, you can read [link 1](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html) and [link 2](https://www.machinelearningplus.com/pyspark/pyspark-stringindexer/). 

We create a StringIndexer instance,

The `handleInvalid='keep'` here is of no use, we can remove it.

In [79]:
BinCols = ['Gender', 'Married', 'Education', 'Self_Employed', 'Loan_Status']
outputCols = ['{}_OHE'.format(c) for c in BinCols]
indexer = StringIndexer(inputCols=BinCols, outputCols=outputCols, stringOrderType='alphabetAsc', handleInvalid='keep')

We fit the instance,

In [80]:
sim = indexer.fit(ind_df)
# Check labelsArray
sim.labelsArray

[('Female', 'Male'),
 ('No', 'Yes'),
 ('Graduate', 'Not Graduate'),
 ('No', 'Yes'),
 ('N', 'Y')]

We can see that, since `stringOrderType='alphabetAsc'`, for most cases No/N = 0, while Yes/Y = 1, as N comes before Y in the alphabet.

However, for the case of 'Education', Graduate = 0 and Not Graduate = 1, as G comes before N.

We can fix since by separating the case of Education from the other cases and taking `stringOrderType='alphabetDesc'` or by changing the labels map (discussed later in this notebook).

We transform the dataframe,

In [81]:
ind_df = sim.transform(ind_df)
ind_df.select(BinCols+outputCols).show(10)

+------+-------+------------+-------------+-----------+----------+-----------+-------------+-----------------+---------------+
|Gender|Married|   Education|Self_Employed|Loan_Status|Gender_OHE|Married_OHE|Education_OHE|Self_Employed_OHE|Loan_Status_OHE|
+------+-------+------------+-------------+-----------+----------+-----------+-------------+-----------------+---------------+
|  Male|    Yes|    Graduate|           No|          N|       1.0|        1.0|          0.0|              0.0|            0.0|
|  Male|    Yes|    Graduate|          Yes|          Y|       1.0|        1.0|          0.0|              1.0|            1.0|
|  Male|    Yes|Not Graduate|           No|          Y|       1.0|        1.0|          1.0|              0.0|            1.0|
|  Male|     No|    Graduate|           No|          Y|       1.0|        0.0|          0.0|              0.0|            1.0|
|  Male|    Yes|    Graduate|          Yes|          Y|       1.0|        1.0|          0.0|              1.0| 

In [82]:
ind_df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)
 |-- Gender_OHE: double (nullable = false)
 |-- Married_OHE: double (nullable = false)
 |-- Education_OHE: double (nullable = false)
 |-- Self_Employed_OHE: double (nullable = false)
 |-- Loan_Status_OHE: double (nullable = false)



To fix the labels issue, we can use `from_labelsArray`.

We reset the dataframe,

In [117]:
ind_df = df_BLS.na.drop()

We then create the wanted `labelArray`.

The `labelsArray` should be a list of lists.

The following step can be automated using regex; however, for now, I use the easy solution.

In [118]:
new_labelsArray = sim.labelsArray
new_labelsArray[2] = (new_labelsArray[2][1], new_labelsArray[2][0])
new_labelsArray = [list(x) for x in new_labelsArray]
new_labelsArray

[['Female', 'Male'],
 ['No', 'Yes'],
 ['Not Graduate', 'Graduate'],
 ['No', 'Yes'],
 ['N', 'Y']]

We create a new model from the arrays of labels.

`inputCols` and `outputCols` should be redefined.

In [119]:
new_sim = sim.from_arrays_of_labels(new_labelsArray, inputCols=BinCols, outputCols=outputCols)
print(new_sim.labelsArray)
print(new_sim.getInputCols())
print(new_sim.getOutputCols())

[('Female', 'Male'), ('No', 'Yes'), ('Not Graduate', 'Graduate'), ('No', 'Yes'), ('N', 'Y')]
['Gender', 'Married', 'Education', 'Self_Employed', 'Loan_Status']
['Gender_OHE', 'Married_OHE', 'Education_OHE', 'Self_Employed_OHE', 'Loan_Status_OHE']


In [120]:
ind_df = new_sim.transform(ind_df)
ind_df.select(BinCols+outputCols).show(10)

+------+-------+------------+-------------+-----------+----------+-----------+-------------+-----------------+---------------+
|Gender|Married|   Education|Self_Employed|Loan_Status|Gender_OHE|Married_OHE|Education_OHE|Self_Employed_OHE|Loan_Status_OHE|
+------+-------+------------+-------------+-----------+----------+-----------+-------------+-----------------+---------------+
|  Male|    Yes|    Graduate|           No|          N|       1.0|        1.0|          1.0|              0.0|            0.0|
|  Male|    Yes|    Graduate|          Yes|          Y|       1.0|        1.0|          1.0|              1.0|            1.0|
|  Male|    Yes|Not Graduate|           No|          Y|       1.0|        1.0|          0.0|              0.0|            1.0|
|  Male|     No|    Graduate|           No|          Y|       1.0|        0.0|          1.0|              0.0|            1.0|
|  Male|    Yes|    Graduate|          Yes|          Y|       1.0|        1.0|          1.0|              1.0| 

For the 'Dependents' columns, we have an Ordinal variable.

We will thus remove the '+' sign from '3+' and then cast it as a double.

To remove the '+' sign, we will use `regex_replace` from sql functions.

For more readings on `regex_replace`: [link1](https://www.skytowner.com/explore/removing_substring_in_column_values_of_pyspark_dataframe), [link2](https://community.snowflake.com/s/article/How-REGEXP-REPLACE-and-REPLACE-handle-empty-string), and [link 3](https://medium.com/@uzzaman.ahmed/pyspark-string-functions-a-comprehensive-guide-842c3d68351b).

In [121]:
from pyspark.sql.functions import regexp_replace

'Dependents_ENC' is cast to double by using `cast('double')`.

In [122]:
ind_df = ind_df.withColumn('Dependents_ENC', regexp_replace('Dependents', r"3\+", "3").cast('double'))
ind_df.select(['Dependents', 'Dependents_ENC']).show(10)

+----------+--------------+
|Dependents|Dependents_ENC|
+----------+--------------+
|         1|           1.0|
|         0|           0.0|
|         0|           0.0|
|         0|           0.0|
|         2|           2.0|
|         0|           0.0|
|        3+|           3.0|
|         2|           2.0|
|         1|           1.0|
|         2|           2.0|
+----------+--------------+
only showing top 10 rows



In [123]:
ind_df = ind_df.withColumn('Dependents_ENC', ind_df.Dependents_ENC.cast('double'))
ind_df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)
 |-- Gender_OHE: double (nullable = false)
 |-- Married_OHE: double (nullable = false)
 |-- Education_OHE: double (nullable = false)
 |-- Self_Employed_OHE: double (nullable = false)
 |-- Loan_Status_OHE: double (nullable = false)
 |-- Dependents_ENC: double (nullable = true)



To do one hot encoding, we should first index the strings, then apply the `OneHotEncoder`.

For further reading: [link 1](https://www.machinelearningplus.com/pyspark/pyspark-onehot-encoding/) and [link 2](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html).

In [124]:
from pyspark.ml.feature import OneHotEncoder

We start by indexing the strings,

In [125]:
sip = StringIndexer(inputCol='Property_Area', outputCol='Property_Area_IND')
sim = sip.fit(ind_df)
ind_df = sim.transform(ind_df)
ind_df.select(['Property_Area', 'Property_Area_IND']).show(10)

+-------------+-----------------+
|Property_Area|Property_Area_IND|
+-------------+-----------------+
|        Rural|              2.0|
|        Urban|              1.0|
|        Urban|              1.0|
|        Urban|              1.0|
|        Urban|              1.0|
|        Urban|              1.0|
|    Semiurban|              0.0|
|        Urban|              1.0|
|    Semiurban|              0.0|
|        Urban|              1.0|
+-------------+-----------------+
only showing top 10 rows



Now, we create an OHE instance,

In [126]:
ohe = OneHotEncoder(inputCol='Property_Area_IND', outputCol='Property_Area_OHE')
model = ohe.fit(ind_df)
ind_df = model.transform(ind_df)
ind_df.select(['Property_Area', 'Property_Area_IND', 'Property_Area_OHE']).show(10)

+-------------+-----------------+-----------------+
|Property_Area|Property_Area_IND|Property_Area_OHE|
+-------------+-----------------+-----------------+
|        Rural|              2.0|        (2,[],[])|
|        Urban|              1.0|    (2,[1],[1.0])|
|        Urban|              1.0|    (2,[1],[1.0])|
|        Urban|              1.0|    (2,[1],[1.0])|
|        Urban|              1.0|    (2,[1],[1.0])|
|        Urban|              1.0|    (2,[1],[1.0])|
|    Semiurban|              0.0|    (2,[0],[1.0])|
|        Urban|              1.0|    (2,[1],[1.0])|
|    Semiurban|              0.0|    (2,[0],[1.0])|
|        Urban|              1.0|    (2,[1],[1.0])|
+-------------+-----------------+-----------------+
only showing top 10 rows



In [129]:
ind_df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)
 |-- Gender_OHE: double (nullable = false)
 |-- Married_OHE: double (nullable = false)
 |-- Education_OHE: double (nullable = false)
 |-- Self_Employed_OHE: double (nullable = false)
 |-- Loan_Status_OHE: double (nullable = false)
 |-- Dependents_ENC: double (nullable = true)
 |-- Property_Area_IND: double (nullable = false)
 |-- Property_Area_OHE: vector (nullable = true)



We see from the schema that the OHE creates a vector. This vector is of type sparse.

It reads as (dim,[pos],[val]).

For example, `(2,[ ],[ ])` is of dimension 2, has no position and value, thus it is the vector [0.0, 0.0].

The vector `(2,[1],[1.0])` is of dimension 2, has position 1 and value of 1.0, thus it is the vector [0.0, 1.0].

We now reorganize ind_df,

In [132]:
colsToKeep = ['Gender_OHE', 'Married_OHE', 'Dependents_ENC', 'Education_OHE',
             'Self_Employed_OHE', 'ApplicantIncome', 'CoapplicantIncome', 'LoanAmount',
             'Loan_Amount_Term', 'Credit_History', 'Property_Area_OHE', 'Loan_Status_OHE']
ind_df = ind_df.select(colsToKeep)
ind_df.show(5)

+----------+-----------+--------------+-------------+-----------------+---------------+-----------------+----------+----------------+--------------+-----------------+---------------+
|Gender_OHE|Married_OHE|Dependents_ENC|Education_OHE|Self_Employed_OHE|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area_OHE|Loan_Status_OHE|
+----------+-----------+--------------+-------------+-----------------+---------------+-----------------+----------+----------------+--------------+-----------------+---------------+
|       1.0|        1.0|           1.0|          1.0|              0.0|           4583|           1508.0|       128|             360|             1|        (2,[],[])|            0.0|
|       1.0|        1.0|           0.0|          1.0|              1.0|           3000|              0.0|        66|             360|             1|    (2,[1],[1.0])|            1.0|
|       1.0|        1.0|           0.0|          0.0|              0.0|           258

In order to apply ML models to my dataframe, I need to put it in a vector format. For that, I import VectorAssembler.

In [133]:
from pyspark.ml.feature import VectorAssembler

We create an instance,

In [134]:
feature_assembler = VectorAssembler()

And, we set the params

In [135]:
inputCols = colsToKeep[:-1]
outputCol = 'Ind_Vect_Feature'
feature_assembler.setParams(inputCols=inputCols, outputCol=outputCol)

VectorAssembler_809e519cd3ce

Then, we transform the dataframe,

**N.B.** To transform the dataframe, there should be no null values.

In [136]:
df_ML = feature_assembler.transform(ind_df)
df_ML.show()

+----------+-----------+--------------+-------------+-----------------+---------------+-----------------+----------+----------------+--------------+-----------------+---------------+--------------------+
|Gender_OHE|Married_OHE|Dependents_ENC|Education_OHE|Self_Employed_OHE|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area_OHE|Loan_Status_OHE|    Ind_Vect_Feature|
+----------+-----------+--------------+-------------+-----------------+---------------+-----------------+----------+----------------+--------------+-----------------+---------------+--------------------+
|       1.0|        1.0|           1.0|          1.0|              0.0|           4583|           1508.0|       128|             360|             1|        (2,[],[])|            0.0|[1.0,1.0,1.0,1.0,...|
|       1.0|        1.0|           0.0|          1.0|              1.0|           3000|              0.0|        66|             360|             1|    (2,[1],[1.0])|            1.0|[1

In [138]:
df_ML = df_ML.select(['Ind_Vect_Feature','Loan_Status_OHE'])
df_ML.show()

+--------------------+---------------+
|    Ind_Vect_Feature|Loan_Status_OHE|
+--------------------+---------------+
|[1.0,1.0,1.0,1.0,...|            0.0|
|[1.0,1.0,0.0,1.0,...|            1.0|
|[1.0,1.0,0.0,0.0,...|            1.0|
|[1.0,0.0,0.0,1.0,...|            1.0|
|[1.0,1.0,2.0,1.0,...|            1.0|
|[1.0,1.0,0.0,0.0,...|            1.0|
|[1.0,1.0,3.0,1.0,...|            0.0|
|[1.0,1.0,2.0,1.0,...|            1.0|
|[1.0,1.0,1.0,1.0,...|            0.0|
|[1.0,1.0,2.0,1.0,...|            1.0|
|[1.0,1.0,2.0,1.0,...|            1.0|
|[1.0,0.0,0.0,1.0,...|            0.0|
|[1.0,1.0,2.0,1.0,...|            1.0|
|[1.0,0.0,0.0,1.0,...|            1.0|
|(12,[3,5,7,8,11],...|            0.0|
|(12,[0,1,5,7,8,9]...|            0.0|
|(12,[0,1,5,7,8,11...|            0.0|
|[1.0,1.0,1.0,1.0,...|            1.0|
|[1.0,1.0,0.0,0.0,...|            0.0|
|[1.0,1.0,0.0,1.0,...|            1.0|
+--------------------+---------------+
only showing top 20 rows



## ML Model

We import `LogisticRegression` from `classification` library,

Documentation in this [link](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html).

In [140]:
from pyspark.ml.classification import LogisticRegression

We split the data into train and test samples,

In [143]:
train_data, test_data = df_ML.randomSplit([0.75, 0.25], seed=0)

We create an instance,

In [144]:
blor = LogisticRegression()

We set the parameters of the instance,

In [201]:
# we set the independent features column
blor.setFeaturesCol('Ind_Vect_Feature')
# we set the labels column
blor.setLabelCol('Loan_Status_OHE')
# defaul family value is 'auto', other options are 'binomial' and 'multinomial'
blor.setFamily('binomial')
# MaxInter is 100 by default
blor.setMaxIter(1000)
# Regularization parameter is 0 by default, 0.1 showed better results
blor.setRegParam(0.1)
# Standardization is True by default
print(blor.getStandardization())
# ElasticNetParam is 0 by default. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
blor.setElasticNetParam(1)

True


LogisticRegression_3268f8dd183a

Now, we fit the model,

In [202]:
model = blor.fit(train_data)

Checking the fitted model coefficients

In [203]:
print(model.coefficients, model.intercept, sep='\n')

(12,[9],[1.959746866078264])
-0.8195179319219494


Checking accuracy on train_data

In [204]:
model.summary.accuracy

0.8172757475083057

Checking accuracy on test_data

In [205]:
model.evaluate(test_data).accuracy

0.7676767676767676

## Additional steps to be done

* Apply Under/Over Sampling in case of imbalance.
* Applying imputer to null values (might be better than simply dropping).

In [212]:
df_BLS.na.drop().select('Loan_Status').groupBy('Loan_Status').count().show()

+-----------+-----+
|Loan_Status|count|
+-----------+-----+
|          Y|  276|
|          N|  124|
+-----------+-----+



## Links and Further Readings

* [7 Techniques to Handle Imbalanced Data](https://www.kdnuggets.com/2017/06/7-techniques-handle-imbalanced-data.html)
* [Oversampling and Undersampling with PySpark](https://medium.com/@junwan01/oversampling-and-undersampling-with-pyspark-5dbc25cdf253)
* [How do I replace a string value with a NULL in PySpark?](https://stackoverflow.com/questions/36897658/how-do-i-replace-a-string-value-with-a-null-in-pyspark)