# Tutorial Prefect + MLFlow
Jorge Garrido 4ºINSO Data


## Que es prefect?

Prefect es una herramienta de orquestación de flujos de trabajo y automatización de tareas. Permite a los usuarios definir, ejecutar y monitorear flujos de trabajo complejos de manera eficiente.

### Caracteristicas clave:
* **Orquestación:** Automatiza la ejecución de tareas en un orden específico.
* **Monitorización:** Permite rastrear el estado de las tareas, detectar errores y reiniciar procesos si es necesario.
* **Escalabilidad:** Ejecuta tareas localmente o en la nube, facilitando la distribución de cargas de trabajo.

### Los decoradores de Prefect:
* **@task:** Define una tarea individual que puede ser ejecutada de manera independiente.
    * Maneja el registro de logs automáticamente.
    * Facilita la reintentos automáticos en caso de errores.
    * Se puede ejecutar de forma paralela o distribuida.

* **@flow:** Define un flujo de trabajo que puede contener múltiples tareas y especificar su orden de ejecución.
    * Administra el orden de ejecución de las tareas.
    * Proporciona una vista general del flujo y su estado.
    * Permite pasar parámetros entre tareas.
* **@parameter:** Define un parámetro que puede ser pasado a una tarea o flujo.


## El tutorial:
Este tutorial va ha mostrar el uso de los de coradores de task y flow para definir un sencillo flujo de trabajo que entrenara un modelo de IA

### Que hace?
Al decorar una función con @task, Prefect la convierte en una unidad de trabajo independiente que puede ser monitorizada, reintentada o ejecutada de forma distribuida.

Al decorar una función con @flow, Prefect la transforma en un flujo de trabajo que puede contener múltiples tareas y especificar su orden de ejecución.

In [1]:
## Decoradores para convertir funciones en tareas y flujos
from prefect import flow, task, get_run_logger

## manipulacion y generacion de datos
import pandas as pd
import numpy as np

## entrenar modelos
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

## MLFlow
import mlflow
import mlflow.sklearn

## Paso 1: Cargar y procesar los datos
Definimos dos tareas:

* cargar_datos: Carga el dataset desde un archivo CSV.
* procesar_datos: Separa las características (X) de la variable objetivo (y) y divide los datos en conjuntos de entrenamiento y prueba.


get_run_logger() nos permite registrar mensajes en el flujo.

In [2]:
@task
def cargar_datos(ruta: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f"Cargando datos desde {ruta}")
    datos = pd.read_csv(ruta)
    return datos

@task
def procesar_datos(datos: pd.DataFrame) -> tuple:
    logger = get_run_logger()
    logger.info("Procesando datos")
    X = datos.drop('target', axis=1)
    y = datos['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test

## Paso 2: Entrenar el modelo
Creamos una tarea que entrena un modelo de Random Forest con los datos de entrenamiento.

In [3]:
@task
def entrenar_modelo(X_train: pd.DataFrame, y_train: pd.Series) -> RandomForestClassifier:
    logger = get_run_logger()
    logger.info("Entrenando modelo RandomForest")
    modelo = RandomForestClassifier(n_estimators=100, random_state=42)
    modelo.fit(X_train, y_train)
    return modelo

## Paso 3: Evaluar el modelo
Evaluamos el rendimiento del modelo utilizando el conjunto de prueba y calculamos la precisión.

* mlflow.log_param("n_estimators", 100): Registra el parámetro n_estimators (número de árboles) del modelo.
* mlflow.log_metric("accuracy", precision): Guarda la precisión obtenida en la evaluación del modelo.
* mlflow.sklearn.log_model(modelo, "random_forest_model"): Guarda el modelo entrenado en el sistema de gestión de modelos de MLflow.


In [4]:
@task
def evaluar_modelo(modelo: RandomForestClassifier, X_test: pd.DataFrame, y_test: pd.Series) -> float:
    logger = get_run_logger()
    logger.info("Evaluando modelo")
    predicciones = modelo.predict(X_test)
    precision = accuracy_score(y_test, predicciones)
    logger.info(f"Precisión del modelo: {precision}")
    
    # Registro en MLflow
    mlflow.log_param("n_estimators", 100)
    mlflow.log_metric("accuracy", precision)
    mlflow.sklearn.log_model(modelo, "random_forest_model")
    
    return precision

## Paso 4: Crear el flujo principal
Definimos el flujo principal que conecta todas las tareas definidas anteriormente.

* @flow indica que esta función es un flujo de Prefect, que orquesta las tareas.
* El flujo ejecuta las tareas en el orden correcto: carga, procesamiento, entrenamiento y evaluación.


En el flujo principal, hemos configurado MLflow para que registre cada ejecución del pipeline como un experimento:
* mlflow.set_experiment("Flujo_ML_Prefect"): Crea un experimento con el nombre Flujo_ML_Prefect para agrupar las ejecuciones.
* mlflow.start_run(): Inicia una nueva ejecución (run) dentro del experimento.


### La celda inferior deberia comportarse de la siguiente manera:
1. Iniciar del flujo:
    * Al llamar flujo_ml("datos.csv"), Prefect inicia el flujo y registra el evento de inicio.

2. Ejecuctar la tarea 1 (cargar_datos):
    * Prefect ejecuta cargar_datos(ruta_datos).
    * Si la tarea se completa con éxito, el flujo pasa automáticamente a la siguiente tarea.
    * Si falla, Prefect registra el error y decide reintentar o detenerse, según la configuración.

3. Ejecución de la Tarea 2 (procesar_datos):
    * Los datos cargados se envían a procesar_datos.
    * Prefect monitorea y registra el estado de esta tarea.

4. Ejecución de la Tarea 3 (entrenar_modelo):
    * El conjunto de entrenamiento es utilizado para entrenar el modelo.
    * Prefect registra cuándo comienza y termina el entrenamiento.

5. Ejecución de la Tarea 4 (evaluar_modelo):
    * El modelo entrenado es evaluado con el conjunto de prueba.
    * La precisión calculada se registra en los logs de prefect.

6. Finalización del Flujo:
    * Una vez que todas las tareas se completan, prefect marca el flujo como Success y registra la precisión final en los logs.

In [5]:
@flow
def flujo_ml(ruta_datos: str):
    mlflow.set_experiment("Flujo_ML_Prefect")
    with mlflow.start_run():
        datos = cargar_datos(ruta_datos)
        X_train, X_test, y_train, y_test = procesar_datos(datos)
        modelo = entrenar_modelo(X_train, y_train)
        precision = evaluar_modelo(modelo, X_test, y_test)
        print(f"Precisión final del modelo: {precision}")

if __name__ == "__main__":
    flujo_ml("datos.csv")

2025/02/11 19:53:19 INFO mlflow.tracking.fluent: Experiment with name 'Flujo_ML_Prefect' does not exist. Creating a new experiment.




Precisión final del modelo: 0.5


## Visualizar MLFLOW:
Para visualizar los experimentos y resultados, abre la interfaz web de MLflow ejecutando el siguiente comando en la terminal:  
Esto abrira una interfaz con la ui del mlflow en la direccion http://localhost:5000

In [6]:
!mlflow ui

^C
