<a href="https://colab.research.google.com/github/sanntana21/distributed-hpo-simulation/blob/main/distributed_hpo_simulation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

---
# **Big Data Analytics**

 - Álvaro Santana Sánchez

---

## Descripción del Proyecto

Este proyecto tiene como objetivo la simulación del ajuste de hiperparámetros de un modelo de Machine Learning en un entorno Big Data distribuido. Aunque el entorno será simulado en un único notebook, se emulará el comportamiento de un sistema distribuido con múltiples workers.

## Enfoque

La optimización de los hiperparámetros se realizará utilizando un algoritmo evolutivo, una técnica inspirada en la evolución biológica. Este enfoque resulta adecuado para la búsqueda eficiente en espacios de soluciones complejos y no convexos, como es habitual en problemas de Machine Learning.

## Principios del Algoritmo Evolutivo Aplicado

- Individuo: Representa una combinación específica de hiperparámetros del modelo.

- Población: Conjunto de individuos que serán evaluados en cada generación.

- Función de fitness: Se define mediante la métrica F1-score, que permite evaluar el desempeño del modelo para cada configuración de hiperparámetros.

- Mutación: Modificación aleatoria de uno o varios hiperparámetros de un individuo.

- Cruzamiento: Combinación de dos individuos para generar nuevos candidatos.

- Selección: Proceso por el cual se eligen los mejores individuos de una generación para formar parte de la siguiente.

---

## Arquitectura del Sistema Simulado
Para simular el entorno Big Data, se considerará una arquitectura compuesta por un nodo Master y múltiples nodos Worker, representando un entorno distribuido:


- Inicialización (Master): Se genera una población inicial de N individuos con combinaciones aleatorias de hiperparámetros.

- Distribución (Master → Workers):
El Master envía la población completa a cada uno de los Workers. Cada Worker tiene acceso a un subconjunto diferente del dataset (simulando datos distribuidos).

- Evaluación Local (Worker):
Cada individuo es evaluado localmente utilizando la vista del dataset asignado al Worker. La evaluación se realiza mediante el cálculo del F1-score.

- Selección Local (Worker):
Cada Worker selecciona los mejores individuos locales en función de su F1-score y envía sus resultados al Master.

- Agregación Global (Master):
El Master agrega los resultados recibidos de todos los Workers. Para cada individuo, se calcula la media de sus F1-score en todos los Workers. Luego, se ordenan los individuos según esta media para formar un ranking global.

- Reproducción (Master):
A partir del ranking global, el Master realiza operaciones de crossover y mutación sobre los mejores individuos para generar la nueva población.

- Iteración:
La nueva población se redistribuye a los Workers, y se repite el proceso desde el paso 3 durante un número determinado de generaciones.



# Setup

Inicialización del entorno pyspark

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("spark.driver.memory", "1g") \
    .config("spark.executor.memory", "1g") \
    .appName("Spark SQL Lab") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
CYAN = "\033[96m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
RESET = "\033[0m"
RED = "\033[91m"

# Carga de datos

In [None]:
from sklearn.datasets import load_breast_cancer
import pandas as pd
from pyspark.sql import SparkSession

# Cargar dataset de ejemplo
data = load_breast_cancer()
df_original = pd.DataFrame(data.data, columns=data.feature_names)
df_original["label"] = data.target

# Convertir a Spark DataFrame
df_spark = spark.createDataFrame(data=df_original)

# Ver el esquema y una muestra
df_spark.printSchema()
df_spark.show(5)

root
 |-- mean radius: double (nullable = true)
 |-- mean texture: double (nullable = true)
 |-- mean perimeter: double (nullable = true)
 |-- mean area: double (nullable = true)
 |-- mean smoothness: double (nullable = true)
 |-- mean compactness: double (nullable = true)
 |-- mean concavity: double (nullable = true)
 |-- mean concave points: double (nullable = true)
 |-- mean symmetry: double (nullable = true)
 |-- mean fractal dimension: double (nullable = true)
 |-- radius error: double (nullable = true)
 |-- texture error: double (nullable = true)
 |-- perimeter error: double (nullable = true)
 |-- area error: double (nullable = true)
 |-- smoothness error: double (nullable = true)
 |-- compactness error: double (nullable = true)
 |-- concavity error: double (nullable = true)
 |-- concave points error: double (nullable = true)
 |-- symmetry error: double (nullable = true)
 |-- fractal dimension error: double (nullable = true)
 |-- worst radius: double (nullable = true)
 |-- worst 

