# 1. Data preparation GeSAI: Pipeline de integración (Pandas + Dask)

Este notebook gestiona la preparación final de los datos. Debido al gran volumen del dataset principal de consumo (aprox. 76 millones de filas), utilizamos una estrategia híbrida:

* Pandas: Para procesar datasets auxiliares "pequeños" (Geográficos, Meteorológicos y Festivos).

* Dask: Para procesar y fusionar el dataset masivo de consumo sin saturar la memoria RAM.

## 1.1. Configuración inicial

Cargamos las librerías necesarias y definimos las rutas de archivos y constantes globales. Esto facilita la modificación de rutas en el futuro sin tocar la lógica del código.

In [1]:
# Importamos las librerías necesarias
import pandas as pd
import dask.dataframe as dd
import gc
import holidays

In [2]:
# Definimos las rutas y constantes
MAIN_PARQUET = '../data/official-data/data_ab3_complete.parquet'
OUTPUT_PARQUET_DIR = '../data/processed-data/dataset_FINAL_COMPLETO/'

NUMERO_DE_TROZOS = 60

LLAVE_DISTRITO = 'KEY_DISTRITO'
LLAVE_SECCION = 'KEY_SECCION'


RENTA_CSV = '../data/open-data/renda_procesada.csv'
ANTIGUITAD_CSV = '../data/open-data/antiguitat_pivotada.csv'
POBLACION_CSV = '../data/open-data/poblacion_pivotada.csv'
OBRES_CSV = '../data/open-data/obres_procesadas.csv'

## 1.2. Preparación de datos auxiliares (In-Memory con Pandas)

En esta fase preparamos todas las tablas que son lo suficientemente pequeñas para caber en memoria. Estas actuarán como tablas de dimensiones ("lookup tables") para enriquecer el dataset principal.

### 1.2.1. Procesamiento de datos geográficos de renta, antigüedad, población y obras públicas

Cargamos los datasets de renta, antigüedad de edificios, población y obras. Los unificamos en una sola "Tabla Maestra Geográfica" indexada por Distrito y Sección Censal.

Objetivo: Tener un perfil socioeconómico y urbano único para cada sección censal.

In [3]:
# Cargamos y preparamos los datos geográficos 
dtype_llaves = {LLAVE_DISTRITO: 'string', LLAVE_SECCION: 'string'}
df_renta = pd.read_csv(RENTA_CSV, dtype=dtype_llaves)
df_antiguitat = pd.read_csv(ANTIGUITAD_CSV, dtype=dtype_llaves)
df_poblacion = pd.read_csv(POBLACION_CSV, dtype=dtype_llaves)
df_obres = pd.read_csv(OBRES_CSV, dtype=dtype_llaves)

# Fusionamos los datos geográficos en un solo DataFrame
df_geo_total = df_renta.merge(df_antiguitat, on=[LLAVE_DISTRITO, LLAVE_SECCION], how='outer')
df_geo_total = df_geo_total.merge(df_poblacion, on=[LLAVE_DISTRITO, LLAVE_SECCION], how='outer')
df_geo_total = df_geo_total.merge(df_obres, on=[LLAVE_DISTRITO, LLAVE_SECCION], how='outer')
nuevas_columnas_geo = df_geo_total.columns.drop([LLAVE_DISTRITO, LLAVE_SECCION]).tolist()
df_geo_total = df_geo_total.fillna(0)

### 1.2.2. Procesamiento de datos meteorológicos (AEMET)

Cargamos los JSONs de la AEMET, concatenamos los históricos y limpiamos los nombres de columnas.

Objetivo: Asociar condiciones climáticas (temperatura, lluvia) a cada fecha, ya que influyen en el consumo y en las roturas por cambios térmicos.

In [4]:
# Procesamiento de datos meteorológicos (AEMET)
df_aemet1 = pd.read_json('../data/open-data/data_aemet_1.json')
df_aemet2 = pd.read_json('../data/open-data/data_aemet_2.json')

# Concatenamos ambos dataframes
df_aemet = pd.concat([df_aemet1, df_aemet2], ignore_index=True)

# Seleccionamos columnas relevantes
df_aemet = df_aemet[['fecha', 'tmed', 'tmin', 'tmax', 'prec', 'hrMedia']]

