# Guia ML

In [1]:
##
#https://jdvelasq.github.io/courses/notebooks/pyspark/2-04-pyspark-SparkSQL.html
#https://guru99.es/pyspark-tutorial/
## findspark permite usar pyspark (interfaz de Python a Spark),
## desde cualquier programa escrito en Python.
##
import findspark
findspark.init()
##
## A continuación se inicializan las variables obligatorias
## requeridas para trabajar con Spark desde Python:
##
##  SparkContext representa la conexión al cluster de Spark.
##  SparkConf representa la configuración particular de una aplicación
##     escrita en Spark.
##  SparkSession representa la conexión para trabajar con SQL.
##
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

sparkConf = SparkConf().setAppName("ml_spark")
sc = SparkContext(conf=sparkConf)
spark = SparkSession(sc)

In [2]:
##
## Row representa una fila en un RDD
##
from pyspark.sql import Row
##
## Crea un DataFrame a partir del archivo con
## formato CSV
##
df_ml = spark.read.load("D:\\CLASES\ELECTIVA 3 BigData\\Clase5_Ejercicos_Spark\\DATOS\\data_icfes_ml_tumaco.csv",
                     format="csv",
                     sep="|",
                     inferSchema= True,
                     encoding="utf-8",
                     header="true")
df_ml.printSchema()

root
 |-- estu_edad: double (nullable = true)
 |-- estu_nacimiento_dia: double (nullable = true)
 |-- estu_nacimiento_mes: double (nullable = true)
 |-- estu_nacimiento_anno: double (nullable = true)
 |-- estu_zona_reside: double (nullable = true)
 |-- estu_area_reside: double (nullable = true)
 |-- cole_valor_pension: double (nullable = true)
 |-- estu_trabaja: double (nullable = true)
 |-- fami_estrato_vivienda: double (nullable = true)
 |-- estu_veces_estado: double (nullable = true)
 |-- fami_educa_padre: double (nullable = true)
 |-- fami_educa_madre: double (nullable = true)
 |-- fami_ocup_padre: double (nullable = true)
 |-- fami_ocup_madre: double (nullable = true)
 |-- fami_nivel_sisben: double (nullable = true)
 |-- fami_pisos_hogar: double (nullable = true)
 |-- fami_personas_hogar: double (nullable = true)
 |-- fami_telefono_fijo: double (nullable = true)
 |-- fami_celular: double (nullable = true)
 |-- fami_internet: double (nullable = true)
 |-- fami_servicio_television: 

In [3]:
df_ml.count()

1923

In [4]:
len(df_ml.columns)

92

# Agrupar datos por clases

In [5]:
df_ml.createOrReplaceTempView("icfes_ml")
spark.sql("select clase,count(1) from icfes_ml group by clase").show()

+-----+--------+
|clase|count(1)|
+-----+--------+
|    1|     331|
|    0|    1592|
+-----+--------+



In [6]:
from pyspark.sql.functions import *



df_ml = df_ml.withColumn(
            "clase_final",  
                        (col('clase'))
)
df_ml=df_ml.drop('clase')
df_ml.printSchema()

root
 |-- estu_edad: double (nullable = true)
 |-- estu_nacimiento_dia: double (nullable = true)
 |-- estu_nacimiento_mes: double (nullable = true)
 |-- estu_nacimiento_anno: double (nullable = true)
 |-- estu_zona_reside: double (nullable = true)
 |-- estu_area_reside: double (nullable = true)
 |-- cole_valor_pension: double (nullable = true)
 |-- estu_trabaja: double (nullable = true)
 |-- fami_estrato_vivienda: double (nullable = true)
 |-- estu_veces_estado: double (nullable = true)
 |-- fami_educa_padre: double (nullable = true)
 |-- fami_educa_madre: double (nullable = true)
 |-- fami_ocup_padre: double (nullable = true)
 |-- fami_ocup_madre: double (nullable = true)
 |-- fami_nivel_sisben: double (nullable = true)
 |-- fami_pisos_hogar: double (nullable = true)
 |-- fami_personas_hogar: double (nullable = true)
 |-- fami_telefono_fijo: double (nullable = true)
 |-- fami_celular: double (nullable = true)
 |-- fami_internet: double (nullable = true)
 |-- fami_servicio_television: 

# Importar Librerias ML

In [7]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['variables','clase'])