In [None]:
# Simular particiones para cada uno de los workers
NUM_WORKERS = 3
splits = df_spark.randomSplit([1.0/NUM_WORKERS]*NUM_WORKERS, seed=130222)

# Mostrar cuántos elementos tiene cada partición
print(f"Dataset original {(df_spark.count(), len(df_spark.columns))}")
for i, split in enumerate(splits):
    print(f"Worker {i+1} tiene {split.count()} registros.")

Dataset original (569, 31)
Worker 1 tiene 188 registros.
Worker 2 tiene 178 registros.
Worker 3 tiene 203 registros.


# Ajuste de hiperparámetros

In [None]:
import random

#Esta parametro permite limitar el espacio de busqueda una maya de parametros concreta
HIPERPARAMETROS_DISPONIBLES = {
    "n_estimators": [50, 100, 150],
    "max_depth": [3, 5, 10],
    "min_samples_split": [2, 5, 10]
}

class Individuo:
  """
  Esta clase se utiliza para definir un individuo, el cúal estará formado por un
  conjunto de hiperpámetro.

  Si no se definen unos hiperparámetros de entrada, se genera aleatoriamente el
  individuo.
  """
  def __init__(self, genes : dict =None):
      self.genes = genes or {k: random.choice(v) for k, v in HIPERPARAMETROS_DISPONIBLES.items()}
      self.fitness = None

  def mutate(self, verbose : bool = False):
    """
    Mutar uno de los hiperparámetros aleatoriamente
    """
    param_to_mutate = random.choice(list(HIPERPARAMETROS_DISPONIBLES.keys()))
    antes = self.genes[param_to_mutate]
    if verbose:
      print(f"{YELLOW} Individuo mutando atributo {param_to_mutate} {RESET}")
    self.genes[param_to_mutate] = random.choice([h for h in HIPERPARAMETROS_DISPONIBLES[param_to_mutate] if h != antes])
    if verbose:
      print(f"Antes {antes}, despues {self.genes[param_to_mutate]}")

  def _set_fitness(self, fitness : float):
    self.fitness = fitness

  def __str__(self):
    # Texto de genes
    genes_str = f"{CYAN}{self.genes}{RESET}"

    # Texto de fitness
    if self.fitness is None:
      fitness_str = f"{YELLOW}Pendiente{RESET}"
    else:
      fitness_str = f"{GREEN}{self.fitness:.4f}{RESET}"

    return f"\n\tGenes {genes_str}, \n\tFitness: {fitness_str}"


In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
import numpy as np

def calcular_fitness_individuo(data : list, individuo : Individuo = None, model = RandomForestClassifier):
    """
    Esta función calcula el fitness de un determinado individuo, para ello
    ajusta
    """
    if not individuo:
      raise ValueError("Debe seleccionar un individuo")

    if not model:
      raise ValueError("Debe seleccionar un modelo")

    #Separamos datos de etiquetas
    X = np.array(data)[:,:-1]
    y = np.array(data)[:, -1]

    # Realizamos separación de datos en train y test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=130222)

    # Crear y entrenar modelo con los hiperparámetros del individuo
    clf = model(**individuo.genes)
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)

    fitness = f1_score(y_test, y_pred)

    return fitness

# Simular evaluar un individuo con un RDD por worker
def evaluar_individuo_en_worker(individuo, df_worker):
    return df_worker.rdd.mapPartitions(
        lambda iter_rows: [calcular_fitness_individuo(list(iter_rows), individuo)]
    ).collect()[0]

### Población inicial

Se genera de manera aleatoria una población inicial

In [None]:
#Definicion de la poblacion inicial
TOTAL_INDIVIDUOS = 9
INDIVIDUOS_POR_WORKER = round(TOTAL_INDIVIDUOS / NUM_WORKERS)
POBLACION_ACTUAL = [Individuo() for _ in range(TOTAL_INDIVIDUOS)]

