In [1]:
import awswrangler as wr

import mlflow

# Para que funciones, todos nuestros scripts debemos exportar las siguientes variables de entorno
%env AWS_ACCESS_KEY_ID=minio   
%env AWS_SECRET_ACCESS_KEY=minio123 
%env MLFLOW_S3_ENDPOINT_URL=http://localhost:9008
%env AWS_ENDPOINT_URL_S3=http://localhost:9008
#%env MLFLOW_S3_ENDPOINT_URL=http://192.168.0.21:9000
#%env AWS_ENDPOINT_URL_S3=http://192.168.0.21:9000

env: AWS_ACCESS_KEY_ID=minio
env: AWS_SECRET_ACCESS_KEY=minio123
env: MLFLOW_S3_ENDPOINT_URL=http://localhost:9008
env: AWS_ENDPOINT_URL_S3=http://localhost:9008


# Búsqueda de mejor modelo e hiperparámetros

Dado nuestro dataset de Heart Disease, el cual ya pasó por el proceso de ETL y se encuentra en nuestro S3 bucket, vamos a realizar una búsqueda de cual seria el mejor modelo y que hiperparametros usar.

La búsqueda de hiperparametros la haremos usando Optuna y el tracking será realizado mediante MLFlow.

OBS: Para la confección de esta notebook, nos basamos en el tutorial de [MLFlow](https://mlflow.org/docs/latest/traditional-ml/hyperparameter-tuning-with-child-runs/notebooks/index.html).

In [2]:
mlflow_server = "http://localhost:5006"
#mlflow_server = "http://192.168.0.21:5000"

mlflow.set_tracking_uri(mlflow_server)

In [3]:
try:
    X_train =  wr.s3.read_csv("s3://data/final/train/spotify_X_train.csv")
    y_train =  wr.s3.read_csv("s3://data/final/train/spotify_y_train.csv")

    X_test =  wr.s3.read_csv("s3://data/final/test/spotify_X_test.csv")
    y_test =  wr.s3.read_csv("s3://data/final/test/spotify_y_test.csv")
except wr.exceptions.NoFilesFound as e:
    print(f"Error detectado: {e}")
    print("Ejecuta Airflow el DAG de ETL")

In [4]:
print(X_test.head())

   speechiness  energy  danceability  acousticness
0       0.2450   0.750         0.710       0.64800
1       0.2710   0.769         0.818       0.01440
2       0.1200   0.559         0.884       0.02360
3       0.0679   0.635         0.599       0.06400
4       0.0299   0.854         0.547       0.00347


## Investigamos la correlación de features con la variable objetivo

Antes de profundizar en el proceso de construcción de modelo, es esencial comprender las relaciones entre nuestras features  y la variable objetivo. Por lo que vamos a realizar un gráfico que indica el coeficiente de correlación de cada feature en relación con la variable objetivo. Esto nos sirve para:

- Evitar data leakage: Debemos asegurarnos de que ninguna característica se correlacione perfectamente con el objetivo (un coeficiente de correlación de aproximadamente 1.0). Si existe tal correlación, es una señal de que nuestro conjunto de datos podría estar "filtrando" información sobre el objetivo. 

- Garantizar relaciones significativas: Idealmente, nuestras características deberían tener algún grado de correlación con el objetivo. Inclusive si estamos trabajando con un problema de clasificación, aunque los resultados no son tan importantes como en un caso de regresión.

- Auditoría y trazabilidad: Loggear esta visualización de correlación con nuestra ejecución principal de MLflow garantiza la trazabilidad. Proporciona una instantánea de las características de los datos en el momento del entrenamiento del modelo, lo cual es invaluable para propósitos de auditoría y replicabilidad.

In [5]:
from plots import plot_correlation_with_target, plot_information_gain_with_target

In [6]:
# Dado que estamos usando como tracking a MLFlow, mostrar los gráficos aquí no tiene sentido.
correlation_plot = plot_correlation_with_target(X_train, y_train)
information_gain_plot = plot_information_gain_with_target(X_train, y_train)

## Arrancamos a experimentar

In [7]:
import datetime
import optuna

from mlflow.models import infer_signature
from mlflow_aux import get_or_create_experiment

from optuna_aux import champion_callback, objective

from sklearn.svm import SVC 
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score

# Optuna es un poco verboso, dejamos que solo nos muestre logs de errores
optuna.logging.set_verbosity(optuna.logging.ERROR)

Antes de poder realizar experimentos, vamos a crear el experimento en MLFLow, pero para evitar desorden, vamos a usar una función que se fije primero si el experimento existe, si esto es así, devuelve su ID.

Además creamos el nombre del run padre con el que vamos a ir registrando las ejecuciones.

In [8]:
# Creemos el experimento
experiment_id = get_or_create_experiment("Spotify")
print(experiment_id)

run_name_parent = "best_hyperparam_"  + datetime.datetime.today().strftime('%Y/%m/%d-%H:%M:%S"')

2


Ya con todo seteado, vamos a ejecutar la optimización usando Optuna, el cual realiza una búsqueda Bayesiana, la cual es más eficiente que una búsqueda de grilla tradicional. La desventaja es que es más difícil de paralelizar.

In [9]:
import optuna
from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
import mlflow
from mlflow.models import infer_signature


with mlflow.start_run(experiment_id=experiment_id, run_name=run_name_parent, nested=True):
    # Inicializamos el estudio de Optuna
    study = optuna.create_study(direction="maximize")

    # Ejecutamos los trials de optimización de hiperparámetros
    study.optimize(
        lambda trial: objective(trial, X_train, y_train, experiment_id),
        n_trials=250,
        callbacks=[champion_callback]
    )

    # Logueamos los mejores parámetros y el mejor valor de f1 en el run padre
    mlflow.log_params(study.best_params)
    mlflow.log_metric("best_train_f1", study.best_value)

    mlflow.set_tags(
        tags={
            "project": "Spotify",
            "optimizer_engine": "optuna",
            "model_family": "sklearn_xgboost",
            "feature_set_version": 1,
        }
    )

    # Selección del modelo basado en los mejores parámetros
    model = None  # Inicializamos `model` como None por defecto

    if study.best_params["classifier"] == "LogisticRegression":
        model = LogisticRegression(
            C=study.best_params["logreg_c"],
            solver='liblinear'
        )
    elif study.best_params["classifier"] == "XGBoost":
        model = XGBClassifier(
            max_depth=study.best_params["xgb_max_depth"],
            n_estimators=study.best_params["xgb_n_estimators"],
            learning_rate=study.best_params["xgb_learning_rate"]
        )
    elif study.best_params["classifier"] == "SVC_linear":
        model = SVC(
            C=study.best_params["svc_c"],
            kernel='linear',
            gamma='scale'
        )
    elif study.best_params["classifier"] == "SVC_poly":
        model = SVC(
            C=study.best_params["svc_c"],
            kernel='poly',
            degree=study.best_params["svc_poly_degree"],
            gamma='scale'
        )
    elif study.best_params["classifier"] == "SVC_rbf":
        model = SVC(
            C=study.best_params["svc_c"],
            kernel='rbf',
            gamma='scale'
        )
    elif study.best_params["classifier"] == "DecisionTreeClassifier":
        model = DecisionTreeClassifier(
            max_depth=study.best_params["tree_max_depth"]
        )
    elif study.best_params["classifier"] == "RandomForest":
        model = RandomForestClassifier(
            max_depth=study.best_params["rf_max_depth"],
            n_estimators=study.best_params["rf_n_estimators"]
        )

    # Verificamos que el modelo fue definido
    if model is not None:
        # Entrenamiento del modelo
        model = model.fit(X_train, y_train.to_numpy().ravel())

        # Predicción y evaluación del modelo
        y_pred = model.predict(X_test)
        test_f1 = f1_score(y_test.to_numpy().ravel(), y_pred)
        mlflow.log_metric("test_f1", test_f1)

        # Logueo del modelo como artefacto
        artifact_path = "model"
        signature = infer_signature(X_train, model.predict(X_train))

        mlflow.sklearn.log_model(
            sk_model=model,
            artifact_path=artifact_path,
            signature=signature,
            serialization_format='cloudpickle',
            registered_model_name="spotify_model_dev",
            metadata={"model_data_version": 1}
        )

        # Obtenemos la ubicación del modelo guardado en MLFlow
        model_uri = mlflow.get_artifact_uri(artifact_path)
        print(f"Modelo registrado en MLFlow: {model_uri}")
    else:
        print("No se definió un modelo válido.")


🏃 View run Trial: 0 at: http://localhost:5006/#/experiments/2/runs/48af400835944ef6a3463433fb725811
🧪 View experiment at: http://localhost:5006/#/experiments/2
Initial trial 0 achieved value: 0.8653736622169713
🏃 View run Trial: 1 at: http://localhost:5006/#/experiments/2/runs/2c06fde88750490ba30c5fe4829022ab
🧪 View experiment at: http://localhost:5006/#/experiments/2
🏃 View run Trial: 2 at: http://localhost:5006/#/experiments/2/runs/f7c787a17f8947a7b99da2dcfca39ef3
🧪 View experiment at: http://localhost:5006/#/experiments/2
🏃 View run Trial: 3 at: http://localhost:5006/#/experiments/2/runs/1b301cc9bad64d089aa08d8be637d612
🧪 View experiment at: http://localhost:5006/#/experiments/2
🏃 View run Trial: 4 at: http://localhost:5006/#/experiments/2/runs/c1733a9533374c7090f29c43b389a49d
🧪 View experiment at: http://localhost:5006/#/experiments/2
🏃 View run Trial: 5 at: http://localhost:5006/#/experiments/2/runs/c497f3a0e04d46a3b519d802ee223f55
🧪 View experiment at: http://localhost:5006/#/exp

Successfully registered model 'spotify_model_dev'.
2024/12/11 13:52:55 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: spotify_model_dev, version 1


Modelo registrado en MLFlow: s3://mlflow/2/5f60505c605545df9615297df96c03a3/artifacts/model
🏃 View run best_hyperparam_2024/12/11-13:52:24" at: http://localhost:5006/#/experiments/2/runs/5f60505c605545df9615297df96c03a3
🧪 View experiment at: http://localhost:5006/#/experiments/2


Created version '1' of model 'spotify_model_dev'.


## Testeando el modelo

Una vez que el modelo fue entrenado, podemos levantarlo y testearlo de una forma agnóstica a donde está guardado.

In [10]:
loaded = mlflow.sklearn.load_model(model_uri)

Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

In [None]:
import numpy as np

test_data = [0.6, 0.8, 0.7, 0.9]  # speechiness, energy, danceability, y otra característica

prediction = loaded.predict(np.array(test_data).reshape([1, -1]))

print("Predicción:", prediction)

In [None]:
import numpy as np

test_data = [0.088, 0.73, 0.919, 0.189]  # speechiness, energy, danceability, y otra característica

prediction = loaded.predict(np.array(test_data).reshape([1, -1]))

print("Predicción:", prediction)

In [None]:
import numpy as np
#dato valido
test_data = [0.0444, 0.521, 0.514, 0.143]  # speechiness, energy, danceability, valence

test_data_np = np.array(test_data).reshape([1, -1])

# Predicción
prediction = loaded.predict(test_data_np)

print("Predicción:", prediction)  # Debería salir [1] si el modelo clasifica correctamente.


## Registramos el modelo 

Realizamos el registro del modelo en MLflow. En este registro se pone el modelo productivo que luego se usará para servir en formato on-line.

In [14]:
from mlflow import MlflowClient

client = MlflowClient()
name = "spotify_model_prod"
desc = "Spotify Song Preference Prediction"

# Verificar si el modelo ya existe
try:
    client.get_registered_model(name=name)
    print(f"El modelo '{name}' ya está registrado.")
except Exception as e:
    # Si el modelo no existe, lo creamos
    print(f"El modelo '{name}' no existe, creando uno nuevo...")
    client.create_registered_model(name=name, description=desc)

# Guardamos los parámetros del modelo
tags = model.get_params()
tags["model"] = type(model).__name__
tags["f1-score"] = f1_score

# Guardamos la versión del modelo
result = client.create_model_version(
    name=name,
    source=model_uri,
    run_id=model_uri.split("/")[-3],
    tags=tags
)

# Asignamos un alias (como "champion") para la nueva versión
client.set_registered_model_alias(name, "champion", version=result.version)
print(f"Modelo '{name}' registrado y alias 'champion' asignado a la versión {result.version}.")


2024/12/11 13:53:46 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: spotify_model_prod, version 1


El modelo 'spotify_model_prod' no existe, creando uno nuevo...
Modelo 'spotify_model_prod' registrado y alias 'champion' asignado a la versión 1.


In [None]:
local_path = "./descargado_modelo"
mlflow.artifacts.download_artifacts(artifact_uri=model_uri, dst_path=local_path)

In [None]:
import boto3
import json

# Inicializa el cliente S3
s3 = boto3.client('s3')

# Especifica el bucket y la clave del archivo JSON
bucket_name = 'data'
json_key = 'data_info/data.json'

# Ruta donde se guardará el archivo descargado localmente
local_file_path = 'data_info_local.json'

try:
    # Verifica si el archivo existe en el bucket
    s3.head_object(Bucket=bucket_name, Key=json_key)
    print(f"El archivo '{json_key}' existe en el bucket '{bucket_name}'. Descargando...")

    # Descarga el archivo JSON desde S3
    result_s3 = s3.get_object(Bucket=bucket_name, Key=json_key)
    text_s3 = result_s3["Body"].read().decode()  # Decodifica el contenido a texto
    
    # Guarda el contenido en un archivo local
    with open(local_file_path, 'w') as local_file:
        local_file.write(text_s3)
    print(f"Archivo descargado y guardado localmente como: {local_file_path}")

except s3.exceptions.NoSuchKey:
    print(f"El archivo '{json_key}' no existe en el bucket '{bucket_name}'.")
except Exception as e:
    print(f"Error al descargar o guardar el archivo JSON: {str(e)}")
