# Entrenamiento sencillo con MLlib

Primero que nada, vamos a aprender cómo podemos realizar un entrenamiento sencillo con MLlib, basado en la división del conjunto de datos en entrenamiento y test. Para ello, construiremos una red neuronal de clasificación e intentaremos predecir el tipo de flor iris (setosa, versicolor y virginica) en base a características como las longitudes y anchuras del sépalo y el pétalo.

Primero, carga en un DataFrame de Spark el conjunto de datos que encontrarás en *iris.csv* (PoliformaT). Asegúrate de que Spark infiera el esquema de los datos. Tampoco te olvides de que el conjunto de datos tiene cabecera. Nombra a la variable como *iris_data*. Tras ello, muestra el contenido del Dataframe para hacer una idea de la información con la que contamos.

In [2]:
#Tu código aquí

app_df = spark.read.csv("file:///home/alumno/Documentos/bigdatapracticas/09 - MLLib/prac/iris.csv", header=True, inferSchema=True,  
                        nullValue="", sep=",")
app_df


DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, variety: string]

Una de las primeras cosas que tendremos que hacer en este entrenamiento sencillo es separar los registros en conjunto de entrenamiento y conjunto de test. El conjunto de entrenamiento tendrá el 70% de los registros, mientras que el conjunto de test tendrá el 30% de los registros. Para crear estos dos Dataframes puedes mirar la documentación del método *randomSplit* que se encuentra definido para cualquier Dataframe de Spark. Para que todos hagáis la misma partición, configura la semilla de número aleatorios del método a *2021*. Llama al conjunto de entrenamiento como *train_df* y al conjunto de test como *test_df*.

In [3]:
#Tu código aquí
train_df,test_df = app_df.randomSplit([0.7, 0.3])
#a,b = df.randomSplit([0.5, 0.5])
train_df
test_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, variety: string]

En el siguiente fragmento de código vamos a ver cómo podemos crear nuestra primera componente de tipo **transformador**. Recuerda que los transformadores simplemente toman como entrada un Dataframe y lo transforman creando un nuevo Dataframe. En este caso vamos a utilizar *VectorAssembler* para coger todas las columnas que queremos que sean características del modelo y juntarlas en un único vector. Fijaros como en el constructor necesitamos dos parámetros: una lista de columnas que conformarán las componentes del vector, y el nombre de la columna donde se dejarán los vectores. En la siguiente imagen tenemos un ejemplo del funcionamiento de este transformador.



![VectorAssembler.png](attachment:VectorAssembler.png)

Nótese como primero creamos el transformador (*VectorAssembler*) en este caso y tras ello llamamos al método *transform* pasando como entrada el Dataframe sobre el que queremos realizar la transformación. El resultado es un nuevo Dataframe que debemos almacenar en una variable si queremos usarlo más adelante.

In [4]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["sepal_length","sepal_width", "petal_length","petal_width"], outputCol="features")
vectorized_df = assembler.transform(train_df)
vectorized_df.show()

+------------+-----------+------------+-----------+----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|         features|
+------------+-----------+------------+-----------+----------+-----------------+
|         4.3|        3.0|         1.1|        0.1|    Setosa|[4.3,3.0,1.1,0.1]|
|         4.4|        2.9|         1.4|        0.2|    Setosa|[4.4,2.9,1.4,0.2]|
|         4.4|        3.2|         1.3|        0.2|    Setosa|[4.4,3.2,1.3,0.2]|
|         4.6|        3.4|         1.4|        0.3|    Setosa|[4.6,3.4,1.4,0.3]|
|         4.6|        3.6|         1.0|        0.2|    Setosa|[4.6,3.6,1.0,0.2]|
|         4.7|        3.2|         1.3|        0.2|    Setosa|[4.7,3.2,1.3,0.2]|
|         4.7|        3.2|         1.6|        0.2|    Setosa|[4.7,3.2,1.6,0.2]|
|         4.8|        3.0|         1.4|        0.3|    Setosa|[4.8,3.0,1.4,0.3]|
|         4.8|        3.1|         1.6|        0.2|    Setosa|[4.8,3.1,1.6,0.2]|
|         4.8|        3.4|  

