# Ejemplo ETL con Polars: Dataset de Taxis de Nueva York

En esta sección, implementaremos un ejemplo completo de ETL (Extracción, Transformación y Carga) utilizando Polars para procesar el dataset de taxis de Nueva York. Este ejemplo demostrará las ventajas de Polars sobre Pandas en términos de rendimiento y funcionalidades.

Nuestro ETL incluirá:
1. Extracción de datos desde archivos Parquet
2. Transformación y limpieza de datos con Polars
3. Validación de datos con Pydantic
4. Carga de datos en una base de datos SQLite utilizando SQLAlchemy
5. Implementación de DAGs (Directed Acyclic Graphs) con Prefect
6. Configuración de logging para seguimiento del proceso

Comencemos explorando la estructura del proyecto y los componentes principales.

## Estructura del Proyecto

Nuestro proyecto ETL está organizado de la siguiente manera:

```
notebook_polars_pyspark/
├── data/
│   └── yellow_tripdata_2022-01.parquet  # Dataset de taxis de Nueva York
├── etl_example/
│   ├── __init__.py
│   ├── etl_config.py      # Configuración del ETL
│   ├── models.py          # Modelos Pydantic para validación
│   ├── database.py        # Configuración de SQLAlchemy
│   ├── logger.py          # Configuración de logging
│   ├── etl_dag.py         # Implementación de DAGs con Prefect
│   ├── output/            # Directorio para la base de datos
│   └── logs/              # Directorio para logs
└── notebooks/
    ├── 01_introduccion_y_definiciones.ipynb
    ├── 02_analisis_comparativo.ipynb
    └── 03_ejemplo_etl.ipynb  # Este notebook
```

Vamos a examinar cada componente del ETL en detalle.

## 1. Configuración del ETL (etl_config.py)

El archivo `etl_config.py` contiene la configuración básica para nuestro ETL, incluyendo rutas de archivos, configuración de la base de datos y parámetros de logging.

In [None]:
# Mostrar el contenido del archivo etl_config.py
!cat ../etl_example/etl_config.py

## 2. Modelos de Datos con Pydantic (models.py)

Utilizamos Pydantic para definir modelos de datos con validación estricta de tipos. Esto nos permite asegurar que los datos cumplen con nuestras expectativas antes de cargarlos en la base de datos.

In [None]:
# Mostrar el contenido del archivo models.py
!cat ../etl_example/models.py

### Ventajas de Pydantic para Validación de Datos

Pydantic ofrece varias ventajas para la validación de datos en flujos ETL:

1. **Validación de tipos en tiempo de ejecución**: Pydantic valida automáticamente los tipos de datos y convierte valores cuando es posible.
2. **Validadores personalizados**: Podemos definir funciones de validación personalizadas para reglas de negocio específicas.
3. **Documentación integrada**: Los modelos Pydantic son autodocumentados con descripciones de campos.
4. **Integración con FastAPI y otras bibliotecas**: Pydantic se integra bien con el ecosistema de Python.
5. **Manejo de errores detallado**: Proporciona mensajes de error claros cuando la validación falla.

## 3. Configuración de la Base de Datos con SQLAlchemy (database.py)

Utilizamos SQLAlchemy para definir el esquema de la base de datos y gestionar las conexiones. SQLAlchemy nos permite trabajar con bases de datos de manera orientada a objetos.

In [None]:
# Mostrar el contenido del archivo database.py
!cat ../etl_example/database.py

### Ventajas de SQLAlchemy para ETL

SQLAlchemy ofrece varias ventajas para los procesos ETL:

1. **Abstracción de la base de datos**: Podemos cambiar el motor de base de datos sin modificar el código.
2. **Mapeo objeto-relacional (ORM)**: Trabajamos con objetos Python en lugar de SQL directo.
3. **Gestión de sesiones**: Manejo eficiente de transacciones y conexiones.
4. **Migraciones de esquema**: Facilita la evolución del esquema de la base de datos.
5. **Validación a nivel de base de datos**: Complementa la validación de Pydantic con restricciones a nivel de base de datos.

## 4. Configuración de Logging (logger.py)

El sistema de logging nos permite seguir el progreso del ETL y diagnosticar problemas.

In [None]:
# Mostrar el contenido del archivo logger.py
!cat ../etl_example/logger.py