# Rellenamos nulos de precipitación con 0
df_aemet.fillna({'prec': 0}, inplace=True)

# Renombramos columnas y convertimos tipos
cols_to_rename = {
    'fecha': 'FECHA', 'tmed': 'TEMP_MEDIA', 'tmin': 'TEMP_MIN',
    'tmax': 'TEMP_MAX', 'prec': 'PRECIPITACION', 'hrMedia': 'HUMEDAD_RELATIVA_MEDIA'
}

# Renombramos columnas y convertimos tipos
df_aemet.rename(columns=cols_to_rename, inplace=True)
df_aemet['FECHA'] = pd.to_datetime(df_aemet['FECHA'], errors='coerce')

# Lista de nuevas columnas meteorológicas
nuevas_columnas_meteo = list(cols_to_rename.values())[1:]

### 1.2.3. Generación de calendario de festivos

Utilizamos la librería holidays para generar un calendario específico de Cataluña para el año 2024.

Objetivo: Diferenciar días laborables de festivos/fines de semana, ya que el patrón de consumo humano varía drásticamente.

In [5]:
# Generación de calendario de festivos
fechas = pd.date_range(start='2024-01-01', end='2024-12-31')
df_fechas = pd.DataFrame({'FECHA': fechas})

# Cálculo de festivos en Cataluña para 2024
es_holidays = holidays.CountryHoliday('ES', subdiv='CT', years=2024)
df_fechas['FESTIVO'] = df_fechas['FECHA'].apply(lambda date: date in es_holidays)

# Clasificación del tipo de día
def get_tipo_dia_simple(fecha, es_festivo):
    if es_festivo: return 'Festivo'
    elif fecha.weekday() >= 5: return 'Fin de Semana'
    else: return 'Laborable'

# Aplicamos la función para obtener el tipo de día    
df_fechas['TIPO_DIA'] = df_fechas.apply(lambda row: get_tipo_dia_simple(row['FECHA'], row['FESTIVO']), axis=1)
df_fechas['FECHA'] = pd.to_datetime(df_fechas['FECHA'], errors='coerce')

# Lista de nuevas columnas de festivos
nuevas_columnas_festivos = ['FESTIVO', 'TIPO_DIA']

### 1.2.4. Unificación de datos temporales

Fusionamos la meteorología y el calendario en una sola tabla auxiliar indexada por fecha.

In [6]:
# Merge Meteo + Festivos
df_meteo_festivos = pd.merge(df_aemet, df_fechas, on='FECHA', how='outer')

## 1.3. Procesamiento masivo del dataset principal

A partir de aquí utilizamos Dask. Cargamos el dataset principal (Parquet), lo dividimos en particiones para paralelizar el trabajo y aplicamos transformaciones de limpieza fila por fila.

### 1.3.1. Carga y limpieza inicial

Cargamos el archivo gigante y eliminamos registros inválidos (sin sección censal) e imputamos consumos nulos.

In [7]:
# Carga y limpieza inicial
df_main_dask = dd.read_parquet(MAIN_PARQUET)
df_main_dask = df_main_dask.repartition(npartitions=NUMERO_DE_TROZOS)

# Limpieza de datos faltantes
df_main_dask = df_main_dask.dropna(subset=['SECCIO_CENSAL'])

# Relleno de valores faltantes en CONSUMO_REAL con 0
df_main_dask['CONSUMO_REAL'] = df_main_dask['CONSUMO_REAL'].fillna(0)

### 1.3.2. Ingeniería de características (Target y fechas)

Definimos una función para transformar las columnas de "mensajes" en columnas binarias útiles para Machine Learning (Target: Fuga Sí/No) y separamos la fecha de la hora.

Nota: Usamos map_partitions para aplicar esta función de manera eficiente a cada trozo del dataframe distribuido.

In [8]:
# Creación de columnas FUGA_DETECTADA y FUGA_REITERADA
def crear_columnas_fuga(df_particion):
    df_particion['FUGA_DETECTADA'] = df_particion['CODIGO_MENSAJE'].apply(
        lambda x: 0 if pd.isna(x) else (1 if x in ['FUITA', 'REITERACIÓ DE FUITA'] else 0)
    )
    df_particion['FUGA_REITERADA'] = df_particion['CODIGO_MENSAJE'].apply(
        lambda x: 0 if pd.isna(x) else (1 if x == 'REITERACIÓ DE FUITA' else 0)
    )
    return df_particion

