# 4. Machine learning with Spark MLlib
RDD:

  1. Reading the file into an RDD
  2. Converting into MLlib matrix or RDD of vectors
  3. Applying some statistics with the  MLlib's RDD API
  4. Learning a classification or regression model with the RDD API
  5. Applying the model to the test data and computing the errors

MLIB:

  1. Reading the file into a DataFrame:
  2. Running some aggregations and explorations using DataFrame functions (not MLlib!) 
  3. Creating a pipeline with **at least** one feature extraction/manipulation and one model estimator
  4. Fitting the pipeline to the training data
  5. Applying the model to the test data and computing the errors
  

Resources:
  - [Spark MLlib guide](https://spark.apache.org/docs/latest/ml-guide.html)
  - [Databricks MLlib guide](https://docs.databricks.com/spark/latest/mllib/index.html#)
  - [pyspark MLlib RDD API docs](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html)
  - [pyspark MLlib DataFrame API docs](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html)
  - [pyspark complete docs](https://spark.apache.org/docs/latest/api/python/)
  - [Course Homepage](https://danielpes.github.io/SparkCourse/)
  


# DataFrame Part

### Upload of data:

The data is available here: http://archive.ics.uci.edu/ml/datasets/Bank+Marketing

The data is related with direct marketing campaigns of a Portuguese banking institution. 
The marketing campaigns were based on phone calls. 
Often, more than one contact to the same client was required, in order to access if the product (bank term deposit) would be ('yes') or not ('no') subscribed.

The **aim** is to construct a model able to predict if the product is subscribed or no.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Project').getOrCreate()
data_bank = spark.read.format('com.databricks.spark.csv').\
                       options(header='true', \
                       inferschema='true').\
load("dbfs:/FileStore/shared_uploads/el-jamiy05@hotmail.fr/bank_additional.csv",sep=";",encoding="utf-8",header=True,inferSchema = True);
print(type(data_bank))

### Data display:

In [None]:
display(data_bank) ## Or data_bank.show()

age,job,marital,education,default,housing,loan,contact,month,day_of_week,duration,campaign,pdays,previous,poutcome,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,y
30,blue-collar,married,basic.9y,no,yes,no,cellular,may,fri,487,2,999,0,nonexistent,-1.8,92.893,-46.2,1.313,5099.1,no
39,services,single,high.school,no,no,no,telephone,may,fri,346,4,999,0,nonexistent,1.1,93.994,-36.4,4.855,5191.0,no
25,services,married,high.school,no,yes,no,telephone,jun,wed,227,1,999,0,nonexistent,1.4,94.465,-41.8,4.962,5228.1,no
38,services,married,basic.9y,no,unknown,unknown,telephone,jun,fri,17,3,999,0,nonexistent,1.4,94.465,-41.8,4.959,5228.1,no
47,admin.,married,university.degree,no,yes,no,cellular,nov,mon,58,1,999,0,nonexistent,-0.1,93.2,-42.0,4.191,5195.8,no
32,services,single,university.degree,no,no,no,cellular,sep,thu,128,3,999,2,failure,-1.1,94.199,-37.5,0.884,4963.6,no
32,admin.,single,university.degree,no,yes,no,cellular,sep,mon,290,4,999,0,nonexistent,-1.1,94.199,-37.5,0.879,4963.6,no
41,entrepreneur,married,university.degree,unknown,yes,no,cellular,nov,mon,44,2,999,0,nonexistent,-0.1,93.2,-42.0,4.191,5195.8,no
31,services,divorced,professional.course,no,no,no,cellular,nov,tue,68,1,999,1,failure,-0.1,93.2,-42.0,4.153,5195.8,no
35,blue-collar,married,basic.9y,unknown,no,no,telephone,may,thu,170,1,999,0,nonexistent,1.1,93.994,-36.4,4.855,5191.0,no


In [None]:
data_bank.printSchema()   # check the data set -- Printing schema in tree format

In [None]:
#  number of rows
print(data_bank.count())                # 4119
#  number of columns
len(data_bank.columns)          #21

###  Some aggregations and explorations using DataFrame functions:

In [None]:
from pyspark.sql import functions as fn
##Min,Max and avg by marital
Data_aggre_age = data_bank.groupBy("marital").agg(fn.min("age") , fn.max("age"), fn.avg("age"))
Data_aggre_age.show()

In [None]:
# Min,Max and avg by the target "y"
Data_aggre_default_age = data_bank.groupBy("y").agg(fn.min("age") , fn.max("age"), fn.avg("age"))
Data_aggre_default_age.show()

### Encodage of variables:

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder


Colums = ['job','marital','education','default','housing','loan','contact','month','day_of_week','poutcome','y']

# The index of string vlaues multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in Colums
]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="rawFeatures")


##Creating a pipeline with at least one feature extraction/manipulation and one model estimator


pipeline = Pipeline(stages=indexers + encoders+ [assembler])
model=pipeline.fit(data_bank)
transformed_data_bank = model.transform(data_bank)
transformed_data_bank.show(5)


In [None]:
##Select Row Features
transformed_data_bank.select('rawFeatures').printSchema()


In [None]:
##check the data set 
transformed_data_bank.printSchema()


###  Spliting data into train and test sets:

In [None]:
##Split Data Into Test and Train 
(trainingData, testData) = transformed_data_bank.randomSplit([0.7, 0.3],seed = 11)

In [None]:
## Test Data
testData.show(5)

### Modeling:

We have chosen to train a logistic regression model because the data in question has binary output.

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
## A Logistic Regression Model To predict a bank client default
lr = LogisticRegression(labelCol="y_indexed", featuresCol="rawFeatures")

#Training algo
lrModel = lr.fit(trainingData)

##Get Prediction 
lr_prediction = lrModel.transform(testData)

## Compare  Prediction with True_label 
lr_prediction.select("prediction", "y_indexed", "rawFeatures").show()

evaluator = MulticlassClassificationEvaluator(labelCol="y_indexed", predictionCol="prediction", metricName="accuracy")

In [None]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

We have obtained a very hight accuracy so the model is very performant.

# RDD Part 

Now, we are going to work with the **RDDs** to do the same things, we found some difficulties in reading and extracting the features of our data for the use of **MLIB API**, suddenly, we do not take all the data and we will work only with the selected features 'y_indexed','age','duration','campaign','pdays','previous','euribor3m'

### Reading the data into Rdd:

In [None]:
data_bank_RDD = transformed_data_bank.select('y_indexed','age','duration','campaign','pdays','previous','euribor3m').rdd
data_bank_RDD.first()
print(type(data_bank_RDD))

In [None]:
from pyspark.mllib.regression import LabeledPoint
Data_Labeled = data_bank_RDD.map(lambda line:LabeledPoint(line[0],[line[1:]]))
Data_Labeled.take(5)

In [None]:
features = data_bank_RDD.map(lambda row: row[1:])
features.take(5)
print(type(features)) 

### Applying some statistics with the MLlib's RDD API

In [None]:
from pyspark.mllib.stat import Statistics

# get a Mean,Max and Min From Statistics Mlib
Data_stats = Statistics.colStats(features)
print("means: \n",Data_stats.mean())
print("max's:\n",Data_stats.max())
print("min's:\n",Data_stats.min())


### Data Preparing

In [None]:
from pyspark.mllib.feature import StandardScaler
standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)

In [None]:
features_transform.take(5)

In [None]:
label = data_bank_RDD.map(lambda row: row[0])
label.take(5)

In [None]:
transformedData = label.zip(features_transform)
transformedData.take(5)

In [None]:
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))
transformedData.take(5)

### Spliting data into train and test sets:

In [None]:
##Split the data into Train and Test
trainingData, testingData = transformedData.randomSplit([0.7, 0.3],seed = 11)
trainingData.cache()
testingData.cache()
type(trainingData)

### Modeling

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
# Logistic Regression Model from Mlib
model = LogisticRegressionWithLBFGS.train(trainingData)

In [None]:
# Get Prediction
results = testingData.map(lambda lp: (lp.label, float(model.predict(lp.features))))
print (results.take(100))

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Area Under ROC
metrics = BinaryClassificationMetrics(results)
metrics.areaUnderROC

In [None]:
lr_error=results.map(lambda lp: (lp[0] - lp[1])).sum()/float(results.count())
print("Accuracy of LogisticRegression is = %g"% (1.0-lr_error))
print("Test Error of LogisticRegression = %g " % (lr_error))