df_transformed = transData(df_ml)
df_transformed.show(5)

+--------------------+-----+
|           variables|clase|
+--------------------+-----+
|[18.0,14.0,3.0,19...|    0|
|[24.0,15.0,4.0,19...|    0|
|[17.0,10.0,9.0,19...|    1|
|[17.0,12.0,12.0,1...|    0|
|[18.0,2.0,2.0,199...|    0|
+--------------------+-----+
only showing top 5 rows



In [10]:
df_transformed.groupby('clase').count().show()

+-----+-----+
|clase|count|
+-----+-----+
|    0| 1592|
|    1|  331|
+-----+-----+



# Indexamos las variables y la clase

In [11]:
claseIndexer = StringIndexer(inputCol='clase',
                             outputCol='indexedClase').fit(df_transformed)

# Identifique automáticamente características categóricas e indexelas.
# Establezca maxCategories para que las entidades con> 4 valores distintos se traten como continuas.
variablesIndexer =VectorIndexer(inputCol="variables", \
                                  outputCol="indexedVariables", \
                                  maxCategories=4).fit(df_transformed)

claseIndexer.transform(df_transformed).show(5, True)
variablesIndexer.transform(df_transformed).show(5, True)


+--------------------+-----+------------+
|           variables|clase|indexedClase|
+--------------------+-----+------------+
|[18.0,14.0,3.0,19...|    0|         0.0|
|[24.0,15.0,4.0,19...|    0|         0.0|
|[17.0,10.0,9.0,19...|    1|         1.0|
|[17.0,12.0,12.0,1...|    0|         0.0|
|[18.0,2.0,2.0,199...|    0|         0.0|
+--------------------+-----+------------+
only showing top 5 rows

+--------------------+-----+--------------------+
|           variables|clase|    indexedVariables|
+--------------------+-----+--------------------+
|[18.0,14.0,3.0,19...|    0|[18.0,14.0,3.0,19...|
|[24.0,15.0,4.0,19...|    0|[24.0,15.0,4.0,19...|
|[17.0,10.0,9.0,19...|    1|[17.0,10.0,9.0,19...|
|[17.0,12.0,12.0,1...|    0|[17.0,12.0,12.0,1...|
|[18.0,2.0,2.0,199...|    0|[18.0,2.0,2.0,199...|
+--------------------+-----+--------------------+
only showing top 5 rows



# Separar conjunto en train y test 

In [12]:
(trainingData, testData) = df_transformed.randomSplit([0.8, 0.2])
trainingData.show(5)

+--------------------+-----+
|           variables|clase|
+--------------------+-----+
|[10.0,4.0,4.0,200...|    1|
|[10.0,6.0,1.0,200...|    0|
|[10.0,8.0,8.0,200...|    1|
|[10.0,12.0,7.0,19...|    1|
|[10.0,16.0,4.0,20...|    0|
+--------------------+-----+
only showing top 5 rows



In [13]:
trainingData.groupby('clase').count().show()
testData.groupby('clase').count().show()

+-----+-----+
|clase|count|
+-----+-----+
|    0| 1287|
|    1|  270|
+-----+-----+

+-----+-----+
|clase|count|
+-----+-----+
|    0|  305|
|    1|   61|
+-----+-----+



# Árbol

In [14]:
from pyspark.ml.classification import DecisionTreeClassifier

# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedClase',
                               featuresCol='indexedVariables',
                               maxDepth = 5,
                               minInstancesPerNode=5,
                               impurity='entropy')

# Convierta etiquetas indexadas a etiquetas originales.
labelConverter = IndexToString(inputCol="prediction",
                               outputCol="predictedLabel",
                               labels=claseIndexer.labels)

# Canalizar el modelo mediante la tuberia 
pipeline = Pipeline(stages=[claseIndexer, variablesIndexer, dTree,
                            labelConverter])

# Esto ejecuta los indexadores y entrena el modelo. 
arbol = pipeline.fit(trainingData)

# RandomForest

In [73]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="indexedClase",
                            featuresCol="indexedVariables", 
                            numTrees=500,maxDepth = 10,
                            minInstancesPerNode=5,
                            impurity='entropy')

# Convierta etiquetas indexadas a etiquetas originales.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=claseIndexer.labels)

# Canalizar el modelo mediante la tuberia 
pipeline = Pipeline(stages=[claseIndexer, variablesIndexer, rf,
                            labelConverter])
# Train model.  This also runs the indexers.
randomforest = pipeline.fit(trainingData)

# Gradient-Boosting

In [71]:
from pyspark.ml.classification import GBTClassifier

gradientbt = GBTClassifier(labelCol='indexedClase', 
                    featuresCol='indexedVariables',
                    maxIter=50, maxDepth=10)

# Convierta etiquetas indexadas a etiquetas originales.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=claseIndexer.labels)

# Canalizar el modelo mediante la tuberia 
pipeline = Pipeline(stages=[claseIndexer, variablesIndexer, gradientbt,
                            labelConverter])
# Train model.  This also runs the indexers.
gbt = pipeline.fit(trainingData)

# Naive Bayes

In [65]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(labelCol="indexedClase",featuresCol="indexedVariables")

# Convierta etiquetas indexadas a etiquetas originales.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=claseIndexer.labels)

# Canalizar el modelo mediante la tuberia 
pipeline = Pipeline(stages=[claseIndexer, variablesIndexer, nb,
                            labelConverter])

# Train model.  This also runs the indexers.
naivebayes = pipeline.fit(trainingData)

# LogisticRegression

In [17]:
from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(labelCol="indexedClase",featuresCol="indexedVariables"
                          ,regParam=0.001)

# Convierta etiquetas indexadas a etiquetas originales.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=claseIndexer.labels)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[claseIndexer, variablesIndexer, logr,labelConverter])

logregre = pipeline.fit(trainingData)

# Red neuronal 

In [22]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# especificar capas para la red neuronal:
# capa de entrada de tamaño 117 (características),
# 1 capa oculta de 10
# y salida de tamaño 2 (clases)
layers = [91, 50 , 2]

# create the trainer and set its parameters
FNN = MultilayerPerceptronClassifier(labelCol="indexedClase", \
                                     featuresCol="indexedVariables",\
                                     maxIter=300, layers=layers, \
                                     blockSize=128, seed=1234)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=claseIndexer.labels)
# Chain indexers and forest in a Pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[claseIndexer, variablesIndexer, FNN, labelConverter])
# train the model
# Train model.  This also runs the indexers.
red = pipeline.fit(trainingData)

# Evaluar Modelos 

In [75]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from sklearn.metrics import accuracy_score,precision_score,classification_report,balanced_accuracy_score,f1_score
from pyspark.mllib.evaluation import BinaryClassificationMetrics


def eval_model(testData,trainingData,model,metric):
    print("____________________________________")
    predictions_test = model.transform(testData)
    predictions_train = model.transform(trainingData)
    # Select (prediction, true label) and compute test error
    evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedClase", predictionCol="prediction", metricName=metric)
    accuracy_test = evaluator.evaluate(predictions_test)
    accuracy_train = evaluator.evaluate(predictions_train)
    print(metric+" test = %g" % (accuracy_test))
    print(metric+" train = %g" % (accuracy_train))
    
    evaluator = BinaryClassificationEvaluator(labelCol='indexedClase')
    
    auroc_test = evaluator.evaluate(predictions_test,
                               {evaluator.metricName: "areaUnderROC"})
    auroc_train = evaluator.evaluate(predictions_train,
                               {evaluator.metricName: "areaUnderROC"})
    
    print("AUC test = %g" % (auroc_test))
    print("AUC train = %g" % (auroc_train))

    #predictions_test=predictions_test.select('prediction').toPandas()
    #y_test=claseIndexer.transform(testData).select("indexedClase").toPandas()
    
    #variablesIndexer.transform(df_transformed).show(5, True)
    #print("Balanced_accuracy test:",balanced_accuracy_score(y_test[y_test.columns[0]], predictions_test[predictions_test.columns[0]]))
    #print("Balanced_accuracy train:",balanced_accuracy_score(y_train, predictions_train))

    rfModel = model.stages[-2]
    print(rfModel)  # summary only

In [76]:
metricas=["weightedPrecision","accuracy","f1","weightedRecall"] 
eval_model(testData,trainingData,arbol,metricas[1])   
eval_model(testData,trainingData,randomforest,metricas[1]) 
eval_model(testData,trainingData,gbt,metricas[1]) 
eval_model(testData,trainingData,naivebayes,metricas[1]) 
eval_model(testData,trainingData,logregre,metricas[1]) 
eval_model(testData,trainingData,red,metricas[1]) 



____________________________________
accuracy test = 0.822404
accuracy train = 0.850996
AUC test = 0.496963
AUC train = 0.448331
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_f85892cc29b0, depth=5, numNodes=31, numClasses=2, numFeatures=91
____________________________________
accuracy test = 0.838798
accuracy train = 0.849069
AUC test = 0.795216
AUC train = 0.921687
RandomForestClassificationModel: uid=RandomForestClassifier_815628d8c8e4, numTrees=500, numClasses=2, numFeatures=91
____________________________________
accuracy test = 0.79235
accuracy train = 1
AUC test = 0.698038
AUC train = 1
GBTClassificationModel: uid = GBTClassifier_252c1774469c, numTrees=50, numClasses=2, numFeatures=91
____________________________________
accuracy test = 0.653005
accuracy train = 0.684008
AUC test = 0.39613
AUC train = 0.46919
NaiveBayesModel: uid=NaiveBayes_180bcb1a7ba6, modelType=multinomial, numClasses=2, numFeatures=91
____________________________________
accuracy test = 0.822404

# Tunear árbol

In [50]:
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
evaluatorB = BinaryClassificationEvaluator(labelCol='indexedClase',
                                          metricName='areaUnderROC')

paramGrid = (ParamGridBuilder()
             .addGrid(dTree.maxDepth,[5,20,15])
             .addGrid(dTree.minInstancesPerNode,[5,2,10])
             .addGrid(dTree.impurity,['entropy','gini'])
             .addGrid(dTree.minInfoGain,[0.0,0.025])
             .build())

# Canalizar el modelo mediante la tuberia 
pipeline = Pipeline(stages=[claseIndexer, variablesIndexer, dTree,
                            labelConverter])
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluatorB,
                          numFolds=4)  # use 3+ folds in practice

cvModel = crossval.fit(trainingData)

In [52]:
arbolSummary = cvModel.bestModel
eval_model(testData,trainingData,arbolSummary,metricas[1])  
import numpy as np
cvModel.getEstimatorParamMaps()[ np.argmax(cvModel.avgMetrics) ]
print(arbolSummary.stages[2].getMaxDepth())
print(arbolSummary.stages[2].getMinInstancesPerNode())
print(arbolSummary.stages[2].getImpurity())
print(arbolSummary.stages[2].getMinInfoGain())

____________________________________
accuracy test = 0.770492
accuracy train = 0.909441
AUC test = 0.600967
AUC train = 0.894949
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_f85892cc29b0, depth=18, numNodes=215, numClasses=2, numFeatures=91
20
5
entropy
0.0


In [67]:
zip(cvModel.avgMetrics, paramGrid)

<zip at 0x1af1fe2cbc8>

# Tunear RandomForest

In [61]:
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
evaluatorB = BinaryClassificationEvaluator(labelCol='indexedClase',
                                          metricName='areaUnderROC')

paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees,[500])
             .addGrid(dTree.maxDepth,[5,7])
             .addGrid(dTree.minInstancesPerNode,[5,3])
             .addGrid(dTree.impurity,['entropy','gini'])
             .addGrid(dTree.minInfoGain,[0.0,0.0025])
             .addGrid(rf.minWeightFractionPerNode,[0.0,0.005])
             .build())

# Canalizar el modelo mediante la tuberia 
pipeline = Pipeline(stages=[claseIndexer, variablesIndexer, rf,
                            labelConverter])

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluatorB,
                          numFolds=4)  # use 3+ folds in practice

cvModel = crossval.fit(trainingData)

In [63]:
rfSummary = cvModel.bestModel
eval_model(testData,trainingData,rfSummary,metricas[1])  
import numpy as np
cvModel.getEstimatorParamMaps()[ np.argmax(cvModel.avgMetrics) ]
print(rfSummary.stages[2].getMaxDepth())
print(rfSummary.stages[2].getMinInstancesPerNode())
print(rfSummary.stages[2].getImpurity())
print(rfSummary.stages[2].getMinWeightFractionPerNode())
print(rfSummary.stages[2].getMinInfoGain())

____________________________________
accuracy test = 0.833333
accuracy train = 0.833654
AUC test = 0.799946
AUC train = 0.810087
RandomForestClassificationModel: uid=RandomForestClassifier_1a07e12d30ee, numTrees=500, numClasses=2, numFeatures=91
5
5
entropy
0.005
0.0
