In [68]:
import pandas as pd
import numpy as np
import holidays
from statsmodels.tsa.seasonal import seasonal_decompose
from scipy import stats

In [69]:
def prepare_hourly_data(df: pd.DataFrame) -> pd.DataFrame:
    # Configurar índice temporal para la descomposición
    df = df.copy()
    df = df.set_index('Fecha').asfreq('h')
    
    # Interpolar valores faltantes
    df['Volumen'] = df['Volumen'].interpolate()
    
    return df

In [70]:
def extract_datetime_features(df: pd.DataFrame, fecha_col='Fecha') -> pd.DataFrame:
    # Copia para no modificar el original
    df_result = df.copy()
    
    if fecha_col not in df_result.columns:
        df_result[fecha_col] = df_result.index
    
    # Características básicas de tiempo
    df_result['hora'] = df_result[fecha_col].dt.hour
    df_result['dia'] = df_result[fecha_col].dt.day
    df_result['dia_semana'] = df_result[fecha_col].dt.dayofweek 
    df_result['dia_year'] = df_result[fecha_col].dt.dayofyear
    df_result['semana_year'] = df_result[fecha_col].dt.isocalendar().week
    df_result['mes'] = df_result[fecha_col].dt.month
    df_result['trimestre'] = df_result[fecha_col].dt.quarter
    df_result['year'] = df_result[fecha_col].dt.year
    
    # Características derivadas
    df_result['fin_de_semana'] = df_result['dia_semana'].isin([5, 6]).astype(int)
    df_result['dia_laboral'] = (~df_result['dia_semana'].isin([5, 6])).astype(int)
    
    # Parte del día (madrugada, mañana, tarde, noche)
    df_result['parte_dia'] = pd.cut(
        df_result['hora'], 
        bins=[-1, 5, 11, 17, 23], 
        labels=['madrugada', 'mañana', 'tarde', 'noche']
    )
    
    # Festivos en Colombia
    festivos_colombia = holidays.country_holidays('CO', years=df_result[fecha_col].dt.year.unique())
    df_result['es_festivo'] = df_result[fecha_col].dt.date.astype('datetime64[ns]').isin(festivos_colombia).astype(int)
        
    # Características cíclicas (seno y coseno)
    df_result['hora_seno'] = np.sin(2 * np.pi * df_result['hora'] / 24)
    df_result['hora_coseno'] = np.cos(2 * np.pi * df_result['hora'] / 24)
    df_result['dia_semana_seno'] = np.sin(2 * np.pi * df_result['dia_semana'] / 7)
    df_result['dia_semana_coseno'] = np.cos(2 * np.pi * df_result['dia_semana'] / 7)
    df_result['mes_seno'] = np.sin(2 * np.pi * df_result['mes'] / 12)
    df_result['mes_coseno'] = np.cos(2 * np.pi * df_result['mes'] / 12)
    
    return df_result  

In [71]:
def extract_seasonal_features(df: pd.DataFrame, period=24, return_components=False):
    # Crear copia para no modificar el original
    df_result = df.copy()
    
    if 'Fecha' not in df_result.columns:
        df_result['Fecha'] = df_result.index

    # Verificar frecuencia horaria
    if not isinstance(df_result.index, pd.DatetimeIndex):
        df_result = df_result.set_index('Fecha').asfreq('h')
    
    # Realizar descomposición estacional
    result = seasonal_decompose(df_result['Volumen'], model='additive', period=period)
    
    # Añadir componentes al DataFrame original
    df_result['trend'] = result.trend.values
    df_result['seasonal'] = result.seasonal.values
    df_result['residual'] = result.resid.values
    
    # Calcular características adicionales
    df_result['detrended'] = df_result['Volumen'] - df_result['trend']
    df_result['seas_strength'] = abs(df_result['seasonal'] / df_result['Volumen'])
    df_result['seas_norm'] = df_result['seasonal'] / df_result['seasonal'].std()
    
    if return_components:
        return df_result, result
    
    return df_result

In [72]:
def calculate_rolling_features(df: pd.DataFrame, target_col='Volumen', windows=[1, 24, 48, 168]):
    # Crear copia para no modificar el original
    df_result = df.copy()
    
    # Calcular estadísticas para cada ventana
    for window in windows:
        # Estadísticas móviles
        df_result[f'rolling_mean_{window}h'] = df_result[target_col].rolling(window=window, min_periods=1).mean()
        df_result[f'rolling_std_{window}h'] = df_result[target_col].rolling(window=window, min_periods=1).std()
        df_result[f'rolling_min_{window}h'] = df_result[target_col].rolling(window=window, min_periods=1).min()
        df_result[f'rolling_max_{window}h'] = df_result[target_col].rolling(window=window, min_periods=1).max()
        
        # Diferencias con respecto al promedio móvil
        df_result[f'diff_from_mean_{window}h'] = df_result[target_col] - df_result[f'rolling_mean_{window}h']
        df_result[f'pct_diff_from_mean_{window}h'] = (df_result[target_col] / df_result[f'rolling_mean_{window}h'] - 1) * 100
        
    # Patrones semanales (168 horas)
    if 168 in windows:
        df_result['diff_from_last_week'] = df_result[target_col].diff(168)
        df_result['pct_diff_from_last_week'] = df_result[target_col].pct_change(168) * 100
    
    return df_result

