# Fase 2: Robust Data Preprocessing

Este notebook implementa el pipeline de limpieza, validación, imputación y agregación mensual de los datos crudos para el proyecto **Forecaster Mis Buñuelos**.

**Objetivo:** Generar `data/02_cleansed/master_monthly.parquet`.

In [None]:
# Celda 1: Setup
import pandas as pd
import numpy as np
import yaml
from pathlib import Path
import os
from datetime import datetime
import platform

# Configurar pandas
pd.set_option('display.max_columns', None)

# Definir Rutas
BASE_DIR = Path(os.getcwd())
if BASE_DIR.name == "notebooks":
    BASE_DIR = BASE_DIR.parent

CONFIG_PATH = BASE_DIR / "config.yaml"
RAW_DATA_PATH = BASE_DIR / "data" / "01_raw"
CLEANSED_DATA_PATH = BASE_DIR / "data" / "02_cleansed"
ARTIFACTS_PATH = BASE_DIR / "experiments" / "phase_02_preprocessing" / "artifacts"

# Crear directorios
CLEANSED_DATA_PATH.mkdir(parents=True, exist_ok=True)
ARTIFACTS_PATH.mkdir(parents=True, exist_ok=True)

# Cargar Configuración
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
    config = yaml.safe_load(f)

print("Configuración cargada y rutas establecidas.")

In [None]:
# Celda 2: Carga de Datos Crudos
files = {
    "ventas": RAW_DATA_PATH / "ventas_diarias.parquet",
    "marketing": RAW_DATA_PATH / "redes_sociales.parquet",
    "promo": RAW_DATA_PATH / "promocion_diaria.parquet",
    "macro": RAW_DATA_PATH / "macro_economia.parquet"
}

dataframes = {}
print("Cargando archivos:")
for key, path in files.items():
    if path.exists():
        df = pd.read_parquet(path)
        dataframes[key] = df
        print(f"  - {key}: {df.shape}")
    else:
        raise FileNotFoundError(f"Archivo no encontrado: {path}")

# Referencias directas para facilitar el código subsiguiente
df_ventas = dataframes["ventas"]
df_marketing = dataframes["marketing"]
df_promo = dataframes["promo"]
df_macro = dataframes["macro"]

In [None]:
# Celda 3: Validación de Contrato de Datos (Critical Check)
print("Validando Contratos de Datos...")
data_contract = config.get("data_contract", {})

# Mapa de keys internas a keys del config
config_map = {
    "ventas": "ventas_diarias",
    "marketing": "redes_sociales",
    "promo": "promocion_diaria",
    "macro": "macro_economia"
}

data_contract_status = {}
    
for key, df in dataframes.items():
    config_name = config_map.get(key)
    # Obtener columnas esperadas
    expected_cols = list(data_contract.get(config_name, {}).keys())
    
    missing_cols = [col for col in expected_cols if col not in df.columns]
    
    if missing_cols:
        error_msg = f"ERROR CRÍTICO en {key}: Faltan columnas {missing_cols}"
        print(error_msg)
        data_contract_status[key] = f"FAILED: Missing {missing_cols}"
        raise RuntimeError(error_msg)
    else:
        print(f"  - {key}: Validación de Contrato OK")
        data_contract_status[key] = "OK"

In [None]:
# Celda 4: Estandarización de Nombres (Cleaning)
rename_map = config["preprocessing"].get("rename_map") or {}
print(f"Aplicando rename_map: {rename_map}")

for key, df in dataframes.items():
    # Renombrar columnas
    df.rename(columns=rename_map, inplace=True)
    # Convertir a snake_case (opcional pero recomendado)
    df.columns = [col.lower().replace(" ", "_") for col in df.columns]
    dataframes[key] = df

print("Nombres estandarizados.")

In [None]:
# Celda 5: Selección de Columnas (Schema Enforcement)
print("Aplicando Selección de Columnas (Schema Enforcement)...")
columns_removed_log = {}

