### Maestría: Computación de Alto Desempeño

##### Autor: **Jean Paul Rodríguez**
##### Fecha: **18 de noviembre 2025**
##### Tema: **Técnias de Machine Learning Supervisado con el ecosistema Apache Spark**

# Procesamiento de Datos a Gran Escala

<p><strong>Objetivo: </strong> El objetivo de este cuaderno es crear un flujo sencillo de machine learning</p>

En este notebook se recorre paso a paso el flujo típico de un proyecto de Machine Learning supervisado usando Spark:

1. Crear una sesión de Spark.
2. Cargar el dataset.
3. Hacer transformaciones, incluyendo vectorización de características y creación de interacciones.
4. Entrenar un modelo de Regresión Logística Multinomial.
5. Evaluarlo.
6. Construir una Pipeline y usar Train/Validation Split con búsqueda de hiperparámetros.

## Cargar de datos en un Dataframe

Para este ejercicio se estará utilizando el conjunto de datos Iris, la cual es una fuente en línea en formato CSV (valores separados por coma).

<p> Este set de datos posee diferentes medidas sobre la planta Iris y es famosamente utilizado como ejemplo en analítica de datos:
  </p>
Se utiliza este conjunto para ejemplificar la creación de clusters:

<ul>
  <li>descripción: <a href="https://archive.ics.uci.edu/ml/datasets/Iris" target="_blank">https://archive.ics.uci.edu/ml/datasets/Iris</a></li>
  <li>fuente de datos: <a href="https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data" target="_blank">https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data</a></li>
    <li>tipo de datos: csv</li>

En este ejemplo trabajaremos con el famoso dataset **Iris**, ampliamente utilizado en Machine
Learning. Este dataset incluye:

- 4 características numéricas:
  - `sepal_length`
  - `sepal_width`
  - `petal_length`
  - `petal_width`
- Una columna `target` que representa la especie de la flor codificada como:
  - 0 = setosa  
  - 1 = versicolor  
  - 2 = virginica  

Cargaremos este dataset desde un archivo Parquet directamente en un DataFrame Spark,
lo cual permite manejarlo de manera distribuida y aplicar transformaciones posteriores.

In [1]:
import findspark

findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

## Creando la sesión de Spark

Aquí configuramos Spark para que corra en modo FAIR scheduling y definimos cuántos cores
y cuánta memoria queremos usar 

Finalmente construimos la SparkSession que será la base de todo nuestro trabajo en MLlib

In [2]:
# Crear sesión 

config = (
    SparkConf()
        .set("spark.scheduler.mode", "FAIR")
        .set("spark.executor.cores", "1")
        .set("spark.executor.memory", "4g")
        .set("spark.cores.max", "4")
        #.setMaster("spark://10.43.100.119:8080")
        .setMaster("spark://10.43.100.119:7077")
    )
config.setAppName("hpcspark_jean")
spark = SparkSession.builder.config(conf=config).getOrCreate()

SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
contextoSpark = spark.sparkContext.getOrCreate()

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/17 19:06:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Cargando el dataset Iris

Leemos el archivo `iris.parquet` que contiene las 4 características numéricas del dataset Iris
junto con la columna `target`, que representa la especie en formato numérico (0, 1 o 2).

Luego mostramos las primeras filas para verificar que todo se haya cargado correctamente

In [3]:
import pandas as pd
from sklearn.datasets import load_iris

# cargar iris con pandas
iris = load_iris()
pdf = pd.DataFrame(iris.data, columns=iris.feature_names)
pdf["target"] = iris.target

# convertir a Spark
df = spark.createDataFrame(pdf)

# guardar como parquet usando Spark (NO necesita pyarrow)
df.write.mode("overwrite").parquet("iris.parquet")

                                                                                

In [4]:
df = spark.read.parquet("iris.parquet")
df.show(5)

                                                                                

+-----------------+----------------+-----------------+----------------+------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|
+-----------------+----------------+-----------------+----------------+------+
|              4.9|             3.6|              1.4|             0.1|     0|
|              4.4|             3.0|              1.3|             0.2|     0|
|              5.1|             3.4|              1.5|             0.2|     0|
|              5.0|             3.5|              1.3|             0.3|     0|
|              4.5|             2.3|              1.3|             0.3|     0|
+-----------------+----------------+-----------------+----------------+------+
only showing top 5 rows



## Renombrando las columnas

Las columnas originales tienen espacios y paréntesis, lo cual es molesto cuando usamos
RFormula o escribimos transformaciones.

Así que renombramos cada columna a un formato más amigable en Spark: `sepal_length`,
`sepal_width`, `petal_length`, `petal_width`.

