## **Procesado Masivo de Datos**
**Ana Pérez Hoyos, Ariadna Viera Santana, Marta Cartillo Ortiz y María Pardo Díaz.**


Este es el segundo notebook en el cual implementaremos mediante técnicas RDD los mismos modelos que en el primer notebook.

El metodo de trabajo es similar pero nos centraremos en la parte de la implementación de los modelos y la manera de trabajar previamente con los datos, ya que es diferente que la implementada en el primer notebook.

In [1]:
# Installing pySpark and importing some useful packages
!pip install pyspark[sql]

from __future__ import print_function
from functools import wraps
import pyspark as spark
from pyspark import SparkConf
import time
from operator import add
import os 
from subprocess import STDOUT, check_call, check_output



In [2]:
def set_conf():
    conf = SparkConf().setAppName("App") #se puede poner el nombre que se quiera
    conf = (conf.setMaster('local[*]') #master en local con todos lo nucleos que tenga (*)
      .set('spark.executor.memory', '4G') #minimo 4G de memoria Ram
      .set('spark.driver.memory', '16G') #que el driver (el que une todo) que tenga 16 G
      .set('spark.driver.maxResultSize', '8G')) #límite del tamaño total al juntar todas las particiones (del resultado)
    return conf

In [3]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [5]:
sc = spark.SparkContext.getOrCreate(conf=set_conf()) #crear el contexto de spark con la configuracion diseñada

In [6]:
spark = SparkSession(sc)

In [7]:
data = sc.textFile("/content/drive/My Drive/PMD/ObesityDataSet_raw_and_data_sinthetic.csv")

In [8]:
mappedRdd= data.map(lambda x:x.split(","))
#mappedRdd.collect()
#mappedRdd.take(5)

In [9]:
headerRdd= mappedRdd.take(1)[0]
print('Rdd con lo nombres de las variables:',headerRdd)

Rdd con lo nombres de las variables: ['Gender', 'Age', 'Height', 'Weight', 'family_history_with_overweight', 'FAVC', 'FCVC', 'NCP', 'CAEC', 'SMOKE', 'CH2O', 'SCC', 'FAF', 'TUE', 'CALC', 'MTRANS', 'NObeyesdad']


In [10]:
#Separamos en dos rdds, uno con los cados y otra con la cabecera
mappedRddWithoutHeader=mappedRdd.filter(lambda x:x[0]!='Gender')

In [11]:
#Por comodidad vamos a pasar a dataframe para realizar el mismo preprocesado
raw_data=mappedRddWithoutHeader.toDF(headerRdd)
#raw_data.take(2)

In [12]:
raw_data.show()

+------+---+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+
|Gender|Age|Height|Weight|family_history_with_overweight|FAVC|FCVC|NCP|      CAEC|SMOKE|CH2O|SCC|FAF|TUE|      CALC|              MTRANS|         NObeyesdad|
+------+---+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+
|Female| 21|  1.62|    64|                           yes|  no|   2|  3| Sometimes|   no|   2| no|  0|  1|        no|Public_Transporta...|      Normal_Weight|
|Female| 21|  1.52|    56|                           yes|  no|   3|  3| Sometimes|  yes|   3|yes|  3|  0| Sometimes|Public_Transporta...|      Normal_Weight|
|  Male| 23|   1.8|    77|                           yes|  no|   2|  3| Sometimes|   no|   2| no|  2|  1|Frequently|Public_Transporta...|      Normal_Weight|
|  Male| 27|   1.8|    87|                          

In [13]:
data=raw_data.toPandas()

In [14]:
data.head()

Unnamed: 0,Gender,Age,Height,Weight,family_history_with_overweight,FAVC,FCVC,NCP,CAEC,SMOKE,CH2O,SCC,FAF,TUE,CALC,MTRANS,NObeyesdad
0,Female,21,1.62,64.0,yes,no,2,3,Sometimes,no,2,no,0,1,no,Public_Transportation,Normal_Weight
1,Female,21,1.52,56.0,yes,no,3,3,Sometimes,yes,3,yes,3,0,Sometimes,Public_Transportation,Normal_Weight
2,Male,23,1.8,77.0,yes,no,2,3,Sometimes,no,2,no,2,1,Frequently,Public_Transportation,Normal_Weight
3,Male,27,1.8,87.0,no,no,3,3,Sometimes,no,2,no,2,0,Frequently,Walking,Overweight_Level_I
4,Male,22,1.78,89.8,no,no,2,1,Sometimes,no,2,no,0,0,Sometimes,Public_Transportation,Overweight_Level_II


In [15]:
import numpy as np

data['CH2O'] =data['CH2O'].astype(float)
data['CH2O']=data['CH2O'].round()


data.FAF = data.FAF.astype(float)
data.FAF = data.FAF.round()
data.TUE = data.TUE.astype(float)
data.TUE = data.TUE.round()
data.FCVC = data.FCVC.astype(float)
data.FCVC = data.FCVC.round()
data.NCP = data.NCP.astype(float)
data.NCP = data.NCP.round()