for key, df in dataframes.items():
    config_name = config_map.get(key)
    # Obtener columnas esperadas originales del contrato
    original_expected_cols = list(data_contract.get(config_name, {}).keys())
    
    # Aplicar el mismo mapeo de nombres a la lista esperada
    final_expected_cols = []
    for col in original_expected_cols:
        # Si 'col' está en rename_map keys, usa el value, si no usa 'col'
        # Luego aplica lower() y replace() igual que hicimos con el df
        new_name = rename_map.get(col, col).lower().replace(" ", "_")
        final_expected_cols.append(new_name)
    
    # Filtrar el DataFrame
    cols_to_keep = [col for col in df.columns if col in final_expected_cols]
    removed = [col for col in df.columns if col not in final_expected_cols]
    
    if removed:
        columns_removed_log[key] = removed
    
    dataframes[key] = df[cols_to_keep].copy()

print("Columnas eliminadas por no estar en contrato:", columns_removed_log)

In [None]:
# Celda 6: Limpieza de Filas (Duplicados y Ruido)
print("Limpieza de Filas...")
stats_cleaning = {"duplicates": {}, "filtered": {}}
filters = config["preprocessing"].get("filters", {})
min_date = pd.to_datetime(filters.get("min_date", "2018-01-01"))

for key, df in dataframes.items():
    initial_rows = len(df)
    
    # 1. Deduplicación Exacta
    df = df.drop_duplicates()
    
    # 2. Deduplicación Temporal (Keep Last)
    if "fecha" in df.columns:
        df["fecha"] = pd.to_datetime(df["fecha"])
        df = df.sort_values("fecha")
        # Eliminar duplicados de fecha conservando el último
        duplicates_date = df.duplicated(subset=["fecha"], keep="last")
        df = df[~duplicates_date]
    
    rows_after_dedup = len(df)
    stats_cleaning["duplicates"][key] = initial_rows - rows_after_dedup
    
    # 3. Filtrado por Fecha
    if "fecha" in df.columns:
        df = df[df["fecha"] >= min_date]
        stats_cleaning["filtered"][key] = rows_after_dedup - len(df)
    
    dataframes[key] = df

print("Estadísticas de Limpieza:", stats_cleaning)

In [None]:
# Celda 7: Tratamiento de Valores Centinela
print("Tratamiento de Centinelas...")
sentinel_values = config["quality"].get("sentinel_values", {})
numeric_sentinels = sentinel_values.get("numeric", []) # ej [-1, 999]
text_sentinels = sentinel_values.get("text", [])

sentinel_stats = {}

for key, df in dataframes.items():
    count_replaced = 0
    for col in df.columns:
        # Excepción confianza_consumidor
        is_confianza = (key == "macro" and col == "confianza_consumidor")
        
        if pd.api.types.is_numeric_dtype(df[col]):
            for val in numeric_sentinels:
                if is_confianza and val == -1:
                    continue # No reemplazar -1 en confianza
                
                mask = (df[col] == val)
                if mask.any():
                    count_replaced += mask.sum()
                    df.loc[mask, col] = np.nan
                    
        elif pd.api.types.is_string_dtype(df[col]):
             for val in text_sentinels:
                mask = (df[col] == val)
                if mask.any():
                    count_replaced += mask.sum()
                    df.loc[mask, col] = np.nan
    
    sentinel_stats[key] = int(count_replaced)
    dataframes[key] = df

print("Centinelas reemplazados por NaN:", sentinel_stats)

In [None]:
# Celda 8: Garantizar Completitud Temporal (Reindexing)
print("Garantizar Completitud Temporal...")
# Determinar rango completo global (desde min_date hasta maximo encontrado)
all_max_dates = [df["fecha"].max() for df in dataframes.values() if "fecha" in df.columns]
global_max_date = max(all_max_dates) if all_max_dates else datetime.now()

# Obtener frecuencias configuradas por fuente
freq_map = config["preprocessing"].get("data_frequency", {})
# Mapeo interno: key_notebook -> key_config
key_map = {
    "ventas": "ventas_diarias",
    "marketing": "redes_sociales",
    "promo": "promocion_diaria",
    "macro": "macro_economia"
}

reindex_stats = {}

