## **Clustering usando SparkML**


<p style='color: red'>El propósito de este cuaderno es mostrarte cómo usar SparkML para agrupar datos.


#### **Conjuntos de datos**

En este cuaderno utilizaremos los siguientes conjuntos de datos:

 - Versión modificada del conjunto de datos de clientes mayoristas. Conjunto de datos original disponible en https://archive.ics.uci.edu/ml/datasets/Wholesale+customers 
 - Conjunto de datos de semillas. Disponible en https://archive.ics.uci.edu/ml/datasets/seeds


#### **Configuración**


Para este cuaderno, utilizaremos las siguientes librerías:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) para conectarse al clúster de Spark.


#### **Instalación de librerías requeridas**

Necesitaremos las librerías como pyspark y findspark para trabajar este cuaderno.


Las siguientes librerías requeridas __no__ están preinstaladas. Necesitare,ps ejecutar la siguiente celda__ para instalarlas:


In [None]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

#### **Importando librerías requeridas**

_Recomendamos importar todas las bibliotecas requeridas en un solo lugar (aquí):_


In [None]:
# También puedes usar esta sección para suprimir advertencias generadas por tu código:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifica el proceso de uso de Apache Spark con Python

import findspark
findspark.init()

# Importar funciones y clases para SparkML

from pyspark.ml.clustering import KMeans, BisectingKMeans, GaussianMixture
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.ml.linalg import Vectors

# Crea una sesión de Spark
from pyspark.sql import SparkSession, Row
import pyspark


#### **Ejemplos**


#### Tarea 1 - Crear una sesión de Spark


In [None]:
# Crea una sesión de Spark
# Ignora cualquier advertencia generada por el comando SparkSession

spark = SparkSession.builder.appName("Clustering usando SparkML").getOrCreate()

#### Tarea 2 - Cargar los datos de un archivo csv en un dataframe


Descarga el archivo de datos


In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/customers.csv


Carga el conjunto de datos en el dataframe de Spark


In [None]:
# Usando la función spark.read.csv, cargamos los datos en un DataFrame.
# El parámetro header=True indica que la primera fila del archivo CSV contiene los nombres de las columnas.
# El parámetro inferSchema=True le indica a Spark que detecte automáticamente los tipos de datos de las columnas.

# Cargar el conjunto de datos de clientes
customer_data = spark.read.csv("customers.csv", header=True, inferSchema=True)


Imprime el esquema del conjunto de datos


In [None]:
# Cada fila en este conjunto de datos representa un cliente. Las columnas indican los pedidos realizados por un cliente de:
# Alimentos frescos (Fresh_food), Leche (Milk), Abarrotes (Grocery) y Comida congelada (Frozen_Food).


In [None]:
customer_data.printSchema()

Muestra las primeras 5 filas del conjunto de datos


In [None]:
customer_data.show(n=5, truncate=False)

#### Tarea 3 - Crear un vector de características


In [None]:
# Ensambla las características en una única columna de vector
feature_cols = ['Fresh_Food', 'Milk', 'Grocery', 'Frozen_Food']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transforma los datos del cliente para incluir la nueva columna de características
customer_transformed_data = assembler.transform(customer_data)

Debes indicarle al algoritmo KMeans cuántos clusters crear a partir de tus datos.


In [None]:
number_of_clusters = 3

#### Tarea 4 - Crear un modelo de clustering


Crear un modelo de clustering KMeans


In [None]:
kmeans = KMeans(k = number_of_clusters)


Entrena/ajusta el modelo en el conjunto de datos<br>


In [None]:
modelo = kmeans.fit(customer_transformed_data)


#### Tarea 5 - Imprimir detalles de los clusters


Tu modelo ya está entrenado. Es hora de evaluar el modelo.


In [None]:
# Realiza predicciones en el conjunto de datos
predictions = modelo.transform(customer_transformed_data)


In [None]:
# Muestra el resultado
predictions.show(5)


Muestra cuántos clientes hay en cada cluster.


In [None]:
predictions.groupBy('prediction').count().show()


### **Paradigma Map-Reduce básico para clustering**

In [None]:
sc = spark.sparkContext

import random
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row

# Parámetros generales
k = number_of_clusters         # número de clusters deseados
max_iters = 20                 # máximo de iteraciones
tol = 1e-6                     # tolerancia para convergencia

# 1) RDD de puntos: cada elemento es un vector
points_rdd = customer_transformed_data.rdd.map(lambda row: row.features)

# Funciones auxiliares
def inicializar_centroides(rdd, k, seed=42):
    """
    Inicializa k centroides escogiendo k muestras aleatorias del RDD de vectores.
    """
    sample = rdd.takeSample(False, k, seed)
    return [Vectors.dense(vec.toArray()) for vec in sample]