Como resultado de la anterior operación tenemos un nuevo Dataframe con una columna llamada *features*, que contiene el vector de características que emplearemos para la tarea de predicción. A continuación, aplicaremos una nueva transformación sobre dicho conjunto de datos. Sin embargo, esta vez necesitaremos hacerlo a través de un **estimador**. Concretamente, usaremos el estimador *StandardScaler*, que toma un vector de características y normaliza las características numéricas restando la media de la componente del vector y dividiendo por la correspondiente desviación típica (transformación a N(0,1)). Cabe recordar que los estimadores necesitan primero leer el Dataframe de entrada para crear un transformador. Ese transformador puede emplearse luego para transformar el Dataframe que se leyó, u otro Dataframe. La siguiente imagen muestra como éste estimador funciona.

![StandardScaler.png](attachment:StandardScaler.png)

Primero tendremos que crear un objeto del tipo *StandardScaler*. Como parámetros opcionales, a este estimador se le puede indicar si queremos que centre en la media (*withMean*), si queremos que divida por la desviación típica (*withStd*), la columna donde se encuentra el vector de características (*inputCol*), y la columna donde debe dejar las características transformadas (*std_features*). Una vez creado el estimador, podemos crear el transformador asociado si llamamos al método *fit* pasando el Dataframe de entrada. Con el transformador (*fitted_scaler* en este caso) podemos transformar cualquier Dataframe. Por ejemplo, en este caso transformamos el Dataframe en el que previamente habíamos creado el vector de características.

In [5]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(withMean=True, withStd=True, inputCol="features", outputCol="std_features")
fitted_scaler = scaler.fit(vectorized_df)
scaled_df = fitted_scaler.transform(vectorized_df)
scaled_df.show()