for key, df in dataframes.items():
    if "fecha" in df.columns:
        config_key = key_map.get(key)
        freq = freq_map.get(config_key, "D") # Default a Diario si no está config
        
        print(f"  - Reindexando {key} con frecuencia: {freq}")
        
        # Crear índice específico para esta fuente con su frecuencia correcta
        full_idx = pd.date_range(start=min_date, end=global_max_date, freq=freq, name="fecha")
        
        # Set index fecha
        df = df.set_index("fecha")
        # Eliminar duplicados de índice si quedaran (por seguridad)
        df = df[~df.index.duplicated(keep='last')]
        
        # Reindexar
        original_len = len(df)
        df = df.reindex(full_idx)
        df.index.name = "fecha" # Restaurar nombre
        
        # Reset index para volver a tener columna fecha
        df = df.reset_index()
        
        new_len = len(df)
        reindex_stats[key] = new_len - original_len
        dataframes[key] = df

print("Filas añadidas (huecos temporales recuperados) por reindexado:", reindex_stats)

In [None]:
# Celda 9: Imputación de Nulos (Lógica de Negocio)
print("Ejecutando Imputación de Negocio...")
imputation_stats = {
    "macro": {},
    "promo": {},
    "marketing": {},
    "ventas": {}
}

# Actualizar referencias locales tras reindexado
df_ventas = dataframes["ventas"]
df_marketing = dataframes["marketing"]
df_promo = dataframes["promo"]
df_macro = dataframes["macro"]

# --- Macroeconomía ---
# Media móvil 2 meses anteriores
cols_num_macro = df_macro.select_dtypes(include=np.number).columns
for col in cols_num_macro:
    nulls_before = df_macro[col].isna().sum()
    if nulls_before > 0:
        # Rolling mean shift 1 para causalidad
        df_macro[col] = df_macro[col].fillna(
            df_macro[col].rolling(window=60, min_periods=1).mean().shift(1)
        )
        # Fallback para inicio de serie
        df_macro[col] = df_macro[col].fillna(method='bfill')
        imputation_stats["macro"][col] = int(nulls_before)

# --- Promociones ---
if "es_promo" in df_promo.columns:
    mask_null_promo = df_promo["es_promo"].isna()
    count_promo_nulls = mask_null_promo.sum()
    if count_promo_nulls > 0:
        meses_promo = [4, 5, 9, 10]
        months = df_promo["fecha"].dt.month
        # Meses promo -> 1, Resto -> 0
        df_promo.loc[mask_null_promo & months.isin(meses_promo), "es_promo"] = 1
        df_promo.loc[mask_null_promo & ~months.isin(meses_promo), "es_promo"] = 0
        imputation_stats["promo"]["es_promo_inferred"] = int(count_promo_nulls)

# --- Marketing (Redes) ---
# 1. Campaña
target_col_campana = "ciclo" if "ciclo" in df_marketing.columns else "campana"
mask_camp_null = df_marketing[target_col_campana].isna()
count_campana_nulls = mask_camp_null.sum()
    
if count_campana_nulls > 0:
    fb_val = df_marketing["inversion_facebook"].fillna(0)
    ig_val = df_marketing["inversion_instagram"].fillna(0)
    has_inv = (fb_val > 0) | (ig_val > 0)

    months = df_marketing["fecha"].dt.month
    mask_abr_may = months.isin([3, 4, 5])
    mask_sep_oct = months.isin([8, 9, 10])

    df_marketing.loc[mask_camp_null & has_inv & mask_abr_may, target_col_campana] = "Ciclo Abr-May"
    df_marketing.loc[mask_camp_null & has_inv & mask_sep_oct, target_col_campana] = "Ciclo Sep-Oct"
    df_marketing.loc[mask_camp_null & df_marketing[target_col_campana].isna(), target_col_campana] = "Sin Campaña"
    imputation_stats["marketing"]["campaigns_inferred"] = int(count_campana_nulls)

