<a href="https://colab.research.google.com/github/marcelolandivar/Python_Projects/blob/master/PySpark-ML-Credit_Approval_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Machine Learning and PySpark
**Credit Approval Prediction using PySpark** \
###By: Marcelo Landivar
---

>**Email:** <MarceloLandivar24@gmail.com>\
> **RESOURCES:**  PySpark


Austrlian Credit Approval Dataset: https://archive.ics.uci.edu/ml/datasets/Statlog+%28Australian+Credit+Approval%29

There is sensitive data inside of the dataset; therefore, there is not detailed information about the columns mean. 

## Loading the data

In [1]:
!wget 'https://archive.ics.uci.edu/ml/machine-learning-databases/statlog/australian/australian.dat'

--2020-08-27 08:17:16--  https://archive.ics.uci.edu/ml/machine-learning-databases/statlog/australian/australian.dat
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 28735 (28K) [application/x-httpd-php]
Saving to: ‘australian.dat’


2020-08-27 08:17:17 (430 KB/s) - ‘australian.dat’ saved [28735/28735]



**PySpark installation**

*If running in Colab do not follow this step and jump to pip install

In [None]:
# Java installation
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Apache Spark download
import os 
os.system("wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz")
os.system("tar xf /spark-2.4.5-bin-hadoop2.7.tgz")

In [2]:
# Direct Pyspark installation for Colab
!pip install -q pyspark

## Creating a Local Spark Session and Reading the Data

In [3]:
import pyspark 
from pyspark import SparkConf
from pyspark import SparkContext

config = SparkConf().setMaster('local').setAppName('my-spark')
sc = SparkContext(conf=config)

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark import SparkFiles
from pyspark.sql import SQLContext

sqlsc = SQLContext(sc)
df_australian = sqlsc.read.format('com.databricks.spark.csv').options(header='false', inferschema='true', delimiter=' ').load('australian.dat')

In [None]:
df_australian.describe().toPandas().transpose()
#c0, c3, c7, c8, c10, c11 are cateogrical for sure. Probably c4 and c5.
#c9, c12 and c13 have too much variability (very noticeable outliers) 
#or there is missing data set as 0 or 1

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
_c0,690,0.6782608695652174,0.4674823918720558,0,1
_c1,690,31.56820289855064,11.853272772971627,13.75,80.25
_c2,690,4.758724637681158,4.978163248528542,0.0,28.0
_c3,690,1.7666666666666666,0.4300628322361985,1,3
_c4,690,7.372463768115942,3.6832647874312814,1,14
_c5,690,4.6927536231884055,1.9923160695338962,1,9
_c6,690,2.2234057971014476,3.3465133592781333,0.0,28.5
_c7,690,0.5231884057971015,0.4998243312700278,0,1
_c8,690,0.427536231884058,0.4950800196191814,0,1


In [None]:
df = df_australian.toPandas()
df.iloc[:,12:14].head(15)

Unnamed: 0,_c12,_c13
0,100,1213
1,160,1
2,280,1
3,0,1
4,60,159
5,100,1
6,60,101
7,43,561
8,176,538
9,100,51


In [None]:
df_australian.toPandas().shape

(690, 15)

