<a href="https://www.kaggle.com/code/rputtam/classification-using-spark-mllib?scriptVersionId=223294154" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/loan-status-prediction/loan_data.csv


In [None]:
#### test changes sync

## The objective of this notebook is to predict loan approval status of members.
#### **Prediction Algorithm**: Logistic Regression
#### **Library**: spark MLib

The notebook adheres to the following steps.
1. Installing and importing packages
2. Load the data
3. Data Exploration (EDA)
4. Data Splitting
5. Data Preprocessing
6. Model Building
7. Model Summary - coefficients and pValues
8. Model Prediction
9. Model Evaluation
10. Saving model artifact
11. Applying the model on testdata

## 1. Install and import necessary packages

In [5]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [6]:
#Creating spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('classification with MLlib').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/19 00:42:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
# Importing packages
from pyspark.sql.types import *
import pyspark.sql.functions as F
import seaborn as sns
import matplotlib.pyplot as plt

## 2. Data loading

In [8]:
df=spark.read.csv('/kaggle/input/loan-status-prediction/loan_data.csv',inferSchema=True,header=True)

                                                                                

In [9]:
df.describe()

DataFrame[summary: string, Loan_ID: string, Gender: string, Married: string, Dependents: string, Education: string, Self_Employed: string, ApplicantIncome: string, CoapplicantIncome: string, LoanAmount: string, Loan_Amount_Term: string, Credit_History: string, Property_Area: string, Loan_Status: string]

 ## 3. Data Exploration (Exploratory Data Analysis)

1. Dataframe size
2. Get the field names and their data types, and, number of numeric and categorical features
3. A glimpse of data
4. Missing values count
5. Target variable distribution
6. Continuous variables distribution - normality check and boxplot for outliers
7. Categorical variables distribution
8. Predictors relation with target

#### 1. Dataframe size

In [76]:
print('Records:',df.count(),'\nColumns:', len(df.columns))

Records: 381 
Columns: 13