# 2. Inversiones
fechas = df_marketing["fecha"]
rango1 = (((fechas.dt.month == 3) & (fechas.dt.day >= 15)) | (fechas.dt.month == 4) | ((fechas.dt.month == 5) & (fechas.dt.day <= 25)))
rango2 = (((fechas.dt.month == 8) & (fechas.dt.day >= 15)) | (fechas.dt.month == 9) | ((fechas.dt.month == 10) & (fechas.dt.day <= 25)))
rango_activo = rango1 | rango2

for col in ["inversion_facebook", "inversion_instagram"]:
    if col in df_marketing.columns:
        mask_null = df_marketing[col].isna()
        count_inv_nulls = mask_null.sum()
        if count_inv_nulls > 0:
            mask_null_in_range = mask_null & rango_activo
            if mask_null_in_range.any():
                df_marketing[col] = df_marketing[col].interpolate(method='linear')
            
            mask_null_out_range = mask_null & ~rango_activo
            if mask_null_out_range.any():
                df_marketing.loc[mask_null_out_range, col] = 0
            
            imputation_stats["marketing"][f"{col}_imputed"] = int(count_inv_nulls)

# 3. Consistencia Total
target_col_marketing = "inversion_marketing_total" if "inversion_marketing_total" in df_marketing.columns else "inversion_total_diaria"
if target_col_marketing in df_marketing.columns:
    df_marketing[target_col_marketing] = df_marketing["inversion_facebook"] + df_marketing["inversion_instagram"]

# --- Ventas Diarias ---
# Identificar filas IMPUTADAS (Total era Null originalmente después del reindexado)
imputed_sales_mask = df_ventas["total_unidades_entregadas"].isna()
imputation_stats["ventas"]["dates_missing_imputed"] = int(imputed_sales_mask.sum())

# Precios/Costos -> ffill, bfill
for col in ["precio_unitario_full", "costo_unitario"]:
    if col in df_ventas.columns:
        nulls = df_ventas[col].isna().sum()
        if nulls > 0:
            df_ventas[col] = df_ventas[col].ffill().bfill()
            imputation_stats["ventas"][f"{col}_filled"] = int(nulls)

# Unidades Total
if "total_unidades_entregadas" in df_ventas.columns:
    s_total = df_ventas["total_unidades_entregadas"]
    # Interpolación Lineal (cubre gaps pequeños y "unir puntos")
    s_interp = s_total.interpolate(method='linear')
    df_ventas["total_unidades_entregadas"] = s_interp.fillna(0)

# Desglose
for col in ["unidades_promo_pagadas", "unidades_promo_bonificadas"]:
    if col in df_ventas.columns:
        df_ventas[col] = df_ventas[col].fillna(0)

if "unidades_precio_normal" in df_ventas.columns:
    # Residual
    residual = df_ventas["total_unidades_entregadas"] - (df_ventas["unidades_promo_pagadas"] + df_ventas["unidades_promo_bonificadas"])
    df_ventas["unidades_precio_normal"] = df_ventas["unidades_precio_normal"].fillna(residual)
    # Clip a 0
    df_ventas["unidades_precio_normal"] = df_ventas["unidades_precio_normal"].clip(lower=0)

print("Imputación de Negocio completada.")
print("Stats Imputación:", imputation_stats)

In [None]:
# Celda 10: Recálculo Financiero Selectivo
print("Recálculo Financiero Selectivo...")
recalc_flag = config["preprocessing"].get("recalc_financials", False)

if recalc_flag:
    # Solo filas imputadas en ventas
    if imputed_sales_mask.any():
        count = imputed_sales_mask.sum()
        print(f"Recalculando {count} filas imputadas...")
        
        # Indices
        idx = df_ventas[imputed_sales_mask].index
        
        # Costo Total
        df_ventas.loc[idx, "costo_total"] = (
            df_ventas.loc[idx, "total_unidades_entregadas"] * 
            df_ventas.loc[idx, "costo_unitario"]
        )
        
        # Ingresos Totales (Normal + Promo Pagada) * Precio
        unidades_pagas = (
            df_ventas.loc[idx, "unidades_precio_normal"] + 
            df_ventas.loc[idx, "unidades_promo_pagadas"]
        )
        # FIX: Usar precio_unitario_full
        df_ventas.loc[idx, "ingresos_totales"] = (
            unidades_pagas * df_ventas.loc[idx, "precio_unitario_full"]
        )
        
        # Utilidad
        df_ventas.loc[idx, "utilidad"] = (
            df_ventas.loc[idx, "ingresos_totales"] - 
            df_ventas.loc[idx, "costo_total"]
        )
    else:
        print("No hay filas imputadas para recalcular.")
