# **Laboratorio 9: Airflow 🛫**

<center><strong>MDS7202: Laboratorio de Programación Científica para Ciencia de Datos - Otoño 2025</strong></center>

### Cuerpo Docente:

- Profesores: Stefano Schiappacasse, Sebastián Tinoco
- Auxiliares: Melanie Peña, Valentina Rojas
- Ayudantes: Angelo Muñoz, Valentina Zúñiga

### Equipo: SUPER IMPORTANTE - notebooks sin nombre no serán revisados

- Nombre de alumno 1: Diego Espinoza
- Nombre de alumno 2: Juan Miño

### **Link de repositorio de GitHub:** [Laboratorio 9: MDS7202](https://github.com/juansebm/MDS7202)

## Temas a tratar

- Construcción de pipelines productivos usando `Airflow`.

## Reglas:

- **Grupos de 2 personas**
- Fecha de entrega: 6 días de plazo con descuento de 1 punto por día. Entregas Martes a las 23:59.
- Instrucciones del lab el viernes a las 16:15 en formato online. Asistencia no es obligatoria, pero se recomienda fuertemente asistir.
- <u>Prohibidas las copias</u>. Cualquier intento de copia será debidamente penalizado con el reglamento de la escuela.
- Tienen que subir el laboratorio a u-cursos y a su repositorio de github. Labs que no estén en u-cursos no serán revisados. Recuerden que el repositorio también tiene nota.
- Cualquier duda fuera del horario de clases al foro. Mensajes al equipo docente serán respondidos por este medio.
- Pueden usar cualquier material del curso que estimen conveniente.

### Objetivos principales del laboratorio

- Reconocer los componentes pricipales de `Airflow` y su funcionamiento.
- Poner en práctica la construcción de pipelines de `Airflow`.
- Automatizar procesos típicos de un proyecto de ciencia de datos mediante `Airflow` y `Docker`.

El laboratorio deberá ser desarrollado sin el uso indiscriminado de iteradores nativos de python (aka "for", "while"). La idea es que aprendan a exprimir al máximo las funciones optimizadas que nos entrega `pandas`, las cuales vale mencionar, son bastante más eficientes que los iteradores nativos sobre DataFrames.

# **Introducción**

<p align="center">
  <img src="https://media.tenor.com/OBQ6niqbxswAAAAM/legallyblonde.gif" width="300">
</p>

Vale, una estudiante del Magíster en Ciencia de Datos, se encuentra en la etapa final de sus estudios. Por un lado, está muy contenta por haber llegado tan lejos, pero por otro, no puede evitar sentirse inquieta. Desde que ingresó a la universidad, una pregunta la ha perseguido: ¿qué tan probable es que pueda ser seleccionada en los lugares donde envíe postulaciones para puestos de trabajo?

Esta duda la mantiene en constante reflexión, especialmente porque sabe que el mercado laboral en Ciencia de Datos es competitivo y exige habilidades no solo técnicas, sino también estratégicas para destacar. Sin embargo, Vale actualmente está completamente enfocada en terminar su tesis de magíster y ha tenido que postergar cualquier preparación específica para enfrentar el desafío de las postulaciones laborales.

Al ver el avance y las habilidades que usted ha demostrado en el curso, Vale decidió proponerle un desafío que le permitirá disminuir la incertidumbre sobre su futuro laboral. Inspirado en sus conocimientos, recolectó un conjunto de datos que contiene información sobre diversos factores que influyen en las decisiones de contratación de empresas al seleccionar entre sus postulantes. Este set de datos incluye los siguientes atributos:

- Age: Edad del candidato
- Gender: Genero del candidato. Male (0), Female (1).
- EducationLevel: Mayor nivel educacional alcanzado por el candidato. Licenciatura Tipo 1 (1), Licenciatura Tipo 2 (2), Maestría (3), PhD. (4).
- ExperienceYears: Años de experiencia profesional.
- PreviousCompanies: Numero de compañías donde el candidato ha trabajado anteriormente.
- DistanceFromCompany: Distancia en kilometros entre la residencia del candidato y la compañía donde postula.
- InterviewScore: Puntaje obtenido en la entrevista por el candidato entre 0 a 100.
- SkillScore: Puntaje obtenido en evaluación de habilidades técnicas por el candidato, entre 0 a 100.
- PersonalityScore: Puntaje obtenido en pruebas de personalidad del candidato, entre 0 a 100.
- RecruitmentStrategy: Estrategia del equipo de reclutamiento. Agresiva (1), Moderada (2), Conservadora (3).

Variable a predecir:
- HiringDecision: Resultado de la postulación. No contratado (0), Contratado (1).