## 5. Implementación de DAGs con Prefect (etl_dag.py)

Utilizamos Prefect para implementar DAGs (Directed Acyclic Graphs) que definen el flujo de trabajo del ETL. Prefect nos permite definir tareas y sus dependencias, gestionar errores y monitorear el progreso.

In [None]:
# Mostrar el contenido del archivo etl_dag.py
!cat ../etl_example/etl_dag.py

## Ejecutando el ETL

Ahora vamos a ejecutar nuestro ETL y analizar su rendimiento. Primero, importamos los módulos necesarios y configuramos el entorno.

In [None]:
import sys
import os
import time
from pathlib import Path

# Añadir el directorio raíz al path para poder importar los módulos
sys.path.append(str(Path.cwd().parent))

# Importar los módulos del ETL
from etl_example.etl_dag import nyc_taxi_etl_flow
from etl_example.etl_config import DB_PATH, OUTPUT_DIR

# Asegurar que el directorio de salida existe
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)

# Eliminar la base de datos si existe para empezar desde cero
if DB_PATH.exists():
    DB_PATH.unlink()

print(f"Configuración completada. La base de datos se creará en: {DB_PATH}")

Ahora ejecutamos el flujo ETL y medimos el tiempo que tarda en completarse.

In [None]:
# Ejecutar el flujo ETL y medir el tiempo
start_time = time.time()

# Ejecutar el flujo
nyc_taxi_etl_flow()

end_time = time.time()
execution_time = end_time - start_time

print(f"\nETL completado en {execution_time:.2f} segundos")

## Verificando los Resultados

Vamos a verificar que los datos se hayan cargado correctamente en la base de datos SQLite.

In [None]:
import sqlite3
import pandas as pd

# Conectar a la base de datos
conn = sqlite3.connect(str(DB_PATH))

# Consultar el número de registros en la tabla de viajes
query_count = "SELECT COUNT(*) FROM taxi_trips"
trip_count = pd.read_sql_query(query_count, conn).iloc[0, 0]

# Consultar el número de ubicaciones
location_count = pd.read_sql_query("SELECT COUNT(*) FROM locations", conn).iloc[0, 0]

print(f"Número de viajes en la base de datos: {trip_count}")
print(f"Número de ubicaciones en la base de datos: {location_count}")

# Consultar algunos viajes para verificar
query_sample = "SELECT * FROM taxi_trips LIMIT 5"
sample_trips = pd.read_sql_query(query_sample, conn)

print("\nMuestra de viajes:")
sample_trips

## Comparación de Rendimiento: Polars vs Pandas

Para demostrar las ventajas de rendimiento de Polars sobre Pandas, vamos a implementar una versión simplificada del mismo proceso ETL utilizando Pandas y comparar los tiempos de ejecución.

In [None]:
import pandas as pd
import time
from etl_example.etl_config import TAXI_DATA_FILE

def etl_with_pandas():
    # Extracción
    start_time = time.time()
    print("Extrayendo datos con Pandas...")
    df_pandas = pd.read_parquet(TAXI_DATA_FILE)
    extraction_time = time.time() - start_time
    print(f"Extracción completada en {extraction_time:.2f} segundos")
    
    # Transformación
    start_time = time.time()
    print("Transformando datos con Pandas...")
    
    # Renombrar columnas para consistencia
    column_mapping = {
        "VendorID": "vendor_id",
        "tpep_pickup_datetime": "pickup_datetime",
        "tpep_dropoff_datetime": "dropoff_datetime",
        "PULocationID": "pickup_location_id",
        "DOLocationID": "dropoff_location_id"
    }
    df_pandas = df_pandas.rename(columns=column_mapping)
    
    # Filtrar viajes con distancia válida
    df_pandas = df_pandas[df_pandas['trip_distance'] > 0]
    
    # Filtrar viajes con tarifa válida
    df_pandas = df_pandas[df_pandas['fare_amount'] >= 0]
    
    # Calcular la duración del viaje en minutos
    df_pandas['trip_duration_minutes'] = (df_pandas['dropoff_datetime'] - df_pandas['pickup_datetime']).dt.total_seconds() / 60
    
    # Filtrar viajes con duración válida
    df_pandas = df_pandas[df_pandas['trip_duration_minutes'] > 0]
    
    # Calcular la velocidad promedio
    df_pandas['avg_speed_mph'] = df_pandas['trip_distance'] / (df_pandas['trip_duration_minutes'] / 60)
    
    # Filtrar velocidades razonables
    df_pandas = df_pandas[df_pandas['avg_speed_mph'] < 100]
    
    # Manejar valores nulos
    df_pandas['passenger_count'] = df_pandas['passenger_count'].fillna(1)
    df_pandas['congestion_surcharge'] = df_pandas['congestion_surcharge'].fillna(0)
    df_pandas['Airport_fee'] = df_pandas['Airport_fee'].fillna(0)
    
    transformation_time = time.time() - start_time
    print(f"Transformación completada en {transformation_time:.2f} segundos")
    
    return {
        "extraction_time": extraction_time,
        "transformation_time": transformation_time,
        "total_time": extraction_time + transformation_time,
        "row_count": len(df_pandas)
    }

