# Machine learning with Spark MLlib

  
Resources:
  - [Spark MLlib guide](https://spark.apache.org/docs/latest/ml-guide.html)
  - [Databricks MLlib guide](https://docs.databricks.com/spark/latest/mllib/index.html#)
  - [pyspark MLlib RDD API docs](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html)
  - [pyspark MLlib DataFrame API docs](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html)
  - [pyspark complete docs](https://spark.apache.org/docs/latest/api/python/)

## Part 1 : Machine learning with Spark MLlib using DataFrames

### loading data and explorion the dataset

In [0]:
# First we import necessary tools for our programm
import pyspark
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [0]:
# Loading the data set of credits and displaying it

credit_df = spark.read.load("/FileStore/tables/Credit_Card.csv", format="csv", sep=";", inferSchema= True, header=True)
display(credit_df)
print(type(credit_df))

ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,default payment next month
1,20000,2,2,1,24,2,2,-1,-1,-2,-2,3913,3102,689,0,0,0,0,689,0,0,0,0,1
2,120000,2,2,2,26,-1,2,0,0,0,2,2682,1725,2682,3272,3455,3261,0,1000,1000,1000,0,2000,1
3,90000,2,2,2,34,0,0,0,0,0,0,29239,14027,13559,14331,14948,15549,1518,1500,1000,1000,1000,5000,0
4,50000,2,2,1,37,0,0,0,0,0,0,46990,48233,49291,28314,28959,29547,2000,2019,1200,1100,1069,1000,0
5,50000,1,2,1,57,-1,0,-1,0,0,0,8617,5670,35835,20940,19146,19131,2000,36681,10000,9000,689,679,0
6,50000,1,1,2,37,0,0,0,0,0,0,64400,57069,57608,19394,19619,20024,2500,1815,657,1000,1000,800,0
7,500000,1,1,2,29,0,0,0,0,0,0,367965,412023,445007,542653,483003,473944,55000,40000,38000,20239,13750,13770,0
8,100000,2,2,2,23,0,-1,-1,0,0,-1,11876,380,601,221,-159,567,380,601,0,581,1687,1542,0
9,140000,2,3,1,28,0,0,2,0,0,0,11285,14096,12108,12211,11793,3719,3329,0,432,1000,1000,1000,0
10,20000,1,3,2,35,-2,-2,-2,-2,-1,-1,0,0,0,0,13007,13912,0,0,0,13007,1122,0,0


In [0]:
credit_df.printSchema()

In [0]:
# Explore data length and number of variables (columns)
print(" number of dataset lines : ", credit_df.count(), "number of dataset columns : ", len(credit_df.columns) )       
print(" number of variables : ", len(credit_df.columns)-1)     # 1 Column of ID   

In [0]:
# we change variable's name to a simplier one
credit_df = credit_df.withColumnRenamed("default payment next month", "DEFAULT_Y")
print("Column names:", credit_df.schema.names)

In [0]:
# we explore number of different valuescategories in each variable
credit_df.agg(*(countDistinct(col(c)).alias(c) for c in credit_df.columns[0:14])).show()   # the split is for clearer presentation
credit_df.agg(*(countDistinct(col(c)).alias(c) for c in credit_df.columns[14:25])).show()


In [0]:
# Exploring Null values
from pyspark.sql.functions import isnan, when, count, col
credit_df.select([count(when(isnan(c), c)).alias(c) for c in credit_df.columns]).show()

In [0]:
# checking on variables distinct values
credit_df.select("EDUCATION").distinct().sort("EDUCATION").show() ; credit_df.select("SEX",).distinct().show() ; 
credit_df.select("MARRIAGE",).distinct().show()

In [0]:
# cheking on variables distinct values
credit_df.select("PAY_0").distinct().sort("PAY_0").show()
credit_df.select("PAY_2").distinct().sort("PAY_2").show()
credit_df.select("PAY_5").distinct().sort("PAY_5").show()

In [0]:
#affichage
#print(credit_df.select("ID", "LIMIT_BAL", "SEX", "EDUCATION", "MARRIAGE", "AGE", "PAY_0", "PAY_0", 
 #                 "BILL_AMT1", "BILL_AMT2", "DEFAULT_Y").show(5))

In [0]:
# Exploring number of differet modalities of target varibale "Default_Y"

print("number of peope with default (Default_Y == 1)):", credit_df.filter(credit_df.DEFAULT_Y == 1).count())
print("number of peope without default(Default_Y == 0)):", credit_df.filter(credit_df.DEFAULT_Y == 0).count())
print((credit_df.filter(credit_df.DEFAULT_Y == 1).count()/credit_df.filter(credit_df.DEFAULT_Y == 0).count())*100, " percentage of people with default in the total of clients")

In [0]:
# Exploring profile of clients with default by SEX
credit_df.select("sex", "Default_Y").groupBy("Default_Y", 'sex').count().show()

#credit_df.select("sex", "Default_Y")\
#.filter(((credit_df.filter(credit_df.DEFAULT_Y == 1).count() & credit_df.filter(credit_df.SEX == 1).count())/credit_df.filter(credit_df.DEFAULT_Y == 1).count())*100).show()

### Training models and testing

In [0]:
# Drop ID column as we don't need it
credit_df = credit_df.drop("ID")

# Split data into training and test sample
splits = credit_df.randomSplit([0.7, 0.3], seed = 20)   
data_train = splits[0]
data_test = splits[1]


In [0]:
print(data_train.show(3))
print("Number of observations in data_train : " , data_train.count())
print("Number of observations in data_test : " , data_test.count())

In [0]:
# Get and convert categorical features (SEX, EDUCATION, MARRIAGE)
categorical_features = credit_df.schema.names[1:4]
catVect = VectorAssembler(inputCols = categorical_features, outputCol = "catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")

# Get and normalize numerical features
numerical_features = credit_df.schema.names[0:1] + credit_df.schema.names[4:]
numVect = VectorAssembler(inputCols = numerical_features, outputCol = "numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol = "normFeatures")

In [0]:
# Creating pipeline
 
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol = "features")
pipeline = Pipeline(stages = [catVect, catIdx, numVect, minMax, featVect])
pipeline_object = pipeline.fit(data_train)

# Run training and test data through the pipeline
data_train = pipeline_object.transform(data_train).select("features", col("DEFAULT_Y").alias("target"))
data_test = pipeline_object.transform(data_test).select("features", col("DEFAULT_Y").alias("target"))

In [0]:
print(data_train.show(5))
print("Number of observations in data_train : " , data_train.count())
print("Number of observations in data_test : " , data_test.count())

## Train and evaluate classification models
### Define evaluation metrics functions

In [0]:
accuracy = MulticlassClassificationEvaluator(labelCol = "target", predictionCol = "prediction", metricName = "accuracy")
precision = MulticlassClassificationEvaluator(labelCol = "target", predictionCol = "prediction", metricName = "weightedPrecision")
recall = MulticlassClassificationEvaluator(labelCol = "target", predictionCol = "prediction", metricName = "weightedRecall")

### Logistic regression

In [0]:
logit = LogisticRegression(labelCol = "target", featuresCol = "features", maxIter = 20, regParam = 0.2)
logit_model = logit.fit(data_train)
logit_predictions_df = logit_model.transform(data_test)

print("Accuracy: {:.4}".format(accuracy.evaluate(logit_predictions_df)))
print("Weighted precision: {:.4}".format(precision.evaluate(logit_predictions_df)))
print(" Weighted recall: {:.4}".format(recall.evaluate(logit_predictions_df)))

### Random Forest

In [0]:
Rf = RandomForestClassifier(labelCol = "target", featuresCol = "features", maxDepth = 3, maxBins = 32, 
                            minInstancesPerNode = 1, minInfoGain=0.0, impurity = "gini", numTrees = 10, seed = 42) 
Rf_model = Rf.fit(data_train)
Rf_predictions_df = Rf_model.transform(data_test)

print("Accuracy: {:.4}".format(accuracy.evaluate(Rf_predictions_df)))
print("Weighted precision: {:.4}".format(precision.evaluate(Rf_predictions_df)))
print("Weighted recall: {:.4}".format(recall.evaluate(Rf_predictions_df)))

### Decision tree

In [0]:
tree = DecisionTreeClassifier(labelCol = "target", featuresCol = "features", maxDepth = 2, maxBins = 32, 
                              minInstancesPerNode = 1, minInfoGain = 0.0, impurity = "gini", seed = 42)
tree_model = tree.fit(data_train)
tree_predictions_df = tree_model.transform(data_test)

print("Accuracy: {:.4}".format(accuracy.evaluate(tree_predictions_df)))
print("Weighted precision: {:.4}".format(precision.evaluate(tree_predictions_df)))
print("Weighted recall: {:.4}".format(recall.evaluate(tree_predictions_df)))

In [0]:
tp = int(Rf_predictions_df.filter("prediction == 1.0 AND target == 1").count())
fp = int(Rf_predictions_df.filter("prediction == 1.0 AND target == 0").count())
tn = int(Rf_predictions_df.filter("prediction == 0.0 AND target == 0").count())
fn = int(Rf_predictions_df.filter("prediction == 0.0 AND target == 1").count())

print("true positives:", tp)
print("false positives:", fp)
print("true negatives:", tn)
print("false negatives:", fn)

print("Accuracy: {:.4}".format((tp+tn)/(tp+fp+tn+fn)))
print("Precision: {:.4}".format((tp)/(tp+fp)))
print("Recall: {:.4}".format((tp)/(tp+fn)))

## Part 2 :  Machine learning with Spark MLlib using RDDs

In [0]:
# re loading Data
credit_df2 = spark.read.load("/FileStore/tables/Credit_Card.csv", format="csv", sep=";", inferSchema= True, header=True)
#display(credit_df)
credit_df2.take(2)

In [0]:
# read as RDD
"""
credit_rdd = credit_df.rdd #session.read.csv("/FileStore/tables/Credit_Card.csv", header=True).rdd
print(type(credit_rdd))
print(credit_rdd)
credit_df.take(5)"""

### create an RDD of vectors

In [0]:
#Transform our RDD of rows into an RDD of vectors
from pyspark.mllib.linalg import Vectors

credit_vectors = credit_df2.rdd.map(lambda row : Vectors.dense([item for item in row]))
print(type(credit_vectors))
credit_vectors.take(5)

#### Draw some statistics on the RDD[vectors]

In [0]:
import numpy as np
from pyspark.mllib.stat import Statistics

In [0]:
# calculate summary statistics
summary = Statistics.colStats(credit_vectors)
print(summary.mean())  
print(summary.variance()) 

In [0]:
# Compute column summary statistics.
print(summary.numNonzeros()) 
print(summary.count())  # number of nonzeros in each column

In [0]:
# We compute Pearson correlation 
print(Statistics.corr(credit_vectors, method="pearson"))

In [0]:
# Split data into Train and Test
(trainingData, testData) = credit_vectors.randomSplit([0.7, 0.3])
print(type(trainingData))

In [0]:
summary = Statistics.colStats(trainingData)
print(summary.count())  # number of nonzeros in each column
print(testData.count())
print(trainingData.count()+testData.count())

#### remark : 
we saw that in order to run ML models, data should have LabelPoint type, otherwise models won't run.
So we change the type of our data from an RDD of vectors to LabelPoint

In [0]:
# LabeledPoint data trasnformation
from pyspark.mllib.regression import LabeledPoint
data_rdd = credit_df2.rdd.map(tuple)

Label_data1 = data_rdd.map(lambda line: LabeledPoint(line[-1],[line[1:-1]]))
Label_data1.take(5)

In [0]:
# Split data into Train and Test
(trainingL, testL) = Label_data1.randomSplit([0.7, 0.3])


In [0]:
#Little verification
print(trainingL.count(), testL.count(),trainingL.count()+testL.count())

### DecisionTree

In [0]:
# Training DecisionTree model
from pyspark.mllib.tree import DecisionTree
tree_model= DecisionTree.trainClassifier(trainingL, 
                                     numClasses=2, 
                                     categoricalFeaturesInfo={})
tree_predictions =tree_model.predict(testL.map(lambda x: x.features))

In [0]:
# Evaluate our model
labelsAndPredictions_tree = testL.map(lambda lp: lp.label).zip(tree_predictions)
testMSE = labelsAndPredictions_tree.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /float(testL.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression Decision Tree :')
print(tree_model.toDebugString())

#### Random Forest

In [0]:
#Training the  RandomForest model
from pyspark.mllib.tree import RandomForest, RandomForestModel
Rf_model = RandomForest.trainRegressor(trainingL, categoricalFeaturesInfo={},
                                        numTrees=5)
Rf_predictions = Rf_model.predict(testL.map(lambda x: x.features))

In [0]:
# Evaluating our model

Rf_labelsAndPredictions = testL.map(lambda lp: lp.label).zip(Rf_predictions)
testMSE = Rf_labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /float(testL.count())
print('Test Mean Squared Error  RandomForestModel = ' + str(testMSE))
print('Learned regression gradient boosting:')
print(Rf_model.toDebugString())

### Gradient Boosting

In [0]:
# Training our Gradient Boost model

from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
gbtModel = GradientBoostedTrees.trainClassifier(trainingL, 
                                     categoricalFeaturesInfo={})
gbt_predictions =gbtModel .predict(testL.map(lambda x: x.features))
labelsAndPredictions_gbt = testL.map(lambda lp: lp.label).zip(gbt_predictions)


In [0]:
# Evaluating our model

testMSE = labelsAndPredictions_gbt.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /float(testL.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression gradient boosting:')
print(gbtModel.toDebugString())

Whithing models presented above, we see that the model giving the best  RMSE score is RandomForest, with a RMSE = 0.1348481479422529.