else:
    print("Recálculo financiero desactivado en config.")

In [None]:
# Celda 11: Agregación Mensual
print("Agregación Mensual (MS)...")
agg_rules = config["preprocessing"].get("aggregation_rules", {})

monthly_dfs = {}

for key, df in dataframes.items():
    if "fecha" in df.columns:
        df = df.set_index("fecha")
    
    # Filtrar reglas para este DF
    current_rules = {col: agg_rules[col] for col in df.columns if col in agg_rules}
    
    # Reglas especiales
    if key == "promo" and "es_promo" in df.columns:
        current_rules["es_promo"] = "sum" # Cuenta días
    elif key == "macro":
        current_rules = {col: "first" for col in df.columns}
    
    # Resample
    if current_rules:
        df_monthly = df.resample("MS").agg(current_rules)
    else:
        df_monthly = df.resample("MS").sum(numeric_only=True)
    
    # Renombres post-agregación
    if key == "promo" and "es_promo" in df_monthly.columns:
        df_monthly.rename(columns={"es_promo": "dias_en_promo"}, inplace=True)
        
    monthly_dfs[key] = df_monthly

print("Agregación completada.")

In [None]:
# Celda 12: Unificación de Fuentes (Merging)
print("Unificando Datasets...")
df_master = monthly_dfs["ventas"].copy()

for key in ["marketing", "promo", "macro"]:
    other_df = monthly_dfs[key]
    # Merge por índice (fechas mes)
    df_master = df_master.merge(other_df, left_index=True, right_index=True, how="left")

print(f"Dataset Maestro Mensual: {df_master.shape}")

In [None]:
# Celda 13: Imputación Post-Merge
print("Imputación Final (Huecos Estructurales)...")
df_master = df_master.interpolate(method='linear').ffill().bfill()

nulos = df_master.isna().sum().sum()
if nulos > 0:
    print(f"ADVERTENCIA: Quedan {nulos} valores nulos.")
else:
    print("Dataset limpio.")

In [None]:
# Celda 14: Regla de Oro (Anti-Data Leakage)
print("Aplicando Regla de Oro: Eliminación del mes en curso (incompleto)...")
current_date = datetime.now()
if not df_master.empty:
    last_date = df_master.index.max()
    # Verificar si el último mes del dataset coincide con el mes/año actual
    if last_date.year == current_date.year and last_date.month == current_date.month:
        print(f"  - Detectado mes incompleto o en curso: {last_date.strftime('%Y-%m')}. Eliminando para evitar Data Leakage.")
        df_master = df_master.iloc[:-1]
    else:
        print(f"  - El último mes ({last_date.strftime('%Y-%m')}) es anterior al actual. No se requiere corte.")

print(f"Dataset Maestro Final (Meses Cerrados): {df_master.shape}")

In [None]:
# Celda 15: Exportación
output_file = CLEANSED_DATA_PATH / "master_monthly.parquet"
df_master.to_parquet(output_file)
print(f"Guardado exitoso en: {output_file}")

In [None]:
# Celda 16: Reporte JSON Detallado (Enhanced + Integrity Checks)
import json
import platform

# Recopilar métricas finales del artefacto de salida
final_shape = df_master.shape

# 1. Validaciones Temporales
is_series_complete = False
missing_expected_dates = []
duplicate_dates_count = 0
date_min = "N/A"
date_max = "N/A"
total_months = 0

