# 0. Introducci√≥n a Cron y Logger en Python

## 0.1. ¬øQu√© es **Cron**?

https://en.wikipedia.org/wiki/Cron

La utilidad de l√≠nea de comandos **cron** es un **programador de tareas** (job scheduler) en sistemas operativos basados en `Unix`. Su funci√≥n es permitir a los usuarios, quienes mantienen y configuran entornos de software, programar trabajos (jobs), como comandos o scripts de shell, conocidos tambi√©n como **cron jobs**, para ejecutarse peri√≥dicamente en tiempos, fechas o intervalos fijos.

Cron se utiliza t√≠picamente para automatizar tareas de mantenimiento o administraci√≥n del sistema. Sin embargo, su naturaleza general lo hace √∫til para otras actividades, como descargar archivos desde Internet o revisar correos electr√≥nicos a intervalos regulares. 

**Cron** es m√°s adecuado para tareas repetitivas.

El nombre de **cron** proviene de "Chronos", la palabra griega para tiempo.

```bash
# * * * * * <command to execute>
# | | | | |
# | | | | day of the week (0‚Äì6) (Sunday to Saturday; 7 is also Sunday on some systems)
# | | | month (1‚Äì12)             
# | | day of the month (1‚Äì31)
# | hour (0‚Äì23)
# minute (0‚Äì59)
```

### Vamos a jugar
https://crontab.guru/


## 0.2. ¬øQu√© es **Logger** en Python?

A medida que nuestros scripts se vuelven m√°s complejos, necesitamos una forma de **monitorear** lo que est√° sucediendo en ellos, especialmente cuando algo sale mal. Aqu√≠ es donde entra **logging**. 

`Python` tiene un m√≥dulo integrado llamado `logging` que permite registrar eventos o mensajes en diferentes niveles: 

- **DEBUG**: Informaci√≥n detallada, usualmente para desarrolladores.
- **INFO**: Confirmaci√≥n de que las cosas est√°n funcionando como se esperaba.
- **WARNING**: Algo inesperado, pero no cr√≠tico.
- **ERROR**: Fallos debido a un problema en el programa.
- **CRITICAL**: Error grave, usualmente detiene el programa.

Para usar **logger** en un script de Python:

In [1]:
import logging

# Configuraci√≥n b√°sica de logging
logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s - %(levelname)s - %(message)s',
)

# Ejemplo de c√≥mo registrar eventos
logging.debug("Este es un mensaje de depuraci√≥n")
logging.info("Este es un mensaje informativo")
logging.warning("Este es una advertencia")
logging.error("Este es un mensaje de error")
logging.critical("Este es un error cr√≠tico")


2024-09-24 20:27:25,689 - DEBUG - Este es un mensaje de depuraci√≥n
2024-09-24 20:27:25,691 - INFO - Este es un mensaje informativo
2024-09-24 20:27:25,692 - ERROR - Este es un mensaje de error
2024-09-24 20:27:25,693 - CRITICAL - Este es un error cr√≠tico


### Configuraci√≥n de **Logger** en Python

El m√≥dulo `logging` de Python permite una gran flexibilidad para definir c√≥mo y d√≥nde se registran los mensajes. Algunas de las configuraciones m√°s comunes son:

#### 1. **Nivel de Log** (`level`)
El nivel del log define la gravedad de los mensajes que se quieren capturar. Algunos niveles de log comunes incluyen:
- `DEBUG`: Informaci√≥n detallada, √∫til para depuraci√≥n.
- `INFO`: Confirmaciones de que el programa est√° funcionando como se espera.
- `WARNING`: Indica que algo inesperado sucedi√≥, pero el programa sigue funcionando.
- `ERROR`: Se√±ala errores m√°s graves, pero que no detienen la ejecuci√≥n.
- `CRITICAL`: Errores graves que probablemente detendr√°n el programa.

Cuando configuramos un **nivel de log**, s√≥lo se capturan los mensajes de ese nivel o superiores. Por ejemplo, si establecemos el nivel en `WARNING`, se registrar√°n los mensajes `WARNING`, `ERROR` y `CRITICAL`, pero no los `DEBUG` o `INFO`.