In [16]:
for i in data['NCP']:
    if i ==4:
        #median = data.loc[data['NCP']<43, 'NCP'].median()
        data.loc[data.NCP == 4, 'NCP'] = np.nan
        data.fillna(3,inplace=True)

## One hot encoding

In [17]:
import pandas as pd
print(data.nunique())

Gender                               2
Age                               1402
Height                            1574
Weight                            1525
family_history_with_overweight       2
FAVC                                 2
FCVC                                 3
NCP                                  3
CAEC                                 4
SMOKE                                2
CH2O                                 3
SCC                                  2
FAF                                  4
TUE                                  3
CALC                                 4
MTRANS                               5
NObeyesdad                           7
dtype: int64


In [18]:
#'CAEC', 'CALC', 'MTRANS'
#one hot solo de las binarias
data = pd.get_dummies(data, columns = ['Gender','family_history_with_overweight','FAVC', 'SMOKE', 'SCC'], drop_first = True)

In [19]:
one_hot = pd.get_dummies(data, columns=['CAEC', 'CALC', 'MTRANS'])
pd.set_option('display.max_columns', None)
one_hot.head()

Unnamed: 0,Age,Height,Weight,FCVC,NCP,CH2O,FAF,TUE,NObeyesdad,Gender_Male,family_history_with_overweight_yes,FAVC_yes,SMOKE_yes,SCC_yes,CAEC_Always,CAEC_Frequently,CAEC_Sometimes,CAEC_no,CALC_Always,CALC_Frequently,CALC_Sometimes,CALC_no,MTRANS_Automobile,MTRANS_Bike,MTRANS_Motorbike,MTRANS_Public_Transportation,MTRANS_Walking
0,21,1.62,64.0,2.0,3.0,2.0,0.0,1.0,Normal_Weight,0,1,0,0,0,0,0,1,0,0,0,0,1,0,0,0,1,0
1,21,1.52,56.0,3.0,3.0,3.0,3.0,0.0,Normal_Weight,0,1,0,1,1,0,0,1,0,0,0,1,0,0,0,0,1,0
2,23,1.8,77.0,2.0,3.0,2.0,2.0,1.0,Normal_Weight,1,1,0,0,0,0,0,1,0,0,1,0,0,0,0,0,1,0
3,27,1.8,87.0,3.0,3.0,2.0,2.0,0.0,Overweight_Level_I,1,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,1
4,22,1.78,89.8,2.0,1.0,2.0,0.0,0.0,Overweight_Level_II,1,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,1,0


In [20]:
#codificamos las etiquetas
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder() 
 
one_hot['NObeyesdad']= le.fit_transform(one_hot['NObeyesdad'])

In [21]:
one_hot.head()

Unnamed: 0,Age,Height,Weight,FCVC,NCP,CH2O,FAF,TUE,NObeyesdad,Gender_Male,family_history_with_overweight_yes,FAVC_yes,SMOKE_yes,SCC_yes,CAEC_Always,CAEC_Frequently,CAEC_Sometimes,CAEC_no,CALC_Always,CALC_Frequently,CALC_Sometimes,CALC_no,MTRANS_Automobile,MTRANS_Bike,MTRANS_Motorbike,MTRANS_Public_Transportation,MTRANS_Walking
0,21,1.62,64.0,2.0,3.0,2.0,0.0,1.0,1,0,1,0,0,0,0,0,1,0,0,0,0,1,0,0,0,1,0
1,21,1.52,56.0,3.0,3.0,3.0,3.0,0.0,1,0,1,0,1,1,0,0,1,0,0,0,1,0,0,0,0,1,0
2,23,1.8,77.0,2.0,3.0,2.0,2.0,1.0,1,1,1,0,0,0,0,0,1,0,0,1,0,0,0,0,0,1,0
3,27,1.8,87.0,3.0,3.0,2.0,2.0,0.0,5,1,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,1
4,22,1.78,89.8,2.0,1.0,2.0,0.0,0.0,6,1,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,1,0


## Divimos los datos en Train y Test para entrenar los algoritmos de machine learning

In [22]:
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import *

In [23]:
one_hot_1 = one_hot[['NObeyesdad','Age','FCVC',	'NCP',	'CH2O',	'FAF',	'TUE','Gender_Male',	'family_history_with_overweight_yes',	'FAVC_yes',	'SMOKE_yes',	'SCC_yes'	,'CAEC_Always',	'CAEC_Frequently',	'CAEC_Sometimes',	'CAEC_no'	,'CALC_Always'	,'CALC_Frequently',	'CALC_Sometimes',	'CALC_no',	'MTRANS_Automobile',	'MTRANS_Bike',	'MTRANS_Motorbike',	'MTRANS_Public_Transportation',	'MTRANS_Walking']]

In [24]:
one_hot_1.head()