def etl_with_polars():
    import polars as pl
    
    # Extracción
    start_time = time.time()
    print("Extrayendo datos con Polars...")
    df_polars = pl.read_parquet(TAXI_DATA_FILE)
    extraction_time = time.time() - start_time
    print(f"Extracción completada en {extraction_time:.2f} segundos")
    
    # Transformación
    start_time = time.time()
    print("Transformando datos con Polars...")
    
    # Renombrar columnas para consistencia
    column_mapping = {
        "VendorID": "vendor_id",
        "tpep_pickup_datetime": "pickup_datetime",
        "tpep_dropoff_datetime": "dropoff_datetime",
        "PULocationID": "pickup_location_id",
        "DOLocationID": "dropoff_location_id"
    }
    for old_name, new_name in column_mapping.items():
        if old_name in df_polars.columns:
            df_polars = df_polars.rename({old_name: new_name})
    
    # Filtrar viajes con distancia válida
    df_polars = df_polars.filter(pl.col("trip_distance") > 0)
    
    # Filtrar viajes con tarifa válida
    df_polars = df_polars.filter(pl.col("fare_amount") >= 0)
    
    # Calcular la duración del viaje en minutos
    df_polars = df_polars.with_columns([
        ((pl.col("dropoff_datetime").dt.epoch() - pl.col("pickup_datetime").dt.epoch()) / 60).alias("trip_duration_minutes")
    ])
    
    # Filtrar viajes con duración válida
    df_polars = df_polars.filter(pl.col("trip_duration_minutes") > 0)
    
    # Calcular la velocidad promedio
    df_polars = df_polars.with_columns([
        (pl.col("trip_distance") / (pl.col("trip_duration_minutes") / 60)).alias("avg_speed_mph")
    ])
    
    # Filtrar velocidades razonables
    df_polars = df_polars.filter(pl.col("avg_speed_mph") < 100)
    
    # Manejar valores nulos
    df_polars = df_polars.with_columns([
        pl.col("passenger_count").fill_null(1),
        pl.col("congestion_surcharge").fill_null(0),
        pl.col("Airport_fee").fill_null(0)
    ])
    
    transformation_time = time.time() - start_time
    print(f"Transformación completada en {transformation_time:.2f} segundos")
    
    return {
        "extraction_time": extraction_time,
        "transformation_time": transformation_time,
        "total_time": extraction_time + transformation_time,
        "row_count": df_polars.shape[0]
    }

# Ejecutar ambas versiones y comparar
print("=== Benchmark: Pandas vs Polars ===")
print("\n1. Ejecutando ETL con Pandas...")
pandas_results = etl_with_pandas()

print("\n2. Ejecutando ETL con Polars...")
polars_results = etl_with_polars()

# Calcular la mejora de rendimiento
speedup_extraction = pandas_results["extraction_time"] / polars_results["extraction_time"]
speedup_transformation = pandas_results["transformation_time"] / polars_results["transformation_time"]
speedup_total = pandas_results["total_time"] / polars_results["total_time"]

print("\n=== Resultados del Benchmark ===")
print(f"Filas procesadas: {pandas_results['row_count']}")
print("\nTiempos de Pandas:")
print(f"  - Extracción: {pandas_results['extraction_time']:.2f} segundos")
print(f"  - Transformación: {pandas_results['transformation_time']:.2f} segundos")
print(f"  - Total: {pandas_results['total_time']:.2f} segundos")

