In [None]:
# ============================================================
# 🚢 Proyecto ETL Titanic Dataset con Arquitectura Medallion
# Author: SI
# ============================================================

# =======================
# 1️⃣ IMPORTAR LIBRERÍAS
# =======================
import os
import datetime
import logging
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# =======================
# 2️⃣ CONFIGURACIÓN GLOBAL
# =======================
# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Crear estructura de carpetas (Bronze/Silver/Gold)
BASE_PATH = "/content/data"
for layer in ["bronze", "silver", "gold"]:
    os.makedirs(f"{BASE_PATH}/{layer}", exist_ok=True)
logger.info("Carpetas de datos creadas correctamente.")

In [None]:
# =======================
# 3️⃣ EXTRACCIÓN (EXTRACT)
# =======================
def extract_titanic_data() -> pl.DataFrame:
    """
    Descarga el dataset Titanic y lo carga como DataFrame Polars.
    """
    url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
    local_path = f"{BASE_PATH}/bronze/titanic_raw.csv"

    try:
        logger.info("Descargando dataset Titanic...")
        os.system(f"wget -q -O {local_path} {url}")
        df = pl.read_csv(local_path)
        df = df.with_columns(pl.lit(datetime.datetime.now()).alias("ingestion_timestamp"))
        logger.info(f"Dataset Titanic cargado con {df.height} registros y {df.width} columnas.")
        return df
    except Exception as e:
        logger.error(f"Error al extraer los datos: {e}")
        raise

In [None]:
# =======================
# 4️⃣ CAPA BRONZE
# =======================
def load_bronze(df: pl.DataFrame):
    """
    Guarda los datos crudos en formato Parquet (Bronze Layer).
    """
    output_path = f"{BASE_PATH}/bronze/titanic_bronze.parquet"
    df.write_parquet(output_path)
    logger.info(f"Datos Bronze guardados en {output_path}")

In [None]:
# =======================
# 5️⃣ CAPA SILVER
# =======================
def transform_silver(df: pl.DataFrame) -> pl.DataFrame:
    """
    Limpieza y estandarización:
    - Manejo de valores nulos (Age)
    - Categorización de tarifas
    - Eliminación de duplicados
    - Tipos de datos correctos
    """
    logger.info("Iniciando transformaciones Silver...")

    # Imputar edades nulas con la mediana
    median_age = df["Age"].median()
    df = df.with_columns(
        pl.when(pl.col("Age").is_null())
        .then(median_age)
        .otherwise(pl.col("Age"))
        .alias("Age")
    )

    # Categorizar tarifa (Fare)
    df = df.with_columns(
      pl.when(pl.col("Fare") < 10).then(pl.lit("Low"))
      .when(pl.col("Fare") < 30).then(pl.lit("Medium"))
      .when(pl.col("Fare") < 100).then(pl.lit("High"))
      .otherwise(pl.lit("Very High"))
      .alias("FareCategory")
    )

    # Eliminar duplicados (por PassengerId)
    df = df.unique(subset=["PassengerId"])

    # Convertir columnas de texto a string explícitamente
    df = df.with_columns([
        pl.col("Name").cast(pl.Utf8),
        pl.col("Sex").cast(pl.Utf8),
        pl.col("Embarked").cast(pl.Utf8)
    ])

    logger.info(f"Transformaciones Silver aplicadas. Total registros: {df.height}")
    return df


def load_silver(df: pl.DataFrame):
    """
    Guarda los datos limpios en formato Parquet (Silver Layer).
    """
    output_path = f"{BASE_PATH}/silver/titanic_silver.parquet"
    df.write_parquet(output_path)
    logger.info(f"Datos Silver guardados en {output_path}")

In [None]:
# =======================
# 6️⃣ CAPA GOLD
# =======================
def aggregate_gold(df: pl.DataFrame) -> pl.DataFrame:
    """
    Crea métricas de negocio:
    - Tasa de supervivencia por clase
    - Tasa de supervivencia por género
    """
    logger.info("Iniciando agregaciones Gold...")

    survival_by_class = (
        df.group_by("Pclass")
          .agg([
              pl.col("Survived").mean().alias("SurvivalRate"),
              pl.len().alias("TotalPassengers")
          ])
          .sort("Pclass")
    )

    survival_by_gender = (
        df.group_by("Sex")
          .agg([
              pl.col("Survived").mean().alias("SurvivalRate"),
              pl.len().alias("TotalPassengers")
          ])
    )

    # Guardar ambos resultados
    survival_by_class.write_parquet(f"{BASE_PATH}/gold/survival_by_class.parquet")
    survival_by_gender.write_parquet(f"{BASE_PATH}/gold/survival_by_gender.parquet")

    logger.info("Tablas Gold generadas correctamente.")
    return survival_by_class, survival_by_gender

In [None]:
# =======================
# 7️⃣ VISUALIZACIÓN (Gold)
# =======================
def visualize_gold(df_class: pl.DataFrame, df_gender: pl.DataFrame):
    """
    Crea visualizaciones de tasas de supervivencia por clase y género.
    """
    sns.set(style="whitegrid")
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))

    # Supervivencia por clase
    sns.barplot(x=df_class["Pclass"].to_pandas(),
                y=df_class["SurvivalRate"].to_pandas(),
                hue=df_class["Pclass"].to_pandas(),
                palette="Blues_d", ax=axes[0])
    axes[0].set_title("Tasa de Supervivencia por Clase")
    axes[0].set_xlabel("Clase")
    axes[0].set_ylabel("Tasa de Supervivencia")

    # Supervivencia por género
    sns.barplot(x=df_gender["Sex"].to_pandas(),
                y=df_gender["SurvivalRate"].to_pandas(),
                hue=df_gender["Sex"].to_pandas(),
                palette="Purples_d", ax=axes[1])
    axes[1].set_title("Tasa de Supervivencia por Género")
    axes[1].set_xlabel("Género")
    axes[1].set_ylabel("Tasa de Supervivencia")

    plt.tight_layout()
    plt.show()

In [None]:
# =======================
# 8️⃣ PIPELINE PRINCIPAL
# =======================
def main():
    logger.info("=== INICIANDO PIPELINE ETL TITANIC ===")

    # EXTRACT
    df_raw = extract_titanic_data()
    load_bronze(df_raw)

    # SILVER
    df_silver = transform_silver(df_raw)
    load_silver(df_silver)

    # GOLD
    df_class, df_gender = aggregate_gold(df_silver)

    # VISUALIZACIÓN
    visualize_gold(df_class, df_gender)

    logger.info("✅ Pipeline ETL completado correctamente.")

# Ejecutar todo el pipeline
if __name__ == "__main__":
    main()