In [1]:
"""
CELDA 1: Configuración Inicial y Documentación
=============================================
Objetivo:
- Configurar el entorno para el módulo de prognosis industrial
- Integrar conexión con PostgreSQL para la gestión de datos

Descripción:
Esta celda configura:
1. Conexión a PostgreSQL
2. Herramientas de logging y visualización
3. Parámetros iniciales del proyecto

"""

# Importaciones necesarias
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
import logging
import warnings
import os

# Configuración inicial
warnings.filterwarnings("ignore")
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# Configuración de conexión a PostgreSQL
DB_CONFIG = {
    "user": "postgres",
    "password": "elico",  #contraseña correcta
    "host": "localhost",
    "port": "5432",
    "database": "prognosis_db"
}
DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"

def create_db_engine():
    """Crea y retorna un motor de conexión a PostgreSQL"""
    try:
        engine = create_engine(DATABASE_URL)
        logging.info("Conexión a la base de datos configurada correctamente.")
        return engine
    except Exception as e:
        logging.error(f"Error al conectar con la base de datos: {str(e)}")
        return None

# Prueba de conexión
engine = create_db_engine()
if engine:
    try:
        with engine.connect() as conn:
            logging.info("Prueba de conexión exitosa.")
    except Exception as e:
        logging.error(f"Error al probar la conexión: {str(e)}")

print("Configuración inicial completada exitosamente.")


2025-01-05 21:02:23,042 - INFO - Conexión a la base de datos configurada correctamente.
2025-01-05 21:02:23,350 - INFO - Prueba de conexión exitosa.


Configuración inicial completada exitosamente.


In [2]:
"""
CELDA 2: Limpieza, Normalización y Preprocesamiento de Datos
===========================================================
Objetivo:
- Realizar la limpieza inicial, normalización y preprocesamiento de datos.
- Garantizar que los datos sean utilizables para la creación de la línea base y la predicción de fallas.
- Asegurar el manejo correcto de caracteres especiales en variables.

Descripción:
Esta celda implementa:
1. Carga de datos desde archivo o ETL.
2. Limpieza estándar (manejo de valores nulos, tipos de datos y columnas irrelevantes).
3. Normalización de variables numéricas.
4. Validación de la estructura de los datos.
5. Preparación para su inserción en la base de datos PostgreSQL.
6. Manejo robusto de caracteres especiales en nombres de variables.

Características principales:
- Soporte para datos provenientes de archivos o ETL.
- Modularidad para limpieza estándar reutilizable.
- Normalización de variables numéricas.
- Validación de integridad y estructura de los datos.
- Reporte detallado de los cambios realizados.
- Mapeo y trazabilidad de nombres de variables.
"""

# Importaciones necesarias
import pandas as pd
import numpy as np
import logging
from sqlalchemy import text, create_engine
from sklearn.preprocessing import StandardScaler
from typing import Tuple, Dict, Any
import unicodedata
from psycopg2 import connect

# Configuración de conexión a PostgreSQL
DB_CONFIG = {
    "user": "postgres",
    "password": "elico",
    "host": "localhost",
    "port": "5432",
    "database": "prognosis_db"
}

class DatabaseSetup:
    @staticmethod
    def create_database_if_not_exists(config: Dict[str, str]):
        """Crea la base de datos si no existe."""
        try:
            connection = connect(
                dbname="postgres",
                user=config["user"],
                password=config["password"],
                host=config["host"],
                port=config["port"]
            )
            connection.autocommit = True
            cursor = connection.cursor()
            cursor.execute(f"SELECT 1 FROM pg_database WHERE datname = '{config['database']}';")
            if not cursor.fetchone():
                cursor.execute(f"CREATE DATABASE {config['database']};")
                logging.info(f"Base de datos '{config['database']}' creada exitosamente.")
            cursor.close()
            connection.close()
        except Exception as e:
            logging.error(f"Error al crear la base de datos: {str(e)}")
            raise