#### 2. **Formato del mensaje** (`format`)
El formato del mensaje permite personalizar c√≥mo se muestran los logs. Algunos componentes √∫tiles en el formato son:
- `%(asctime)s`: La fecha y hora en que se registr√≥ el mensaje.
- `%(levelname)s`: El nivel del log (DEBUG, INFO, WARNING, etc.).
- `%(message)s`: El mensaje que se ha registrado.
- `%(name)s`: El nombre del logger.
- `%(threadName)s`: El nombre del hilo desde donde se emiti√≥ el log.
- `%(processName)s`: El nombre del proceso que emiti√≥ el log.

#### 3. **Formato de fecha y hora** (`datefmt`)
El par√°metro `datefmt` permite personalizar el formato de la fecha y hora que se incluye en los mensajes de log. Puedes usar cualquier formato de fecha y hora compatible con Python, como el que se usa en `strftime`. Esto es √∫til para ajustar el formato a las necesidades espec√≠ficas de tu aplicaci√≥n.

##### C√≥digos de Formato Comunes

Aqu√≠ tienes algunos c√≥digos de formato com√∫nmente utilizados que puedes usar con `strftime`:

| C√≥digo | Significado                           | Ejemplo de Salida       |
|--------|---------------------------------------|--------------------------|
| `%Y`   | A√±o con siglo                         | `2024`                   |
| `%y`   | A√±o sin siglo (00-99)                | `24`                     |
| `%m`   | Mes como un decimal con ceros (01-12)| `09`                     |
| `%B`   | Nombre completo del mes               | `Septiembre`             |
| `%b`   | Nombre abreviado del mes              | `Sep`                    |
| `%d`   | D√≠a del mes (01-31)                  | `24`                     |
| `%H`   | Hora (00-23)                         | `14`                     |
| `%I`   | Hora (01-12)                         | `02`                     |
| `%M`   | Minuto (00-59)                       | `05`                     |
| `%S`   | Segundo (00-59)                      | `30`                     |
| `%p`   | AM o PM                              | `PM`                     |
| `%A`   | Nombre completo del d√≠a de la semana | `Martes`                 |
| `%a`   | Nombre abreviado del d√≠a de la semana | `Mar`                    |
| `%j`   | D√≠a del a√±o (001-366)                | `267`                    |
| `%U`   | N√∫mero de semana del a√±o (00-53)     | `39`                     |
| `%W`   | N√∫mero de semana del a√±o (00-53), basado en lunes | `39`          |



In [2]:
import logging
import os

logger = logging.getLogger(__name__)
                           
logging.basicConfig(
    # filename='backup.log', 
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
     datefmt='%Y-%m-%d %H:%M:%S',
)

def hacer_respaldo():
    try:
        # Supongamos que aqu√≠ ocurre el proceso de respaldo
        logger.info("Respaldo iniciado")
        # Simulaci√≥n de respaldo
        if os.path.exists('base_de_datos.db'):
            logger.info("Respaldo exitoso")
        else:
            logger.error("Error: La base de datos no existe")
    except Exception as e:
        logger.critical(f"Error cr√≠tico durante el respaldo: {e}")

# Ejecutamos la funci√≥n
hacer_respaldo()

2024-09-24 20:30:56,458 - INFO - Respaldo iniciado
2024-09-24 20:30:56,460 - ERROR - Error: La base de datos no existe


Ahora s√≠, empecemos la clase de hoy

# 1. Machine Learning Pipeline a.k.a Workflow Orchestration

> Un pipeline de machine learning es una serie de pasos secuenciales y automatizados que se siguen para entrenar, evaluar y desplegar un modelo de machine learning. 
> El objetivo principal de un pipeline es automatizar el proceso repetitivo de transformar datos crudos en un modelo que pueda usarse en producci√≥n.

    
### Yo tratando de explicar el orden de ejecuci√≥n de las celdas de mi `jupyter-notebook`:

 <img style="display: block; margin: auto;" src="./images/orchestration-meme.png" width="580" height="50">

Revisemos lo que hemos hecho hasta ahora con nuestro c√≥digo...
1. Downloading data ----> Ingestion
2. Transforming the data ----> Filtering, removing outliers
3. Preparing data for ML ----> Feature Engineering
4. Hyper-parameter tunning ----> Best params
5. Train the final model ----> Best params
6. Registry the final model

**Problemas:**
- Un cuaderno gigante
- Sin muchas instrucciones
- Poco legible para cualquier persona
- No escalable ni mantenible
- Podr√≠amos decir que es un `workflow`, ya que se debe ejecutar en un orden espec√≠fico

In [None]:
def download_data(year, month):
    ...
    return df

def prepare_data(df):
    ...
    return df

def feature_engineering(df):
    ...
    return X, y