### Primera iteración

Se realiza una primera iteración para conjunto de workers e individuos y
se calcula un ranking de los mejores individuos de la iteración

In [None]:
for i, split in enumerate(splits):
    print(f"\n {RED} Worker {i+1} {RESET}")

    rows = split.toPandas().values.tolist()

    start = i * INDIVIDUOS_POR_WORKER
    end = min(start + INDIVIDUOS_POR_WORKER, TOTAL_INDIVIDUOS) if i < NUM_WORKERS - 1 else TOTAL_INDIVIDUOS
    individuos_asignados = POBLACION_ACTUAL[start:end]

    for idx, individuo in enumerate(POBLACION_ACTUAL[start:end]):
        f1 = calcular_fitness_individuo(rows, individuo)
        individuo._set_fitness(f1)
        print(f"Worker {i+1}: Individuo : {individuo}")


 [91m Worker 1 [0m
Worker 1: Individuo : 
	Genes [96m{'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m
Worker 1: Individuo : 
	Genes [96m{'n_estimators': 150, 'max_depth': 5, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m
Worker 1: Individuo : 
	Genes [96m{'n_estimators': 100, 'max_depth': 5, 'min_samples_split': 2}[0m, 
	Fitness: [92m0.9867[0m

 [91m Worker 2 [0m
Worker 2: Individuo : 
	Genes [96m{'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 10}[0m, 
	Fitness: [92m0.9610[0m
Worker 2: Individuo : 
	Genes [96m{'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 10}[0m, 
	Fitness: [92m0.9610[0m
Worker 2: Individuo : 
	Genes [96m{'n_estimators': 100, 'max_depth': 3, 'min_samples_split': 10}[0m, 
	Fitness: [92m0.9610[0m

 [91m Worker 3 [0m
Worker 3: Individuo : 
	Genes [96m{'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 10}[0m, 
	Fitness: [92m0.9610[0m
Worker 3: Individuo : 
	G

### Selección

Mediante la selección nos quedamos con el top de mejores individuos.

In [None]:
def obtener_mejores_individuos(individuos, top_k):
  """
  Esta función selecciona los mejores individuos de una población
  """
  return sorted(individuos, key=lambda ind: ind.fitness, reverse=True)[:top_k]


print(f"{RED}RANKING ACTUAL{RESET}")
TOP_K = 5
if TOP_K > TOTAL_INDIVIDUOS:
  raise ValueError("Disminuye el top o aumenta el numero de individuos")
POBLACION_ACTUAL = obtener_mejores_individuos(POBLACION_ACTUAL, TOP_K)
for a in POBLACION_ACTUAL:
  print(a)

[91mRANKING ACTUAL[0m

	Genes [96m{'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m

	Genes [96m{'n_estimators': 150, 'max_depth': 5, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m

	Genes [96m{'n_estimators': 100, 'max_depth': 5, 'min_samples_split': 2}[0m, 
	Fitness: [92m0.9867[0m


### Cruze

Combinamos los mejores individuos entre sí.

In [None]:
def cruzar_genes(padre1 : Individuo, padre2 : Individuo):
    """Cruce uniforme entre dos individuos de la población"""
    nuevo_genes = {}
    for k in HIPERPARAMETROS_DISPONIBLES.keys():
        nuevo_genes[k] = random.choice([padre1.genes[k], padre2.genes[k]])
    return Individuo(genes=nuevo_genes)

def generar_nueva_poblacion(padres : list[Individuo],
                            prob_mutacion : float = 0.3,
                            tam_nueva_poblacion : int = TOTAL_INDIVIDUOS,
                            verbose : bool = False
                            ):
  """
  Dada una población se realiza la creación de una nueva mediante
  """
  nueva_poblacion = []
  while len(nueva_poblacion) < tam_nueva_poblacion:
    padre1, padre2 = random.sample(padres, 2)
    if verbose:
      print("Padre 1", padre1)
      print("Padre 2", padre2)
    hijo = cruzar_genes(padre1, padre2)
    if verbose:
      print("Generado hijo",hijo)

    # Mutación aleatoria con cierta probabilidad
    if random.random() < prob_mutacion:
      hijo.mutate(verbose=verbose)

    nueva_poblacion.append(hijo)

    if verbose:
      print("-"*50)

  return nueva_poblacion

NUEVA_POBLACION = generar_nueva_poblacion(POBLACION_ACTUAL,verbose=1)

Padre 1 
	Genes [96m{'n_estimators': 150, 'max_depth': 5, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m
Padre 2 
	Genes [96m{'n_estimators': 100, 'max_depth': 5, 'min_samples_split': 2}[0m, 
	Fitness: [92m0.9867[0m
Generado hijo 
	Genes [96m{'n_estimators': 150, 'max_depth': 5, 'min_samples_split': 5}[0m, 
	Fitness: [93mPendiente[0m
--------------------------------------------------
Padre 1 
	Genes [96m{'n_estimators': 100, 'max_depth': 5, 'min_samples_split': 2}[0m, 
	Fitness: [92m0.9867[0m
Padre 2 
	Genes [96m{'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m
Generado hijo 
	Genes [96m{'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 5}[0m, 
	Fitness: [93mPendiente[0m
--------------------------------------------------
Padre 1 
	Genes [96m{'n_estimators': 100, 'max_depth': 5, 'min_samples_split': 2}[0m, 
	Fitness: [92m0.9867[0m
Padre 2 
	Genes [96m{'n_estimators': 150, 'max_depth': 5, 'min_samples_sp

### Segunda iteración

Se realiza una nueva iteración utilizando los valores creados por el cruce.

In [None]:
for i, split in enumerate(splits):
    print(f"\n {RED} Worker {i+1} {RESET}")

    start = i * INDIVIDUOS_POR_WORKER
    end = min(start + INDIVIDUOS_POR_WORKER, TOTAL_INDIVIDUOS) if i < NUM_WORKERS - 1 else TOTAL_INDIVIDUOS
    individuos_asignados = NUEVA_POBLACION[start:end]

    for idx, individuo in enumerate(NUEVA_POBLACION[start:end]):
        f1 = calcular_fitness_individuo(rows, individuo)
        individuo._set_fitness(f1)
        print(f"Worker {i+1}: Individuo : {individuo}")


 [91m Worker 1 [0m
Worker 1: Individuo : 
	Genes [96m{'n_estimators': 150, 'max_depth': 5, 'min_samples_split': 5}[0m, 
	Fitness: [92m1.0000[0m
Worker 1: Individuo : 
	Genes [96m{'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m
Worker 1: Individuo : 
	Genes [96m{'n_estimators': 150, 'max_depth': 5, 'min_samples_split': 2}[0m, 
	Fitness: [92m0.9610[0m

 [91m Worker 2 [0m
Worker 2: Individuo : 
	Genes [96m{'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9737[0m
Worker 2: Individuo : 
	Genes [96m{'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 2}[0m, 
	Fitness: [92m0.9867[0m
Worker 2: Individuo : 
	Genes [96m{'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9610[0m

 [91m Worker 3 [0m
Worker 3: Individuo : 
	Genes [96m{'n_estimators': 50, 'max_depth': 5, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m
Worker 3: Individuo : 
	Genes 

In [None]:

# Mejores individuos segunda poblacion
print(f"{RED}RANKING ACTUAL{RESET}")
NUEVA_POBLACION_RANKING = obtener_mejores_individuos(NUEVA_POBLACION, TOP_K)
for a in NUEVA_POBLACION_RANKING:
  print(a)

[91mRANKING ACTUAL[0m

	Genes [96m{'n_estimators': 150, 'max_depth': 5, 'min_samples_split': 5}[0m, 
	Fitness: [92m1.0000[0m

	Genes [96m{'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m

	Genes [96m{'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 2}[0m, 
	Fitness: [92m0.9867[0m

	Genes [96m{'n_estimators': 50, 'max_depth': 5, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9867[0m

	Genes [96m{'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 5}[0m, 
	Fitness: [92m0.9737[0m


# Stop

Se detiene el contexto de pyspark y se finaliza

In [None]:
sc.stop()