class DataPreprocessor:
    def __init__(self, db_engine, file_path: str = None):
        """Inicializa la clase de preprocesamiento de datos."""
        self.db_engine = db_engine
        self.file_path = file_path
        self.cleaned_data = None
        self.normalized_data = None
        self.quality_report = {
            'missing_values': {},
            'irrelevant_columns': [],
            'data_types': {},
            'warnings': [],
            'variable_mapping': {}
        }
        self.scaler = StandardScaler()
        self._setup_logging()

    def _setup_logging(self):
        """Configura el sistema de logging."""
        self.logger = logging.getLogger(__name__)
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)

    def _normalize_column_name(self, column: str) -> str:
        """Normaliza el nombre de una columna manteniendo trazabilidad."""
        try:
            # Convertir a minúsculas y eliminar acentos
            normalized = column.lower()
            normalized = unicodedata.normalize('NFKD', normalized)
            normalized = u"".join([c for c in normalized if not unicodedata.combining(c)])
            
            # Reemplazar caracteres especiales con guión bajo
            normalized = ''.join(c if c.isalnum() else '_' for c in normalized)
            normalized = '_'.join(filter(None, normalized.split('_')))
            
            # Guardar mapeo original
            self.quality_report['variable_mapping'][normalized] = column
            
            return normalized
            
        except Exception as e:
            self.logger.error(f"Error normalizando nombre de columna: {str(e)}")
            return column

    def load_data(self) -> pd.DataFrame:
        """Carga datos desde un archivo Excel o una ETL."""
        try:
            if not self.file_path:
                raise ValueError("No se proporcionó la ruta del archivo.")

            self.logger.info("Cargando datos desde el archivo...")
            data = pd.read_excel(self.file_path)
            self.logger.info(f"Datos cargados: {data.shape[0]} filas y {data.shape[1]} columnas.")
            return data

        except Exception as e:
            self.logger.error(f"Error al cargar datos: {str(e)}")
            return pd.DataFrame()

    def clean_data(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """Limpia los datos eliminando valores nulos, ajustando tipos de datos y reportando cambios."""
        try:
            self.logger.info("Iniciando limpieza de datos...")
            
            # Normalizar nombres de columnas
            data = data.copy()
            data.columns = [self._normalize_column_name(col) for col in data.columns]
                # NUEVO: Análisis detallado de variables constantes
            constant_vars = []
            for col in data.columns:
                unique_values = data[col].nunique()
                if unique_values <= 1:
                    constant_vars.append({
                        "variable": col,
                        "value": data[col].iloc[0],
                        "unique_count": unique_values
                    })
                    
            # Actualizar quality_report con información detallada
            self.quality_report['constant_variables'] = constant_vars
            
            # Identificar y eliminar columnas irrelevantes con más detalle
            irrelevant_cols = [col['variable'] for col in constant_vars]
            if irrelevant_cols:
                self.logger.warning(f"Variables constantes detectadas: {irrelevant_cols}")
                self.quality_report['irrelevant_columns'] = irrelevant_cols
                data.drop(columns=irrelevant_cols, inplace=True)
                # Guardar tipos de datos originales
            self.quality_report['data_types'] = {col: str(data[col].dtype) for col in data.columns}
                # Manejo de valores nulos
            for col in data.columns:
                missing = data[col].isnull().sum()
                self.quality_report['missing_values'][col] = missing
                if missing > 0:
                    if pd.api.types.is_numeric_dtype(data[col]):
                        data[col].fillna(data[col].median(), inplace=True)
                    else:
                        data[col].fillna("Desconocido", inplace=True)
                self.cleaned_data = data
            self.logger.info("Limpieza de datos completada.")
            return data, self.quality_report
        except Exception as e:
            self.logger.error(f"Error durante la limpieza de datos: {str(e)}")
            return pd.DataFrame(), self.quality_report

    def normalize_data(self):
        """Normaliza las variables numéricas."""
        try:
            if self.cleaned_data is None or self.cleaned_data.empty:
                raise ValueError("Los datos no están limpios o están vacíos.")

            numeric_cols = self.cleaned_data.select_dtypes(include=[np.number]).columns
            self.normalized_data = self.cleaned_data.copy()
            self.normalized_data[numeric_cols] = self.scaler.fit_transform(self.cleaned_data[numeric_cols])

            self.logger.info("Normalización de datos completada.")
            return self.normalized_data

        except Exception as e:
            self.logger.error(f"Error durante la normalización de datos: {str(e)}")
            return pd.DataFrame()

    def validate_data(self) -> bool:
        """Valida que los datos estén listos para la inserción en la base de datos."""
        try:
            if self.normalized_data is None or self.normalized_data.empty:
                raise ValueError("Los datos no están listos o están vacíos.")

            if 'timestamp' not in self.normalized_data.columns:
                raise ValueError("Los datos no contienen una columna de 'timestamp'.")

            self.logger.info("Validación de datos exitosa.")
            return True

        except Exception as e:
            self.logger.error(f"Error durante la validación de datos: {str(e)}")
            return False

    def insert_into_db(self, table_name: str):
        """Inserta los datos normalizados en la base de datos PostgreSQL."""
        try:
            if not self.validate_data():
                raise ValueError("Los datos no pasaron la validación.")

            with self.db_engine.connect() as conn:
                with conn.begin():
                    # Crear tabla de mapeo
                    conn.execute(text("""
                        CREATE TABLE IF NOT EXISTS variable_mapping (
                            normalized_name VARCHAR PRIMARY KEY,
                            original_name VARCHAR NOT NULL,
                            data_type VARCHAR,
                            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                        );
                    """))
                    
                    # Insertar mapeo de variables
                    for norm_name, orig_name in self.quality_report['variable_mapping'].items():
                        conn.execute(text("""
                            INSERT INTO variable_mapping 
                            (normalized_name, original_name, data_type)
                            VALUES (:norm, :orig, :dtype)
                            ON CONFLICT (normalized_name) DO UPDATE 
                            SET original_name = EXCLUDED.original_name,
                                data_type = EXCLUDED.data_type
                        """), {
                            'norm': norm_name,
                            'orig': orig_name,
                            'dtype': self.quality_report['data_types'].get(norm_name)
                        })

                    # Crear e insertar datos normalizados
                    self.normalized_data.to_sql(
                        table_name, 
                        conn, 
                        if_exists='replace', 
                        index=False,
                        method='multi'
                    )

            self.logger.info(f"Datos insertados exitosamente en la tabla '{table_name}'.")

        except Exception as e:
            self.logger.error(f"Error al insertar datos en la base de datos: {str(e)}")
            raise

def test_preprocessing():
    """Prueba el flujo completo de limpieza, normalización y preprocesamiento."""
    try:
        print("\n=== PRUEBA DE PREPROCESAMIENTO ===\n")

        # Configuración inicial
        file_path = "filtered_consolidated_data_cleaned.xlsx"
        table_name = "normalized_data_table"

        # Crear la base de datos si no existe
        DatabaseSetup.create_database_if_not_exists(DB_CONFIG)

        # Crear el motor de conexión
        DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
        engine = create_engine(DATABASE_URL)

        # Crear instancia del preprocesador
        preprocessor = DataPreprocessor(db_engine=engine, file_path=file_path)

        # Ejecutar el flujo completo
        raw_data = preprocessor.load_data()
        cleaned_data, report = preprocessor.clean_data(raw_data)
        normalized_data = preprocessor.normalize_data()
        preprocessor.insert_into_db(table_name)

        # Mostrar reporte de calidad
        print("\nReporte de calidad de datos:")
        for key, value in report.items():
            print(f"{key}: {value}")

        print("\nPreprocesamiento completado exitosamente.")

    except Exception as e:
        print(f"Error en la prueba de preprocesamiento: {str(e)}")

# Ejecutar prueba
if __name__ == "__main__":
    test_preprocessing()


=== PRUEBA DE PREPROCESAMIENTO ===



2025-01-05 21:02:29,559 - __main__ - INFO - Cargando datos desde el archivo...
2025-01-05 21:02:29,559 - INFO - Cargando datos desde el archivo...
2025-01-05 21:02:42,189 - __main__ - INFO - Datos cargados: 7141 filas y 57 columnas.
2025-01-05 21:02:42,189 - INFO - Datos cargados: 7141 filas y 57 columnas.
2025-01-05 21:02:42,195 - __main__ - INFO - Iniciando limpieza de datos...
2025-01-05 21:02:42,195 - INFO - Iniciando limpieza de datos...
2025-01-05 21:02:42,365 - __main__ - INFO - Limpieza de datos completada.
2025-01-05 21:02:42,365 - INFO - Limpieza de datos completada.
2025-01-05 21:02:42,482 - __main__ - INFO - Normalización de datos completada.
2025-01-05 21:02:42,482 - INFO - Normalización de datos completada.
2025-01-05 21:02:42,486 - __main__ - INFO - Validación de datos exitosa.
2025-01-05 21:02:42,486 - INFO - Validación de datos exitosa.
2025-01-05 21:03:11,223 - __main__ - INFO - Datos insertados exitosamente en la tabla 'normalized_data_table'.
2025-01-05 21:03:11,223


Reporte de calidad de datos:
missing_values: {'tension_l1_v': np.int64(0), 'tension_l2_v': np.int64(0), 'tension_l3_v': np.int64(0), 'tension_l1_l2_v': np.int64(0), 'tension_l2_l3_v': np.int64(0), 'tension_l3_l1_v': np.int64(0), 'timestamp': np.int64(0), 'corriente_l1_a': np.int64(0), 'corriente_l2_a': np.int64(0), 'corriente_l3_a': np.int64(0), 'factor_de_potencia_l1': np.int64(0), 'factor_de_potencia_l2': np.int64(0), 'factor_de_potencia_l3': np.int64(0), 'flicker_pst_l1_pst': np.int64(0), 'flicker_pst_l2_pst': np.int64(0), 'flicker_pst_l3_pst': np.int64(0), 'distorsion_armonica_vl1_v_thd': np.int64(0), 'distorsion_armonica_vl2_v_thd': np.int64(0), 'distorsion_armonica_vl3_v_thd': np.int64(0), 'distorsion_armonica_il1_i_thd': np.int64(0), 'distorsion_armonica_il2_i_thd': np.int64(0), 'distorsion_armonica_il3_i_thd': np.int64(0), 'armonicos_il1_armonico_2_il1': np.int64(0), 'armonicos_il1_armonico_3_il1': np.int64(0), 'armonicos_il1_armonico_5_il1': np.int64(0), 'armonicos_il1_armoni

In [3]:
"""
Fase 2: Identificación de Variables Clave
========================================

Objetivos:
1. Identificar y registrar variables clave para prognosis en PostgreSQL.
2. Implementar un sistema robusto de selección y seguimiento de variables.
3. Mantener histórico de cambios en las variables seleccionadas.
4. Generar reportes detallados de análisis.

Características:
- Análisis multifactorial de variables.
- Sistema de puntuación ponderado.
- Registro histórico con timestamps.
- Validación y logging exhaustivo.
- Creación robusta de la base de datos y tablas.
"""

# Importaciones necesarias
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import OperationalError, ProgrammingError
from psycopg2 import connect
from typing import Dict, Any, List
import logging
from datetime import datetime

class DatabaseSetup:
    @staticmethod
    def create_database_if_not_exists(config: Dict[str, str]):
        """Crea la base de datos si no existe."""
        try:
            connection = connect(
                dbname="postgres",
                user=config["user"],
                password=config["password"],
                host=config["host"],
                port=config["port"]
            )
            connection.autocommit = True
            cursor = connection.cursor()
            cursor.execute(f"SELECT 1 FROM pg_database WHERE datname = '{config['database']}';")
            if not cursor.fetchone():
                cursor.execute(f"CREATE DATABASE {config['database']};")
                logging.info(f"Base de datos '{config['database']}' creada exitosamente.")
            cursor.close()
            connection.close()
        except Exception as e:
            logging.error(f"Error al crear la base de datos: {str(e)}")
            raise

class KeyVariableSelector:
    def __init__(self, db_engine, table_name: str):
        """Inicializa el selector de variables clave."""
        self.db_engine = db_engine
        self.table_name = table_name
        self.mapping_table = "variable_mapping"
        self.history_table = "selected_variables_history"
        self.analysis_table = "variable_analysis_history"
        self.logger = self._setup_logging()
        self.variable_mapping = {}
        self.results = {
            "selected_variables": {},
            "variable_scores": {},
            "removed_variables": [],
            "new_variables": [],
            "analysis_timestamp": datetime.now()
        }
        self._validate_db_connection()
        self._create_tables()

    def _setup_logging(self):
        """Configura el sistema de logging."""
        logger = logging.getLogger(__name__)
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger

    def _validate_db_connection(self):
        """Valida la conexión con la base de datos."""
        try:
            with self.db_engine.connect() as conn:
                self.logger.info("Conexión con la base de datos validada correctamente.")
        except OperationalError as e:
            self.logger.error(f"Error al conectar con la base de datos: {str(e)}")
            raise

    def _create_tables(self):
        """Crea las tablas necesarias en la base de datos."""
        try:
            with self.db_engine.connect() as conn:
                with conn.begin():
                    # Tabla para histórico de variables seleccionadas
                    conn.execute(text(f"""
                        CREATE TABLE IF NOT EXISTS {self.history_table} (
                            id SERIAL PRIMARY KEY,
                            variable_name TEXT NOT NULL,
                            original_name TEXT NOT NULL,
                            importance_score FLOAT NOT NULL,
                            category TEXT NOT NULL,
                            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                            UNIQUE(variable_name, timestamp)
                        );
                    """))
                    
                    # Tabla para histórico de análisis
                    conn.execute(text(f"""
                        CREATE TABLE IF NOT EXISTS {self.analysis_table} (
                            id SERIAL PRIMARY KEY,
                            variable_name TEXT NOT NULL,
                            original_name TEXT NOT NULL,
                            variance_score FLOAT,
                            stability_score FLOAT,
                            trend_score FLOAT,
                            correlation_score FLOAT,
                            final_score FLOAT,
                            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                        );
                    """))
            self.logger.info("Tablas creadas/verificadas exitosamente.")
        except Exception as e:
            self.logger.error(f"Error creando tablas: {str(e)}")
            raise

    def load_data(self) -> pd.DataFrame:
        """Carga datos y mantiene trazabilidad de nombres."""
        try:
            with self.db_engine.connect() as conn:
                # Cargar datos normalizados
                query = f"SELECT * FROM {self.table_name};"
                data = pd.read_sql_query(query, conn)
                
                # Cargar mapeo de variables
                mapping_query = f"SELECT normalized_name, original_name FROM {self.mapping_table};"
                variable_mapping = pd.read_sql_query(mapping_query, conn)
                
            # Validaciones
            if data.empty:
                raise ValueError("No se encontraron datos en la tabla.")
            if 'timestamp' not in data.columns:
                raise ValueError("La tabla debe contener una columna 'timestamp'.")
                
            # Configurar índice temporal
            data.set_index('timestamp', inplace=True)
            
            # Guardar mapeo para uso posterior
            self.variable_mapping = dict(zip(variable_mapping['normalized_name'], 
                                          variable_mapping['original_name']))
            
            self.logger.info(f"Datos cargados: {data.shape[0]} filas y {data.shape[1]} columnas.")
            return data
            
        except Exception as e:
            self.logger.error(f"Error al cargar datos: {str(e)}")
            raise

    def calculate_variable_score(self, series: pd.Series) -> float:
        """Calcula el score inicial de una variable y valida si es utilizable."""
        try:
            # 1. Verificación de valores únicos sin NaN
            unique_values = series.dropna().unique()
            n_unique = len(unique_values)
            total_values = len(series.dropna())
            
            self.logger.debug(f"Evaluando {series.name}:")
            self.logger.debug(f"- Valores únicos: {n_unique}")
            
            # Filtrar variables con muy pocos valores únicos
            if n_unique <= 3:  # Aumentamos el umbral mínimo de valores únicos
                self.logger.warning(f"Variable {series.name} tiene muy pocos valores únicos ({n_unique})")
                return 0.0
                
            # Calcular ratio de valores únicos
            unique_ratio = n_unique / total_values
            if unique_ratio < 0.001:  # Si menos del 0.1% de valores son únicos
                self.logger.warning(f"Variable {series.name} tiene ratio de valores únicos muy bajo ({unique_ratio:.6f})")
                return 0.0
                
            # 2. Verificación de varianza
            variance = series.var()
            self.logger.debug(f"- Varianza: {variance}")
            
            if abs(variance) < 1e-6:  # Aumentamos el umbral de varianza mínima
                self.logger.warning(f"Variable {series.name} tiene varianza prácticamente nula ({variance})")
                return 0.0
                
            # 3. Cálculo del score considerando tanto varianza como diversidad de valores
            score = float(np.log1p(abs(variance))) * unique_ratio
            return score
            
        except Exception as e:
            self.logger.error(f"Error calculando score para {series.name}: {str(e)}")
            return 0.0
    def _analyze_single_variable(self, series: pd.Series, data: pd.DataFrame) -> Dict[str, float]:
        """Analiza una variable individual con múltiples métricas."""
        try:
            # 1. Score inicial de variabilidad
            variance_score = self.calculate_variable_score(series)
            
            # Si no pasa la validación inicial, retornamos scores en cero
            if variance_score == 0:
                self.logger.warning(f"Variable {series.name} no pasó validación inicial")
                return self._get_zero_scores()

            # 2. Cálculo de métricas adicionales
            try:
                # Estabilidad temporal
                rolling_std = series.rolling(window=24, min_periods=1).std().mean()
                total_std = series.std()
                stability_score = float(1 - ( rolling_std / total_std) if total_std > 0 else 0)
                    # Tendencia
                trend_score = float(abs(np.polyfit(np.arange(len(series)), series, 1)[0]))
                    # Correlación con otras variables
                correlations = data.corrwith(series).abs()
                correlation_score = float(correlations.mean())
                    # 3. Score final ponderado
                scores = {
                    "variance_score": variance_score,
                    "stability_score": stability_score,
                    "trend_score": trend_score,
                    "correlation_score": correlation_score
                }
                
                weights = {
                    "variance_score": 0.3,
                    "stability_score": 0.3,
                    "trend_score": 0.2,
                    "correlation_score": 0.2
                }
                
                final_score = sum(score * weights[name] for name, score in scores.items())
                return {**scores, "final_score": float(final_score)}
            except Exception as calc_error:
                self.logger.error(f"Error en cálculo de métricas para {series.name}: {calc_error}")
                return self._get_zero_scores()
        except Exception as e:
            self.logger.error(f"Error en análisis de {series.name}: {str(e)}")
            return self._get_zero_scores()
    def analyze_variables(self, data: pd.DataFrame) -> Dict[str, Any]:
        """Realiza análisis completo de variables."""
        try:
            self.logger.info("Iniciando análisis de variables...")
            analysis_results = []
            constant_variables = []
            scores_dict = {}
                # Análisis individual de variables
            for col in data.columns:
                series = data[col].copy()
                
                # Estadísticas básicas para logging
                unique_vals = series.dropna().unique()
                variance = series.var()
                self.logger.info(f"\nAnalizando {col}:")
                self.logger.info(f"- Valores únicos: {len(unique_vals)}")
                self.logger.info(f"- Muestra valores: {unique_vals[:5]}")
                self.logger.info(f"- Varianza: {variance}")
                
                # Análisis completo
                scores = self._analyze_single_variable(series, data)
                
                # Logging de scores
                for score_name, score_value in scores.items():
                    self.logger.info(f"- {score_name}: {score_value}")
                
                # Clasificación de la variable
                if scores["final_score"] == 0:
                    reason = "Variable constante" if len(unique_vals) <= 1 else "Varianza nula"
                    value = str(series.iloc[0]) if not series.empty else "NA"
                    
                    self.logger.warning(f"Variable descartada: {col}")
                    self.logger.warning(f"Razón: {reason}")
                    
                    constant_variables.append({
                        "variable": col,
                        "reason": reason,
                        "value": value,
                        "variance": float(variance)
                    })
                    continue
                
                # Registro de variables válidas
                scores_dict[col] = scores["final_score"]
                analysis_results.append({
                    "variable_name": col,
                    "original_name": self.variable_mapping.get(col, col),
                    **scores
                })
                # Actualización de resultados
            self.results.update({
                "variable_scores": scores_dict,
                "removed_variables": constant_variables
            })
                # Proceso final
            self._categorize_variables(data)
            self._save_analysis_results(analysis_results)
            self.validate_selected_variables()
            
            return self.results
        except Exception as e:
            self.logger.error(f"Error en análisis de variables: {str(e)}")
            raise
    def _get_zero_scores(self) -> Dict[str, float]:
        """Retorna un diccionario con todos los scores en cero."""
        return {
            "variance_score": 0.0,
            "stability_score": 0.0,
            "trend_score": 0.0,
            "correlation_score": 0.0,
            "final_score": 0.0
        }

    def _categorize_variables(self, data: pd.DataFrame):
        """Categoriza variables en críticas y monitoreo."""
        try:
            # 1. Obtener scores y variables removidas
            scores = pd.Series(self.results["variable_scores"])
            removed_vars = set(v["variable"] for v in self.results.get("removed_variables", []))
            
            # 2. Logging inicial de variables removidas
            if removed_vars:
                self.logger.info(f"Variables excluidas de categorización: {sorted(list(removed_vars))}")
            
            # 3. Filtrado estricto de variables válidas
            valid_scores = scores[
                (scores > 0) & (~scores.index.isin(removed_vars))
            ]
        
            if valid_scores.empty:
                self.logger.warning("No hay variables válidas para categorizar")
                return
            
            # 4. Verificación de seguridad
            if set(valid_scores.index).intersection(removed_vars):
                self.logger.error("¡ALERTA! Variables removidas encontradas en scores válidos")
                valid_scores = valid_scores[~valid_scores.index.isin(removed_vars)]
            
            if valid_scores.empty:
                self.logger.warning("No hay variables válidas para categorizar")
                return
            
            # 5. Cálculo de umbrales y categorización
            thresholds = {
                "critical": valid_scores.quantile(0.8),
                "monitoring": valid_scores.quantile(0.5)
            }
                # 6. Reset y categorización de variables
            self.results["selected_variables"] = {}
            for var, score in valid_scores.items():
                if score >= thresholds["critical"]:
                    category = "critical"
                elif score >= thresholds["monitoring"]:
                    category = "monitoring"
                else:
                    continue
                    
                self.results["selected_variables"][var] = {
                    "category": category,
                    "score": float(score),
                    "threshold_used": thresholds[category],
                    "timestamp": datetime.now().isoformat()
                }
                # 7. Registro de métricas finales
            self.results["categorization_metrics"] = {
                "total_variables": len(data.columns),  # Total original
                "analyzed_variables": len(scores),     # Variables analizadas
                "valid_variables": len(valid_scores),  # Variables válidas
                "removed_variables": len(removed_vars),# Variables removidas
                "critical_variables": len([v for v in self.results["selected_variables"].values() 
                                        if v["category"] == "critical"]),
                "monitoring_variables": len([v for v in self.results["selected_variables"].values() 
                                            if v["category"] == "monitoring"]),
                "thresholds": thresholds,
                "removed_list": sorted(list(removed_vars))
            }
                # 8. Logging detallado final
            self.logger.info(f"""
            === RESUMEN FINAL DE CATEGORIZACIÓN ===
            Total variables originales: {self.results['categorization_metrics']['total_variables']}
            Variables analizadas: {self.results['categorization_metrics']['analyzed_variables']}
            Variables removidas: {self.results['categorization_metrics']['removed_variables']}
            Variables válidas: {self.results['categorization_metrics']['valid_variables']}
            Variables críticas: {self.results['categorization_metrics']['critical_variables']}
            Variables en monitoreo: {self.results['categorization_metrics']['monitoring_variables']}
            Variables descartadas: {self.results['categorization_metrics']['removed_list']}
            =====================================
            """)
        except Exception as e:
            self.logger.error(f"Error en categorización de variables: {str(e)}")
            raise

    def validate_selected_variables(self) -> bool:
        """Validación adicional de variables seleccionadas."""
        try:
            for var, details in self.results["selected_variables"].items():
                if details["score"] == 0:
                    self.logger.error(f"Variable {var} con score 0 detectada en selección")
                    del self.results["selected_variables"][var]  # Se agregó 'del'
            return True
        except Exception as e:
            self.logger.error(f"Error en validación: {str(e)}")
            return False

    def _save_analysis_results(self, analysis_results: List[Dict[str, Any]]):
        """Guarda resultados detallados del análisis."""
        try:
            with self.db_engine.connect() as conn:
                with conn.begin():
                    for result in analysis_results:
                        result = {k: float(v) if isinstance(v, (np.float64, np.float32)) else v 
                                for k, v in result.items()}
                        conn.execute(text(f"""
                            INSERT INTO {self.analysis_table} 
                            (variable_name, original_name, variance_score, stability_score, 
                             trend_score, correlation_score, final_score)
                            VALUES (:variable_name, :original_name, :variance_score, 
                                    :stability_score, :trend_score, :correlation_score, :final_score)
                        """), result)
        except Exception as e:
            self.logger.error(f"Error guardando análisis: {str(e)}")
            raise

    def save_results(self):
        """Guarda las variables seleccionadas manteniendo trazabilidad."""
        try:
            with self.db_engine.connect() as conn:
                with conn.begin():
                    for var, details in self.results["selected_variables"].items():
                        # Usar nombre original en el registro
                        original_name = self.variable_mapping.get(var, var)
                        
                        query = text(f"""
                            INSERT INTO {self.history_table} 
                            (variable_name, original_name, importance_score, category)
                            VALUES (:variable_name, :original_name, :importance_score, :category)
                            ON CONFLICT (variable_name, timestamp)
                            DO UPDATE SET
                                importance_score = EXCLUDED.importance_score,
                                category = EXCLUDED.category;
                        """)
                        
                        conn.execute(query, {
                            "variable_name": var,
                            "original_name": original_name,
                            "importance_score": float(details["score"]),
                            "category": details["category"]
                        })
                        
        except Exception as e:
            self.logger.error(f"Error al guardar resultados: {str(e)}")
            raise

# Ejecución
if __name__ == "__main__":
    try:
        DB_CONFIG = {
            "user": "postgres",
            "password": "elico",
            "host": "localhost",
            "port": "5432",
            "database": "prognosis_db"
        }

        # Crear la base de datos si no existe
        DatabaseSetup.create_database_if_not_exists(DB_CONFIG)

        # Crear el motor de conexión
        DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
        engine = create_engine(DATABASE_URL)
        
        # Ejecutar el flujo
        selector = KeyVariableSelector(engine, "normalized_data_table")
        data = selector.load_data()
        selector.analyze_variables(data)
        selector.save_results()
    except Exception as e:
        logging.error(f"Error ejecutando flujo: {str(e)}")

2025-01-05 21:03:11,361 - __main__ - INFO - Conexión con la base de datos validada correctamente.
2025-01-05 21:03:11,361 - INFO - Conexión con la base de datos validada correctamente.
2025-01-05 21:03:11,366 - __main__ - INFO - Tablas creadas/verificadas exitosamente.
2025-01-05 21:03:11,366 - INFO - Tablas creadas/verificadas exitosamente.
2025-01-05 21:03:11,759 - __main__ - INFO - Datos cargados: 7141 filas y 56 columnas.
2025-01-05 21:03:11,759 - INFO - Datos cargados: 7141 filas y 56 columnas.
2025-01-05 21:03:11,763 - __main__ - INFO - Iniciando análisis de variables...
2025-01-05 21:03:11,763 - INFO - Iniciando análisis de variables...
2025-01-05 21:03:11,767 - __main__ - INFO - 
Analizando tension_l1_v:
2025-01-05 21:03:11,767 - INFO - 
Analizando tension_l1_v:
2025-01-05 21:03:11,770 - __main__ - INFO - - Valores únicos: 1081
2025-01-05 21:03:11,770 - INFO - - Valores únicos: 1081
2025-01-05 21:03:11,773 - __main__ - INFO - - Muestra valores: [-0.12807509 -0.00177156  0.01859

In [12]:
"""
FASE 3: Sistema Robusto de Línea Base Adaptativa Incremental
=========================================================

Objetivo: Establecer líneas base robustas y adaptativas para variables clave
Versión: 5.0
Descripción: Sistema avanzado de modelado de comportamiento normal con:
- Procesamiento incremental por lotes
- Ensemble de modelos estadísticos y ML
- Detección automática de patrones y regímenes
- Validación temporal rigurosa 
- Control de versiones de modelos
- Preparación para predicción de fallas
"""

import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text, MetaData
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, RobustScaler
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller, kpss
from scipy import stats
from prophet import Prophet
from typing import Dict, List, Any, Tuple, Optional
import logging
from datetime import datetime, timedelta
import json
import warnings
from concurrent.futures import ThreadPoolExecutor
warnings.filterwarnings('ignore')

# Configuración del sistema
class SystemConfig:
    def __init__(self):
        # Configuración de BD
        self.DB_CONFIG = {
            "user": "postgres",
            "password": "elico",
            "host": "localhost", 
            "port": "5432",
            "database": "prognosis_db"
        }
        
        # Parámetros del modelo
        self.MODEL_PARAMS = {
            "training_window": "90D",
            "update_frequency": "1D",
            "min_data_points": 1000,
            "confidence_level": 0.95,
            "seasonality_test_size": 168,
            "batch_size": 1000,  # Tamaño del lote para procesamiento incremental
            "max_versions": 5     # Número máximo de versiones a mantener
        }
        
        # Umbrales estadísticos
        self.STATISTICAL_THRESHOLDS = {
            "stationarity_pvalue": 0.05,
            "seasonality_strength": 0.3,
            "outlier_std_dev": 3,
            "minimum_variance_ratio": 0.01,
            "learning_threshold": 0.1  # Umbral para considerar mejora significativa
        }
        
        # Tablas de BD
        self.TABLES = {
            "input": {
                "normalized_data": "normalized_data_table",
                "selected_variables": "selected_variables_history"
            },
            "output": {
                "baseline": "baseline_models",
                "patterns": "pattern_registry", 
                "metrics": "baseline_metrics",
                "validation": "baseline_validation",
                "incremental": "baseline_incremental_control",
                "versions": "baseline_model_versions"
            }
        }
class DatabaseManager:
    def __init__(self, config: SystemConfig):
        self.config = config
        self.engine = self._create_engine()
        self.metadata = MetaData()
        self._create_tables()
        
    def _create_engine(self):
        url = f"postgresql://{self.config.DB_CONFIG['user']}:{self.config.DB_CONFIG['password']}@{self.config.DB_CONFIG['host']}:{self.config.DB_CONFIG['port']}/{self.config.DB_CONFIG['database']}"
        return create_engine(url)
    
    def _create_tables(self):
        with self.engine.begin() as conn:
            conn.execute(text("""
                -- Tabla para modelos base
                CREATE TABLE IF NOT EXISTS baseline_models (
                    id SERIAL PRIMARY KEY,
                    variable_name TEXT NOT NULL,
                    model_type TEXT NOT NULL,
                    model_parameters JSONB,
                    baseline_stats JSONB,
                    confidence_intervals JSONB,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    is_active BOOLEAN DEFAULT true,
                    UNIQUE(variable_name, model_type)
                );

                -- Tabla para registro de patrones
                CREATE TABLE IF NOT EXISTS pattern_registry (
                    id SERIAL PRIMARY KEY,
                    variable_name TEXT NOT NULL,
                    pattern_type TEXT NOT NULL,
                    pattern_params JSONB,
                    detection_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    confidence_score FLOAT,
                    UNIQUE(variable_name, pattern_type)
                );

                -- Tabla para métricas
                CREATE TABLE IF NOT EXISTS baseline_metrics (
                    id SERIAL PRIMARY KEY,
                    variable_name TEXT NOT NULL,
                    metric_type TEXT NOT NULL,
                    metric_value FLOAT,
                    calculation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );

                -- Tabla para validación
                CREATE TABLE IF NOT EXISTS baseline_validation (
                    id SERIAL PRIMARY KEY,
                    variable_name TEXT NOT NULL,
                    validation_type TEXT NOT NULL,
                    validation_result JSONB,
                    validation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );

                -- Nueva tabla para control incremental
                CREATE TABLE IF NOT EXISTS baseline_incremental_control (
                    id SERIAL PRIMARY KEY,
                    variable_name TEXT NOT NULL,
                    last_processed_timestamp TIMESTAMP,
                    batch_number INTEGER DEFAULT 0,
                    batch_size INTEGER,
                    learning_metrics JSONB,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    UNIQUE(variable_name)
                );

                -- Nueva tabla para versiones de modelos
                CREATE TABLE IF NOT EXISTS baseline_model_versions (
                    id SERIAL PRIMARY KEY,
                    variable_name TEXT NOT NULL,
                    version INTEGER,
                    model_parameters JSONB,
                    performance_metrics JSONB,
                    training_metadata JSONB,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    is_active BOOLEAN DEFAULT true,
                    UNIQUE(variable_name, version)
                );
            """))

    def get_selected_variables(self) -> List[str]:
        """Obtene solo las variables críticas del último análisis"""
        query = f"""
            WITH LastAnalysisTime AS (
                SELECT MAX(timestamp) as max_time 
                FROM {self.config.TABLES['input']['selected_variables']}
            )
            SELECT variable_name 
            FROM {self.config.TABLES['input']['selected_variables']}
            WHERE timestamp = (SELECT max_time FROM LastAnalysisTime)
            AND category = 'critical';
        """
        try:
            with self.engine.connect() as conn:  # Cambio: self.db.engine -> self.engine
                result = conn.execute(text(query))
                variables = [row[0] for row in result]  # Corrección de la línea corrupta
                
                # Validación adicional
                if len(variables) != 8:  # Número esperado según VariableAnalyzer
                    logging.warning(  # Cambio: self.logger -> logging
                        f"Discrepancia en variables críticas: "
                        f"Esperadas=8, Obtenidas={len(variables)}"
                    )
                    
                return variables
        except Exception as e:
            logging.error(f"Error obteniendo variables críticas: {str(e)}")  # Cambio: self.logger -> logging
            return []

    def get_variable_data_batch(self, variable_name: str, last_timestamp: Optional[datetime] = None, 
                              batch_size: int = 1000) -> pd.DataFrame:
        """Obtiene datos de la variable en lotes"""
        if last_timestamp is None:
            query = f"""
                SELECT timestamp, {variable_name}
                FROM {self.config.TABLES['input']['normalized_data']}
                ORDER BY timestamp
                LIMIT {batch_size};
            """
        else:
            query = f"""
                SELECT timestamp, {variable_name}
                FROM {self.config.TABLES['input']['normalized_data']}
                WHERE timestamp > :last_timestamp
                ORDER BY timestamp
                LIMIT {batch_size};
            """
        return pd.read_sql_query(text(query), self.engine, 
                               params={"last_timestamp": last_timestamp},
                               parse_dates=['timestamp'])

    def get_last_processed_timestamp(self, variable_name: str) -> Optional[datetime]:
        """Obtiene el último timestamp procesado para una variable"""
        query = """
            SELECT last_processed_timestamp
            FROM baseline_incremental_control
            WHERE variable_name = :variable_name;
        """
        try:
            with self.engine.connect() as conn:
                result = conn.execute(text(query), {"variable_name": variable_name})
                row = result.fetchone()
                return row[0] if row else None
        except Exception as e:
            logging.error(f"Error obteniendo último timestamp: {str(e)}")
            return None
        
class BaselineModeler:
    def __init__(self, config: SystemConfig, db_manager: DatabaseManager):
        self.config = config
        self.db = db_manager
        self.models = {}
        self.scalers = {}
        self.patterns = {}
    
    def save_results(self, variable_name: str, results: Dict[str, Any]):
        """Guarda los resultados del modelo y crea una nueva versión"""
        try:
            # Limpiar datos antes de serializar
            clean_results = {
                "patterns": self._clean_json_data(results["patterns"]),
                "metrics": self._clean_json_data(results["metrics"]),
                "validation": self._clean_json_data(results["validation"])
            }

            with self.db.engine.begin() as conn:
                # Guardar modelo base
                conn.execute(text("""
                    INSERT INTO baseline_models 
                    (variable_name, model_type, model_parameters, baseline_stats, 
                    confidence_intervals)
                    VALUES (:variable_name, 'ensemble', :model_params, :stats, :intervals)
                    ON CONFLICT (variable_name, model_type) DO UPDATE 
                    SET model_parameters = EXCLUDED.model_parameters,
                        baseline_stats = EXCLUDED.baseline_stats,
                        confidence_intervals = EXCLUDED.confidence_intervals,
                        updated_at = CURRENT_TIMESTAMP;
                """), {
                    "variable_name": variable_name,
                    "model_params": json.dumps(clean_results["patterns"]),
                    "stats": json.dumps(clean_results["metrics"]),
                    "intervals": json.dumps(clean_results["validation"])
                })

            # Guardar versión del modelo
            self.save_model_version(variable_name, clean_results)
                    
        except Exception as e:
            logging.error(f"Error guardando resultados para {variable_name}: {str(e)}")
            raise 

    def _clean_json_data(self, data: Any) -> Any:
        """Limpia valores NaN y los convierte a NULL para compatibilidad con PostgreSQL"""
        if isinstance(data, dict):
            return {k: self._clean_json_data(v) for k, v in data.items()}
        elif isinstance(data, list):
            return [self._clean_json_data(x) for x in data]
        elif isinstance(data, (float, np.float64, np.float32)):
            return None if np.isnan(data) else float(data)
        return data
    
    def analyze_variable(self, variable_name: str, data: pd.DataFrame) -> Dict[str, Any]:
        """Analiza una variable y genera su línea base"""
        # Preparación de datos
        ts_data = self._prepare_timeseries(data)
        
        # Detección de patrones
        patterns = self._detect_patterns(ts_data)
        self.patterns[variable_name] = patterns
        
        # Modelado ensemble
        models = self._create_ensemble_model(ts_data, patterns)
        self.models[variable_name] = models
        
        # Validación
        validation = self._validate_models(variable_name, ts_data, models)
        
        # Métricas
        metrics = self._calculate_metrics(variable_name, ts_data, models)
        
        return {
            "patterns": patterns,
            "models": models,
            "validation": validation,
            "metrics": metrics
        }

    def _prepare_timeseries(self, data: pd.DataFrame) -> pd.Series:
        """Prepara los datos de series temporales"""
        data = data.set_index('timestamp').sort_index()
        return data.iloc[:, 0]  # Primera columna contiene los valores

    def _detect_patterns(self, data: pd.Series) -> Dict[str, Any]:
        """Detecta patrones en los datos"""
        # Análisis de estacionariedad
        adf_test = adfuller(data)
        kpss_test = kpss(data)
        
        # Descomposición de series
        decomposition = seasonal_decompose(data, period=24)
        
        # Detección de estacionalidad
        acf_24 = np.correlate(data, data, mode='full')[len(data)-1:len(data)+24]
        seasonal_strength = np.max(np.abs(acf_24[1:]))
        
        return {
            "stationarity": {
                "adf_pvalue": adf_test[1],
                "kpss_pvalue": kpss_test[1]
            },
            "seasonality": {
                "strength": seasonal_strength,
                "period": 24 if seasonal_strength > self.config.STATISTICAL_THRESHOLDS["seasonality_strength"] else 0
            },
            "decomposition": {
                "trend": decomposition.trend.dropna().tolist(),
                "seasonal": decomposition.seasonal.dropna().tolist(),
                "resid": decomposition.resid.dropna().tolist()
            }
        }

    def _create_ensemble_model(self, data: pd.Series, patterns: Dict) -> Dict[str, Any]:
        """Crea un conjunto de modelos para la serie temporal"""
        models = {}
        
        # Verificar si los datos son constantes
        if len(data.unique()) <= 1:
            logging.warning(f"Datos constantes detectados con valor {data.iloc[0]}")
            return {
                "constant": {
                    "value": float(data.iloc[0]),
                    "is_constant": True
                }
            }
        
        try:
            # Modelo SARIMA para componentes temporales
            if patterns["seasonality"]["period"] > 0:
                try:
                    sarima = SARIMAX(data, order=(1,1,1), 
                                seasonal_order=(1,1,1,patterns["seasonality"]["period"]))
                    models["sarima"] = sarima.fit(disp=False)
                except Exception as e:
                    logging.warning(f"Error en ajuste SARIMA: {str(e)}")
            
            # Modelo Prophet optimizado
            try:
                prophet_data = pd.DataFrame({
                    'ds': data.index,
                    'y': data.values
                })
                prophet = Prophet(
                    yearly_seasonality=True, 
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    n_changepoints=25,
                    interval_width=0.95,
                    mcmc_samples=0
                )
                prophet.fit(prophet_data)
                models["prophet"] = prophet
            except Exception as e:
                logging.warning(f"Error en ajuste Prophet: {str(e)}")
            
            # Isolation Forest para detección de anomalías
            try:
                iso_forest = IsolationForest(
                    contamination=0.1, 
                    random_state=42,
                    n_jobs=-1
                )
                iso_forest.fit(data.values.reshape(-1, 1))
                models["isolation_forest"] = iso_forest
            except Exception as e:
                logging.warning(f"Error en ajuste Isolation Forest: {str(e)}")
            
            return models
            
        except Exception as e:
            logging.error(f"Error en creación de ensemble: {str(e)}")
            raise

    def _validate_models(self, variable_name: str, data: pd.Series, models: Dict[str, Any]) -> Dict[str, Any]:
        """Valida los modelos generados"""
        validation_results = {}
        
        # Validación cruzada temporal
        train_size = int(len(data) * 0.8)
        train_data = data[:train_size]
        test_data = data[train_size:]
        
        # Validación SARIMA
        if "sarima" in models:
            sarima_pred = models["sarima"].predict(start=test_data.index[0],
                                                end=test_data.index[-1])
            validation_results["sarima"] = {
                "rmse": np.sqrt(((test_data - sarima_pred) ** 2).mean()),
                "mae": np.abs(test_data - sarima_pred).mean()
            }
        
        # Validación Prophet
        try:
            prophet_test = pd.DataFrame({'ds': test_data.index})
            prophet_pred = models["prophet"].predict(prophet_test)
            
            # Alinear índices
            test_values = test_data.reindex(prophet_pred['ds'].values)
            pred_values = prophet_pred['yhat']
            
            if len(test_values) == len(pred_values):
                validation_results["prophet"] = {
                    "rmse": np.sqrt(((test_values - pred_values) ** 2).mean()),
                    "mae": np.abs(test_values - pred_values).mean()
                }
            else:
                validation_results["prophet"] = {
                    "rmse": None,
                    "mae": None
                }
        except Exception as e:
            logging.warning(f"Error en validación Prophet: {str(e)}")
            validation_results["prophet"] = {
                "rmse": None,
                "mae": None
            }
        
        return validation_results

    def _calculate_metrics(self, variable_name: str, data: pd.Series, models: Dict[str, Any]) -> Dict[str, float]:
        """Calcula métricas del modelo"""
        metrics = {}
        
        # Estadísticas básicas
        metrics.update({
            "mean": float(data.mean()),
            "std": float(data.std()),
            "skewness": float(stats.skew(data)),
            "kurtosis": float(stats.kurtosis(data))
        })
        
        # Métricas de calidad del modelo
        if "sarima" in models:
            metrics["sarima_aic"] = float(models["sarima"].aic)
            metrics["sarima_bic"] = float(models["sarima"].bic)
        
        # Score de anomalías
        iso_scores = models["isolation_forest"].score_samples(data.values.reshape(-1, 1))
        metrics["anomaly_ratio"] = float((iso_scores < 0).mean())
        
        return metrics

    def process_incremental(self, variable_name: str) -> bool:
        """Procesa una variable de forma incremental con validación"""
        try:
            # Verificar si la variable realmente es crítica
            query = f"""
                WITH LastAnalysis AS (
                    SELECT MAX(timestamp) as max_time 
                    FROM {self.config.TABLES['input']['selected_variables']}
                )
                SELECT COUNT(1) 
                FROM {self.config.TABLES['input']['selected_variables']}
                WHERE timestamp = (SELECT max_time FROM LastAnalysis)
                AND category = 'critical'
                AND variable_name = :variable_name;
            """
            
            with self.db.engine.connect() as conn:
                result = conn.execute(text(query), {"variable_name": variable_name})
                if not result.scalar():
                    logging.warning(f"Variable {variable_name} no es crítica en el último análisis")
                    return False
            # Obtener último timestamp procesado
            last_timestamp = self.db.get_last_processed_timestamp(variable_name)
            batch_size = self.config.MODEL_PARAMS["batch_size"]
            
            # Obtener siguiente lote de datos
            data_batch = self.db.get_variable_data_batch(
                variable_name, last_timestamp, batch_size
            )
            
            if data_batch.empty:
                logging.info(f"No hay nuevos datos para {variable_name}")
                return True
                
            # Procesar lote
            results = self.analyze_variable(variable_name, data_batch)
            
            # Actualizar modelo existente si existe
            current_model = self.get_current_model(variable_name)
            if current_model:
                results = self.update_model(current_model, results)
            
            # Guardar resultados y actualizar control
            self.save_results(variable_name, results)
            self.update_incremental_control(
                variable_name, 
                data_batch['timestamp'].max(),
                len(data_batch)
            )
            
            return True
            
        except Exception as e:
            logging.error(f"Error en procesamiento incremental de {variable_name}: {str(e)}")
            return False

    def get_current_model(self, variable_name: str) -> Optional[Dict]:
        """Obtiene el modelo actual de la variable"""
        query = """
            SELECT model_parameters::text, 
                    baseline_stats::text, 
                    confidence_intervals::text
            FROM baseline_models
            WHERE variable_name = :variable_name
            AND is_active = true;
        """
        try:
            with self.db.engine.connect() as conn:
                result = conn.execute(text(query), {"variable_name": variable_name})
                row = result.fetchone()
                if row:
                    try:
                        return {
                            "parameters": json.loads(row[0]) if row[0] else {},
                            "stats": json.loads(row[1]) if row[1] else {},
                            "intervals": json.loads(row[2]) if row[2] else {}
                        }
                    except json.JSONDecodeError as je:
                        logging.error(f"Error decodificando JSON para {variable_name}: {str(je)}")
                        return None
                return None
        except Exception as e:
            logging.error(f"Error obteniendo modelo actual: {str(e)}")
            return None

    def update_model(self, current_model: Dict, new_results: Dict) -> Dict:
        """Actualiza el modelo con nuevos resultados"""
        # Combinar patrones
        combined_patterns = self._merge_patterns(
            current_model["parameters"], 
            new_results["patterns"]
        )
        
        # Actualizar métricas
        updated_metrics = self._update_metrics(
            current_model["stats"], 
            new_results["metrics"]
        )
        
        # Actualizar intervalos de confianza
        updated_intervals = self._update_intervals(
            current_model["intervals"], 
            new_results["validation"]
        )
        
        return {
            "patterns": combined_patterns,
            "metrics": updated_metrics,
            "validation": updated_intervals,
            "models": new_results["models"]  # Mantener modelos actualizados
        }

    def _merge_patterns(self, old_patterns: Dict, new_patterns: Dict) -> Dict:
        """Combina patrones antiguos y nuevos"""
        merged = {}
        
        # Combinar estacionariedad
        merged["stationarity"] = {
            "adf_pvalue": min(old_patterns["stationarity"]["adf_pvalue"],
                            new_patterns["stationarity"]["adf_pvalue"]),
            "kpss_pvalue": min(old_patterns["stationarity"]["kpss_pvalue"],
                             new_patterns["stationarity"]["kpss_pvalue"])
        }
        
        # Actualizar estacionalidad
        merged["seasonality"] = new_patterns["seasonality"]
        
        # Mantener descomposición más reciente
        merged["decomposition"] = new_patterns["decomposition"]
        
        return merged

    def _update_metrics(self, old_metrics: Dict, new_metrics: Dict) -> Dict:
        """Actualiza métricas con nuevos datos"""
        updated = {}
        
        # Actualizar estadísticas básicas con ponderación
        weight_old = 0.7  # Dar más peso a la historia
        weight_new = 0.3
        
        for key in ["mean", "std", "skewness", "kurtosis"]:
            updated[key] = (old_metrics[key] * weight_old + 
                          new_metrics[key] * weight_new)
        
        # Actualizar métricas de modelo
        if "sarima_aic" in new_metrics:
            updated["sarima_aic"] = new_metrics["sarima_aic"]
            updated["sarima_bic"] = new_metrics["sarima_bic"]
        
        # Actualizar ratio de anomalías
        updated["anomaly_ratio"] = new_metrics["anomaly_ratio"]
        
        return updated
    
    def _update_intervals(self, old_intervals: Dict, new_intervals: Dict) -> Dict:
        """Actualiza intervalos de confianza"""
        updated = {}
        
        # Actualizar métricas SARIMA
        if "sarima" in new_intervals:
            updated["sarima"] = new_intervals["sarima"]
        
        # Actualizar métricas Prophet
        if "prophet" in new_intervals:
            updated["prophet"] = new_intervals["prophet"]
        
        return updated

    def update_incremental_control(self, variable_name: str, 
                                 last_timestamp: datetime, 
                                 batch_size: int) -> None:
        """Actualiza el control de procesamiento incremental"""
        query = """
            INSERT INTO baseline_incremental_control 
            (variable_name, last_processed_timestamp, batch_size, batch_number)
            VALUES (:variable_name, :last_timestamp, :batch_size, 1)
            ON CONFLICT (variable_name) DO UPDATE
            SET last_processed_timestamp = :last_timestamp,
                batch_size = :batch_size,
                batch_number = baseline_incremental_control.batch_number + 1,
                updated_at = CURRENT_TIMESTAMP;
        """
        try:
            with self.db.engine.begin() as conn:
                conn.execute(text(query), {
                    "variable_name": variable_name,
                    "last_timestamp": last_timestamp,
                    "batch_size": batch_size
                })
        except Exception as e:
            logging.error(f"Error actualizando control incremental: {str(e)}")
            raise

    def save_model_version(self, variable_name: str, results: Dict[str, Any]):
        """Guarda una nueva versión del modelo"""
        try:
            # Obtener última versión
            query = """
                SELECT COALESCE(MAX(version), 0)
                FROM baseline_model_versions
                WHERE variable_name = :variable_name;
            """
            with self.db.engine.connect() as conn:
                result = conn.execute(text(query), {"variable_name": variable_name})
                last_version = result.scalar() or 0
                
                # Crear nueva versión
                new_version = last_version + 1
                
                # Desactivar versiones antiguas si excede el máximo
                if new_version > self.config.MODEL_PARAMS["max_versions"]:
                    conn.execute(text("""
                        UPDATE baseline_model_versions
                        SET is_active = false
                        WHERE variable_name = :variable_name
                        AND version <= :old_version;
                    """), {
                        "variable_name": variable_name,
                        "old_version": new_version - self.config.MODEL_PARAMS["max_versions"]
                    })
                
                # Insertar nueva versión
                conn.execute(text("""
                    INSERT INTO baseline_model_versions
                    (variable_name, version, model_parameters, 
                     performance_metrics, training_metadata)
                    VALUES (:variable_name, :version, :parameters,
                           :metrics, :metadata);
                """), {
                    "variable_name": variable_name,
                    "version": new_version,
                    "parameters": json.dumps(self._clean_json_data(results["patterns"])),
                    "metrics": json.dumps(self._clean_json_data(results["metrics"])),
                    "metadata": json.dumps({
                        "timestamp": datetime.now().isoformat(),
                        "validation": results["validation"]
                    })
                })
                
        except Exception as e:
            logging.error(f"Error guardando versión del modelo: {str(e)}")
            raise

             


def execute_baseline_system():
    """Ejecuta el sistema de línea base con procesamiento incremental"""
    try:
        config = SystemConfig()
        db_manager = DatabaseManager(config)
        modeler = BaselineModeler(config, db_manager)
        
        # Obtener variables críticas
        variables = db_manager.get_selected_variables()
        if not variables:
            logging.warning("No se encontraron variables críticas para procesar")
            return False
            
        logging.info(f"Variables críticas identificadas: {len(variables)}")
        
        def process_variable(variable: str) -> bool:
            try:
                logging.info(f"Iniciando procesamiento incremental de: {variable}")
                success = modeler.process_incremental(variable)
                if success:
                    logging.info(f"Variable {variable} procesada exitosamente")
                return success
            except Exception as e:
                logging.error(f"Error procesando {variable}: {str(e)}")
                return False
        
        # Procesar variables en paralelo
        successful = []
        failed = []
        
        with ThreadPoolExecutor(max_workers=3) as executor:
            future_to_var = {executor.submit(process_variable, var): var 
                           for var in variables}
            
            for future in future_to_var:
                variable = future_to_var[future]
                try:
                    if future.result():
                        successful.append(variable)
                    else:
                        failed.append(variable)
                except Exception as e:
                    logging.error(f"Error en {variable}: {str(e)}")
                    failed.append(variable)
        
        # Reporte final
        logging.info(f"""
        Proceso completado:
        - Variables exitosas: {len(successful)}
        - Variables fallidas: {len(failed)}
        - Total procesadas: {len(variables)}
        """)
        
        if failed:
            logging.warning(f"Variables con errores: {failed}")
            
        return len(successful) > 0
        
    except Exception as e:
        logging.error(f"Error en proceso de línea base: {str(e)}")
        return False

if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    execute_baseline_system()

2025-01-05 22:49:08,554 - INFO - Variables críticas identificadas: 8
2025-01-05 22:49:08,555 - INFO - Iniciando procesamiento incremental de: tension_l2_v
2025-01-05 22:49:08,559 - INFO - Iniciando procesamiento incremental de: tension_l3_v
2025-01-05 22:49:08,569 - INFO - Iniciando procesamiento incremental de: tension_l1_l2_v
2025-01-05 22:51:10,835 - DEBUG - input tempfile: /tmp/tmpng3_x0fd/1dbsrkwn.json
2025-01-05 22:51:13,447 - DEBUG - input tempfile: /tmp/tmpng3_x0fd/_8cfel8_.json
2025-01-05 22:51:13,761 - DEBUG - idx 0
2025-01-05 22:51:14,035 - DEBUG - running CmdStan, num_threads: None
2025-01-05 22:51:14,039 - DEBUG - CmdStan args: ['/home/elicoubuntu/Desktop/Industrial_Insigths/.venv/lib/python3.10/site-packages/prophet/stan_model/prophet_model.bin', 'random', 'seed=89452', 'data', 'file=/tmp/tmpng3_x0fd/1dbsrkwn.json', 'init=/tmp/tmpng3_x0fd/_8cfel8_.json', 'output', 'file=/tmp/tmpng3_x0fd/prophet_modelrpispkjw/prophet_model-20250105225113.csv', 'method=optimize', 'algorithm