# Aplicamos la función a cada partición del DataFrame Dask
new_meta = df_main_dask._meta.copy()
new_meta['FUGA_DETECTADA'] = pd.Series(dtype='int64')
new_meta['FUGA_REITERADA'] = pd.Series(dtype='int64')
df_main_dask = df_main_dask.map_partitions(crear_columnas_fuga, meta=new_meta)
df_main_dask = df_main_dask.drop(columns=['CREATED_MENSAJE', 'CODIGO_MENSAJE', 'TIPO_MENSAJE'])

# Extracción de fecha y hora
df_main_dask['FECHA'] = df_main_dask['FECHA_HORA'].dt.date
df_main_dask['HORA'] = df_main_dask['FECHA_HORA'].dt.time.astype(str) # Corrección V12
df_main_dask['FECHA'] = dd.to_datetime(df_main_dask['FECHA'], errors='coerce')


## 1.4. Fusión de datos (Merges distribuídos)

Enriquecemos el dataset principal cruzándolo con las tablas auxiliares que preparamos en Pandas. Dask maneja eficientemente el cruce de una "Tabla Gigante" (Dask) con una "Tabla Pequeña" (Pandas).

In [9]:
# Preparamos Merge 1 (Meteo + Festivos)
df_main_dask = dd.merge(
    df_main_dask,
    df_meteo_festivos,
    on='FECHA',
    how='left'
)

# Preparamos Merge 2 (Datos Geográficos)
llave_str = df_main_dask['SECCIO_CENSAL'].astype('string')
df_main_dask[LLAVE_DISTRITO] = llave_str.str.slice(5, 7).astype('string')
df_main_dask[LLAVE_SECCION] = llave_str.str.slice(7, 10).astype('string')

# Merge Datos Geográficos
df_final_dask = dd.merge(
    df_main_dask,
    df_geo_total,
    on=[LLAVE_DISTRITO, LLAVE_SECCION],
    how='left'
)

+----------------------------------+------------+-------------+
| Merge columns                    | left dtype | right dtype |
+----------------------------------+------------+-------------+
| ('KEY_DISTRITO', 'KEY_DISTRITO') | string     | string      |
| ('KEY_SECCION', 'KEY_SECCION')   | string     | string      |
+----------------------------------+------------+-------------+
Cast dtypes explicitly to avoid unexpected results.


## 1.5. Limpieza final y guardado

Liberamos memoria RAM eliminando los DataFrames de Pandas que ya no necesitamos. 

Configuramos los valores por defecto para rellenar los nulos que hayan podido surgir tras los merges (ej. una fecha sin datos meteo o una sección censal sin datos de renta) y dropeamos duplicados para asegurar la consistencia del dataset. 

Finalmente, ejecutamos el cómputo y guardamos en disco.

In [None]:
# Liberamos memoria RAM eliminando los DataFrames de Pandas que ya no necesitamos
del df_renta, df_antiguitat, df_poblacion, df_obres, df_geo_total, df_aemet, df_fechas, df_meteo_festivos
gc.collect()

# Diccionarios de valores por defecto para rellenar nulos
fill_values_geo = {col: 0 for col in nuevas_columnas_geo}
fill_values_meteo = {col: 0 for col in nuevas_columnas_meteo}
fill_values_festivos = {'FESTIVO': False, 'TIPO_DIA': 'Laborable'} 

# Limpieza final de nulos
df_final_dask = df_final_dask.fillna(value=fill_values_geo)
df_final_dask = df_final_dask.fillna(value=fill_values_meteo)
df_final_dask = df_final_dask.fillna(value=fill_values_festivos)
df_final_dask['FESTIVO'] = df_final_dask['FESTIVO'].astype(bool)

# Eliminamos duplicados
subset_duplicados = ['POLISSA_SUBM', 'FECHA', 'HORA']
df_final_dask = df_final_dask.drop_duplicates(subset=subset_duplicados)

# Aseguramos orden por fechas y horas (lo aseguraremos más adelante en el pipeline)
#df_final_dask = df_final_dask.set_index('FECHA_HORA').reset_index()

# Ejecutamos el cómputo y guardamos en disco
df_final_dask.to_parquet(OUTPUT_PARQUET_DIR, write_index=False)