In [77]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: double (nullable = true)
 |-- Loan_Amount_Term: double (nullable = true)
 |-- Credit_History: double (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



#### 2. Get the field names and their data types, and, number of numeric and categorical features

In [78]:
numeric_data_types=['IntegerType','DoubleType','FloatType','LongType']
string_data_types=['StringType']

cat_cols=0
num_cols=0
other_cols=0

num_cols_list=[]
cat_cols_list=[]
oth_cols_list=[]

for field in df.schema.fields:
    if str(field.dataType)[:-2] in string_data_types:
        cat_cols_list.append(field.name)
        cat_cols+=1
    elif str(field.dataType)[:-2] in numeric_data_types:
        num_cols_list.append(field.name)
        num_cols+=1
    else:
        oth_cols_list.append(field.name)
        other_cols+1
print('Numeric cols:', num_cols,num_cols_list,'\nCategorical Cols:', cat_cols, cat_cols_list,'\nOther Cols:',other_cols,oth_cols_list)  

Numeric cols: 5 ['ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'Credit_History'] 
Categorical Cols: 8 ['Loan_ID', 'Gender', 'Married', 'Dependents', 'Education', 'Self_Employed', 'Property_Area', 'Loan_Status'] 
Other Cols: 0 []


In [None]:
#df.show(5)

#### 3. Data Glimpse

In [None]:
df.toPandas().head()
# numeric variables such as income, loan amount, term are considered as strings, but should be converted to numeric data types which is done in data preprocessing step

#### 4. Missing values Summary

In [None]:
missing_values_dict={}
for c in df.columns:
    missing_values_dict[c]=df.agg(F.count(F.when(F.isnan(c) | F.col(c).isNull(),c))).collect()[0][0]
    
missing_values_dict

#### 5. Target class distribution

In [None]:
# Creating target variable
df = df.withColumn('target', F.when(F.col('Loan_Status') == 'Y',1).otherwise(0))
print(df.select('target').dtypes)
df.agg(F.mean('target')).collect()[0][0]

In [None]:
total_count=df.count()
# Get the target class distribution
df_prop = df.groupBy(['Loan_Status'])\
.agg(F.count('Loan_ID').alias('appl_ct'))\
.withColumn('prop',F.round(F.col('appl_ct')/total_count*100,2))\

df_prop.show()

In [None]:
df_target_stats=df_prop.toPandas()
df_target_stats.head()

In [None]:
sns.catplot(data=df_target_stats,x='Loan_Status',y='appl_ct',kind='bar')

plt.xlabel('Loan Status')
plt.ylabel('Count')
plt.title('Distribution of Loan Status')

## Summary Statistics

In [None]:
df.describe().toPandas().head()

In [None]:
df.toPandas().describe()

In [None]:
df.toPandas().describe(include='object')

In [None]:
string_fields = [col for col,dtype in df.dtypes if dtype =='string']
string_fields.remove('Loan_ID')
for string_field in string_fields:
    df.select(string_field).distinct().show(5)

In [None]:
df_p=df.toPandas()

In [None]:
# Get the levels of caterogical variables
for i in string_fields:
    print(i,(df_p[i].nunique(),df_p[i].unique()))
#df_p.Gender.value_counts()

#### 6. Continuous variables distribution - normality check and boxplot for outliers
Data distribution plots of continuous variables

In [None]:
num_cols_list

In [None]:
import warnings

# Suppress all warnings
warnings.filterwarnings("ignore")

fig,axs = plt.subplots(1,5,figsize=(15,5))
for i,col in enumerate(num_cols_list):
    sns.kdeplot(data=df_p[col],ax=axs[i],fill=True)
plt.tight_layout()
plt.show()

In [None]:
#Nuemric variables box plot to cehck for outliers
df_p.boxplot(column=num_cols_list)

In [None]:
df_p.boxplot(column=['LoanAmount','Loan_Amount_Term'])
# It can be seen from the below plot that the range of these numeric variables is wide and there seem to be outliers for loan_amount_term

In [None]:
df_p.boxplot(column=['ApplicantIncome','CoapplicantIncome'])
# There seem to be instances where coapplicant income is higher than applicant income

In [None]:
cat_cols_list

#### 7. Categorical variables distribution plots

In [None]:
# Catgeorical variables counts plot
fig,axes=plt.subplots(nrows=1,ncols=len(cat_cols_list)-1,figsize=(15,5))
for i,col in enumerate(cat_cols_list[1:]):
    cat_counts =df_p[col].value_counts()
    cat_counts.plot(kind='bar',ax=axes[i])

plt.tight_layout()
plt.show()

#### 8. Predictors relationship with target

In [None]:
#Distributions 
fig,axes = plt.subplots(nrows=1,ncols=len(num_cols_list),figsize=(15,5))
for i,col in enumerate(num_cols_list):
    sns.histplot(data=df_p,ax=axes[i],x=col,kde=True,hue='Loan_Status')
plt.tight_layout()
plt.show()

In [None]:
fig,axes =plt.subplots(nrows=1,ncols=len(num_cols_list),figsize=(15,5))
for i,col in enumerate(num_cols_list):
    sns.stripplot(df_p,x='Loan_Status',y=col,ax=axes[i])
plt.tight_layout()
plt.show()

In [None]:
#df_p.Loan_Status=df_p.Loan_Status.map(dict(Y=1,N=0))

In [None]:
fig,axes = plt.subplots(nrows=1,ncols=len(cat_cols_list)-1,figsize=(15,5))
for i,col in enumerate(cat_cols_list):
    if col not in ['Loan_Status','Loan_ID']:
        sns.countplot(df_p,ax=axes[i],x='Loan_Status',hue=col)
plt.tight_layout()
plt.show()

## 4. Data splitting

In [79]:
# Calculate the fraction of each class
class_fractions = df.groupBy("Loan_Status").count().withColumn(
    "fraction", col("count") / df.count()
).select("Loan_Status", "fraction").rdd.collectAsMap()

In [80]:
class_fractions

{'Y': 0.7112860892388452, 'N': 0.2887139107611549}

In [90]:
# Modify the fractions to get an 80-20 split
train_fractions = {k: v * 0.8 for k, v in class_fractions.items()}

# Create the training set (e.g., 80% of the data)
train_df = df.sampleBy("Loan_Status", fractions=train_fractions, seed=42)

# Create the test set (remaining data)
test_df = df.subtract(train_df)

In [83]:
# Verify the class balance
print("Original class distribution:")
df.groupBy("Loan_Status").count().show()

Original class distribution:
+-----------+-----+
|Loan_Status|count|
+-----------+-----+
|          Y|  271|
|          N|  110|
+-----------+-----+



In [91]:
 # Verify the class balance
print("Original class distribution:")
df.groupBy("Loan_Status").agg(
        (F.count("Loan_Status") / F.lit(df.count()) * 100).alias("percentage")
    ).orderBy("Loan_Status").show()

print("Train set class distribution:")
train_df.groupBy("Loan_Status").agg(
        (F.count("Loan_Status") / F.lit(df.count()) * 100).alias("percentage")
    ).orderBy("Loan_Status").show()

print("Test set class distribution:")
test_df.groupBy("Loan_Status").agg(
        (F.count("Loan_Status") / F.lit(df.count()) * 100).alias("percentage")
    ).orderBy("Loan_Status").show()

Original class distribution:
+-----------+------------------+
|Loan_Status|        percentage|
+-----------+------------------+
|          N|28.871391076115486|
|          Y| 71.12860892388451|
+-----------+------------------+

Train set class distribution:
+-----------+------------------+
|Loan_Status|        percentage|
+-----------+------------------+
|          N| 6.561679790026247|
|          Y|43.569553805774284|
+-----------+------------------+

Test set class distribution:
+-----------+------------------+
|Loan_Status|        percentage|
+-----------+------------------+
|          N| 22.30971128608924|
|          Y|27.559055118110237|
+-----------+------------------+



## 5. Data Preprocessing

The objective is to have a pipeline with the preprocessing and model building steps that can be used on any similar dataset

### Steps

1. Convert all numeric to double and categorical to string
2. Impute missing values - numerical with median and categorical with 'unknown'
3. String indexer - To make give numeric labels to categorical variables
4. One hot encoding - Create individual fields for each label of categorical column with more than two levels
5. Vector assembler to get an array of predictors

## Set up the preprocessing pipeline

In [8]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: double (nullable = true)
 |-- Loan_Amount_Term: double (nullable = true)
 |-- Credit_History: double (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



In [10]:
from pyspark.ml import Estimator, Transformer, Pipeline
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import HasInputCols, HasOutputCols
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import col, median, when,lit
from pyspark.sql.types import DoubleType, StringType
from pyspark.ml.feature import Imputer, OneHotEncoder, VectorAssembler, StringIndexer

In [11]:
import json

#### Custom Estimator and Transformer to change datatypes of specified columns into numeric and string datatypes


* Estimator: This is the main class that DataTypeConverter inherits from, providing the basic structure and functionality of an estimator in the PySpark ML pipeline.
* DefaultParamsReadable: This mixin class provides functionality for reading parameters from a saved model. It allows the DataTypeConverter to be loaded from disk after being saved as part of a pipeline.
* DefaultParamsWritable: This mixin class provides functionality for writing parameters when saving a model. It allows the DataTypeConverter to be saved to disk as part of a pipeline.
* Param: Defines parameters for ML components, specifying name, description, and type; ensures type safety and provides documentation. _setDefault() & Getters/Setters: Set default parameter values and provide controlled access, ensuring seamless integration with PySpark's ML Pipelines.


#### 1. DataTypeConverter
Convert all numeric to double and categorical to string

In [12]:
class DataTypeConverter(Estimator, DefaultParamsReadable, DefaultParamsWritable):
    
    numericCols = Param(Params._dummy(), "numericCols", "Columns to be converted to DoubleType",
                        typeConverter=TypeConverters.toListString)
    categoricalCols = Param(Params._dummy(), "categoricalCols", "Columns to be converted to StringType",
                            typeConverter=TypeConverters.toListString)

    def __init__(self, numericCols=None, categoricalCols=None):
        super(DataTypeConverter, self).__init__()
        self._setDefault(numericCols=[], categoricalCols=[])
        self.setNumericCols(numericCols)
        self.setCategoricalCols(categoricalCols)

    def setNumericCols(self, value):
        return self._set(numericCols=value)

    def getNumericCols(self):
        return self.getOrDefault(self.numericCols)

    def setCategoricalCols(self, value):
        return self._set(categoricalCols=value)

    def getCategoricalCols(self):
        return self.getOrDefault(self.categoricalCols)
        
    def _fit(self, dataset):
        return DataTypeConverterModel(self.getNumericCols(), self.getCategoricalCols())

class DataTypeConverterModel(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    numericCols = Param(Params._dummy(), "numericCols", "Columns to be converted to DoubleType",
                        typeConverter=TypeConverters.toListString)
    categoricalCols = Param(Params._dummy(), "categoricalCols", "Columns to be converted to StringType",
                            typeConverter=TypeConverters.toListString)
    
    def __init__(self, numericCols=None, categoricalCols=None):
        super(DataTypeConverterModel, self).__init__()
        self._setDefault(numericCols=[], categoricalCols=[])
        self.setNumericCols(numericCols)
        self.setCategoricalCols(categoricalCols)

    def setNumericCols(self, value):
        return self._set(numericCols=value)

    def getNumericCols(self):
        return self.getOrDefault(self.numericCols)

    def setCategoricalCols(self, value):
        return self._set(categoricalCols=value)

    def getCategoricalCols(self):
        return self.getOrDefault(self.categoricalCols)
        
    def _transform(self,dataset):
        for col_name in self.getNumericCols():
            dataset = dataset.withColumn(col_name,col(col_name).cast(DoubleType()))
        for col_name in self.getCategoricalCols():
            dataset = dataset.withColumn(col_name,col(col_name).cast(StringType()))
        return dataset

#### 2. CustomImputer
Impute missing values - numerical with median and categorical with 'unknown'

In [13]:
class CustomImputer(Estimator, HasInputCols, HasOutputCols,
                    DefaultParamsReadable, DefaultParamsWritable):
    
    inputCols = Param(Params._dummy(), "inputCols", "Columns to be checked and imputed",
                        typeConverter=TypeConverters.toListString)

        
    def __init__(self, inputCols=None):
        super(CustomImputer, self).__init__()
        self._setDefault(inputCols=[])
        self.setInputCols(inputCols)

    def setInputCols(self, value):
        return self._set(inputCols=value)

    def getInputCols(self):
        return self.getOrDefault(self.inputCols)
        
    def _fit(self, dataset):
        "get numeric cols from inputcols list and get the fill value as median aka numericaimputer"
        "get categorical cols from inputcols list and get the fill value aka categoricalimputer"
        numericCols = [field.name for field in dataset.schema.fields if isinstance(field.dataType, DoubleType)]
        categoricalCols = [field.name for field in dataset.schema.fields if isinstance(field.dataType, StringType)]
        # Initialize an empty dictionary to store the medians
        median_values = {}

        # Iterate through the numeric columns and calculate median
        for column in numericCols:
        # Calculate the median
            median_value = df.select(median(col(column))).collect()[0][0]
    
        # Store the median in the dictionary
            median_values[column] = median_value
        return CustomImputerModel(inputCols=self.getInputCols(), 
                                  numericCols=numericCols,
                                  categoricalCols=categoricalCols,
                                  numericFill=median_values,
                                 categoricalFill='unknown')

class CustomImputerModel(Transformer, HasInputCols, HasOutputCols):
    inputCols = Param(Params._dummy(),"inputCols","InputColumns to impute",
                     typeConverter= TypeConverters.toListString)
    numericCols = Param(Params._dummy(), "numericCols", "Numeric columns to impute",
                        typeConverter=TypeConverters.toListString)
    categoricalCols = Param(Params._dummy(), "categoricalCols", "Categorical columns to impute",
                            typeConverter=TypeConverters.toListString)
    numericFill = Param(Params._dummy(), "numericFill", "JSON string of numeric fill values")
    categoricalFill = Param(Params._dummy(), "categoricalFill", "Fill value for categorical columns",
                            typeConverter=TypeConverters.toString)
    
    def __init__(self, inputCols=None, numericCols=None, categoricalCols=None, numericFill=None, categoricalFill=None):
        super(CustomImputerModel, self).__init__()
        self._setDefault(inputCols=[],numericCols=[], categoricalCols=[], numericFill={}, categoricalFill="unknown")
        self.setInputCols(inputCols)
        self.setNumericCols(numericCols)
        self.setCategoricalCols(categoricalCols)
        self.setNumericFill(numericFill)
        self.setCategoricalFill(categoricalFill)

    def setInputCols(self,value):
        return self._set(inputCols=value)

    def setNumericCols(self, value):
        return self._set(numericCols=value)

    def getNumericCols(self):
        return self.getOrDefault(self.numericCols)

    def setCategoricalCols(self, value):
        return self._set(categoricalCols=value)

    def getCategoricalCols(self):
        return self.getOrDefault(self.categoricalCols)

    def setNumericFill(self, value):
        return self._set(numericFill=json.dumps(value))

    def getNumericFill(self):
        return json.loads(self.getOrDefault(self.numericFill))

    def setCategoricalFill(self, value):
        return self._set(categoricalFill=value)

    def getCategoricalFill(self):
        return self.getOrDefault(self.categoricalFill)

   
    def _transform(self, dataset):
        
        numericFill = self.getNumericFill()
        for column in self.getNumericCols():
            
            if column in self.getInputCols():
                dataset = dataset.withColumn(column, when(col(column).isNull(), lit(numericFill[column])).otherwise(col(column)))
        
        # Impute categorical columns
        for column in self.getCategoricalCols():
            if column in self.getInputCols():
                dataset = dataset.withColumn(column, when(col(column).isNull(), lit(self.getCategoricalFill())).otherwise(col(column)))
        
        return dataset

In [None]:
df.toPandas().head(2)

In [35]:
data_type_converter = DataTypeConverter(numericCols=['ApplicantIncome','CoapplicantIncome',
                                                    'LoanAmount','Loan_Amount_Term'], 
                                        categoricalCols=['Gender', 'Married','Dependents','Education','Self_Employed',
                                                       'Property_Area','Loan_Status' ])

In [14]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: double (nullable = true)
 |-- Loan_Amount_Term: double (nullable = true)
 |-- Credit_History: double (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



In [15]:
features = ['ApplicantIncome','CoapplicantIncome',
                                        'LoanAmount','Loan_Amount_Term', 
                                        'Gender', 'Married','Dependents','Education',
                                          'Self_Employed','Credit_History',
                                            'Property_Area' ]

In [16]:
num_cols = ['ApplicantIncome','CoapplicantIncome',
                                        'LoanAmount','Loan_Amount_Term']

In [33]:
cat_cols=['Gender', 'Married','Dependents','Education',
                                          'Self_Employed','Credit_History',
                                            'Property_Area','Loan_Status']

In [36]:
custom_imputer = CustomImputer(inputCols=['ApplicantIncome','CoapplicantIncome',
                                        'LoanAmount','Loan_Amount_Term', 
                                        'Gender', 'Married','Dependents','Education',
                                          'Self_Employed','Credit_History',
                                            'Property_Area' ])

#### 3. String indexer 
To make give numeric labels to categorical variables

In [37]:
string_indexer = StringIndexer(inputCols=cat_cols, outputCols=[f"{col}_indexed" for col in cat_cols])

#### 4. OneHotEncoder
Create individual fields for each label of categorical column with more than two levels

In [38]:
one_hot_encoder = OneHotEncoder(inputCols=[f"{col}_indexed" for col in cat_cols], 
                       outputCols=[f"{col}_encoded" for col in cat_cols])

#### 5. Vector assembler 
To get an array of predictors

In [39]:
vector_assembler = VectorAssembler(inputCols=[f"{col}_encoded" for col in cat_cols] + num_cols, outputCol="features")

In [40]:
pipeline=Pipeline(stages=[data_type_converter,custom_imputer,string_indexer,one_hot_encoder,vector_assembler])

In [41]:
preprocessed_pipeline=pipeline.fit(df)

In [45]:
df.toPandas().head(2)

Unnamed: 0,Loan_ID,Gender,Married,Dependents,Education,Self_Employed,ApplicantIncome,CoapplicantIncome,LoanAmount,Loan_Amount_Term,Credit_History,Property_Area,Loan_Status
0,LP001003,Male,Yes,1,Graduate,No,4583,1508.0,128.0,360.0,1.0,Rural,N
1,LP001005,Male,Yes,0,Graduate,Yes,3000,0.0,66.0,360.0,1.0,Urban,Y


In [42]:
preprocessed_df=preprocessed_pipeline.transform(df)

In [43]:
preprocessed_df.toPandas().head(2)

25/02/19 00:48:31 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,Loan_ID,Gender,Married,Dependents,Education,Self_Employed,ApplicantIncome,CoapplicantIncome,LoanAmount,Loan_Amount_Term,...,Loan_Status_indexed,Gender_encoded,Married_encoded,Dependents_encoded,Education_encoded,Self_Employed_encoded,Credit_History_encoded,Property_Area_encoded,Loan_Status_encoded,features
0,LP001003,Male,Yes,1,Graduate,No,4583.0,1508.0,128.0,360.0,...,1.0,"(1.0, 0.0)",(1.0),"(0.0, 0.0, 1.0, 0.0)",(1.0),"(1.0, 0.0)",(1.0),"(0.0, 0.0)",(0.0),"(1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, ..."
1,LP001005,Male,Yes,0,Graduate,Yes,3000.0,0.0,66.0,360.0,...,0.0,"(1.0, 0.0)",(1.0),"(1.0, 0.0, 0.0, 0.0)",(1.0),"(0.0, 1.0)",(1.0),"(0.0, 1.0)",(1.0),"[1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, ..."


In [44]:
preprocessed_df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: double (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: double (nullable = true)
 |-- Loan_Amount_Term: double (nullable = true)
 |-- Credit_History: double (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)
 |-- Gender_indexed: double (nullable = false)
 |-- Married_indexed: double (nullable = false)
 |-- Dependents_indexed: double (nullable = false)
 |-- Education_indexed: double (nullable = false)
 |-- Self_Employed_indexed: double (nullable = false)
 |-- Credit_History_indexed: double (nullable = false)
 |-- Property_Area_indexed: double (nullable = false)
 |-- Loan_Status_indexed: double (nullable = false)
 |-- Gender_encoded:

In [17]:
df.dtypes

[('Loan_ID', 'string'),
 ('Gender', 'string'),
 ('Married', 'string'),
 ('Dependents', 'string'),
 ('Education', 'string'),
 ('Self_Employed', 'string'),
 ('ApplicantIncome', 'int'),
 ('CoapplicantIncome', 'double'),
 ('LoanAmount', 'double'),
 ('Loan_Amount_Term', 'double'),
 ('Credit_History', 'double'),
 ('Property_Area', 'string'),
 ('Loan_Status', 'string')]

In [46]:
preprocessed_df.dtypes

[('Loan_ID', 'string'),
 ('Gender', 'string'),
 ('Married', 'string'),
 ('Dependents', 'string'),
 ('Education', 'string'),
 ('Self_Employed', 'string'),
 ('ApplicantIncome', 'double'),
 ('CoapplicantIncome', 'double'),
 ('LoanAmount', 'double'),
 ('Loan_Amount_Term', 'double'),
 ('Credit_History', 'double'),
 ('Property_Area', 'string'),
 ('Loan_Status', 'string'),
 ('Gender_indexed', 'double'),
 ('Married_indexed', 'double'),
 ('Dependents_indexed', 'double'),
 ('Education_indexed', 'double'),
 ('Self_Employed_indexed', 'double'),
 ('Credit_History_indexed', 'double'),
 ('Property_Area_indexed', 'double'),
 ('Loan_Status_indexed', 'double'),
 ('Gender_encoded', 'vector'),
 ('Married_encoded', 'vector'),
 ('Dependents_encoded', 'vector'),
 ('Education_encoded', 'vector'),
 ('Self_Employed_encoded', 'vector'),
 ('Credit_History_encoded', 'vector'),
 ('Property_Area_encoded', 'vector'),
 ('Loan_Status_encoded', 'vector'),
 ('features', 'vector')]

## 6. Build Model

### Fitting the model

In [45]:
from pyspark.ml.regression import GeneralizedLinearRegression

In [49]:
model = GeneralizedLinearRegression(
    featuresCol = 'features'
    , labelCol = 'Loan_Status_indexed'
    , predictionCol ='prediction'
    , family = 'binomial'
    , link = 'logit'
    , regParam = 0.1)

In [50]:
model_fitted = model.fit(preprocessed_df)

## 7. Model Summary

* Fitting a logistic regression model
* Predictors coefficients and pValues

In [51]:
model_summary = model_fitted.summary

In [72]:
model_summary

Coefficients:
             Feature Estimate Std Error  T Value P Value
         (Intercept)   3.4673    1.9686   1.7613  0.0782
 Gender_encoded_Male  -0.0941    0.6005  -0.1567  0.8755
Gender_encoded_Fe...  -0.0431    0.6090  -0.0707  0.9436
 Married_encoded_Yes  -0.1697    0.4638  -0.3658  0.7145
Dependents_encoded_0  -0.0882    0.5255  -0.1678  0.8667
Dependents_encoded_2  -0.0904    0.5880  -0.1537  0.8779
Dependents_encoded_1   0.0922    0.5794   0.1592  0.8735
Dependents_encode...   0.0266    0.6362   0.0418  0.9667
Education_encoded...  -0.1554    0.4628  -0.3357  0.7371
Self_Employed_enc...   0.0130    0.5734   0.0226  0.9820
Self_Employed_enc...   0.0812    0.6278   0.1294  0.8971
Credit_History_en...  -1.2176    0.5417  -2.2478  0.0246
Property_Area_enc...  -0.3665    0.4748  -0.7719  0.4402
Property_Area_enc...  -0.0444    0.4752  -0.0934  0.9256
Loan_Status_encod...  -5.3594    0.4340 -12.3499  0.0000
     ApplicantIncome   0.0000    0.0002   0.1544  0.8773
   CoapplicantInc

In [75]:
from IPython.display import display, HTML
display(HTML(model_summary.to_html()))

AttributeError: 'GeneralizedLinearRegressionTrainingSummary' object has no attribute 'to_html'

In [74]:
model_fitted.coefficients

DenseVector([-0.0941, -0.0431, -0.1697, -0.0882, -0.0904, 0.0922, 0.0266, -0.1554, 0.013, 0.0812, -1.2176, -0.3665, -0.0444, -5.3594, 0.0, 0.0, -0.0036, 0.0012])

In [55]:
df_transformed = model_fitted.transform(preprocessed_df)

In [68]:
df_transformed.select("features").schema[0].metadata.get('ml_attr').get('attrs')

{'numeric': [{'idx': 14, 'name': 'ApplicantIncome'},
  {'idx': 15, 'name': 'CoapplicantIncome'},
  {'idx': 16, 'name': 'LoanAmount'},
  {'idx': 17, 'name': 'Loan_Amount_Term'}],
 'binary': [{'idx': 0, 'name': 'Gender_encoded_Male'},
  {'idx': 1, 'name': 'Gender_encoded_Female'},
  {'idx': 2, 'name': 'Married_encoded_Yes'},
  {'idx': 3, 'name': 'Dependents_encoded_0'},
  {'idx': 4, 'name': 'Dependents_encoded_2'},
  {'idx': 5, 'name': 'Dependents_encoded_1'},
  {'idx': 6, 'name': 'Dependents_encoded_3+'},
  {'idx': 7, 'name': 'Education_encoded_Graduate'},
  {'idx': 8, 'name': 'Self_Employed_encoded_No'},
  {'idx': 9, 'name': 'Self_Employed_encoded_Yes'},
  {'idx': 10, 'name': 'Credit_History_encoded_1.0'},
  {'idx': 11, 'name': 'Property_Area_encoded_Semiurban'},
  {'idx': 12, 'name': 'Property_Area_encoded_Urban'},
  {'idx': 13, 'name': 'Loan_Status_encoded_Y'}]}

In [69]:
model_fitted.coefficients

DenseVector([-0.0941, -0.0431, -0.1697, -0.0882, -0.0904, 0.0922, 0.0266, -0.1554, 0.013, 0.0812, -1.2176, -0.3665, -0.0444, -5.3594, 0.0, 0.0, -0.0036, 0.0012])

In [64]:
numeric_metadata = df_transformed.select("features").schema[0].metadata.get('ml_attr').get('attrs').get('numeric')

In [65]:
numeric_metadata

[{'idx': 14, 'name': 'ApplicantIncome'},
 {'idx': 15, 'name': 'CoapplicantIncome'},
 {'idx': 16, 'name': 'LoanAmount'},
 {'idx': 17, 'name': 'Loan_Amount_Term'}]

In [71]:
model_summary.coefficients

AttributeError: 'GeneralizedLinearRegressionTrainingSummary' object has no attribute 'coefficients'

In [67]:
model_fitted.coefficients[14]

3.0414180257908366e-05

In [None]:
summary.select("coefficients").show(truncate=False)

## 8. Set up the final pipeline

## 9. Model prediction on unseen (test) data

## 10. 

## Rough

In [20]:
df.toPandas().isnull().sum()

Loan_ID               0
Gender                5
Married               0
Dependents            8
Education             0
Self_Employed        21
ApplicantIncome       0
CoapplicantIncome     0
LoanAmount            0
Loan_Amount_Term     11
Credit_History       30
Property_Area         0
Loan_Status           0
dtype: int64

In [38]:
preprocessed_df.toPandas().isnull().sum()

Loan_ID                   0
Gender                    0
Married                   0
Dependents                0
Education                 0
Self_Employed             0
ApplicantIncome           0
CoapplicantIncome         0
Loanamount                0
Loan_Amount_Term          0
Credit_History            0
Property_Area             0
Loan_Status               0
Gender_indexed            0
Married_indexed           0
Dependents_indexed        0
Education_indexed         0
Self_Employed_indexed     0
Credit_History_indexed    0
Property_Area_indexed     0
dtype: int64

In [None]:
numericCols = [field.name for field in df.schema.fields if isinstance(field.dataType, DoubleType)]
categoricalCols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]
        # Initialize an empty dictionary to store the medians

In [None]:
numericCols

In [None]:
categoricalCols

In [None]:
class CustomImputer(Estimator, HasInputCols, HasOutputCols,
                    DefaultParamsReadable, DefaultParamsWritable):
    
    inputCols = Param(Params._dummy(), "inputCols", "Columns to be checked and imputed",
                        typeConverter=TypeConverters.toListString)

        
    def __init__(self, inputCols=None):
        super(CustomImputer, self).__init__()
        self._setDefault(inputCols=[])
        self.setInputCols(inputCols)

    def setInputCols(self, value):
        return self._set(inputCols=value)

    def getInputCols(self):
        return self.getOrDefault(self.inputCols)
        
    def _fit(self, dataset):
        "get numeric cols from inputcols list and get the fill value as median aka numericaimputer"
        "get categorical cols from inputcols list and get the fill value aka categoricalimputer"
        numericCols = [field.name for field in dataset.schema.fields if isinstance(field.dataType, DoubleType)]
        categoricalCols = [field.name for field in dataset.schema.fields if isinstance(field.dataType, StringType)]
        print(numericCols)
        print(categoricalCols)
        # Initialize an empty dictionary to store the medians
        median_values = {}

        # Iterate through the numeric columns and calculate median
        for column in numericCols:
        # Calculate the median
            median_value = df.select(median(col(column))).collect()[0][0]
    
        # Store the median in the dictionary
            median_values[column] = median_value
        return CustomImputerModel(inputCols=self.getInputCols(), 
                                  numericCols=numericCols,
                                  categoricalCols=categoricalCols,
                                  numericFill=median_values,
                                 categoricalFill='unknown')

class CustomImputerModel(Transformer, HasInputCols, HasOutputCols):
    inputCols = Param(Params._dummy(),"inputCols","InputColumns to impute",
                     typeConverter= TypeConverters.toListString)
    numericCols = Param(Params._dummy(), "numericCols", "Numeric columns to impute",
                        typeConverter=TypeConverters.toListString)
    categoricalCols = Param(Params._dummy(), "categoricalCols", "Categorical columns to impute",
                            typeConverter=TypeConverters.toListString)
    numericFill = Param(Params._dummy(), "numericFill", "JSON string of numeric fill values")
    categoricalFill = Param(Params._dummy(), "categoricalFill", "Fill value for categorical columns",
                            typeConverter=TypeConverters.toString)
    
    def __init__(self, inputCols=None, numericCols=None, categoricalCols=None, numericFill=None, categoricalFill=None):
        super(CustomImputerModel, self).__init__()
        self._setDefault(inputCols=[],numericCols=[], categoricalCols=[], numericFill={}, categoricalFill="unknown")
        self.setInputCols(inputCols)
        self.setNumericCols(numericCols)
        self.setCategoricalCols(categoricalCols)
        self.setNumericFill(numericFill)
        self.setCategoricalFill(categoricalFill)

    def setInputCols(self,value):
        return self._set(inputCols=value)

    def setNumericCols(self, value):
        return self._set(numericCols=value)

    def getNumericCols(self):
        return self.getOrDefault(self.numericCols)

    def setCategoricalCols(self, value):
        return self._set(categoricalCols=value)

    def getCategoricalCols(self):
        return self.getOrDefault(self.categoricalCols)

    def setNumericFill(self, value):
        return self._set(numericFill=json.dumps(value))

    def getNumericFill(self):
        return json.loads(self.getOrDefault(self.numericFill))

    def setCategoricalFill(self, value):
        return self._set(categoricalFill=value)

    def getCategoricalFill(self):
        return self.getOrDefault(self.categoricalFill)

   
    def _transform(self, dataset):
        
        numericFill = self.getNumericFill()
        print(numericFill)
        print(self.getNumericCols())
        for column in self.getNumericCols():
            
            if column in self.getInputCols():
                print(column)
                dataset = dataset.withColumn(column, when(col(column).isNull(), lit(numericFill[column])).otherwise(col(column)))
        
        # Impute categorical columns
        for column in self.getCategoricalCols():
            if column in self.getInputCols():
                dataset = dataset.withColumn(column, when(col(column).isNull(), lit(self.getCategoricalFill())).otherwise(col(column)))
        
        return dataset

In [None]:
df.toPandas().isnull().sum()

In [None]:
custom_imputer = CustomImputer(inputCols=['ApplicantIncome','CoapplicantIncome',
                                        'LoanAmount','Loan_Amount_Term', 
                                        'Gender', 'Married','Dependents','Education',
                                          'Self_Employed','Credit_History',
                                            'Property_Area','Loan_Status' ])

In [None]:
pipeline=Pipeline(stages=[custom_imputer])

In [None]:
preprocessed_pipeline=pipeline.fit(df)

In [None]:
df.toPandas().head(2)

In [None]:
df.dtypes

In [None]:
transformed_df=preprocessed_pipeline.transform(df)

##Archive

In [None]:
transformed_df.toPandas().isnull().sum()

In [None]:
type(df)

In [None]:
df.columns

In [None]:
df.toPandas().head()

In [None]:
df.toPandas().head()

In [None]:
df.printSchema()

In [None]:
#imposing double to all numeric and string to all string fields
print(num_cols_list,'\n',cat_cols_list,'\n',oth_cols_list)

Let's impute missing values

In [None]:
for col in num_cols_list:
    df = df.withColumn(col, df[col].cast(DoubleType()))

In [None]:
df.printSchema()

In [None]:
for col in cat_cols_list:
    df = df.withColumn(col, df[col].cast(StringType()))

In [None]:
df.printSchema()

In [None]:
first, load the dataframe
check the fields, their datatypes and missing values
next, convert all numeric to double types
and categorical to string, even though it has numeric values (as in 1,2,3- but it's category)
next, impute missing values
next,do one hot encoding on categorical variables
                                                              

In [None]:
df.toPandas().head(2)

In [None]:
from pyspark.ml.feature import Imputer

In [None]:
Imputer(inputCols=["value"], outputCols=["value_imputed"]).setStrategy("median")

In [None]:
imputer = Imputer(inputCols=num_cols_list, outputCols=num_cols_list).setStrategy("median")

In [None]:
df_imputed = imputer.fit(df).transform(df)

In [None]:
df.toPandas().isnull().sum()

In [None]:
df_imputed.toPandas().isnull().sum()

In [None]:
Imputing categorical feaures, the imputer doesnt work, hecne creating custom transformer

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, Transformer, Estimator
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from pyspark.sql.functions import col, lit
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

In [None]:
from pyspark.ml import Estimator, Model, Transformer
from pyspark.ml.param.shared import HasInputCols, HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import col

class CustomImputerEstimator(Estimator, HasInputCols, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    fillValue = Param(Params._dummy(), "fillValue", "Value to replace missing values with")

    def __init__(self, inputCols=None, outputCols=None, fillValue="missing"):
        super(CustomImputerEstimator, self).__init__()
        self._setDefault(fillValue="missing")
        self.setParams(inputCols=inputCols, outputCols=outputCols, fillValue=fillValue)
    
    def setParams(self, inputCols=None, outputCols=None, fillValue="missing"):
        if inputCols is not None:
            self.setInputCols(inputCols)
        if outputCols is not None:
            self.setOutputCols(outputCols)
        if fillValue is not None:
            self.setFillValue(fillValue)
        return self
    
    def getFillValue(self):
        return self.getOrDefault(self.fillValue)

    def setFillValue(self, value):
        return self._set(fillValue=value)
    
    def setInputCols(self, value):
        return self._set(inputCols=value)
    
    def setOutputCols(self, value):
        return self._set(outputCols=value)
    
    def _fit(self, dataset):
        return CustomImputerModel(inputCols=self.getInputCols(), 
                                  outputCols=self.getOutputCols(), 
                                  fillValue=self.getFillValue())

class CustomImputerModel(Model, HasInputCols, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    fillValue = Param(Params._dummy(), "fillValue", "Value to replace missing values with")

    def __init__(self, inputCols=None, outputCols=None, fillValue="missing"):
        super(CustomImputerModel, self).__init__()
        self._setDefault(fillValue="missing")
        self.setParams(inputCols=inputCols, outputCols=outputCols, fillValue=fillValue)
    
    def setParams(self, inputCols=None, outputCols=None, fillValue="missing"):
        if inputCols is not None:
            self.setInputCols(inputCols)
        if outputCols is not None:
            self.setOutputCols(outputCols)
        if fillValue is not None:
            self.setFillValue(fillValue)
        return self
    
    def getFillValue(self):
        return self.getOrDefault(self.fillValue)

    def setFillValue(self, value):
        return self._set(fillValue=value)
    
    def setInputCols(self, value):
        return self._set(inputCols=value)
    
    def setOutputCols(self, value):
        return self._set(outputCols=value)
    
    def _transform(self, dataset):
        inputCols = self.getInputCols()
        outputCols = self.getOutputCols()
        fillValue = self.getFillValue()
        
        for inputCol, outputCol in zip(inputCols, outputCols):
            dataset = dataset.withColumn(outputCol, col(inputCol).cast("string")).na.fill({outputCol: fillValue})
        return dataset

# Usage in a pipeline
from pyspark.ml import Pipeline


# Create the custom imputer estimator
imputer = CustomImputerEstimator(inputCols=cat_cols_list, outputCols=cat_cols_list, fillValue="Unknown")

# Create a pipeline with the imputer
pipeline = Pipeline(stages=[imputer])

# Fit the pipeline on the training data
model = pipeline.fit(df_train)

# Save the pipeline model
model.save("path/to/save/pipeline_model")

# Later, load the model and transform the test data
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("path/to/save/pipeline_model")
imputed_test_df = loaded_model.transform(df_test)

# Show the result
imputed_test_df.show()

In [None]:
# Later, load the model and transform the test data
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("path/to/save/pipeline_model")

# Create a sample test DataFrame
data_test = [(5, None), (6, "b"), (7, None)]
df_test = spark.createDataFrame(data_test, ["id", "category"])

# Transform the test data
imputed_test_df = loaded_model.transform(df_test)

# Show the result
imputed_test_df.show()

In [None]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCols, HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import col

class CustomImputer(Transformer, HasInputCols, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    fillValue = Param(Params._dummy(), "fillValue", "Value to replace missing values with")

    def __init__(self, inputCols=None, outputCols=None, fillValue="missing"):
        super(CustomImputer, self).__init__()
        self._setDefault(fillValue="missing")
        self.setParams(inputCols=inputCols, outputCols=outputCols, fillValue=fillValue)
    
    def setParams(self, inputCols=None, outputCols=None, fillValue="missing"):
        if inputCols is not None:
            self.setInputCols(inputCols)
        if outputCols is not None:
            self.setOutputCols(outputCols)
        if fillValue is not None:
            self.setFillValue(fillValue)
        return self
    
    def getFillValue(self):
        return self.getOrDefault(self.fillValue)

    def setFillValue(self, value):
        return self._set(fillValue=value)
    
    def setInputCols(self, value):
        return self._set(inputCols=value)
    
    def setOutputCols(self, value):
        return self._set(outputCols=value)
    
    def _transform(self, dataset):
        inputCols = self.getInputCols()
        outputCols = self.getOutputCols()
        fillValue = self.getFillValue()
        
        for inputCol, outputCol in zip(inputCols, outputCols):
            dataset = dataset.withColumn(outputCol, col(inputCol).cast("string")).na.fill({outputCol: fillValue})
        return dataset


# Instantiate the CustomImputer
imputer_cat = CustomImputer(inputCols=cat_cols_list, outputCols=cat_cols_list, fillValue="Unknown")

# Apply the transformer
imputed_cat_df = imputer_cat.transform(df)

# Show the result
#imputed_cat_df.show()

In [None]:
imputed_cat_df.toPandas().isnull().sum()

In [None]:
df.toPandas().isnull().sum()

In [None]:
# Instantiate the CustomImputer
imputer_cat = CustomImputer(inputCol=cat_cols_list, outputCol=cat_cols_list, fillValue="Unknown")

# Apply the transformer
imputed_cat_df = imputer_cat.transform(df)

# Show the result
imputed_cat_df.show()

#### Archive

In [None]:
# Let's split the data into train, valid and test datatsets
train,valid,test=df.randomSplit(weights=[0.7,0.2,0.1],seed=12345)

In [None]:
print('Train dataset',train.count(),train.agg(F.sum('target')).collect()[0][0])
print('Valid dataset',valid.count(),valid.agg(F.sum('target')).collect()[0][0])
print('Test dataset',test.count(),test.agg(F.sum('target')).collect()[0][0])

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

In [None]:
from pyspark.ml.feature import StandardScaler,OneHotEncoder,StringIndexer, VectorAssembler, Imputer,ChiSqSelector