In [73]:
def gas_law_features(df: pd.DataFrame, presion_col='Presion', temperatura_col='Temperatura', volumen_col='Volumen'):
    # Crear copia para no modificar el original
    df_result = df.copy()
    
    # Convertir temperatura a Kelvin
    df_result['temperatura_K'] = df_result[temperatura_col] + 273.15
    
    # Leyes de los gases
    df_result['PV_producto'] = df_result[presion_col] * df_result[volumen_col]
    df_result['V_T_ratio'] = df_result[volumen_col] / df_result['temperatura_K']
    df_result['P_T_ratio'] = df_result[presion_col] / df_result['temperatura_K']
    df_result['PV_T_ratio'] = df_result['PV_producto'] / df_result['temperatura_K']
    
    # Calcular desviaciones
    for col in ['PV_producto', 'V_T_ratio', 'P_T_ratio', 'PV_T_ratio']:
        mean_val = df_result[col].mean()
        df_result[f'{col}_deviation'] = ((df_result[col] - mean_val) / mean_val) * 100
    
    return df_result

In [74]:
def detect_outliers(df: pd.DataFrame, columns, method='zscore', threshold=3.0):
    # Crear copia para no modificar el original
    df_result = df.copy()
    
    for col in columns:
        if col not in df_result.columns:
            print(f"Columna {col} no encontrada en el DataFrame")
            continue
            
        if method == 'zscore':
            # Método Z-score
            z_scores = stats.zscore(df_result[col], nan_policy='omit')
            df_result[f'{col}_outlier_zscore'] = (abs(z_scores) > threshold).astype(int)
            
        elif method == 'iqr':
            # Método IQR
            Q1 = df_result[col].quantile(0.25)
            Q3 = df_result[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - threshold * IQR
            upper_bound = Q3 + threshold * IQR
            df_result[f'{col}_outlier_iqr'] = ((df_result[col] < lower_bound) | 
                                               (df_result[col] > upper_bound)).astype(int)
    
    # Columna agregada de outliers
    outlier_cols = [col for col in df_result.columns if '_outlier_' in col]
    if outlier_cols:
        df_result['is_any_outlier'] = df_result[outlier_cols].max(axis=1)
    
    return df_result

In [75]:
def create_features_for_anomaly_detection(df: pd.DataFrame, presion_col='Presion', 
                                          temperatura_col='Temperatura', volumen_col='Volumen'):
    """Función principal para crear todas las características para detección de anomalías"""
    # 0. Preparar los datos
    df_processed = prepare_hourly_data(df)
    
    # 1. Extraer características temporales
    df_processed = extract_datetime_features(df_processed)
    
    # 2. Añadir características de estacionalidad
    df_processed = extract_seasonal_features(df_processed)
    
    # 3. Calcular promedios móviles y estadísticas relacionadas
    df_processed = calculate_rolling_features(df_processed, target_col=volumen_col)
    
    # 4. Añadir características basadas en leyes de los gases
    df_processed = gas_law_features(df_processed, presion_col=presion_col, 
                                   temperatura_col=temperatura_col, volumen_col=volumen_col)
    
    # 5. Detectar outliers en columnas relevantes
    columns_for_outlier_detection = [
        volumen_col,
        'diff_from_mean_24h',
        'diff_from_mean_168h',
        'PV_producto_deviation',
        'V_T_ratio_deviation',
        'P_T_ratio_deviation',
        'residual'
    ]
    
    # Detectar outliers con ambos métodos
    df_processed = detect_outliers(df_processed, columns_for_outlier_detection, method='zscore')
    df_processed = detect_outliers(df_processed, columns_for_outlier_detection, method='iqr')
    
    return df_processed

In [76]:
# Código principal para procesar datos
def process_gas_data(data: pd.DataFrame):
    """Procesa datos de gas por cliente para detección de anomalías"""
    results = {}
    
    for cliente, df_cliente in data.groupby('Cliente'):
        print(f"Procesando cliente: {cliente}")
        
        # Aplicar todas las funciones de ingeniería de características
        df_processed = create_features_for_anomaly_detection(df_cliente)
        
        # Guardar resultados
        results[cliente] = df_processed
        
        # Mostrar resumen de outliers detectados
        n_outliers = df_processed['is_any_outlier'].sum()
        print(f"Se detectaron {n_outliers} posibles anomalías para el cliente {cliente}")
        
        # Opcional: guardar en CSV
        df_processed.to_csv(f"../data/processed/{cliente}_processed.csv", index=False)
    
    return results

In [77]:
# Cargar datos
data = pd.read_csv("../data/raw/data.csv", parse_dates=['Fecha'])

# Procesar datos
results = process_gas_data(data)

Procesando cliente: CLIENTE1
Se detectaron 2621 posibles anomalías para el cliente CLIENTE1


  df_result['es_festivo'] = df_result[fecha_col].dt.date.astype('datetime64[ns]').isin(festivos_colombia).astype(int)


Procesando cliente: CLIENTE10
Se detectaron 19062 posibles anomalías para el cliente CLIENTE10


  df_result['es_festivo'] = df_result[fecha_col].dt.date.astype('datetime64[ns]').isin(festivos_colombia).astype(int)


Procesando cliente: CLIENTE11


ValueError: cannot reindex on an axis with duplicate labels