+------------+-----------+------------+-----------+----------+-----------------+--------------------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|         features|        std_features|
+------------+-----------+------------+-----------+----------+-----------------+--------------------+
|         4.3|        3.0|         1.1|        0.1|    Setosa|[4.3,3.0,1.1,0.1]|[-1.8939678601097...|
|         4.4|        2.9|         1.4|        0.2|    Setosa|[4.4,2.9,1.4,0.2]|[-1.7741367469000...|
|         4.4|        3.2|         1.3|        0.2|    Setosa|[4.4,3.2,1.3,0.2]|[-1.7741367469000...|
|         4.6|        3.4|         1.4|        0.3|    Setosa|[4.6,3.4,1.4,0.3]|[-1.5344745204808...|
|         4.6|        3.6|         1.0|        0.2|    Setosa|[4.6,3.6,1.0,0.2]|[-1.5344745204808...|
|         4.7|        3.2|         1.3|        0.2|    Setosa|[4.7,3.2,1.3,0.2]|[-1.4146434072711...|
|         4.7|        3.2|         1.6|        0.2|    Setosa|[4.7,3.2,1.6,0.2]|[-

Ahora aplicaremos otro **estimador**: *StringIndexer*. Este estimador toma una columna con categorías de un Dataframe de entrada y crea una nueva columna donde las cadenas de texto han sido discretizadas a enteros (uno por cada cadena diferente). Aplica este estimador y su correspondiente estimador al Dataframe anterior (*scaled_df*), tomando como columna a transformar *variety* y como columna resultado *class*. Guarda el Dataframe resultante en *indexed_df*, y asegúrate que el el estimador sea guardado en una variable llamada *indexer*. 

In [6]:
from pyspark.ml.feature import StringIndexer
#Tu código aquí.
indexer = StringIndexer(inputCol ='variety', outputCol= 'class')
#Entrenar este DataFrame
model = indexer.fit(scaled_df)
#Utilizando el modelo generado para transformar el DataFrame
indexed_df = model.transform(scaled_df)
indexed_df.show()

+------------+-----------+------------+-----------+----------+-----------------+--------------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|   variety|         features|        std_features|class|
+------------+-----------+------------+-----------+----------+-----------------+--------------------+-----+
|         4.3|        3.0|         1.1|        0.1|    Setosa|[4.3,3.0,1.1,0.1]|[-1.8939678601097...|  2.0|
|         4.4|        2.9|         1.4|        0.2|    Setosa|[4.4,2.9,1.4,0.2]|[-1.7741367469000...|  2.0|
|         4.4|        3.2|         1.3|        0.2|    Setosa|[4.4,3.2,1.3,0.2]|[-1.7741367469000...|  2.0|
|         4.6|        3.4|         1.4|        0.3|    Setosa|[4.6,3.4,1.4,0.3]|[-1.5344745204808...|  2.0|
|         4.6|        3.6|         1.0|        0.2|    Setosa|[4.6,3.6,1.0,0.2]|[-1.5344745204808...|  2.0|
|         4.7|        3.2|         1.3|        0.2|    Setosa|[4.7,3.2,1.3,0.2]|[-1.4146434072711...|  2.0|
|         4.7|        3.2|  

Ahora que ya hemos discretizado las clases, entrenaremos una red neuronal sobre el conjunto de entrenamiento. Recuerda que en MLLib los algoritmos de aprendizaje son transformadores, por lo que funcionan igual que cualquier otro estimador. Es decir, al llamar al método *fit* crea un transformador que realmente es el modelo entrenado. Este modelo se puede usar para hacer predicciones llamando al método *transform*. Crea ahora un estimador de tipo red neuronal (Perceptrón multicapa) al que le especificaremos las características para entrenar (parámetro *featuresCol* del constructor), la columna donde se encuentra la etiqueta de clase (parámetro *labelCol*) y la columna donde debe dejar las predicciones (parámetro *predictionCol*) y a la que llamaremos *prediction*. Adicionalmente, le podemos indicar en una lista algunos hiperparámetros de la red como ahora el número de capas. En este caso pasaremos una lista con enteros [4, 4, 3], que indica que tendremos 4 características de entrada, una capa oculta con 4 neuronas, y una capa de salida de tamaño 3 (una salida por clase). Tras crear el estimador, entrénalo y genera un dataframe con las predicciones sobre el conjunto de entrenamiento.

In [7]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
#Tu código aquí
# and output of size 3 (classes)
layers = [4, 4, 3]

#    featuresCol='features', labelCol='label', predictionCol='prediction', maxIter=100, tol=1e-06, seed=None, layers=None, blockSize=128, stepSize=0.03, solver='l-bfgs', initialWeights=None, probabilityCol='probability', rawPredictionCol='rawPrediction'
#mlp = MultilayerPerceptronClassifier(layers = layers, seed = 1, featuresCol = "class", predictionCol = "prediction",labelCol='label')
mlp = MultilayerPerceptronClassifier(layers = layers,featuresCol = "std_features", labelCol ="class",  seed=9)
model = mlp.fit(indexed_df)
neuro_df = model.transform(indexed_df)
#neuro_df.show()
"""
result = model.transform(test_df)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
"""



'\nresult = model.transform(test_df)\npredictionAndLabels = result.select("prediction", "label")\nevaluator = MulticlassClassificationEvaluator(metricName="accuracy")\nprint("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))\n'

# Tuberías en acción

Si nos fijamos, el código que hemos construido en el ejemplo anterior es bastante secuencial. Es decir, simplemente consistía en una serie de transformaciones en secuencia hasta obtener el modelo entrenado y realizar predicciones. En general, esto es siempre así cuando entrenamos modelos de aprendizaje automático. Por ello, en muchos paquetes que permiten el entrenamiento de este tipo de modelos podemos definir lo que se conocen como tuberías. Una tubería no es más que una secuencia de transformaciones y estimadores que se aplican sobre un conjunto de datos. Podemos construir una tubería de transformadores y estimadores para conseguir el mismo entrenamiento que conseguimos en el ejemplo anterior. Fijémonos en el siguiente ejemplo.

Creamos todos los estimadores y transformadores que sean necesarios, especificando exactamente las transformaciones que deben llevar a cabo y configurando sus correspondientes parámetros. Una vez creados, y sin necesidad de llamar a los métodos de *fit* y *transform*, podemos crear una tubería (clase *Pipeline*) e indicar en el parámetro del constructor *stages* el orden en el que se deben aplicar los estimadores y transformadores. Una vez construida la tubería, podemos llamar al método *fit* de esta tubería para usar un conjunto de datos de entrada para entrenar los diferentes estimadores y el propio modelo de aprendizaje automático seleccionado. Como resultado de esta tubería tendremos un transformador que llevará a cabo todas las transformaciones indicadas en la tubería sobre cualquier conjunto de datos. Por ejemplo, en este caso podemos usar el método *transform* sobre el conjunto de test para aplicar las mismas transformaciones que al conjunto de entrenamiento y emplear el modelo de aprendizaje automático entrenado para producir las predicciones para el nuevo conjunto de datos.

In [8]:
from pyspark.ml import Pipeline

assembler = VectorAssembler(inputCols=["sepal_length","sepal_width", "petal_length","petal_width"], outputCol="features")
scaler = StandardScaler(withMean=True, withStd=True, inputCol="features", outputCol="std_features")
indexer = StringIndexer(inputCol="variety",outputCol="class")
network = MultilayerPerceptronClassifier(featuresCol="std_features",labelCol="class",seed=89, predictionCol="prediction", layers=[4, 4, 3])


pipeline = Pipeline(stages=[assembler, scaler, indexer, network])
fitted_model = pipeline.fit(train_df)
test_predictions_df = fitted_model.transform(test_df)
test_predictions_df.show()

+------------+-----------+------------+-----------+----------+-----------------+--------------------+-----+--------------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|         features|        std_features|class|       rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+----------+-----------------+--------------------+-----+--------------------+--------------------+----------+
|         4.4|        3.0|         1.3|        0.2|    Setosa|[4.4,3.0,1.3,0.2]|[-1.7741367469000...|  2.0|[-158.22465135134...|[1.55523596593581...|       2.0|
|         4.5|        2.3|         1.3|        0.3|    Setosa|[4.5,2.3,1.3,0.3]|[-1.6543056336904...|  2.0|[-108.02366925537...|[1.04013025680053...|       1.0|
|         4.6|        3.1|         1.5|        0.2|    Setosa|[4.6,3.1,1.5,0.2]|[-1.5344745204808...|  2.0|[-158.22465135135...|[1.55523596588834...|       2.0|
|         4.6|        3.2|        

Típicamente, una vez entrenado el modelo y realizadas las predicciones sobre el conjunto de test nos interesará evaluar la calidad de dichas predicciones. En Spark contamos con varios tipos de evaluadores para evaluar un modelo de aprendizaje automático entrenado. Más concretamente contamos con:
* Regression Evaluator (para problemas de regresión)
* BinaryClassificationEvaluator (para problemas de clasificación con dos clases)
* MulticlassClassificationEvaluator (para problemas de clasificación con más de una clase).
* RankingEvaluator (para evaluar rankings)
* MultilabelClassificationEvaluator (para problemas donde la salida pueden ser varias clases)

En este caso estamos ante un problema de clasificación con tres clases, por lo que crearemos un evaluador del tipo *MulticlassClassificationEvaluator*. Como parámetros de entrada indicamos la columna del Dataframe donde se encuentra la predicción realizada por el modelo, la columna donde se encuentra el valor verdadero a predecir, y el tipo métrica a emplear. Éste último depende del tipo de evaluador que seleccionemos. Para los problemas de clasificación tenemos varias opciones como ahora el *accuracy* (% de muestras bien clasificadas, empleado habitualmente), o el *f1* score. Para los problemas de regresión contamos con otros como el error cuadrático medio (*rmse*, uno de los más habituales), el error absoluto medio (*mae*) o la variabilidad explicada (*var*). Podemos obtener el valor de dicha métrica de calidad si llamamos al método de *evaluate* sobre el conjunto de datos que contiene las predicciones que hemos realizado y los verdaderos valores a predecir.

In [9]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="class", metricName="accuracy")
evaluator.evaluate(test_predictions_df)

0.918918918918919

# Entrenamiento con validación cruzada

Una forma más apropiada de llevar a cabo la evaluación de uno o varios modelos de aprendizaje automático es con la validación cruzada. Este tipo de evaluación se suele realizar, sobretodo, cuando queremos comparar múltiples modelos y seleccionar aquel que es más apropiado para la tarea de predicción que queremos resolver. Desde la API de MLlib podemos diseñar evaluaciones con validación cruzada de forma fácil y sencilla. De hecho, se basa también en el diseño de tuberías. En este caso, las tuberías serán aplicadas sobre todos los algoritmos de aprendizaje, solo que cambiando los hiperparámetros del entrenamiento.

Por tanto, primero que nada crearemos todos los estimadores y transformadores que queramos aplicar (incluido el algoritmo de aprendizaje). Una vez creados, generamos una tubería que determine en qué orden deben aplicarse dichos transformadores y estimadores, siendo el algoritmo de aprendizaje normalmente el último de ellos.

En este caso, entrenaremos de nuevo redes neuronales, aunque cambiaremos algunos de los hiperparámetros de la red. Para este ejemplo, probaremos varias arquitecturas de red así como varios límites de iteraciones para el entrenamiento de las redes. Para probar todas las combinaciones de parámetros y poder evaluar cuál de ellas obtiene mejores resultados en la tarea predictiva, necesitamos hacer una búsqueda de rejilla o *grid search*. Esta técnica de optimización de parámetros consiste simplemente en probar todas las posibles combinaciones de hiperparámetros y evaluarlos con validación cruzada. Para poder hacer una búsqueda en rejilla necesitamos crear un objeto del tipo *ParamGridBuilder*. Con este objeto creado, podemos llamar tantas veces como sea necesario a *addGrid* para configurar un hiperparámetro a probar con el identificador del hiperparámetro (los encontraréis en los propios estimadores) y una lista con los valores a probar para dicho hiperparámetro. En el ejemplo, configuramos dos hiperparámetros: *layers* y *maxIter*. Una vez hemos configurado todos los hiperparámetros debemos llamar al método *build* para crear la rejilla de búsqueda.

Crearemos también el objeto evaluador con la métrica que deseemos emplear para evaluar la calidad de los modelos. Finalmente, crearemos una instancia de la clase *CrossValidator*. A este objeto tenemos que indicarle una tubería (*setEstimator*), el evaluador a emplear (*setEvaluator*), la rejilla de búsqueda (*setEstimatorParamMaps*), el número de bloques a emplear en la validación cruzada (*setNumFolds*), y la semilla (*setSeed*) para poder repetir la misma partición en validación cruzada.

In [10]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


assembler = VectorAssembler(inputCols=["sepal_length","sepal_width", "petal_length","petal_width"], outputCol="features")
scaler = StandardScaler(withMean=True, withStd=True, inputCol="features", outputCol="std_features")
indexer = StringIndexer(inputCol="variety",outputCol="class")
network = MultilayerPerceptronClassifier(featuresCol="std_features",labelCol="class", predictionCol="prediction")

pipeline = Pipeline(stages=[assembler, scaler, indexer, network])

parameters = ParamGridBuilder().addGrid(network.layers, [ [4,4,3], [4,4,4,3], [4,3]  ] ).\
    addGrid( network.maxIter, [100, 200, 300] ).build()

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="class", metricName="accuracy")
cross_validator = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(parameters).\
    setNumFolds(5).setSeed(2021)
model = cross_validator.fit(train_df)

El siguiente fragmento de código te permitirá encontrar qué modelos funcionaron mejor en la tarea predictiva de todas las combinaciones de hiperparámetros probadas.

In [11]:
for (result, config) in zip(model.avgMetrics, parameters):
    print(result, config[network.layers], config[network.maxIter])

0.9694444444444446 [4, 4, 3] 100
0.9694444444444446 [4, 4, 3] 200
0.9694444444444446 [4, 4, 3] 300
0.9280768345985737 [4, 4, 4, 3] 100
0.936410167931907 [4, 4, 4, 3] 200
0.936410167931907 [4, 4, 4, 3] 300
0.9694444444444446 [4, 3] 100
0.9694444444444446 [4, 3] 200
0.9694444444444446 [4, 3] 300


El mejor modelo de los encontrados en el experimento se encuentra en el atributo *bestModel*.

In [12]:
best_network = model.bestModel

Ahora realiza un entrenamiento por validación cruzada para encontrar el mejor *RandomForestClassifier* y deja el validador entrenado en *model2*. Te recomendamos que juegues con al menos dos hiperparámetros del modelo, especialmente el número de árboles en el modelo. Usa exactamente el mismo particionado en bloques empleado anteriormente (usando la misma semilla).

In [13]:
from pyspark.ml.classification import RandomForestClassifier
# Tu código aquí
print(best_network)




PipelineModel_b034a349b195


Fíjate que hasta ahora NO hemos tocado el conjunto de test. Esto ha sido intencional, ya que de este modo ninguno de los mejores modelos han visto el conjunto de test y por tanto podemos comprobar la capacidad para generalizar de cada uno de los dos mejores modelos obtenidos en un conjunto nunca visto. Evalúa ahora ambos modelos sobre el conjunto de test para comprobar cuál de los dos tiene una mejor precisión sobre un conjunto independiente de datos.

In [30]:
# Tu código aquí


0.9512195121951219 0.9512195121951219