In [None]:
# Checking the data type
df_australian.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 690 entries, 0 to 689
Data columns (total 15 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   _c0     690 non-null    int32  
 1   _c1     690 non-null    float64
 2   _c2     690 non-null    float64
 3   _c3     690 non-null    int32  
 4   _c4     690 non-null    int32  
 5   _c5     690 non-null    int32  
 6   _c6     690 non-null    float64
 7   _c7     690 non-null    int32  
 8   _c8     690 non-null    int32  
 9   _c9     690 non-null    int32  
 10  _c10    690 non-null    int32  
 11  _c11    690 non-null    int32  
 12  _c12    690 non-null    int32  
 13  _c13    690 non-null    int32  
 14  _c14    690 non-null    int32  
dtypes: float64(3), int32(12)
memory usage: 48.6 KB


In [None]:
# Checking null values
df_australian.toPandas().isna().sum()

_c0     0
_c1     0
_c2     0
_c3     0
_c4     0
_c5     0
_c6     0
_c7     0
_c8     0
_c9     0
_c10    0
_c11    0
_c12    0
_c13    0
_c14    0
dtype: int64

In [None]:
# Checking how balance is the data
print('Target 0: '+str((len(df_australian.toPandas()[df_australian.toPandas()._c14==0])/len(df_australian.toPandas()))*100)+'%')
print('Target 1: '+str((len(df_australian.toPandas()[df_australian.toPandas()._c14==1])/len(df_australian.toPandas()))*100)+'%')

Target 0: 55.507246376811594%
Target 1: 44.492753623188406%


## Enriching the data

Creation of some new features based on the variables that are available.

In [None]:
for i in df_australian.columns:
    if i != '_c14':
        print('Corrleation of each feature with the target', i,df_australian.stat.corr(str(i), '_c14'))
    elif i=='_c14':
        break

Corrleation of each feature with the target _c0 -0.013897069428285015
Corrleation of each feature with the target _c1 0.16162586606830812
Corrleation of each feature with the target _c2 0.2062937386450388
Corrleation of each feature with the target _c3 0.19430606561906658
Corrleation of each feature with the target _c4 0.37371161291900007
Corrleation of each feature with the target _c5 0.24656748770400883
Corrleation of each feature with the target _c6 0.3224753582553842
Corrleation of each feature with the target _c7 0.7204068158989543
Corrleation of each feature with the target _c8 0.45830133160794323
Corrleation of each feature with the target _c9 0.4064100087639557
Corrleation of each feature with the target _c10 0.03162481448371768
Corrleation of each feature with the target _c11 0.11526093232909745
Corrleation of each feature with the target _c12 -0.09997240789126369
Corrleation of each feature with the target _c13 0.1756572009935053


In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
#udf are functions applied to a whole column
unlist = udf(lambda x: round(float(list(x)[0]),4), DoubleType())

for i in df_australian.columns:
    
    if i != '_c14':
    
        assembler = VectorAssembler(inputCols = [i], outputCol=i+"_Vect")
    
        scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
    
        pipeline = Pipeline(stages=[assembler,scaler])
    
        df_australian = pipeline.fit(df_australian).transform(df_australian).withColumn(i+"_Scaled", unlist(i+'_Scaled')).drop(i+'_Vect')




In [None]:
df_australian = df_australian.drop('_c0_Scaled', '_c3_Scaled','_c7_Scaled','_c8_Scaled', '_c10_Scaled', '_c11_Scaled')

In [None]:
df_australian.toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c13,_c14,_c1_Scaled,_c2_Scaled,_c4_Scaled,_c5_Scaled,_c6_Scaled,_c9_Scaled,_c12_Scaled,_c13_Scaled
0,1,22.08,11.460,2,4,4,1.585,0,0,0,...,1213,0,0.1253,0.4093,0.2308,0.375,0.0556,0.0000,0.05,0.0121
1,0,22.67,7.000,2,8,4,0.165,0,0,0,...,1,0,0.1341,0.2500,0.5385,0.375,0.0058,0.0000,0.08,0.0000
2,0,29.58,1.750,1,4,4,1.250,0,0,0,...,1,0,0.2380,0.0625,0.2308,0.375,0.0439,0.0000,0.14,0.0000
3,0,21.67,11.500,1,5,3,0.000,1,1,11,...,1,1,0.1191,0.4107,0.3077,0.250,0.0000,0.1642,0.00,0.0000
4,1,20.17,8.170,2,6,4,1.960,1,1,14,...,159,1,0.0965,0.2918,0.3846,0.375,0.0688,0.2090,0.03,0.0016
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
685,1,31.57,10.500,2,14,4,6.500,1,0,0,...,1,1,0.2680,0.3750,1.0000,0.375,0.2281,0.0000,0.00,0.0000
686,1,20.67,0.415,2,8,4,0.125,0,0,0,...,45,0,0.1041,0.0148,0.5385,0.375,0.0044,0.0000,0.00,0.0004
687,0,18.83,9.540,2,6,4,0.085,1,0,0,...,1,1,0.0764,0.3407,0.3846,0.375,0.0030,0.0000,0.05,0.0000
688,0,27.42,14.500,2,14,8,3.085,1,1,1,...,12,1,0.2056,0.5179,1.0000,0.875,0.1082,0.0149,0.06,0.0001


In [None]:
type(df_australian)

pyspark.sql.dataframe.DataFrame

There are nonen NULL or NAN values inside of the df. Therefore, I can't tell if there is missing data. If some values are not supposed to be 0, then it is something I can't know for sure because I do not know the meaning of each feature. Depending on the content of the data, it is possible to drop rows with too much missing data, use mean if does not affect the calculation (so depends on the feature), set the value to 0 (depeding on the feature and how it will affect the mean) or generate synthetic data.

## Preprocessing the data

Create a pipeline that converts the data available into the needed format (dummy variables, no strings, and vectorized format).

In [None]:
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import PCA

#Australian_pd = df_australian.toPandas()

#Encoder is used to transform the categorical variables
Encoder = OneHotEncoderEstimator(inputCols=['_c0', '_c3', '_c7', '_c8', '_c10', '_c11'], 
                                   outputCols=['_c0_encoded', '_c3_encoded', '_c7_encoded', '_c8_encoded', '_c10_encoded', '_c11_encoded'])

#Transform the variables into one Vector to pass to the ML algorithm
Assembler = VectorAssembler(inputCols=['_c0_encoded', '_c1_Scaled', '_c2_Scaled', '_c3_encoded', '_c4_Scaled', '_c5_Scaled', '_c6_Scaled','_c7_encoded', '_c8_encoded', '_c9_Scaled', '_c10_encoded', '_c11_encoded', '_c12_Scaled', '_c13_Scaled'], 
                                   outputCol='features')

#Perform a PCA and fit the data into two dimensions to simplify the model and improve the performance of the algorithm
pca = PCA(k=2, inputCol='features', outputCol='pca_features')

Pipe = Pipeline(stages= [Encoder, Assembler, pca])

model = Pipe.fit(df_australian)

processed_data = model.transform(df_australian)



DataFrame[pca_features: vector, _c14: int]

In [None]:
data_for_model= processed_data.select('pca_features', '_c14')

## Modelling

Dividing the data into train, test and validation sets. It will be used to test the data at the end and make sure the model is meaningful.

In [None]:
train = data_for_model.limit(500)
validation = data_for_model.subtract(train)

In [None]:
validation.count(), train.count()

(190, 500)

ML Algorithms for classification: (https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.classification)

In [None]:
split = train.randomSplit([0.8,0.2])
train_df = split[0]
test_df = split[1]

In [None]:
test_df

DataFrame[pca_features: vector, _c14: int]

In [None]:
from pyspark.ml.classification import LogisticRegression

logit = LogisticRegression(maxIter=10, featuresCol='pca_features', labelCol='_c14', 
                        elasticNetParam=0.8, family='multinomial')
logit_model = logit.fit(train_df)
train_summary = logit_model.summary
train_summary.accuracy
#metrics = BinaryClassificationMetrics(prediction)


0.8275

In [None]:
# Evaluating the Logistic Regression Model
from pyspark.ml.evaluation import BinaryClassificationEvaluator

logit_predictions = logit_model.transform(test_df)
logit_predictions.select('prediction', '_c14', 'pca_features').show(5)
logit_evaluator = BinaryClassificationEvaluator(labelCol='_c14')
print('Test Area under ROC', logit_evaluator.evaluate(logit_predictions))

+----------+----+--------------------+
|prediction|_c14|        pca_features|
+----------+----+--------------------+
|       1.0|   1|[-0.8222473451480...|
|       1.0|   1|[-0.8091206541156...|
|       1.0|   1|[-0.8009544167703...|
|       1.0|   1|[-0.7973594339431...|
|       1.0|   0|[-0.7966798643240...|
+----------+----+--------------------+
only showing top 5 rows

Test Area under ROC 0.8460606060606057


In [None]:
# Testing a Decision Tree Classifier and compare to the Logistic Regression Model
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dt = DecisionTreeClassifier(featuresCol = 'pca_features', labelCol = '_c14',
                            impurity='entropy', maxDepth=30, maxBins=20)
dt_model = dt.fit(train_df)
predictions = dt_model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(
    labelCol="_c14", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(str(accuracy*100)+'%')

79.0%


In [None]:
logit_2 = LogisticRegression(maxIter=50, featuresCol='pca_features', labelCol='_c14',
                             regParam=0.7, family='multinomial')
logit_model_2 = logit_2.fit(train_df)
train_summary_2 = logit_model_2.summary
print(str(train_summary_2.accuracy*100)+'%')

87.0%


## Testing the Model

Using the validation set, a confusion matrix is displayed with the overall accuracy (and other metrics) of the model.

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

logit_predictions_2 = logit_model.transform(test_df)
preds_and_labels = logit_predictions_2.select('prediction', '_c14')
preds_and_labels = preds_and_labels.withColumn("_c14", preds_and_labels["_c14"].cast('float'))
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)
print("Accuracy = %s" % metrics.accuracy)



Summary Stats
Precision = 0.77
Recall = 0.77
F1 Score = 0.77
Accuracy = 0.77


In [None]:
print(metrics.confusionMatrix().toArray()) 
# TP FP
# FN TN

[[45. 10.]
 [13. 32.]]