Unnamed: 0,NObeyesdad,Age,FCVC,NCP,CH2O,FAF,TUE,Gender_Male,family_history_with_overweight_yes,FAVC_yes,SMOKE_yes,SCC_yes,CAEC_Always,CAEC_Frequently,CAEC_Sometimes,CAEC_no,CALC_Always,CALC_Frequently,CALC_Sometimes,CALC_no,MTRANS_Automobile,MTRANS_Bike,MTRANS_Motorbike,MTRANS_Public_Transportation,MTRANS_Walking
0,1,21,2.0,3.0,2.0,0.0,1.0,0,1,0,0,0,0,0,1,0,0,0,0,1,0,0,0,1,0
1,1,21,3.0,3.0,3.0,3.0,0.0,0,1,0,1,1,0,0,1,0,0,0,1,0,0,0,0,1,0
2,1,23,2.0,3.0,2.0,2.0,1.0,1,1,0,0,0,0,0,1,0,0,1,0,0,0,0,0,1,0
3,5,27,3.0,3.0,2.0,2.0,0.0,1,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,1
4,6,22,2.0,1.0,2.0,0.0,0.0,1,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,1,0


In [25]:
spDF = spark.createDataFrame(one_hot_1)
spDF.show()

+----------+---+----+---+----+---+---+-----------+----------------------------------+--------+---------+-------+-----------+---------------+--------------+-------+-----------+---------------+--------------+-------+-----------------+-----------+----------------+----------------------------+--------------+
|NObeyesdad|Age|FCVC|NCP|CH2O|FAF|TUE|Gender_Male|family_history_with_overweight_yes|FAVC_yes|SMOKE_yes|SCC_yes|CAEC_Always|CAEC_Frequently|CAEC_Sometimes|CAEC_no|CALC_Always|CALC_Frequently|CALC_Sometimes|CALC_no|MTRANS_Automobile|MTRANS_Bike|MTRANS_Motorbike|MTRANS_Public_Transportation|MTRANS_Walking|
+----------+---+----+---+----+---+---+-----------+----------------------------------+--------+---------+-------+-----------+---------------+--------------+-------+-----------+---------------+--------------+-------+-----------------+-----------+----------------+----------------------------+--------------+
|         1| 21| 2.0|3.0| 2.0|0.0|1.0|          0|                                

Los modelos RDD con MLib necesitan trabajar con el formato Labeled Point

In [26]:
#De esta manera obtenemos el formato necesario para poder introducir los datos en los modelos de MLlib con RDD
temp = spDF.rdd.map(lambda line:LabeledPoint(line[0],[line[1:]]))
temp.take(5)

[LabeledPoint(1.0, [21.0,2.0,3.0,2.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0]),
 LabeledPoint(1.0, [21.0,3.0,3.0,3.0,3.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0]),
 LabeledPoint(1.0, [23.0,2.0,3.0,2.0,2.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0]),
 LabeledPoint(5.0, [27.0,3.0,3.0,2.0,2.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0]),
 LabeledPoint(6.0, [22.0,2.0,1.0,2.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0])]

In [27]:
#Dividimos en la misma proporción qu el primer notebook
trainingData, testingData = temp.randomSplit([.8,.2],seed=1234)

A continuación pasamos a la implementación de los mismos modelos.

## Regresión logística

In [28]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics

model = LogisticRegressionWithLBFGS.train(trainingData,numClasses=7)



In [29]:
%%time
# Run training algorithm to build the model
model = LogisticRegressionWithLBFGS.train(trainingData,numClasses=7)
# Compute raw scores on the test set
predictionAndLabels = testingData.map(lambda lp: (float(model.predict(lp.features)), lp.label))
#predictionAndLabels.take(20)

CPU times: user 89.5 ms, sys: 13.1 ms, total: 103 ms
Wall time: 7.34 s


# Random Forest

In [30]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

In [31]:
model_rf = RandomForest.trainClassifier(trainingData, numClasses=7, categoricalFeaturesInfo={},
                                     numTrees=3, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)

In [32]:
%%time
# Run training algorithm to build the model
model_rf = RandomForest.trainClassifier(trainingData,numClasses=7,numTrees=3,categoricalFeaturesInfo={})
# Compute raw scores on the test set
predictionAndLabels = testingData.map(lambda lp: (float(model_rf.predict(lp.features)), lp.label))


CPU times: user 27 ms, sys: 3.73 ms, total: 30.7 ms
Wall time: 1.9 s


# Árbol de decisión

In [33]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

In [34]:
model_dt = DecisionTree.trainClassifier(trainingData, numClasses=7, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=4, maxBins=32)

In [35]:
%%time
# Run training algorithm to build the model
model_dt= DecisionTree.trainClassifier(trainingData,numClasses=7,categoricalFeaturesInfo={})
# Compute raw scores on the test set
predictionAndLabels = testingData.map(lambda lp: (float(model_dt.predict(lp.features)), lp.label))


CPU times: user 16.8 ms, sys: 6.31 ms, total: 23.1 ms
Wall time: 1.32 s


## Naive Bayes

In [36]:
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.util import MLUtils

In [37]:
model_nb = NaiveBayes.train(trainingData, 1.0)

In [38]:
%%time
# Run training algorithm to build the model
model_nb= NaiveBayes.train(trainingData)
# Compute raw scores on the test set
predictionAndLabels = testingData.map(lambda lp: (float(model_nb.predict(lp.features)), lp.label))

CPU times: user 51.2 ms, sys: 12.9 ms, total: 64.1 ms
Wall time: 2.56 s