Su objetivo será ayudar a Vale a desarrollar un modelo que le permita predecir, basado en estos factores, si un postulante será contratado o no. Esta herramienta no solo le dará a Vale mayor claridad sobre el impacto de ciertos atributos en la decisión final de contratación, sino que también le permitirá aplicar sus conocimientos de Ciencia de Datos para resolver una pregunta que a muchos estudiantes como a ella les inquieta.

Como estudiante del curso Laboratorio de Programación Científica para Ciencia de Datos, deberá demostrar sus capacidades para preprocesar, analizar y modelar datos, brindándole a Vale una solución robusta y bien fundamentada para su problemática.

`Nota:` El siguiente [enlace](https://www.kaggle.com/datasets/rabieelkharoua/predicting-hiring-decisions-in-recruitment-data/data) contiene el set de datos original.

# **1. Pipeline de Predicción Lineal** (30 Puntos)

<p align="center">
  <img src="https://media.licdn.com/dms/image/v2/D4E22AQHZplrdPyKnvA/feedshare-shrink_2048_1536/feedshare-shrink_2048_1536/0/1713736729086?e=2147483647&v=beta&t=Tad2ulaWkhhDrPRN0PCdXrfuza60PjoJqgLborDyLao" width="500">
</p>

En esta sección buscaremos desplegar un producto utilizando un modelo de clasificación `Random Forest` para determinar **si una persona será contratada o no en un proceso de selección**. Para ello, comenzaremos preparando un pipeline lineal mediante `Airflow`.

## **1.1 Preparando el Pipeline** (15 puntos)

**Primero, asegúrese de tener creada las carpetas `dags`, `plugins` y `logs`**.

Comenzamos preparando un archivo llamado `hiring_functions.py`, el cual guardará en la carpeta `dags` y debe contener lo siguiente:

1. (3 puntos) Una función llamada `create_folders()` que cree una carpeta, la cual utilice la fecha de ejecución como nombre. Adicionalmente, dentro de esta carpeta debe crear las siguientes subcarpetas:
  - raw
  - splits
  - models

  `Hint`: Puede hacer uso de kwargs para obtener la fecha de ejecución mediante el DAG. El siguiente [Enlace](https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html) le puede ser útil.

2. (3 puntos) Una función llamada `split_data()` que lea el archivo `data_1.csv` de la carepta `raw` y a partir de este, aplique un *hold out*, generando un dataset de entrenamiento y uno de prueba. Luego debe guardar estos nuevos conjuntos de datos en la carpeta `splits`. `Nota:` Utilice un 20% para el conjunto de prueba, mantenga la proporción original en la variable objetivo y fije una semilla.

3. (8 puntos) Cree una función llamada `preprocess_and_train()` que:
  - Lea los set de entrenamiento y prueba de la carpeta `splits`.
  - Cree y aplique un `Pipeline` con una etapa de preprocesamiento. Utilice `ColumnTransformers` para aplicar las transformaciones que estime convenientes. Puede apoyarse del archivo `data_1_report.html` para justificar cualquier paso del preprocesamiento.
  
  - Añada una etapa de entrenamiento utilizando el modelo `RandomForest`.
  
  Esta función **debe crear un archivo `joblib` (análogo a `pickle`) con el pipeline entrenado** en la carepta `models`, además debe **imprimir** el accuracy en el conjunto de prueba y el f1-score de la clase positiva (contratado).
3. (1 punto) Incorpore la función `gradio_interface` en su script, modificando la ruta de acceso a su modelo, de forma que pueda leerlo desde la carepta `models`. Puede realizar las modificaciones que estime necesarias.

`NOTA:` Se permite la creación de funciones auxiliares si lo estiman conveniente.

In [None]:
#Inserte su código aqui
# CODIGO EN CARPETA DAGS hiring_functions.py !!!

```Python
def predict(file,model_path):

    pipeline = joblib.load(model_path)
    input_data = pd.read_json(file)
    predictions = pipeline.predict(input_data)
    print(f'La prediccion es: {predictions}')
    labels = ["No contratado" if pred == 0 else "Contratado" for pred in predictions]

    return {'Predicción': labels[0]}


def gradio_interface():

    model_path = "data/2025-06-06/models/model.joblib"

    interface = gr.Interface(
        fn=lambda file: predict(file, model_path),
        inputs=gr.File(label="Sube un archivo JSON"),
        outputs="json",
        title="Hiring Decision Prediction",
        description="Sube un archivo JSON con las características de entrada para predecir si Vale será contratada o no."
    )
    interface.launch(share=True)
````

## **1.2 Creando Nuestro DAG** (15 puntos)

<p align="center">
  <img src="https://media.tenor.com/a_yibuZQgngAAAAM/elle-woods.gif" width="400">
</p>

Con las funciones del pipeline ya creadas, ahora vamos a proceder a crear un Directed Acyclic Graph (DAG). Para ello, se le pide lo siguiente:

- (10 puntos) Cree un segundo archivo llamado `dag_lineal.py` y guardelo en la carpeta dags. Este script debe seguir la siguiente estructura (Ver imagen de referencia):

    0. Inicialice un DAG con fecha de inicio el 1 de octubre de 2024, ejecución manual y **sin backfill**. Asigne un `dag_id` que pueda reconocer facilmente, como `hiring_lineal`, etc.
    1. Debe comenzar con un marcador de posición que indique el inicio del pipeline.
    2. Cree una carpeta correspondiente a la ejecución del pipeline y cree las subcarpetas `raw`, `splits` y `models` mediante la función `create_folders()`.
    3. Debe descargar el archivo `data_1.csv` del siguiente [enlace](https://gitlab.com/eduardomoyab/laboratorio-13/-/raw/main/files/data_1.csv). Debe guardar el archivo en la carpeta raw de la ejecución correspondiente.`Hint:` Le puede ser útil el comando `curl -o <path de guardado> <enlace con los datos>`.
    4. Debe aplicar un hold out mediante la función `split_data()` de su archivo creado en la subsección anterior.
    5. Debe aplicar el preprocesamiento y el entrenamiento del modelo mediante la función `preprocess_and_train()`.
    6. Finalmente, debe montar una interfaz en gradio donde pueda cargar un archivo ``json``.


- (3 puntos) Cree un `DockerFile` para montar un contenedor que contenga Airflow. Adicionalmente, cree una carpeta llamada dags donde guardará el script.py creado anteriormente.

    `Nota:` Para la imagen, se recomienda utilizar python 3.10-slim. Adicionalmente, puede instalar `curl` mediante la siguiente linea de código: `RUN apt-get update && apt-get install -y curl`.

- Construya el contenedor en Docker y acceda a la aplicación web de Airflow mediante el siguiente [enlace](http://localhost:8080/). Inicie sesión, acceda al DAG creado y ejecute de forma manual su pipeline.

- (2 puntos) Acceda a la URL pública de Gradio e ingrese el archivo `vale_data.json` a su modelo. ¿Que predicción entregó el modelo para Vale? Adjunte imágenes de su resultado. `Hint:` Puede acceder a los `logs` para obtener los prints y la URL pública.

`Hint:` Recuerde que puede entregar `kwargs` a sus funciones, como por ejemplo la fecha de ejecución `ds`.

**Para esta sección, debe adjuntar todos los scripts creados junto a su notebook en la entrega, ya que serán ejecutados para validar el funcionamiento. Para justificar sus respuestas, adicionaslmente puede utilizar imágenes de apoyo, como screenshots.**

DAG de referencia:
<p align="center">
  <img src="https://drive.google.com/uc?id=1iwDgECZfFeWq1dl433tMa6_3CNF9cn1L" width="1200">
</p> 




In [None]:
#Inserte código aqui
# CODIGO EN CARPETA DAGS dag_lineal.py!!!

Docker Desktop:

<p align="center">
  <img src="./images/image.png" alt="alt text" width="1200"/>
</p>

Se observa airflow ejecutándose en localhost:8080

<p align="center">
  <img src="./images/image-1.png" alt="alt text" width="1200"/>
</p>

Se observan DAGs de ejemplo, pero aparece hiring_lineal entre la lista. Podemos ejecutarlo.

<p align="center">
  <img src="./images/image-2.png" alt="alt text" width="1200"/>
</p>

El flujo se ejecuta sin problemas. También retorna la Gradio app:

<p align="center">
  <img src="./images/image-3.png" alt="alt text" width="1200"/>
</p>

La app funciona y lee el .csv:

<p align="center">
  <img src="./images/image-4.png" alt="alt text" width="1200"/>
</p>


# **2. Paralelizando el Pipeline** (30 puntos)

<p align="center">
  <img src="https://i.gifer.com/8LNL.gif" width="400">
</p>



Al ver los resultados obtenidos, Vale queda muy contenta con el clasificador. Sin embargo, le aparecen algunas dudas respecto al funcionamiento del pipeline. Primero le comenta que es posible que en un futuro tenga nuevos datos que podrían ser útiles para realizar nuevos entrenamientos, por lo que sería ideal si este pipeline se fuera ejecutando de forma periódica y **NO** de forma manual. Además, Vale le menciona que le gustaría explorar el desempeño de otros modelos además de `Random Forest`, de forma que el pipeline seleccione de forma automática el modelo con mejor desempeño para luego hacer la predicción de Vale.

## **2.1 Preparando un Nuevo Pipeline** (15 puntos)

<p align="center">
  <img src="https://media.tenor.com/gnA7-5TewXMAAAAM/elle-woods.gif" width="400">
</p>

De acuerdo a lo que le comentó Vale, usted decide crear un nuevo script con las funciones que utilizará su pipeline. Por ende, dentro de la carpeta `dags`, usted creará el archivo `hiring_dynamic_functions.py` el cual debe contener:

1. (2 puntos) Una función llamada `create_folders()` que cree una carpeta, la cual utilice la fecha de ejecución como nombre. Adicionalmente, dentro de esta carpeta debe crear las siguientes subcarpetas:
  - raw
  - preprocessed
  - splits
  - models
2. (2 puntos) Una función llamada `load_ands_merge()` que lea desde la carpeta `raw` los archivos `data_1.csv`y `data_2.csv` en caso de estar disponible. Luego concatene estos y genere un nuevo archivo resultante, guardándolo en la carpeta `preprocessed`.

3. (2 puntos) Una función llamada `split_data()` que lea la data guardada en la carpeta `preprocessed` y realice un hold out sobre esta data. Esta función debe crear un conjunto de entrenamiento y uno de prueba. Mantenga una semilla y 20% para el conjunto de prueba. Guarde los conjuntos resultantes en la carpeta `splits`.

4. (6 puntos) Una función llamada `train_model()` que reciba un modelo de clasificación.
    - La función debe comenzar leyendo el conjunto de entrenamiento desde la carpeta `spits`.
    - Esta debe crear y aplicar un `Pipeline` con una etapa de preprocesamiento. Utilice `ColumnTransformers` para aplicar las transformaciones que estime convenientes.
    - Añada una etapa de entrenamiento utilizando un modelo que ingrese a la función.
  
  Esta función **debe crear un archivo joblib con el pipeline entrenado**. Guarde el modelo con un nombre que le permita una facil identificación dentro de la carpeta `models`.

5. (3 puntos) Una función llamada `evaluate_models()` que reciba sus modelos entrenados desde la carpeta `models`, evalúe su desempeño mediante `accuracy` en el conjunto de prueba y seleccione el mejor modelo obtenido. Luego guarde el mejor modelo como archivo `.joblib`. Su función debe imprimir el nombre del modelo seleccionado y el accuracy obtenido.

#Inserte código aqui

```Python
import os
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import accuracy_score
from datetime import datetime

def create_folders(**kwargs):
    """Esta función tiene como finalidad crear las carpetas raw, preprocessed,
    splits y models.
    Nota: usamos os.makedirs(..., exist_ok=True) para evitar errores si las
    carpetas ya existen.
    
        Input:
                **kwargs: permite extraer ds (execution date) de Airflow
        Output:
                Folders."""
    execution_date = kwargs['ds']
    base_path = f"data/{execution_date}"
    os.makedirs(os.path.join(base_path, "raw"), exist_ok=True)
    os.makedirs(os.path.join(base_path, "preprocessed"), exist_ok=True)
    os.makedirs(os.path.join(base_path, "splits"), exist_ok=True)
    os.makedirs(os.path.join(base_path, "models"), exist_ok=True)
    print(f"Carpetas creadas en: {base_path}")


def load_and_merge(**kwargs):
    """Lee desde la carpeta raw los archivos data_1.csv y data_2.csv en caso
    de estar disponible. Luego concatena estos y genera un nuevo archivo resultante,
    guardándolo en la carpeta preprocessed.
    
        Input:
                **kwargs
        Output:
                merged_df"""
    execution_date = kwargs['ds']
    base_path = f"data/{execution_date}/raw"
    files = ["data_1.csv", "data_2.csv"]

    #el siguiente loop lo que hace es efectivamente concatenar los dataframes
    dfs = []
    for file in files:
        file_path = os.path.join(base_path, file)#esta linea construye la ruta completa
                                                 #del archivo combinando la ruta base (base_path)
                                                 #y el nombre del archivo
        
        if os.path.exists(file_path):# Verifica si el archivo existe en esa ruta
            df = pd.read_csv(file_path)
            dfs.append(df)
            print(f"Archivo leído: {file}")
        else:
            print(f"Archivo no encontrado (omitido): {file}")
    if not dfs:
        raise FileNotFoundError("No se encontraron archivos CSV para procesar.")

    #ignore_index=True reinicia el índice del dataframe resultante, lo que es útil
    #si queremos concatenar datos similares y no nos importa el índice original.
    merged_df = pd.concat(dfs, ignore_index=True)
    output_path = f"data/{execution_date}/preprocessed/merged.csv"

    #index=False evita que se escriba la columna de índice en el archivo CSV. Esto es
    #lo habitual cuando se exportan datos para análisis posteriores, ya que el índice no
    #suele ser útil fuera de Pandas.
    merged_df.to_csv(output_path, index=False)
    print(f"Datos preprocesados guardados en: {output_path}")


def split_data(**kwargs):
    """Función que lee la data guardada en la carpeta preprocessed y realiza un holdout
    sobre esta data. Esta crea un set de train y de test, manteniendo una semilla
    y 20% para el conjunto de test. Además, guarda los conjuntos resultantes en la carpeta
    splits.
    
        Input:
                **kwargs
        Output:
                train.csv
                test.csv"""
    execution_date = kwargs['ds']
    input_path = f"data/{execution_date}/preprocessed/merged.csv"
    output_path = f"data/{execution_date}/splits"

    data = pd.read_csv(input_path)
    X = data.drop(columns=["HiringDecision"])
    y = data["HiringDecision"]

    #hacemos el split tal como nos dice el enunciado: manteniendo proporciones con stratify
    #fijando la semilla con random_state y que el tamaño del test set fuese de 20% o 0.2.
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    train = pd.concat([X_train, y_train], axis=1)
    test = pd.concat([X_test, y_test], axis=1)

    train.to_csv(os.path.join(output_path, "train.csv"), index=False)
    test.to_csv(os.path.join(output_path, "test.csv"), index=False)
    print(f"Datos divididos y guardados en: {output_path}")


def train_model(model, model_name, **kwargs):
    """Esta función hace cuatro cosas: 1) comienza leyendo el conjunto de training
    desde la carpeta splits, 2) crea y aplica un pipeline con una etapa de preproce-
    samiento utilizando columntransformer para las transformaciones, 3) añade una etapa
    de entrenamiento utilizando un modelo que se ingresa a la función y 4) crea un
    archivo joblib con el pipeline entrenado, guardando el modelo con un nombre que
    permita una fácil identificación dentro de la carpeta models.
    
        Input:
               **kwargs
               model: modelo ingresado
        Output:
               archivo .joblib: con el pipeline entrenado"""
    execution_date = kwargs['ds']
    split_path = f"data/{execution_date}/splits"
    model_path = f"data/{execution_date}/models"

    #etapa 1: lectura del training set
    train = pd.read_csv(os.path.join(split_path, "train.csv"))
    X_train = train.drop(columns=["HiringDecision"])
    y_train = train["HiringDecision"]

    #etapa 2: pipeline y ColumnTransformer
    numeric_features = [
        "Age", "ExperienceYears", "PreviousCompanies",
        "DistanceFromCompany", "InterviewScore",
        "SkillScore", "PersonalityScore"
    ]
    categorical_features = ["Gender", "EducationLevel", "RecruitmentStrategy"]

    preprocessor = ColumnTransformer([
        ("num", StandardScaler(), numeric_features),
        ("cat", OneHotEncoder(handle_unknown="ignore"), categorical_features),
    ])

    pipeline = Pipeline([
        ("preprocessing", preprocessor),
        ("classifier", model)
    ])

    #etapa 3: entrenamiento
    pipeline.fit(X_train, y_train)

    #etapa 4: se crea un archivo joblib, con el nombre del modelo y fecha.
    model_filename = f"{model_name}_{datetime.now().strftime('%H%M%S')}.joblib"
    full_model_path = os.path.join(model_path, model_filename)
    joblib.dump(pipeline, full_model_path)
    print(f"Modelo entrenado y guardado en: {full_model_path}")


def evaluate_models(**kwargs):
    """Función que recibe los modelos entrenados desde la carpeta models, evalúa
    su desempeño mediante accuracy en el test set y selecciona el mejor modelo obtenido.
    Además, guarda el mejor modelo como un archivo .joblib. Por último, imprime el
    nombre del modelo seleccionado y el accuracy obtenido.
    
        Input:
              **kwargs
        Output:
              best_model.joblib: el modelo con el mejor accuracy en el test set."""
    execution_date = kwargs['ds']
    base_path = f"data/{execution_date}"
    models_dir = os.path.join(base_path, "models")
    test_path = os.path.join(base_path, "splits", "test.csv")

    #recordemos que ahora queremos evaluar la función en el test set.
    test = pd.read_csv(test_path)
    X_test = test.drop(columns=["HiringDecision"])
    y_test = test["HiringDecision"]

    best_accuracy = -1
    best_model_name = None
    best_model = None

    #para cada uno de los modelos vamos viendo cual es el mejor
    #según la métrica accuracy (porque son modelos de clasificación)
    for model_file in os.listdir(models_dir):
        if model_file.endswith(".joblib"):
            model_path = os.path.join(models_dir, model_file)
            model = joblib.load(model_path)
            y_pred = model.predict(X_test)
            acc = accuracy_score(y_test, y_pred)

            print(f"Modelo {model_file} con una Accuracy: {acc:.4f}")

            #vamos actualizando el modelo, el nombre y la métrica accuracy
            if acc > best_accuracy:
                best_accuracy = acc
                best_model = model
                best_model_name = model_file

    #vamos guardando el mejor modelo (o best_model)
    if best_model is not None:
        final_model_path = os.path.join(models_dir, "best_model.joblib")
        joblib.dump(best_model, final_model_path)
        print(f"Mejor modelo: {best_model_name} con Accuracy: {best_accuracy:.4f}")
        print(f"Guardado como: {final_model_path}")
    else:
                print("No se encontraron modelos para evaluar.")
```

## **2.2 Componiendo un nuevo DAG** (15 puntos)

<p align="center">
  <img src="https://67.media.tumblr.com/bfa5208006dc3f404ec08e8c3195cf2c/tumblr_obg9tgnLfX1u9e9f2o2_r1_500.gif" width="500">
</p>

Con las nuevas funciones, se debe crear el nuevo nuevo DAG. Para ello, cree un nuevo script en la carpeta `dags`, llamandolo `dag_dynamic.py`. Este script debe contener la siguiente estructura:

1. (1 punto) Inicialice un DAG con fecha de inicio el 1 de octubre de 2024, el cual se debe ejecutar el día 5 de cada mes a las 15:00 UTC. Utilice un `dag_id` interpretable para identificar fácilmente. **Habilite el backfill** para que pueda ejecutar tareas programadas desde fechas pasadas.
2. (1 punto) Comience con un marcador de posición que indique el inicio del pipeline.
3. (2 puntos) Cree una carpeta correspondiente a la ejecución del pipeline y cree las subcarpetas `raw`, `preprocessed`, `splits` y `models` mediante la función `create_folders()`.
4. (2 puntos) Implemente un `Branching`que siga la siguiente lógica:
  - Fechas previas al 1 de noviembre de 2024: Se descarga solo `data_1.csv`
  - Desde el 1 de noviembre del 2024: descarga `data_1.csv` y `data_2.csv`.
  En el siguiente [enlace](https://gitlab.com/eduardomoyab/laboratorio-13/-/raw/main/files/data_2.csv) puede descargar `data_2.csv`.
5. (1 punto) Cree una tarea que concatene los datasets disponibles mediante la función `load_and_merge()`. Configure un `Trigger` para que la tarea se ejecute si encuentra disponible **como mínimo** uno de los archivos.
6. (1 punto) Aplique el hold out al dataset mediante la función `split_data()`, obteniendo un conjunto de entrenamiento y uno de prueba.
7. (2 puntos) Realice 3 entrenamientos en paralelo:
  - Un modelo Random Forest.
  - 2 modelos a elección.
  Asegúrese de guardar sus modelos entrenados con nombres distintivos. Utilice su función `train_model()` para ello.
8. (2 puntos) Mediante la función `evaluate_models()`, evalúe los modelos entrenados, registrando el accuracy de cada modelo en el set de prueba. Luego debe imprimir el mejor modelo seleccionado y su respectiva métrica. Configure un `Trigger` para que la tarea se ejecute solamente si los 3 modelos fueron entrenados y guardados.

`Hint:` Recuerde que puede entregar `kwargs` a sus funciones, como por ejemplo la fecha de ejecución `ds`.

Una vez creado el script, vuelva a construir el contenedor en Docker, acceda a la aplicación web de Airflow, ejecute su pipeline y muestre sus resultados. Adjunte imágenes que ayuden a mostrar el proceso y sus resultados.

Adicionalmente, responda (1 c/u):

- ¿Cual es el accuracy de cada modelo en la ejecución de octubre? ¿Se obtienen los mismos resultados a partir de Noviembre?
- Analice como afectó el añadir datos a sus modelos mediante el desempeño del modelo y en costo computacional.
- Muestre el esquema de su DAG ejecutado en octubre y en noviembre.


`Nota:` Para esta sección no debe implementar la tarea en gradio, solamente se espera determinar el mejor modelo y comparar el desempeño obtenido.

**IMPORTANTE: Para esta sección, debe adjuntar todos los scripts creados junto a su notebook en la entrega, ya que serán ejecutados para validar el funcionamiento. Para justificar sus respuestas, adicionaslmente puede utilizar imágenes de apoyo, como screenshots.**

# Respuestas a las preguntas: 

primero que todo podemos ver el dag dinámico que hemos creado ejecutado en Octubre de 2024:

<div align="center">
  <img src="images_part_2/imagen_00.png" width="800">
</div>



y el dag dinámico ejecutado en Noviembre de 2024:

<div align="center">
  <img src="images_part_2/imagen_0.png" width="800">
</div>

¿CUÁL ES EL ACCURACY DE CADA MODELO EN LA EJECUCIÓN DE OCTUBRE?

El accuracy para cada modelo para cada mes es (como se evidencia por las imagenes de mas abajo):

| Modelo                   | Octubre | Noviembre | Diciembre | Enero | Febrero | Marzo | Abril |
|--------------------------|-----------|-----------|-----------|-----------|-----------|-----------|-----------|
| XGBoost Classifier       | 0.92      | 0.94      | 0.94      | 0.94      | 0.94      | 0.94      | 0.94      |
| LightGBM Classifier      | 0.92      | 0.93      | 0.93      | 0.93      | 0.93      | 0.93      | 0.93      |
| Random Forest Classifier | 0.93      | 0.9367    | 0.9176    | 0.9233    | 0.91      | 0.9167    | 0.9133    |

de donde además podemos concluir que el único modelo que fluctúa en cuanto a su métrica accuracy es el modelo Random Forest Classifier.
A partir de noviembre cambian los resultados, el accuracy de hecho aumenta para todos los modelos excepto para el Random Forest.

si vamos a hiring_dynamic -> calendar podemos ver

<div align="center">
  <img src="images_part_2/imagen_01.png" width="800">
</div>

Podemos ver el accuracy para Octubre:

<div align="center">
  <img src="images_part_2/imagen_02.png" width="800">
</div>

¿SE OBTIENEN LOS MISMOS RESULTADOS A PARTIR DE NOVIEMBRE?

Podemos ver que en Noviembre se obtienen los resultados:

<div align="center">
  <img src="images_part_2/imagen_1.png" width="800">
</div>

En Diciembre:

<div align="center">
  <img src="images_part_2/imagen_2.png" width="800">
</div>




En Enero

<div align="center">
  <img src="images_part_2/imagen_3.png" width="800">
</div>

Febrero 



<div align="center">
  <img src="images_part_2/imagen_4.png" width="800">
</div>


Marzo
<div align="center">
  <img src="images_part_2/imagen_5.png" width="800">
</div>



Abril

<div align="center">
  <img src="images_part_2/imagen_6.png" width="800">
</div>

Para Octubre de 2024 podemos ver las imagenes que contienen cuanto se demoró cada modelo:
<div align="center">
  <img src="images_part_2/imagen_7.png" width="800">
</div>
<div align="center">
  <img src="images_part_2/imagen_8.png" width="800">
</div>
<div align="center">
  <img src="images_part_2/imagen_9.png" width="800">
</div>

Y para Noviembre:
<div align="center">
  <img src="images_part_2/imagen_11.png" width="800">
</div>
<div align="center">
  <img src="images_part_2/imagen_12.png" width="800">
</div>
<div align="center">
  <img src="images_part_2/imagen_13.png" width="800">
</div>

Por lo que, si hacemos una tabla de costos computacionales (es decir, en cantidad de tiempo, específicamente en segundos):
| Modelo                  | Octubre | Noviembre |
|-------------------------|---------|-----------|
| XGBoost Classifier      |    <1    |     <1     |
| LightGBM Classifier     |    aprox 1    |     <1     |
| Random Forest Classifier|    aprox 1    |     <1     |

En términos computacionales podemos decir que los tres modelos se entrenaron más rápido que solo con data_1.csv.


#A continuación el script de python que contiene el dag_dynamic.py

```Python
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

from airflow.utils.trigger_rule import TriggerRule


from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime
from hiring_dynamic_functions import (
    create_folders,
    split_data,
    train_model,
    load_and_merge,
    evaluate_models
)

default_args = {"owner": "airflow"}

def decide_branch(ds, **kwargs):
    """Esta función decide qué datos descargar según la fecha de ejecución, si
    antes o después del primero de Noviembre de 2024"""
    cutoff_date = datetime(2024, 11, 1)
    current_date = datetime.strptime(ds, "%Y-%m-%d")
    if current_date < cutoff_date:
        return "download_data_1"
    else:
        return ["download_data_1", "download_data_2"]

    #Ahora dividimos el enunciado de este script por puntos. Cada punto es una parte del DAG;

    
    #punto 1: Inicializamos el DAG con fecha de inicio el 1 de Octubre de 2024, el cual se debe ejecutar
    #el día 5 de cada mes a las 15:00 UTC, utilizando un dag_id interpretable para identificar fácilmente
    #habilitamos el backfill para que pueda ejecutar tareas programadas desde fechas pasadas.
    
with DAG(
    dag_id="hiring_dynamic",#cambiamos de hiring_lineal a hiring_dynamic
    default_args=default_args,
    start_date=datetime(2024, 10, 1),
    schedule_interval="0 15 5 * *",#cambiamos None por las 15 UTC, osea 0 15 5 * * 
    catchup=True,#aqui activamos backfilling para ejecutar tareas programadas desde fechas pasadas
    description="DAG para predicción dinámica de contrataciones",
) as dag:

    
    #punto 2; aquí definimos el marcador de posición que indica el inicio del pipeline
    start = EmptyOperator(task_id="start")

    #punto 3; creamos carpetas para la ejecución actual
    crear_carpetas = PythonOperator(
        task_id="create_folders",
        python_callable=create_folders,#esta es una función importada de hiring_dynamic_functions.py
        op_kwargs={"ds": "{{ ds }}"},
    )

    #punto 4; hacemos branching (es decir, "rama"-izado) para decidir qué datos descargar,
    #usando la función decide_branch que definimos antes.
    branching = BranchPythonOperator(
        task_id="branching_download",
        python_callable=decide_branch,
        op_kwargs={"ds": "{{ ds }}"},
    )

    #descargamos o bien data_1.csv o bien data_2.csv! Este punto es nuevo.
    #para data_1.csv (siempre)
    download_data_1 = BashOperator(
        task_id="download_data_1",
        bash_command=(
            "mkdir -p $AIRFLOW_HOME/data/{{ ds }}/raw && "
            "curl -sSf -o $AIRFLOW_HOME/data/{{ ds }}/raw/data_1.csv "
            "https://gitlab.com/eduardomoyab/laboratorio-13/-/raw/main/files/data_1.csv"
        ),
        env={"AIRFLOW_HOME": "/opt/airflow"},
    )

    #para data_2.csv (solo desde noviembre)
    download_data_2 = BashOperator(
        task_id="download_data_2",
        bash_command=(
            "mkdir -p $AIRFLOW_HOME/data/{{ ds }}/raw && "
            "curl -sSf -o $AIRFLOW_HOME/data/{{ ds }}/raw/data_2.csv "
            "https://gitlab.com/eduardomoyab/laboratorio-13/-/raw/main/files/data_2.csv"
        ),
        env={"AIRFLOW_HOME": "/opt/airflow"},
    )


    #punto 5: Concatenamos los datasets disponibles usando la función
    #load_and_merge(), configurando un trigger para que la tarea se eje
    #cute si encuentra disponible como mínimo uno de los archivos.
    load_merge = PythonOperator(
        task_id="load_and_merge",
        python_callable=load_and_merge,#usamos la función de hiring_dynamic_functions.py
        op_kwargs={"ds": "{{ ds }}"},
        
        # Ejecutamos este operador si al menos uno de
        #los archivos existe (usamos TriggerRule como ONE_SUCCESS)
        trigger_rule=TriggerRule.ONE_SUCCESS,
    )

    #punto 6; Aplicamos el holdout y split de los datos
    #en entrenamiento y prueba usando la función split_data()
    split = PythonOperator(
        task_id="split_data",
        python_callable=split_data,#usamos la función de hiring_dynamic_functions.py
        op_kwargs={"ds": "{{ ds }}"},
    )

    
    #punto 7; Entrenamientos paralelos; realizamos 3 entrenamientos
    #paralelos de 3 modelos diferentes, donde nos aseguramos de guardar
    #los modelos entrenados con nombres distintivos, usando la función
    #train_model().

    train_rf = PythonOperator(
        task_id="train_rf",
        python_callable=train_model,
        op_kwargs={
            "ds": "{{ ds }}",
            "model": RandomForestClassifier(),
            "model_name": "rf"
        },
    )

    train_xgb = PythonOperator(
        task_id="train_xgb",
        python_callable=train_model,
        op_kwargs={
            "ds": "{{ ds }}",
            "model": XGBClassifier(use_label_encoder=False, eval_metric="logloss"),
            "model_name": "xgb"
        },
    )

    train_lgbm = PythonOperator(
        task_id="train_lgbm",
        python_callable=train_model,
        op_kwargs={
            "ds": "{{ ds }}",
            "model": LGBMClassifier(),
            "model_name": "lgbm"
        },
    )



    #punto 8;
    # Evaluación de modelos: registramos el accuracy de cada modelo en el set
    # de prueba, luego imprimimos el mejor modelo seleccionado para que la
    # tarea se ejecute solamente si los 3 modelos fueron entrenados y guardados
    evaluate = PythonOperator(
        task_id="evaluate_models",
        python_callable=evaluate_models,
        op_kwargs={"ds": "{{ ds }}"},
        # Ejecutar solo si los 3 modelos fueron entrenados (todos OK)
        trigger_rule=TriggerRule.ALL_SUCCESS,
    )
    
    start >> crear_carpetas >> branching
    branching >> [download_data_1, download_data_2] >> load_merge
    load_merge >> split
    split >> [train_rf, train_xgb, train_lgbm] >> evaluate


````

# Conclusión

Éxito!
<div align="center">
  <img src="https://miro.medium.com/v2/resize:fit:1000/1*PX8WVijZapo7EDrvGv9Inw.gif" width="500">
</div>