In [5]:
df = df \
    .withColumnRenamed("sepal length (cm)", "sepal_length") \
    .withColumnRenamed("sepal width (cm)", "sepal_width") \
    .withColumnRenamed("petal length (cm)", "petal_length") \
    .withColumnRenamed("petal width (cm)", "petal_width")
df.show(5)

+------------+-----------+------------+-----------+------+
|sepal_length|sepal_width|petal_length|petal_width|target|
+------------+-----------+------------+-----------+------+
|         4.9|        3.6|         1.4|        0.1|     0|
|         4.4|        3.0|         1.3|        0.2|     0|
|         5.1|        3.4|         1.5|        0.2|     0|
|         5.0|        3.5|         1.3|        0.3|     0|
|         4.5|        2.3|         1.3|        0.3|     0|
+------------+-----------+------------+-----------+------+
only showing top 5 rows



## Transformaciones

El conjunto de datos actual no cumple con el requisito de estar en formato de Vector y, por lo tanto, debemos transformarlo al formato adecuado.

Para lograr esto en nuestro ejemplo, vamos a especificar una RFormula. Este es un lenguaje declarativo para especificar transformaciones de aprendizaje automático y es fácil de usar una vez que comprende la sintaxis.

Los operadores básicos de RFormula son:
<p>
<p>"~" Destino y términos separados</p>
<p>"+" Términos de Concat; "+ 0" significa eliminar la intersección (esto significa que la intersección y de la línea que ajustaremos será 0)</p>
<p>"-" Eliminar un término; "- 1" significa eliminar la intersección (esto significa que la intersección y de la línea que vamos a ajustar será 0; sí, esto hace lo mismo que "+ 0"</p>
<p>":" Interacción (multiplicación de valores numéricos o valores categóricos binarizados)</p>
<p>"." Todas las columnas excepto la variable objetivo / dependiente</p>
</p>

Para especificar transformaciones con esta sintaxis, necesitamos importar la clase RFormula. 


Para entrenar modelos en Spark MLlib, las variables deben estar unificadas dentro de una sola
columna llamada `features`, que contiene un vector con las características numéricas.

En el caso del dataset **Iris**, todas las columnas predictoras son numéricas, así que no es necesario
convertir categorías ni aplicar codificaciones especiales.

Usaremos `RFormula`, que facilita:
- Seleccionar el target (`target`)
- Seleccionar todas las características (`.`)
- Crear nuevas características basadas en interacciones entre variables numéricas

En este notebook agregamos una interacción entre `sepal_length` y `petal_width`, creando
una característica sintética que puede capturar relaciones más complejas entre dimensiones de
la flor.

In [6]:
from pyspark.ml.feature import RFormula
supervised = RFormula(
    formula="target ~ . + sepal_length:petal_width"
)

El siguiente paso es ajustar el transformador RFormula a los datos para que descubra los posibles valores de cada columna.

No todos los transformadores tienen este requisito, pero debido a que RFormula manejará automáticamente las variables categóricas por nosotros, necesita determinar qué columnas son categóricas y cuáles no, así como cuáles son los valores distintos de las columnas categóricas.

Por esta razón, tenemos que llamar al método fit. Una vez que llamamos a fit, devuelve una versión "entrenada" de nuestro transformador que luego podemos usar para transformar nuestros datos.

Luego llamamos a transform en ese objeto para transformar nuestros datos de entrada en los datos de salida esperados.

In [7]:
fittedRF = supervised.fit(df) # Ajusta
preparedDF = fittedRF.transform(df) # Transforma
preparedDF.show()

[Stage 4:>                                                          (0 + 1) / 1]

+------------+-----------+------------+-----------+------+--------------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|target|            features|label|
+------------+-----------+------------+-----------+------+--------------------+-----+
|         4.9|        3.6|         1.4|        0.1|     0|[4.9,3.6,1.4,0.1,...|  0.0|
|         4.4|        3.0|         1.3|        0.2|     0|[4.4,3.0,1.3,0.2,...|  0.0|
|         5.1|        3.4|         1.5|        0.2|     0|[5.1,3.4,1.5,0.2,...|  0.0|
|         5.0|        3.5|         1.3|        0.3|     0|[5.0,3.5,1.3,0.3,...|  0.0|
|         4.5|        2.3|         1.3|        0.3|     0|[4.5,2.3,1.3,0.3,...|  0.0|
|         4.4|        3.2|         1.3|        0.2|     0|[4.4,3.2,1.3,0.2,...|  0.0|
|         5.0|        3.5|         1.6|        0.6|     0|[5.0,3.5,1.6,0.6,...|  0.0|
|         5.1|        3.8|         1.9|        0.4|     0|[5.1,3.8,1.9,0.4,...|  0.0|
|         4.8|        3.0|         1.4|        0.3|   

                                                                                

En la salida podemos ver el resultado final: Spark creó las columnas `features` y `label` a partir de
la fórmula definida.

Dado que Iris solo contiene valores numéricos, `RFormula` simplemente:
- Agrupa las columnas numéricas en un único vector (`features`)
- Garantiza que `target` quede en formato numérico apto para modelado (`label`)
- Aplica la interacción especificada entre `sepal_length` y `petal_width`

Este proceso deja los datos listos para entrenar un modelo de clasificación multinomial.


Creemos ahora un conjunto de prueba simple basado en una división aleatoria de los datos:

## División Train/Test

Dividimos los datos usando una separación de 70/30 para poder entrenar el modelo y evaluar
su rendimiento en datos que no ha visto antes.


In [8]:
train, test = preparedDF.randomSplit([0.7, 0.3])

## Estimators

El modelo que utilizaremos es la **Regresión Logística Multinomial**, una extensión natural de la
regresión logística clásica para problemas donde existen más de dos clases.  
En el dataset Iris tenemos **tres especies distintas**, por lo que Spark automáticamente activa
el modo multinomial.

Creamos una instancia de `LogisticRegression`, indicando qué columna contiene las etiquetas
(`target`) y qué columna contiene el vector de características (`features`).  
Luego mostramos los parámetros disponibles para entender qué opciones se pueden ajustar.


In [9]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="target",featuresCol="features")

Antes de comenzar a entrenar este modelo, inspeccionemos los parámetros.

Este método muestra una explicación de todos los parámetros para la implementación de Spark de la regresión logística.

El método "explainParams" existe en todos los algoritmos disponibles en MLlib.

In [10]:
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: target)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. Th

## Entrenando el modelo

Al crear una instancia de un algoritmo no entrenado, llega el momento de ajustarlo a los datos (entrenarlo). En este caso, esto devuelve un LogisticRegressionModel.

Este código iniciará un trabajo de Spark para entrenar el modelo. A diferencia de las transformaciones, el ajuste de un modelo de aprendizaje automático es ansioso y se realiza de inmediato.

Entrenamos el modelo con los datos de entrenamiento.  

In [11]:
fittedLR = lr.fit(train)

                                                                                

## Predicciones sobre el conjunto de entrenamiento

Utilizamos el modelo entrenado para generar predicciones sobre los datos de `train` y
visualizamos las columnas `target` y `prediction`.  
Esto nos permite ver si el modelo está clasificando correctamente cada punto.

Una vez completado, puede usar el modelo para hacer predicciones. Lógicamente, esto significa transformar características en etiquetas.

Hacemos predicciones con el método transform. Por ejemplo, podemos transformar nuestro conjunto de datos de entrenamiento para ver qué etiquetas asignó nuestro modelo a los datos de entrenamiento y cómo se comparan con los resultados reales.

Realicemos esa predicción con el siguiente fragmento de código:

In [12]:
fittedLR.transform(train).select("target", "prediction").show(50)

+------+----------+
|target|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     1|       1.0|
|     0|       0.0|
|     1|       1.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     1|       1.0|
|     0|       0.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     2|       2.0|
|     2|       2.0|
|     1|       1.0|
|     2|       2.0|
|     2|       2.0|


Al transformar el conjunto de entrenamiento podemos ver la predicción que el modelo asigna a
cada fila. En este caso, las clases posibles son 0, 1 y 2, correspondientes a las tres especies de
Iris.  
Esto nos permite identificar rápidamente si el modelo está clasificando correctamente los
ejemplos dentro de cada clase.

Nuestro siguiente paso sería evaluar manualmente este modelo y calcular métricas de rendimiento como la tasa de verdaderos positivos, la tasa de falsos negativos, etc.

## Pipelines

Cuando un flujo de Machine Learning incluye varios pasos encadenados (transformación de
datos, vectorización, modelado, etc.), puede volverse difícil mantener el seguimiento de cada
DataFrame intermedio.

Para evitar escribir cada paso de forma manual, usamos `Pipeline`.  

Spark ML proporciona `Pipeline` para agrupar todas las etapas en un solo objeto.  
En nuestro caso, la Pipeline tendrá dos stages:

1. `RFormula` → crea el vector de características y la interacción
2. `LogisticRegression` → entrena el modelo multinomial

Al ejecutar la Pipeline, Spark aplica todas las stages en secuencia y devuelve un modelo final
ya entrenado.


In [13]:
train, test = df.randomSplit([0.7, 0.3])

Ahora que tiene un conjunto de entrenamiento y prueba, creemos las stages base en nuestra Pipeline.

Una stage simplemente representa un transformador o un estimador. En nuestro caso, tendremos dos estimadores. La RFomula y el LogisticRegresión:

In [14]:
rForm = RFormula()
lr = LogisticRegression().setLabelCol("target").setFeaturesCol("features")

Ahora, en lugar de usar manualmente nuestras transformaciones y luego ajustar nuestro modelo, simplemente las hacemos stages en la Pipeline general, como en el siguiente fragmento de código:

In [15]:
from pyspark.ml import Pipeline
stages = [rForm, lr]
pipeline = Pipeline().setStages(stages)

## Entrenamiento y Evaluación

Ahora que organizó la Pipeline, el siguiente paso es el Entrenamiento.

En este caso, no entrenaremos solo un modelo. Entrenaremos varias variaciones del modelo especificando diferentes combinaciones de hiperparámetros que nos gustaría que Spark probara.

Luego, seleccionaremos el mejor modelo usando un evaluador que compara sus predicciones con nuestros datos de validación.

Podemos probar diferentes hiperparámetros en toda la Pipeline, incluso en la fórmula de RF que usamos para manipular los datos sin procesar.

En esta sección definimos una grilla de hiperparámetros para explorar diferentes configuraciones
del modelo y de la fórmula.  
Incluimos:

- Dos variantes de la fórmula (una con una interacción extra)
- Tres valores para `elasticNetParam`, que controla la mezcla entre L1 y L2
- Dos valores para `regParam`, que controla la regularización

Esto genera 12 combinaciones diferentes que serán probadas durante la validación.

In [16]:
from pyspark.ml.tuning import ParamGridBuilder
params = ParamGridBuilder()\
.addGrid(rForm.formula, [
"target ~ . + sepal_length:petal_width",
"target ~ . + sepal_length:petal_width + sepal_length:petal_length"])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
.addGrid(lr.regParam, [0.1, 2.0])\
.build()

## Evaluación del modelo

Ahora que la cuadrícula está construida, es hora de especificar nuestro proceso de evaluación. El evaluador nos permite comparar de forma automática y objetiva varios modelos con la misma métrica de evaluación.

Para comparar el rendimiento de cada combinación de hiperparámetros, utilizamos
`MulticlassClassificationEvaluator` con la métrica `f1` dado que tenemos un problema de clasificación multiclase

In [17]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="target",
    predictionCol="prediction",
    metricName="f1"  
)

Ahora que tenemos una canalización que especifica cómo se deben transformar nuestros datos, realizaremos la selección del modelo para probar diferentes hiperparámetros en nuestro modelo de regresión logística y medir el éxito comparando su desempeño usando la métrica areaUnderROC.

## Entrenamiento y Evaluación

Usamos `TrainValidationSplit` para entrenar varias variantes del modelo y seleccionar aquella que
obtiene el mejor desempeño según la métrica especificada.  

- Entrena cada modelo con 75% del train
- Valida con 25% del train
- Elige el mejor

Finalmente evaluamos el modelo ganador en el conjunto de prueba (`test`), para ver cómo se
comporta con datos completamente nuevos.

In [18]:
from pyspark.ml.tuning import TrainValidationSplit
tvs = TrainValidationSplit()\
.setTrainRatio(0.75)\
.setEstimatorParamMaps(params)\
.setEstimator(pipeline)\
.setEvaluator(evaluator)

Ejecutemos toda la Pipeline que construimos. Para revisar, la ejecución de esta canalización probará todas las versiones del modelo con el conjunto de validación.

In [19]:
tvsFitted = tvs.fit(train)

Tambien se evalua cómo funciona el algoritmo con el conjunto de prueba:

In [20]:
evaluator.evaluate(tvsFitted.transform(test))

0.9466208133971292

# Resumen y Conclusiones

En este notebook se recorre el proceso completo de Machine Learning Supervisado en Spark MLlib usando el
dataset Iris:

- Cargamos datos y preprocesamos las columnas.
- Creamos transformaciones usando RFormula, incluyendo una interacción entre variables.
- Entrenamos un modelo de Regresión Logística Multinomial.
- Evaluamos sus predicciones.
- Construimos una Pipeline completa.
- Ejecutamos una búsqueda de hiperparámetros con TrainValidationSplit.
- Obtuvimos el mejor modelo y lo evaluamos con métricas estándar.

Este flujo es la base de cómo Spark maneja ML a escala y permite extenderlo fácilmente a
datasets mucho más grandes