if isinstance(df_master.index, pd.DatetimeIndex):
    if not df_master.empty:
        date_min = df_master.index.min().isoformat()
        date_max = df_master.index.max().isoformat()
        total_months = len(df_master)
        
        # Chequear completitud (Freq MS)
        expected_range = pd.date_range(start=df_master.index.min(), end=df_master.index.max(), freq='MS')
        # Comparar sets para ignorar duplicados en index por ahora (se revisan abajo)
        is_series_complete = len(expected_range) == len(set(df_master.index))
        if not is_series_complete:
             missing_expected_dates = [d.isoformat() for d in set(expected_range) - set(df_master.index)]
        
        # Chequear fechas duplicadas
        duplicate_dates_count = int(df_master.index.duplicated().sum())

# 2. Integridad de Datos
duplicate_rows = int(df_master.duplicated().sum())
total_nulls = int(df_master.isna().sum().sum())
rows_with_nulls = int(df_master.isna().any(axis=1).sum())
column_types = df_master.dtypes.astype(str).to_dict()
columns_list = df_master.columns.tolist()

file_size_bytes = output_file.stat().st_size if output_file.exists() else 0

missing_values = total_nulls

# 3. Muestras de Datos (Serialize dates/timestamps for JSON)
def serialize_df(df_part):
    # Convert timestamp index to string column for JSON
    temp = df_part.copy()
    if isinstance(temp.index, pd.DatetimeIndex):
        temp = temp.reset_index()
        temp['fecha'] = temp['fecha'].astype(str)
    return temp.to_dict(orient='records')

head_5 = serialize_df(df_master.head(5))
tail_5 = serialize_df(df_master.tail(5))
random_5 = serialize_df(df_master.sample(5, random_state=42))

# Estructura enriquecida del reporte
report = {
    "phase": "Phase 2 - Preprocessing",
    "timestamp": datetime.now().isoformat(),
    "environment_info": {
        "platform": platform.system(),
        "python_version": platform.python_version(),
        "pandas_version": pd.__version__
    },
    "execution_context": {
        "description": "Limpieza exhaustiva, imputación de negocio, agregación mensual y corte de mes en curso (Anti-Data Leakage).",
        "validation_status": "SUCCESS" if total_nulls == 0 and is_series_complete and duplicate_dates_count == 0 else "WARNING"
    },
    "data_quality_audit": {
        "contract_validation": data_contract_status if 'data_contract_status' in locals() else {},
        "schema_enforcement": {
            "columns_removed": columns_removed_log if 'columns_removed_log' in locals() else {}
        },
        "cleaning_stats": {
            "rows_filtered_logic": stats_cleaning.get("filtered", {}),
            "duplicates_removed": stats_cleaning.get("duplicates", {}),
            "sentinel_values_replaced": sentinel_stats,
            "temporal_gaps_reindexed": reindex_stats
        },
        "imputation_metrics": {
            "financial_records_recalculated": int(imputed_sales_mask.sum()) if 'imputed_sales_mask' in locals() else 0,
            "remaining_nulls_final": total_nulls,
            "details": imputation_stats if 'imputation_stats' in locals() else {}
        }
    },
    "output_artifact_details": {
        "file_name": output_file.name,
        "full_path": str(output_file.absolute()),
        "file_size_bytes": file_size_bytes,
        "shape": {
            "rows": final_shape[0],
            "columns": final_shape[1]
        },
        "temporal_coverage": {
            "start_date": date_min,
            "end_date": date_max,
            "frequency": "MS (Month Start)",
            "total_months": total_months,
            "is_series_complete": is_series_complete,
            "missing_expected_dates": missing_expected_dates,
            "duplicate_dates_count": duplicate_dates_count
        },
        "data_integrity": {
            "duplicate_rows": duplicate_rows,
            "rows_with_nulls": rows_with_nulls,
            "total_nulls": total_nulls
        },
        "schema": {
            "columns": columns_list,
            "dtypes": column_types
        }
    },
    "sample_data": {
        "head_5": head_5,
        "tail_5": tail_5,
        "random_5": random_5
    }
}

# Guardar reporte
report_path = ARTIFACTS_PATH / "phase_02_preprocessing.json"
with open(report_path, "w", encoding='utf-8') as f:
    json.dump(report, f, indent=4)

print(f"Reporte DETALLADO generado en: {report_path}")
print("Resumen del Artefacto de Salida:")
print(json.dumps(report["output_artifact_details"], indent=2))