def find_best_model(X, y):
    ...
    return params

def train_model(X, y, params):
    ...
    return model

def main():
    df = download_data(2024,1)
    df = prepare_data(df)
    X, y = feature_engineering(df)
    model_params = find_best_model(X, y)
    model = train_model(X, y, model_params)

Mucho mejor, no?

Pero sigue teniendo problemas:
- Lo podemos agendar?
- Qu√© pasa si tengo m√∫ltiples archivos?
- O si no lo quiero ejecutar en mi m√°quina local?
- Qu√© pasa si una de las funciones falla? Si es solamente temporal el fallo?
- Y si queremos notificar que ese error ocurri√≥ a alg√∫n administrador?

Hay m√∫ltiples herramientas que vienen a solventar esos problemas:

- [Apache Airflow](https://airflow.apache.org/)
- [Prefect](https://www.prefect.io/)
- [Mage](https://www.mage.ai/)
- [Dagster](https://dagster.io/)
- [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/)
- [Scikitlearn Pipelines](https://scikit-learn.org/stable/modules/compose.html#pipeline)



# 2. Prefect

## 2.1. Definiciones en **Prefect**

### 2.1.1. **Task (Tarea)**
https://docs.prefect.io/3.0/develop/write-tasks

En **Prefect**, una `task` es la unidad m√°s b√°sica de trabajo en un flujo de Prefect. Una `task` representa una operaci√≥n individual que se ejecuta dentro de un flujo de trabajo (`flow`). Puedes convertir cualquier funci√≥n de Python en una `task` agregando el decorador `@task`. 

Las tasks pueden:
- **Tomar entradas, realizar un trabajo y devolver salidas**: Realizan operaciones con los datos que reciben y devuelven resultados.
- **Cachear su ejecuci√≥n a trav√©s de m√∫ltiples invocaciones**: Evitar repetir c√°lculos si una task ya se ejecut√≥ anteriormente con los mismos inputs.
- **Encapsular la l√≥gica del flujo en unidades reutilizables**: Pueden ser utilizadas en diferentes flows.
- **Usar logging autom√°tico** para capturar detalles de ejecuci√≥n, etiquetas (tags) y estado final.
- **Ejecutarse de forma concurrente**: Permiten paralelismo en la ejecuci√≥n de tareas.
- **Definirse en el mismo archivo que el `flow` o importarse de m√≥dulos**.
- **Llamarse desde `flows` u otras `tasks`**.

**Ejemplo de una tarea:**

```python
from prefect import task

@task
def sumar(a, b):
    return a + b
```

### 2.1.2. **Flow (Flujo)**
https://docs.prefect.io/3.0/develop/write-flows

Un `flow` en **Prefect** es una colecci√≥n de `tasks` que se ejecutan de manera organizada y coordinada. 

Un `flow` define c√≥mo las `tasks` interact√∫an entre s√≠ y permite orquestar la ejecuci√≥n de m√∫ltiples `tasks` con reglas espec√≠ficas, como dependencias, condicionales y paralelismo. 

Adem√°s, un `flow` puede manejar `tasks` de manera secuencial o en paralelo, as√≠ como gestionarlas en funci√≥n de eventos externos.

Los flows se definen como funciones en Python, y pueden tomar entradas, realizar tareas y devolver resultados. Cualquier funci√≥n Python puede convertirse en un flow de Prefect a√±adiendo el decorador `@flow`:
```python
from prefect import flow

@flow
def mi_flujo_principal():
    resultado = sumar(3, 4)
    print(f"El resultado es {resultado}")
```
#### Capacidades de los Flows en Prefect:

Cuando una funci√≥n se convierte en un `flow`, adquiere las siguientes capacidades:

- **Seguimiento autom√°tico de metadatos** sobre las ejecuciones del flujo, como el tiempo de ejecuci√≥n y el estado final.
- **Registro de cada estado** que el flujo alcanza, lo que permite observar y actuar sobre cada transici√≥n en la ejecuci√≥n del flow.
- **Validaci√≥n de tipos de los argumentos** de entrada como par√°metros del flujo de trabajo.
- **Reintentos autom√°ticos** en caso de fallo, con l√≠mites y retrasos configurables.
- **Timeouts** para evitar que los flujos de trabajo se ejecuten durante demasiado tiempo sin control.
- **Capacidad de despliegue**, lo que expone una API para interactuar con el flow de manera remota.

Los `flows` se identifican de forma √∫nica por su nombre. Puedes especificar un nombre para el `flow` utilizando el par√°metro `name`:

```python
@flow(name="Mi Flujo")
def mi_flujo() -> str:
    return "¬°Hola, mundo!"
```

Si no proporcionas un nombre, Prefect usar√° el nombre de la funci√≥n del flow.

#### Ejecuci√≥n de `Flows`:

Una ejecuci√≥n de flow (**flow run**) es una ejecuci√≥n individual de un `flow`.

Puedes ejecutar un `flow` llam√°ndolo por su nombre de funci√≥n, de la misma manera que lo har√≠as con una funci√≥n normal de `Python`. Tambi√©n puedes ejecutar un `flow` mediante:

- **Programadores externos**, como `cron`, para invocar la funci√≥n del `flow`.
- **Desplegar el flow en Prefect Cloud** o en un servidor auto-hospedado de Prefect.
- **Iniciar una ejecuci√≥n de flow a trav√©s de un cronograma**, la interfaz de usuario de Prefect, o la API de Prefect.

Sin importar c√≥mo ejecutes el `flow`, `Prefect` monitorea su ejecuci√≥n, capturando el estado para observabilidad. Adem√°s, puedes registrar una variedad de metadatos sobre las ejecuciones del flow para monitoreo, resoluci√≥n de problemas y auditor√≠a.



### 2.1.3. Diferencias entre `Task` y `Flow`:

- **Task**: Una tarea individual, peque√±a y espec√≠fica que ejecuta una operaci√≥n.
- **Flow**: Un contenedor que organiza y coordina la ejecuci√≥n de varias tasks, permitiendo su orquestaci√≥n.

### 2.1.4. Ejemplo Completo de Task y Flow en Prefect:

```python
from prefect import task, flow

@task
def sumar(a, b):
    return a + b

@flow
def mi_flujo_principal():
    resultado = sumar(3, 4)
    print(f"El resultado es {resultado}")

# Ejecutar el flow
mi_flujo_principal()
```

## 2.2 Uso de `Prefect`

```bash
pip install prefect
prefect version
```

Ahora vamos a inicializar un servidor de `prefect`

```bash
prefect server start
```

Vamos a ver un `flow` sencillo

In [5]:
import httpx
from prefect import flow, task


@task(retries=4, retry_delay_seconds=1, log_prints=True)
def fetch_cat_fact():
    cat_fact = httpx.get("https://f3-vyx5c2hfpq-ue.a.run.app/")
    #An endpoint that is designed to fail sporadically
    if cat_fact.status_code >= 400:
        raise Exception()
    print(cat_fact.text)


@flow
def fetch():
    fetch_cat_fact()

In [6]:
fetch()

Ahora veamos el concepto de `subflows`

In [7]:
from prefect import flow


@flow(name="Cat fact")
def fetch_cat_fact():
    """A flow that gets a cat fact"""
    return httpx.get("https://catfact.ninja/fact?max_length=140").json()["fact"]


@flow(name="Dog fact")
def fetch_dog_fact():
    """A flow that gets a dog fact"""
    return httpx.get(
        "https://dogapi.dog/api/v2/facts",
        headers={"accept": "application/json"},
    ).json()["data"][0]["attributes"]["body"]


@flow(name="Animals fact", log_prints=True)
def animal_facts():
    cat_fact = fetch_cat_fact()
    dog_fact = fetch_dog_fact()
    print(f"üê±: {cat_fact} \nüê∂: {dog_fact}")

In [8]:
animal_facts()

Ahora vamos a hacerlo para nuestro pipeline de entrenamiento en nuestro proyecto `nyc-taxi-time-prediction`

- Vamos a crear una nueva rama `feat/training_orchestration`. 
- Vamos a crear un nuevo directorio `training pipeline`
- crear un archivo llamado `train.py`

In [None]:
import pickle
import mlflow
import pathlib
import dagshub
import pandas as pd
import xgboost as xgb
from hyperopt.pyll import scope
from sklearn.metrics import root_mean_squared_error
from sklearn.feature_extraction import DictVectorizer
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials


def read_data(file_path: str) -> pd.DataFrame:
    """Read data into DataFrame"""
    df = pd.read_parquet(file_path)

    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)

    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)

    return df

def add_features(df_train: pd.DataFrame, df_val: pd.DataFrame):
    """Add features to the model"""
    df_train["PU_DO"] = df_train["PULocationID"] + "_" + df_train["DOLocationID"]
    df_val["PU_DO"] = df_val["PULocationID"] + "_" + df_val["DOLocationID"]

    categorical = ["PU_DO"]  #'PULocationID', 'DOLocationID']
    numerical = ["trip_distance"]

    dv = DictVectorizer()

    train_dicts = df_train[categorical + numerical].to_dict(orient="records")
    X_train = dv.fit_transform(train_dicts)

    val_dicts = df_val[categorical + numerical].to_dict(orient="records")
    X_val = dv.transform(val_dicts)

    y_train = df_train["duration"].values
    y_val = df_val["duration"].values
    return X_train, X_val, y_train, y_val, dv

def hyper_parameter_tunning(X_train, X_val, y_train, y_val, dv):
    
    mlflow.xgboost.autolog()
    
    training_dataset = mlflow.data.from_numpy(X_train.data, targets=y_train, name="green_tripdata_2024-01")
    
    validation_dataset = mlflow.data.from_numpy(X_val.data, targets=y_val, name="green_tripdata_2024-02")
    
    train = xgb.DMatrix(X_train, label=y_train)
    
    valid = xgb.DMatrix(X_val, label=y_val)
    
    def objective(params):
        with mlflow.start_run(nested=True):
             
            # Tag model
            mlflow.set_tag("model_family", "xgboost")
            
            # Train model
            booster = xgb.train(
                params=params,
                dtrain=train,
                num_boost_round=100,
                evals=[(valid, 'validation')],
                early_stopping_rounds=10
            )
            
            # Predict in the val dataset
            y_pred = booster.predict(valid)
            
            # Calculate metric
            rmse = root_mean_squared_error(y_val, y_pred)
            
            # Log performance metric
            mlflow.log_metric("rmse", rmse)
    
        return {'loss': rmse, 'status': STATUS_OK}

    with mlflow.start_run(run_name="Xgboost Hyper-parameter Optimization", nested=True):
        search_space = {
            'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
            'learning_rate': hp.loguniform('learning_rate', -3, 0),
            'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
            'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
            'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
            'objective': 'reg:squarederror',
            'seed': 42
        }
        
        best_params = fmin(
            fn=objective,
            space=search_space,
            algo=tpe.suggest,
            max_evals=10,
            trials=Trials()
        )
        best_params["max_depth"] = int(best_params["max_depth"])
        best_params["seed"] = 42
        best_params["objective"] = "reg:squarederror"
        
        mlflow.log_params(best_params)

    return best_params


def train_best_model(X_train, X_val, y_train, y_val, dv, best_params) -> None:
    """train a model with best hyperparams and write everything out"""

    with mlflow.start_run("Best model ever"):
        train = xgb.DMatrix(X_train, label=y_train)
        valid = xgb.DMatrix(X_val, label=y_val)

        mlflow.log_params(best_params)

        # Log a fit model instance
        booster = xgb.train(
            params=best_params,
            dtrain=train,
            num_boost_round=100,
            evals=[(valid, 'validation')],
            early_stopping_rounds=10
        )

        y_pred = booster.predict(valid)
        rmse = root_mean_squared_error(y_val, y_pred)
        mlflow.log_metric("rmse", rmse)

        pathlib.Path("models").mkdir(exist_ok=True)
        with open("models/preprocessor.b", "wb") as f_out:
            pickle.dump(dv, f_out)
        
        mlflow.log_artifact("models/preprocessor.b", artifact_path="preprocessor")
    
    return None

def main_flow(year: int, month_train: int, month_val: int) -> None:
    """The main training pipeline"""
    
    train_path = f"../data/green_tripdata_{year}-{month_train}.parquet"
    val_path = f"../data/green_tripdata_{year}-{month_val}.parquet"
    
    # MLflow settings
    dagshub.init(url="https://dagshub.com/zapatacc/nyc-taxi-time-prediction", mlflow=True)
    
    MLFLOW_TRACKING_URI = mlflow.get_tracking_uri()
    
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    mlflow.set_experiment(experiment_name="nyc-taxi-experiment-prefect")

    # Load
    df_train = read_data(train_path)
    df_val = read_data(val_path)

    # Transform
    X_train, X_val, y_train, y_val, dv = add_features(df_train, df_val)
    
    # Hyper-parameter Tunning
    best_params = hyper_parameter_tunning(X_train, X_val, y_train, y_val, dv)
    
    # Train
    train_best_model(X_train, X_val, y_train, y_val, dv, best_params)