def convergieron(old_centroids, new_centroids, tol=1e-6):
    """
    Comprueba si los centroides han dejado de moverse más allá de la tolerancia.
    """
    distances = [float(old_centroids[i].squared_distance(new_centroids[i]))
                 for i in range(len(old_centroids))]
    return all(d <= tol for d in distances)

def map_assign(point, centroids):
    """
    Map: asigna un punto al centroide más cercano.
    Devuelve (cluster_id, (vector, 1))
    """
    idx = min(range(len(centroids)),
              key=lambda i: float(point.squared_distance(centroids[i])))
    return (idx, (point, 1))

def reduce_avg(a, b):
    """
    Reduce: suma vectores y cuentas.
    a, b = (suma_vectores, cuenta)
    """
    sum_a, cnt_a = a
    sum_b, cnt_b = b
    return (sum_a + sum_b, cnt_a + cnt_b)

# 2) Inicializa centroides
centroids = inicializar_centroides(points_rdd, k, seed=42)

# 3) Bucle Map–Reduce
for i in range(max_iters):
    bc_centroids = sc.broadcast(centroids)

    # Map
    assigned = points_rdd.map(lambda p: map_assign(p, bc_centroids.value))

    # Reduce
    merged = assigned.reduceByKey(reduce_avg)

    # Calcula nuevos centroides
    new_centroids = (
        merged
        .mapValues(lambda x: x[0] / x[1])
        .sortByKey()
        .map(lambda x: x[1])
        .collect()
    )

    # Comprueba convergencia
    if convergieron(centroids, new_centroids, tol):
        print(f"Convergió en la iteración {i}")
        centroids = new_centroids
        break

    centroids = new_centroids
else:
    print("No convergió tras el número máximo de iteraciones.")


#### **Map–Reduce K-Means en PySpark**

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row

# Asumimos centroids, map_assign y merge_counts ya definidos 

# 1) RDD de vectores
points_rdd = customer_transformed_data.rdd.map(lambda row: row.features)

# 2) Inicialización (ya en bloque 1)
centroids = inicializar_centroides(points_rdd, k, seed=42)

# 3) Iteraciones de K-Means
for iteration in range(max_iters):
    bc_centroids = sc.broadcast(centroids)

    # a) Asigna cada punto a su cluster
    clusters = points_rdd.map(lambda p: map_assign(p, bc_centroids.value))

    # b) Reduce para sumar vectores y contar
    stats = clusters.reduceByKey(reduce_avg)

    # c) Calcula nuevos centroides
    new_centroids = (
        stats
        .mapValues(lambda x: x[0] / x[1])
        .sortByKey()
        .map(lambda x: x[1])
        .collect()
    )

    # d) Verifica convergencia
    if convergieron(centroids, new_centroids, tol):
        print(f"K-Means convergió en iteración {iteration}")
        centroids = new_centroids
        break

    centroids = new_centroids
else:
    print("K-Means alcanzó el máximo de iteraciones sin converger.")

# 4) Crea DataFrame de predicciones
predictions_rdd = customer_transformed_data.rdd.map(
    lambda row: Row(**row.asDict(),
                    prediction=min(
                        range(len(centroids)),
                        key=lambda idx: float(row.features.squared_distance(centroids[idx]))
                    ))
)
predictions_df = spark.createDataFrame(predictions_rdd)
predictions_df.show(5)


### **Canopy Clustering (preprocesamiento para K-Means)**

La idea de **Canopy Clustering** es agrupar rápidamente los puntos en "canopies" usando dos umbrales $T_1 > T_2$. Estos canopies luego se usan para inicializar centroides de K-Means de forma más efectiva.

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
import math

# Umbrales (ajustar según los datos)
T1 = 5.0
T2 = 3.0

def canopy_step(points_rdd, T1, T2):
    """
    Ejecuta una pasada de canopy clustering sobre el RDD de vectores.
    Devuelve lista de canopies: [(centroid, [puntos_asignados]), ...]
    """
    unassigned = points_rdd.collect()  # para ilustración; en datos muy grandes harías sampling
    canopies = []

    while unassigned:
        # Elige un punto aleatorio como centro de canopy
        c = unassigned.pop()
        canopy = [c]
        to_remove = []

        for p in unassigned:
            dist = float(p.squared_distance(c))**0.5
            if dist < T1:
                canopy.append(p)
            if dist < T2:
                to_remove.append(p)

        # Elimina de unassigned los puntos con dist < T2
        unassigned = [p for p in unassigned if p not in to_remove]
        canopies.append((c, canopy))

    return canopies

# 1) RDD de puntos
points_rdd = customer_transformed_data.rdd.map(lambda r: r.features)

# 2) Canopy clustering
canopies = canopy_step(points_rdd, T1, T2)