print("\nTiempos de Polars:")
print(f"  - Extracción: {polars_results['extraction_time']:.2f} segundos")
print(f"  - Transformación: {polars_results['transformation_time']:.2f} segundos")
print(f"  - Total: {polars_results['total_time']:.2f} segundos")

print("\nMejora de rendimiento (Polars vs Pandas):")
print(f"  - Extracción: {speedup_extraction:.2f}x más rápido")
print(f"  - Transformación: {speedup_transformation:.2f}x más rápido")
print(f"  - Total: {speedup_total:.2f}x más rápido")

## Ventajas de Polars para ETL

Basándonos en la implementación y los resultados del benchmark, podemos destacar las siguientes ventajas de Polars para procesos ETL:

1. **Rendimiento superior**: Como hemos visto en el benchmark, Polars es significativamente más rápido que Pandas en operaciones de extracción y transformación.

2. **Ejecución perezosa (lazy)**: Polars permite definir un plan de ejecución completo antes de ejecutarlo, lo que permite optimizaciones globales.

3. **Paralelismo automático**: Polars aprovecha automáticamente todos los núcleos disponibles sin configuración adicional.

4. **Eficiencia de memoria**: Polars consume menos memoria que Pandas para las mismas operaciones.

5. **API expresiva**: La API de Polars permite expresar transformaciones complejas de manera concisa y legible.

6. **Integración con ecosistema de datos**: Polars se integra bien con formatos como Parquet, CSV, JSON, etc.

7. **Consistencia de API**: La API de Polars es más consistente y predecible que la de Pandas.

Estas ventajas hacen de Polars una excelente opción para procesos ETL que manejan conjuntos de datos medianos a grandes en una sola máquina.

## Ventajas de la Arquitectura ETL Implementada

Nuestra arquitectura ETL combina varias tecnologías modernas para crear un flujo de trabajo robusto y eficiente:

1. **Polars para procesamiento de datos**: Aprovechamos el rendimiento y la expresividad de Polars para las operaciones de extracción y transformación.

2. **Pydantic para validación de datos**: Utilizamos Pydantic para asegurar que los datos cumplen con nuestras expectativas antes de cargarlos en la base de datos.

3. **SQLAlchemy para acceso a base de datos**: Utilizamos SQLAlchemy para definir el esquema de la base de datos y gestionar las conexiones de manera orientada a objetos.

4. **Prefect para orquestación de flujos**: Implementamos DAGs con Prefect para definir el flujo de trabajo, gestionar errores y monitorear el progreso.

5. **Logging para seguimiento**: Configuramos un sistema de logging para seguir el progreso del ETL y diagnosticar problemas.

Esta arquitectura proporciona:

- **Modularidad**: Cada componente tiene una responsabilidad clara y puede ser modificado o reemplazado independientemente.
- **Escalabilidad**: El diseño permite escalar a conjuntos de datos más grandes y flujos de trabajo más complejos.
- **Mantenibilidad**: El código está organizado de manera lógica y sigue buenas prácticas de ingeniería de software.
- **Robustez**: La validación de datos y el manejo de errores aseguran que el ETL sea resistente a problemas.
- **Observabilidad**: El logging y la monitorización permiten seguir el progreso y diagnosticar problemas.

## Conclusiones

En este ejemplo, hemos implementado un ETL completo utilizando Polars para procesar el dataset de taxis de Nueva York. Hemos demostrado las ventajas de Polars sobre Pandas en términos de rendimiento y funcionalidades, y hemos construido una arquitectura ETL robusta y eficiente.

Las principales conclusiones son:

1. **Polars ofrece un rendimiento significativamente mejor que Pandas** para operaciones ETL, especialmente en conjuntos de datos medianos a grandes.

2. **La combinación de Polars, Pydantic, SQLAlchemy y Prefect** proporciona una arquitectura ETL robusta, eficiente y mantenible.

3. **La validación estricta de tipos con Pydantic** asegura la integridad de los datos antes de cargarlos en la base de datos.

4. **La implementación de DAGs con Prefect** permite definir flujos de trabajo complejos de manera clara y gestionar errores de manera efectiva.

5. **El logging y la monitorización** son esenciales para seguir el progreso del ETL y diagnosticar problemas.

En la siguiente sección, presentaremos un ejercicio práctico para que los estudiantes implementen su propio ETL utilizando estas tecnologías.