# 3) Inicializar centroides de K-Means tomando uno por canopy
initial_centroids = [canopy[0] for canopy in canopies[:k]]


#### **MapReduce K-Means con canopy**

Aprovechando los centroides iniciales de los canopies, lanzamos el bucle MapReduce de K-Means distribuido:

In [None]:
from pyspark import SparkContext
from pyspark.ml.linalg import Vectors

sc = spark.sparkContext

# Funciones map_assign, reduce_avg y convergieron definidas como antes...

# 1) Broadcast de centroides iniciales
centroids = initial_centroids

for iteration in range(max_iters):
    bc = sc.broadcast(centroids)

    # Map: asigna cada punto al centroide más cercano
    assignments = points_rdd.map(lambda p: map_assign(p, bc.value))

    # Reduce: suma vectores y cuentas
    stats = assignments.reduceByKey(reduce_avg)

    # Computa nuevos centroides
    new_centroids = (
        stats
        .mapValues(lambda x: x[0] / x[1])
        .sortByKey()
        .map(lambda x: x[1])
        .collect()
    )

    if convergieron(centroids, new_centroids, tol):
        print(f"Convergió en iteración {iteration}")
        centroids = new_centroids
        break

    centroids = new_centroids
else:
    print("No convergió tras el máximo de iteraciones.")


#### **K-Means con DataFrame y `mapPartitions` (paralelismo más fino)**

En lugar de enviar cada punto individualmente al executor, podemos procesarlos por partición para reducir overhead:

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# 1) UDF para asignar un punto a su cluster usando broadcast
@udf(IntegerType())
def predict_cluster(features):
    return int(min(
        range(len(bc_cent.value)),
        key=lambda i: float(features.squared_distance(bc_cent.value[i]))
    ))

# 2) Bucle MapPartitions con assign_partition redefinido para captar bc_cent
for iteration in range(max_iters):
    bc_cent = sc.broadcast(centroids)

    # Redefinimos aquí la función para que capture bc_cent en su closure
    def assign_partition(iterator):
        cents = bc_cent.value
        for v in iterator:
            idx = min(range(len(cents)),
                      key=lambda i: float(v.squared_distance(cents[i])))
            yield (idx, (v, 1))

    # mapPartitions recibe un iterador de vectores; usamos la versión local de assign_partition
    mapped = points_rdd.mapPartitions(assign_partition)
    stats = mapped.reduceByKey(reduce_avg)

    new_centroids = (
        stats
        .mapValues(lambda x: x[0] / x[1])
        .sortByKey()
        .map(lambda x: x[1])
        .collect()
    )

    if iteration > 0 and convergieron(centroids, new_centroids, tol):
        print(f"K-Means (mapPartitions) convergió en iteración {iteration}")
        centroids = new_centroids
        break

    centroids = new_centroids


In [None]:
#Se detiene la sesión de spark
spark.stop()


#### **Ejercicios**


**Ejercicio 1 - Crear una sesión de Spark**


Crea una sesión de Spark con el nombre de la aplicación "Seed Clustering"


In [None]:
spark = #TODO

<details>
    <summary>Haz clic aquí para una pista</summary>
    
Utiliza SparkSession.builder

</details>


<details>
    <summary>Haz clic aquí para ver la solución</summary>
    
```python
spark = SparkSession.builder.appName("Seed Clustering").getOrCreate()
```
    
</details>


**Ejercicio 2 - Cargar los datos de un archivo csv en un dataframe**


In [None]:
#download seed dataset
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/seeds.csv


Carga el conjunto de datos de semillas


In [None]:

seed_data =  #TODO


<details>
    <summary>Haz clic aquí para una pista</summary>
    
Utiliza spark.read.csv

</details>


<details>
    <summary>Haz clic aquí para ver la solución</summary>
    
```python
seed_data = spark.read.csv("seeds.csv", header=True, inferSchema=True)
```
    
</details>


Imprime el esquema del conjunto de datos


In [None]:
seed_data.printSchema()

Muestra las primeras 5 filas del conjunto de datos


In [None]:
seed_data.show(n=5, truncate=False, vertical=True)

#### **Ejercicio 3 - Crear un vector de características**


Ensambla todas las columnas en un solo vector


In [None]:
feature_cols =  #TODO
assembler =  #TODO
seed_transformed_data =  #TODO


<details>
    <summary>Haz clic aquí para una pista</summary>
    
Consulta la tarea 3
</details>


<details>
    <summary>Haz clic aquí para ver la solución</summary>
    
```python
feature_cols = ['area',
 'perimeter',
 'compactness',
 'length of kernel',
 'width of kernel',
 'asymmetry coefficient',
 'length of kernel groove']

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
seed_transformed_data = assembler.transform(seed_data)

```
    
</details>


#### **Ejercicio 4 - Crear un modelo de clustering**


Crea 7 clusters


In [None]:
number_of_clusters =  #TODO
kmeans =  #TODO
model =  #TODO

<details>
    <summary>Haz clic aquí para una pista</summary>
    
utiliza el método kmeans.fit()
</details>


<details>
    <summary>Haz clic aquí para ver la solución</summary>
    
```python
number_of_clusters = 3
kmeans = KMeans(k = number_of_clusters)
model = kmeans.fit(seed_transformed_data)

```
    
</details>


#### **Ejercicio 5 - Imprimir detalles de los clusters**


In [None]:
predictions =  #TODO

<details>
    <summary>Haz clic aquí para una pista</summary>
    
utiliza el método transform()
</details>


<details>
    <summary>Haz clic aquí para ver la solución</summary>
    
```python
predictions = model.transform(seed_transformed_data)
```
    
</details>


In [None]:
predictions.show(n=5, truncate=False, vertical=True)

In [None]:
predictions.groupBy('prediction').count().show()

In [None]:
#stop spark session
spark.stop()

Te animamos a crear diferentes números de clusters utilizando el mismo conjunto de datos.


### **Ejercicios adicionales**

1. **Diferencias de paradigma**

   * Explica las principales diferencias entre un algoritmo de clustering paralelo (que aprovecha múltiples núcleos/procesos) y uno distribuido (que reparte datos y computación en varios nodos). ¿Qué ventajas y desafíos presenta cada enfoque?

2. **Arquitectura de Spark**

   * Describe cómo Spark gestiona RDDs y DataFrames en un clúster. ¿Qué papel juegan el *driver* y los *executors*, y cómo influye esto en la ejecución de un algoritmo de clustering?

3. **Inicialización de centroides**

   * En el enfoque Map-Reduce manual, ¿qué implicaciones tiene la estrategia de muestreo para inicializar centroides en datos muy grandes? Propón al menos dos alternativas y discute sus pros y contras.

4. **Balanceo de carga**

   * ¿Cómo podrías evaluar y mejorar el balanceo de carga entre particiones al ejecutar el bucle Map-Reduce de K-Means? Describe métricas y técnicas para detectar y corregir particiones desiguales.

5. **Criterio de convergencia**

   * El script usa tolerancia en la distancia cuadrática. Plantea un criterio de convergencia alternativo (por ejemplo, cambios en inercia global) y discute su viabilidad en un entorno distribuido.


6. **Selección de umbrales**

   * ¿Cómo influye la elección de los umbrales $T_1$ y $T_2$ en la formación de canopies? Diseña un protocolo experimental para ajustarlos automáticamente en un dataset desconocido.

7. **Eficiencia y escalabilidad**

   * El método `canopy_step` recoge todos los puntos en el driver. Propón un esquema que permita ejecutar canopy clustering de forma distribuida (sin `collect()`) y describe cómo sincronizar los centros de canopy.

8. **Impacto en la calidad final**

   * Diseña un experimento para comparar K-Means con inicialización aleatoria vs. K-Means con inicialización por canopies. ¿Qué métricas usarías y cómo interpretarías los resultados?


9. **MapPartitions para reducir overhead**

   * Explica cómo el uso de `mapPartitions` mejora la eficiencia con respecto a una asignación punto a punto. ¿En qué escenarios podría no ser beneficioso?

10. **Streaming K-Means**

    * El script importa `StreamingKMeans` pero no lo implementa. Describe cómo organizarías un pipeline de streaming (p. ej., datos de sensores) para actualizar los centroides en tiempo real. ¿Qué parámetros del modelo son críticos?

11. **Clustering jerárquico y Mezcla Gaussiana**

    * Además de K-Means, el script menciona `BisectingKMeans` y `GaussianMixture`. Para cada uno, plantea un caso de uso donde sea preferible sobre K-Means, y qué retos de paralelización o distribución conlleva.

12. **Evaluación en Spark**

    * Describe cómo calcularías métricas de calidad de clusters (Silhouette, Davies–Bouldin, etc.) en Spark de forma eficiente en un clúster, minimizando movimientos de datos.

13. **Tolerancia a fallos**

    * Imagina que un executor falla durante la fase de Map-Reduce. ¿Cómo maneja Spark esta situación y qué consideraciones de diseño debes tener para asegurar la integridad de tu clustering iterativo?

14. **Benchmarking de rendimiento**

    * Diseña un plan de benchmarking para comparar el rendimiento de las distintas implementaciones (SparkML vs. Map-Reduce manual vs. mapPartitions). Incluye tamaño de datos, número de particiones, recursos del clúster y métricas de tiempo/uso de CPU/memoria.



In [None]:
## Tus respuestas