In [2]:
pip install openpyxl

Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
   ---------------------------------------- 0.0/250.9 kB ? eta -:--:--
   - -------------------------------------- 10.2/250.9 kB ? eta -:--:--
   ------ -------------------------------- 41.0/250.9 kB 653.6 kB/s eta 0:00:01
   ---------------------------------------- 250.9/250.9 kB 2.6 MB/s eta 0:00:00
Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-2.0.0 openpyxl-3.1.5
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [19]:
import pandas as pd
import numpy as np
import os
import re
import time
import logging
from datetime import datetime, timedelta
from dateutil.parser import parse
import warnings
warnings.filterwarnings('ignore')

# Configurar logging
def configurar_logging(log_file='myinvestingreportnormal.log'):
    """Configura el sistema de logging"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger('MyinvestingreportNormal')

class MyinvestingreportNormal:
    """
    Clase para implementar el procesamiento de datos económicos de Investing
    con método Normal para datos de frecuencia diaria
    """
    
    def __init__(self, config_file, data_root='data/Macro/raw', log_file='myinvestingreportnormal.log'):
        """
        Inicializa el procesador
        
        Args:
            config_file (str): Ruta al archivo de configuración (Data Engineering.xlsx)
            data_root (str): Directorio raíz donde se encuentran los subdirectorios de datos
            log_file (str): Ruta al archivo de log
        """
        self.config_file = config_file
        self.data_root = data_root
        self.logger = configurar_logging(log_file)
        self.config_data = None
        self.fecha_min_global = None
        self.fecha_max_global = None
        self.indice_diario = None
        self.datos_procesados = {}
        self.df_combinado = None
        self.estadisticas = {}
        
        self.logger.info("=" * 80)
        self.logger.info("INICIANDO PROCESO: MyinvestingreportNormal")
        self.logger.info(f"Archivo de configuración: {config_file}")
        self.logger.info(f"Directorio raíz de datos: {data_root}")
        self.logger.info(f"Fecha y hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        self.logger.info("=" * 80)
    
    def leer_configuracion(self):
        """
        Lee y filtra la configuración del archivo Excel
        
        Returns:
            pd.DataFrame: Configuraciones filtradas
        """
        try:
            self.logger.info("Leyendo archivo de configuración...")
            # Leer archivo de configuración
            df_config = pd.read_excel(self.config_file)
            
            # Filtrar por tipo de preprocesamiento y fuente
            self.config_data = df_config[
                (df_config['Fuente'] == 'Investing Data') & 
                (df_config['Tipo de Preprocesamiento Según la Fuente'] == 'Normal')
            ].copy()
            
            num_configs = len(self.config_data)
            self.logger.info(f"Se encontraron {num_configs} configuraciones para procesar")
            
            if num_configs == 0:
                self.logger.warning("No se encontraron configuraciones que cumplan los criterios")
                return None
                
            return self.config_data
        
        except Exception as e:
            self.logger.error(f"Error al leer configuración: {str(e)}")
            return None
    
    def extraer_fecha(self, texto_fecha):
        """
        Extrae una fecha del formato "Apr 01, 2025 (Mar)" o similar
        
        Args:
            texto_fecha: Texto con la fecha a extraer
            
        Returns:
            pd.Timestamp: Fecha extraída o None si no se pudo procesar
        """
        if not isinstance(texto_fecha, str):
            # Si ya es un objeto datetime, convertirlo a pd.Timestamp
            if isinstance(texto_fecha, (datetime, pd.Timestamp)):
                return pd.Timestamp(texto_fecha)
            return None
        
        # Buscar formato "Apr 01, 2025 (Mar)"
        match = re.search(r'([A-Za-z]+\s+\d+,\s+\d{4})', texto_fecha)
        if match:
            try:
                return pd.to_datetime(match.group(1))
            except:
                pass
        
        # Intentar parsear directamente
        try:
            return pd.to_datetime(texto_fecha)
        except:
            return None
    
    def encontrar_ruta_archivo(self, variable, tipo_macro):
        """
        Encuentra la ruta completa del archivo basado en la variable y tipo_macro
        
        Args:
            variable (str): Nombre de la variable (archivo sin extensión)
            tipo_macro (str): Tipo de indicador macroeconómico
            
        Returns:
            str: Ruta completa del archivo o None si no se encuentra
        """
        # Construir ruta basada en la estructura de directorios
        ruta_base = os.path.join(self.data_root, tipo_macro)
        nombre_archivo = f"{variable}.csv"  # Para datos normales suelen ser CSV
        ruta_completa = os.path.join(ruta_base, nombre_archivo)
        
        if os.path.exists(ruta_completa):
            return ruta_completa
        
        # Intentar con extensión xlsx por si acaso
        nombre_archivo_alt = f"{variable}.xlsx"
        ruta_completa_alt = os.path.join(ruta_base, nombre_archivo_alt)
        
        if os.path.exists(ruta_completa_alt):
            return ruta_completa_alt
        
        # Si no se encuentra, intentar buscar en todos los subdirectorios
        for root, dirs, files in os.walk(self.data_root):
            if nombre_archivo in files:
                return os.path.join(root, nombre_archivo)
            if nombre_archivo_alt in files:
                return os.path.join(root, nombre_archivo_alt)
        
        return None
    
    def limpiar_valor_porcentaje(self, valor):
        """
        Limpia y convierte valores con formato de porcentaje, millones, billones, etc.
        
        Args:
            valor: Valor a limpiar, puede ser string o número
            
        Returns:
            float: Valor numérico limpio o None si no se puede convertir
        """
        # Si ya es un número, retornarlo directamente
        if isinstance(valor, (int, float)):
            return float(valor)
        
        # Si es None o no es string, retornar None
        if not isinstance(valor, str) or valor is None:
            return None
        
        # Eliminar espacios
        valor_limpio = valor.strip()
        
        # Si está vacío después de quitar espacios, retornar None
        if not valor_limpio:
            return None
        
        try:
            # Casos según el formato
            
            # Caso 1: Porcentaje (ej: "7.5%")
            if "%" in valor_limpio:
                return float(valor_limpio.replace("%", "").strip())
                
            # Caso 2: Millones (ej: "2.95M")
            if "M" in valor_limpio:
                return float(valor_limpio.replace("M", "").strip())
                
            # Caso 3: Billones (ej: "269.80B")
            if "B" in valor_limpio:
                return float(valor_limpio.replace("B", "").strip())
                
            # Caso 4: Miles (ej: "1.5K")
            if "K" in valor_limpio:
                return float(valor_limpio.replace("K", "").strip())
                
            # Caso 5: Trillones (ej: "1.2T")
            if "T" in valor_limpio:
                return float(valor_limpio.replace("T", "").strip())
                
            # Caso 6: Valores con comas como separadores de miles (ej: "1,234.56")
            if "," in valor_limpio:
                return float(valor_limpio.replace(",", "").strip())
            
            # Caso por defecto: intentar convertir directamente
            return float(valor_limpio)
            
        except (ValueError, TypeError):
            # Si no se puede convertir, retornar None
            return None
    
    def detectar_columna_fecha(self, df):
        """
        Detecta la columna de fecha en el DataFrame
        
        Args:
            df (pd.DataFrame): DataFrame con los datos
            
        Returns:
            str: Nombre de la columna de fecha detectada o None si no se encontró
        """
        columnas = df.columns.tolist()
        
        # Buscar columnas que contengan palabras relacionadas con fechas
        candidatos_fecha = [
            col for col in columnas if any(
                palabra in col.lower() 
                for palabra in ['date', 'fecha', 'time', 'día', 'day', 'periodo']
            )
        ]
        
        if candidatos_fecha:
            # Intentar convertir cada columna candidata a fecha
            for col in candidatos_fecha:
                muestra = df[col].dropna().head(5)
                if len(muestra) > 0:
                    # Intentar convertir a fecha
                    try:
                        pd.to_datetime(muestra)
                        return col
                    except:
                        pass
        
        # Si no se encuentra una columna explícita, verificar todas las columnas
        for col in columnas:
            muestra = df[col].dropna().head(5)
            if len(muestra) > 0:
                # Verificar si la columna ya es de tipo datetime
                if pd.api.types.is_datetime64_any_dtype(df[col]):
                    return col
                
                # Intentar convertir a fecha
                try:
                    pd.to_datetime(muestra)
                    return col
                except:
                    pass
        
        return None
    
    def detectar_columna_target(self, df, target_especificado):
        """
        Detecta la columna target en el DataFrame
        
        Args:
            df (pd.DataFrame): DataFrame con los datos
            target_especificado (str): Nombre de la columna TARGET especificada
            
        Returns:
            str: Nombre de la columna target detectada o None si no se encontró
        """
        # Comprobar si existe la columna target especificada (insensible a mayúsculas/minúsculas)
        columnas = df.columns.tolist()
        for col in columnas:
            if col.upper() == target_especificado.upper():
                return col
        
        # Si no existe la columna target, buscar columnas alternativas comunes
        candidatos_valor = [
            'Price', 'Close', 'PRICE', 'Adj Close', 'Cierre', 'Value', 'Valor',
            'Data', 'Rate', 'Tasa', 'Actual', 'ACTUAL', 'Last'
        ]
        
        for candidato in candidatos_valor:
            if candidato in columnas:
                return candidato
        
        # Si no se encuentra ningún candidato obvio, buscar columnas con valores numéricos
        columnas_numericas = []
        
        for col in columnas:
            # Evitar columnas que probablemente no son valores
            if col in ['Date', 'Fecha', 'Time', 'Hora', 'Symbol', 'Ticker', 'Volume', 'Open', 'High', 'Low']:
                continue
                
            # Contar valores numéricos
            valores_numericos = 0
            for i in range(min(10, len(df))):
                if i < len(df):
                    valor = df.iloc[i].get(col)
                    if valor is not None:
                        valor_limpio = self.limpiar_valor_porcentaje(valor)
                        if valor_limpio is not None:
                            valores_numericos += 1
            
            if valores_numericos > 0:
                columnas_numericas.append((col, valores_numericos))
        
        # Ordenar por número de valores numéricos, descendente
        columnas_numericas.sort(key=lambda x: x[1], reverse=True)
        
        # Retornar la columna con más valores numéricos
        if columnas_numericas:
            return columnas_numericas[0][0]
            
        return None
    
    def procesar_archivo(self, config_row):
        """
        Procesa un archivo individual según la configuración
        
        Args:
            config_row (pd.Series): Fila de configuración del archivo
            
        Returns:
            tuple: (nombre_indicador, pd.DataFrame procesado) o (nombre_indicador, None) si hay error
        """
        variable = config_row['Variable']
        tipo_macro = config_row['Tipo Macro']
        target_col = config_row['TARGET']
        
        # Buscar la ruta completa del archivo
        ruta_archivo = self.encontrar_ruta_archivo(variable, tipo_macro)
        
        self.logger.info(f"\nProcesando: {variable} ({tipo_macro})")
        self.logger.info(f"- Archivo: {variable}")
        self.logger.info(f"- Columna TARGET: {target_col}")
        
        if ruta_archivo is None:
            self.logger.error(f"- ERROR: Archivo no encontrado: {variable}")
            return variable, None
        
        self.logger.info(f"- Ruta encontrada: {ruta_archivo}")
        
        try:
            # Determinar la extensión del archivo
            _, extension = os.path.splitext(ruta_archivo)
            extension = extension.lower()
            
            # Leer el archivo según su extensión
            if extension == '.csv':
                # Probar diferentes separadores
                try:
                    df = pd.read_csv(ruta_archivo, sep=',')
                except:
                    try:
                        df = pd.read_csv(ruta_archivo, sep=';')
                    except:
                        try:
                            df = pd.read_csv(ruta_archivo, sep='\t')
                        except Exception as e:
                            self.logger.error(f"- ERROR: No se pudo leer el archivo CSV: {str(e)}")
                            return variable, None
            elif extension in ['.xlsx', '.xls']:
                try:
                    # Método 1: Usando pandas directamente
                    df = pd.read_excel(ruta_archivo)
                except Exception as e:
                    self.logger.warning(f"- AVISO: Error al leer con pandas: {str(e)}")
                    try:
                        # Método 2: Usando openpyxl como motor
                        df = pd.read_excel(ruta_archivo, engine='openpyxl')
                    except Exception as e2:
                        self.logger.warning(f"- AVISO: Error al leer con openpyxl: {str(e2)}")
                        try:
                            # Método 3: Usando xlrd como motor
                            df = pd.read_excel(ruta_archivo, engine='xlrd')
                        except Exception as e3:
                            self.logger.error(f"- ERROR: Todos los métodos de lectura fallaron: {str(e3)}")
                            return variable, None
            else:
                self.logger.error(f"- ERROR: Formato de archivo no soportado: {extension}")
                return variable, None
            
            total_filas = len(df)
            self.logger.info(f"- Filas encontradas: {total_filas}")
            
            if total_filas == 0:
                self.logger.error(f"- ERROR: El archivo está vacío")
                return variable, None
            
            # Detectar columna de fecha
            columna_fecha = self.detectar_columna_fecha(df)
            if columna_fecha is None:
                self.logger.error(f"- ERROR: No se encontró columna de fecha en {ruta_archivo}")
                return variable, None
            
            self.logger.info(f"- Columna de fecha identificada: {columna_fecha}")
                
            # Detectar la columna TARGET
            columna_encontrada = self.detectar_columna_target(df, target_col)
            
            if columna_encontrada is None:
                self.logger.error(f"- ERROR: No se encontró columna TARGET ni alternativas en {ruta_archivo}")
                return variable, None
                
            if columna_encontrada != target_col:
                self.logger.warning(f"- AVISO: No se encontró columna '{target_col}', usando '{columna_encontrada}'")
            
            # Procesar fechas
            df['fecha'] = df[columna_fecha].apply(self.extraer_fecha)
            df = df.dropna(subset=['fecha'])
            
            # Contar fechas procesadas correctamente
            fechas_procesadas = len(df)
            if fechas_procesadas < total_filas:
                self.logger.warning(f"- AVISO: {total_filas - fechas_procesadas} fechas no pudieron ser procesadas")
            
            # Extraer valores de TARGET, limpiando porcentajes, millones, billones, etc.
            df['valor'] = df[columna_encontrada].apply(self.limpiar_valor_porcentaje)
            
            # Verificar si hay valores válidos
            df = df.dropna(subset=['valor'])
            
            valores_validos = len(df)
            if valores_validos == 0:
                self.logger.error(f"- ERROR: No se encontraron valores válidos para '{columna_encontrada}' en {ruta_archivo}")
                return variable, None
                
            cobertura = (valores_validos / total_filas) * 100
            
            # Renombrar columna según el patrón
            nuevo_nombre = f"{target_col}_{variable}_{tipo_macro}"
            df.rename(columns={'valor': nuevo_nombre}, inplace=True)
            
            # Seleccionar solo las columnas relevantes
            df_procesado = df[['fecha', nuevo_nombre]].copy()
            
            # Ordenar por fecha
            df_procesado = df_procesado.sort_values('fecha')
            
            # Calcular fechas mínima y máxima
            fecha_min = df_procesado['fecha'].min()
            fecha_max = df_procesado['fecha'].max()
            
            # Actualizar fechas mínima y máxima globales
            if self.fecha_min_global is None or fecha_min < self.fecha_min_global:
                self.fecha_min_global = fecha_min
                
            if self.fecha_max_global is None or fecha_max > self.fecha_max_global:
                self.fecha_max_global = fecha_max
            
            # Registrar estadísticas
            self.estadisticas[variable] = {
                'tipo_macro': tipo_macro,
                'columna_target': columna_encontrada,
                'total_filas': total_filas,
                'valores_validos': valores_validos,
                'cobertura': cobertura,
                'fecha_min': fecha_min,
                'fecha_max': fecha_max,
                'nuevo_nombre': nuevo_nombre
            }
            
            estado = "OK" if cobertura >= 75 else "ALERTA"
            
            self.logger.info(f"- Valores no nulos en TARGET: {valores_validos}")
            self.logger.info(f"- Cobertura: {cobertura:.2f}%")
            self.logger.info(f"- Periodo: {fecha_min.strftime('%Y-%m-%d')} a {fecha_max.strftime('%Y-%m-%d')}")
            self.logger.info(f"- Estado: {estado}")
            
            return variable, df_procesado
            
        except Exception as e:
            self.logger.error(f"- ERROR al procesar {ruta_archivo}: {str(e)}")
            return variable, None
    
    def generar_indice_diario(self):
        """
        Genera un DataFrame con índice diario entre fechas mínima y máxima globales
        
        Returns:
            pd.DataFrame: DataFrame con índice diario
        """
        if self.fecha_min_global is None or self.fecha_max_global is None:
            self.logger.error("No se pudieron determinar fechas mínima y máxima globales")
            return None
            
        self.logger.info("\nGenerando índice temporal diario...")
        self.logger.info(f"- Rango de fechas global: {self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}")
        
        # Generar todas las fechas diarias
        todas_fechas = pd.date_range(start=self.fecha_min_global, end=self.fecha_max_global, freq='D')
        self.indice_diario = pd.DataFrame({'fecha': todas_fechas})
        
        self.logger.info(f"- Total de fechas diarias generadas: {len(self.indice_diario)}")
        return self.indice_diario
    
    def combinar_datos(self):
        """
        Combina todos los indicadores procesados con el índice diario
        
        Returns:
            pd.DataFrame: DataFrame combinado con todos los indicadores
        """
        if not self.datos_procesados:
            self.logger.error("No hay datos procesados para combinar")
            return None
            
        if self.indice_diario is None:
            self.logger.error("No se ha generado el índice diario")
            return None
            
        self.logger.info("\nCombinando datos con índice diario...")
        
        # Comenzar con el índice diario
        df_combinado = self.indice_diario.copy()
        
        # Para cada indicador, realizar el merge y aplicar ffill
        for variable, df in self.datos_procesados.items():
            if df is None:
                self.logger.warning(f"Omitiendo {variable} por errores de procesamiento")
                continue
                
            nombre_col = df.columns[1]  # La columna de valores (después de 'fecha')
            
            self.logger.info(f"- Combinando: {nombre_col}")
            
            # Realizar merge
            df_combinado = pd.merge(df_combinado, df, on='fecha', how='left')
            
            # Aplicar forward fill (ffill)
            df_combinado[nombre_col] = df_combinado[nombre_col].ffill()
            
            # Calcular métricas después de ffill
            valores_antes = self.estadisticas[variable]['valores_validos']
            valores_despues = df_combinado[nombre_col].notna().sum()
            valores_imputados = valores_despues - valores_antes
            cobertura_final = (valores_despues / len(df_combinado)) * 100
            
            # Actualizar estadísticas
            self.estadisticas[variable].update({
                'valores_despues_ffill': valores_despues,
                'valores_imputados': valores_imputados,
                'cobertura_final': cobertura_final
            })
        
        self.df_combinado = df_combinado
        self.logger.info(f"- DataFrame combinado: {len(df_combinado)} filas, {len(df_combinado.columns)} columnas")
        
        return self.df_combinado
    
    def analizar_cobertura_final(self):
        """
        Genera un informe detallado de cobertura final
        """
        if self.df_combinado is None or not self.estadisticas:
            self.logger.error("No hay datos combinados o estadísticas para analizar")
            return
            
        self.logger.info("\n" + "=" * 50)
        self.logger.info("RESUMEN DE COBERTURA FINAL")
        self.logger.info("=" * 50)
        
        total_indicadores = len(self.estadisticas)
        total_dias = len(self.indice_diario)
        
        self.logger.info(f"Total indicadores procesados: {total_indicadores}")
        self.logger.info(f"Rango de fechas: {self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}")
        self.logger.info(f"Total días en la serie: {total_dias}")
        self.logger.info("\nCobertura por indicador:")
        
        # Contadores por nivel de cobertura
        coberturas = {
            "Excelente (>90%)": 0,
            "Buena (75-90%)": 0,
            "Regular (50-75%)": 0,
            "Baja (25-50%)": 0,
            "Crítica (<25%)": 0
        }
        
        # Ordenar por cobertura final descendente
        indicadores_ordenados = sorted(
            self.estadisticas.items(), 
            key=lambda x: x[1].get('cobertura_final', 0), 
            reverse=True
        )
        
        for variable, stats in indicadores_ordenados:
            cobertura = stats.get('cobertura_final', 0)
            estado = self._obtener_estado_cobertura(cobertura)
            
            self.logger.info(f"- {variable} ({stats['nuevo_nombre']}): {cobertura:.2f}% [{estado}]")
            
            # Incrementar contador correspondiente
            if cobertura > 90:
                coberturas["Excelente (>90%)"] += 1
            elif cobertura > 75:
                coberturas["Buena (75-90%)"] += 1
            elif cobertura > 50:
                coberturas["Regular (50-75%)"] += 1
            elif cobertura > 25:
                coberturas["Baja (25-50%)"] += 1
            else:
                coberturas["Crítica (<25%)"] += 1
        
        self.logger.info("\nDistribución de cobertura:")
        for rango, num in coberturas.items():
            self.logger.info(f"- {rango}: {num} indicadores")
        
        # Añadir información sobre valores imputados
        self.logger.info("\nImputación de datos:")
        total_valores = total_dias * total_indicadores
        total_originales = sum(s['valores_validos'] for s in self.estadisticas.values())
        total_imputados = sum(s.get('valores_imputados', 0) for s in self.estadisticas.values())
        
        self.logger.info(f"- Valores originales: {total_originales} ({total_originales/total_valores*100:.2f}%)")
        self.logger.info(f"- Valores imputados: {total_imputados} ({total_imputados/total_valores*100:.2f}%)")
    
    def _obtener_estado_cobertura(self, cobertura):
        """Determina el estado según el porcentaje de cobertura"""
        if cobertura > 90:
            return "EXCELENTE"
        elif cobertura > 75:
            return "BUENO"
        elif cobertura > 50:
            return "REGULAR"
        elif cobertura > 25:
            return "BAJO"
        else:
            return "CRÍTICO"
    
    def generar_estadisticas_valores(self):
        """
        Genera estadísticas descriptivas de los valores para cada indicador
        """
        if self.df_combinado is None:
            self.logger.error("No hay datos combinados para analizar")
            return
            
        self.logger.info("\n" + "=" * 50)
        self.logger.info("ESTADÍSTICAS DE VALORES")
        self.logger.info("=" * 50)
        
        for col in self.df_combinado.columns:
            if col == 'fecha':
                continue
                
            serie = self.df_combinado[col].dropna()
            
            if len(serie) == 0:
                self.logger.warning(f"La columna {col} no tiene valores")
                continue
            
            # Calcular estadísticas básicas
            stats = {
                'min': serie.min(),
                'max': serie.max(),
                'mean': serie.mean(),
                'median': serie.median(),
                'std': serie.std()
            }
            
            # Identificar valores atípicos (más de 3 desviaciones estándar)
            umbral_superior = stats['mean'] + 3 * stats['std']
            umbral_inferior = stats['mean'] - 3 * stats['std']
            valores_atipicos = serie[(serie > umbral_superior) | (serie < umbral_inferior)]
            
            self.logger.info(f"\nEstadísticas para {col}:")
            self.logger.info(f"- Min: {stats['min']:.4f}")
            self.logger.info(f"- Max: {stats['max']:.4f}")
            self.logger.info(f"- Media: {stats['mean']:.4f}")
            self.logger.info(f"- Mediana: {stats['median']:.4f}")
            self.logger.info(f"- Desv. Estándar: {stats['std']:.4f}")
            
            if len(valores_atipicos) > 0:
                pct_atipicos = (len(valores_atipicos) / len(serie)) * 100
                self.logger.info(f"- ALERTA: Se encontraron {len(valores_atipicos)} valores atípicos ({pct_atipicos:.2f}%)")
                self.logger.info(f"  Umbral inferior: {umbral_inferior:.4f}, Umbral superior: {umbral_superior:.4f}")
            
            # Guardar estadísticas en el diccionario
            for var, stats_dict in self.estadisticas.items():
                if stats_dict.get('nuevo_nombre') == col:
                    stats_dict.update(stats)
                    stats_dict['valores_atipicos'] = len(valores_atipicos)
    
    def guardar_resultados(self, output_file='datos_economicos_normales_procesados.xlsx'):
        """
        Guarda los resultados procesados en un archivo Excel
        
        Args:
            output_file (str): Ruta del archivo de salida
            
        Returns:
            bool: True si se guardó correctamente, False en caso contrario
        """
        if self.df_combinado is None:
            self.logger.error("No hay datos para guardar")
            return False
            
        try:
            self.logger.info("\n" + "=" * 50)
            self.logger.info("GUARDANDO RESULTADOS")
            self.logger.info("=" * 50)
            
            self.logger.info(f"Guardando resultados en: {output_file}")
            
            # Crear un writer de Excel
            with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:
                # Guardar el DataFrame combinado en la primera hoja
                self.logger.info(f"Guardando DataFrame combinado en '{output_file}'...")
                self.df_combinado.to_excel(writer, sheet_name='Datos_Combinados', index=False)
                
                # Guardar estadísticas en una segunda hoja
                self.logger.info("Guardando estadísticas de los indicadores...")
                
                # Convertir diccionario de estadísticas a DataFrame
                stats_data = []
                for variable, stats in self.estadisticas.items():
                    row = {
                        'Variable': variable,
                        'Tipo_Macro': stats.get('tipo_macro', ''),
                        'Columna_TARGET': stats.get('columna_target', ''),
                        'Nombre_Columna': stats.get('nuevo_nombre', ''),
                        'Total_Filas_Original': stats.get('total_filas', 0),
                        'Valores_Validos_Original': stats.get('valores_validos', 0),
                        'Cobertura_Original_%': stats.get('cobertura', 0),
                        'Valores_Despues_FFill': stats.get('valores_despues_ffill', 0),
                        'Valores_Imputados': stats.get('valores_imputados', 0),
                        'Cobertura_Final_%': stats.get('cobertura_final', 0),
                        'Fecha_Min': stats.get('fecha_min', ''),
                        'Fecha_Max': stats.get('fecha_max', ''),
                        'Valor_Min': stats.get('min', ''),
                        'Valor_Max': stats.get('max', ''),
                        'Valor_Media': stats.get('mean', ''),
                        'Valor_Mediana': stats.get('median', ''),
                        'Desviacion_Estandar': stats.get('std', ''),
                        'Valores_Atipicos': stats.get('valores_atipicos', 0)
                    }
                    stats_data.append(row)
                    
                df_stats = pd.DataFrame(stats_data)
                df_stats.to_excel(writer, sheet_name='Estadisticas', index=False)
                
                # Guardar metadatos
                metadata = {
                    'Proceso': ['MyinvestingreportNormal'],
                    'Fecha de proceso': [datetime.now().strftime('%Y-%m-%d %H:%M:%S')],
                    'Total indicadores': [len(self.estadisticas)],
                    'Periodo': [f"{self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}"],
                    'Total días': [len(self.indice_diario)]
                }
                pd.DataFrame(metadata).to_excel(writer, sheet_name='Metadatos')
            
            self.logger.info(f"Archivo guardado exitosamente: {output_file}")
            return True
            
        except Exception as e:
            self.logger.error(f"Error al guardar resultados: {str(e)}")
            return False
    
    def ejecutar_proceso_completo(self, output_file='datos_economicos_normales_procesados.xlsx'):
        """
        Ejecuta el proceso completo de preprocesamiento
        
        Args:
            output_file (str): Ruta del archivo de salida
            
        Returns:
            bool: True si el proceso se completó exitosamente, False en caso contrario
        """
        inicio = time.time()
        self.logger.info("Iniciando proceso completo MyinvestingreportNormal...")
        
        # 1. Leer configuración
        self.leer_configuracion()
        if self.config_data is None or len(self.config_data) == 0:
            return False
        
        # 2. Procesar cada archivo
        for _, config_row in self.config_data.iterrows():
            variable, df_procesado = self.procesar_archivo(config_row)
            self.datos_procesados[variable] = df_procesado
        
        # Verificar si se procesó al menos un archivo correctamente
        archivos_correctos = sum(1 for df in self.datos_procesados.values() if df is not None)
        if archivos_correctos == 0:
            self.logger.error("No se pudo procesar correctamente ningún archivo")
            return False
        
        # 3. Generar índice diario
        self.generar_indice_diario()
        if self.indice_diario is None:
            return False
        
        # 4. Combinar datos
        self.combinar_datos()
        if self.df_combinado is None:
            return False
        
        # 5. Analizar cobertura final
        self.analizar_cobertura_final()
        
        # 6. Generar estadísticas de valores
        self.generar_estadisticas_valores()
        
        # 7. Guardar resultados
        resultado = self.guardar_resultados(output_file)
        
        # 8. Mostrar resumen final
        fin = time.time()
        tiempo_ejecucion = fin - inicio
        
        self.logger.info("\n" + "=" * 50)
        self.logger.info("RESUMEN DE EJECUCIÓN")
        self.logger.info("=" * 50)
        self.logger.info(f"Proceso: MyinvestingreportNormal")
        self.logger.info(f"Tiempo de ejecución: {tiempo_ejecucion:.2f} segundos")
        self.logger.info(f"Archivos procesados: {len(self.datos_procesados)}")
        self.logger.info(f"Archivos con error: {sum(1 for df in self.datos_procesados.values() if df is None)}")
        self.logger.info(f"Archivos procesados correctamente: {archivos_correctos}")
        self.logger.info(f"Periodo de datos: {self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}")
        self.logger.info(f"Datos combinados: {len(self.df_combinado)} filas, {len(self.df_combinado.columns)} columnas")
        self.logger.info(f"Archivo de salida: {output_file}")
        self.logger.info(f"Estado: {'COMPLETADO' if resultado else 'ERROR'}")
        self.logger.info("=" * 50)
        
        return resultado


# Función principal para ejecutar el proceso
def ejecutar_myinvestingreportnormal(config_file='Data Engineering.xlsx',
                                    output_file='datos_economicos_normales_procesados.xlsx',
                                    data_root='data/Macro/raw',
                                    log_file='myinvestingreportnormal.log'):
    """
    Ejecuta el proceso MyinvestingreportNormal
    
    Args:
        config_file (str): Ruta al archivo de configuración
        output_file (str): Ruta al archivo de salida
        data_root (str): Directorio raíz donde se encuentran los subdirectorios de datos
        log_file (str): Ruta al archivo de log
        
    Returns:
        bool: True si el proceso se completó exitosamente, False en caso contrario
    """
    procesador = MyinvestingreportNormal(config_file, data_root, log_file)
    return procesador.ejecutar_proceso_completo(output_file)


# Ejemplo de uso
if __name__ == "__main__":
    resultado = ejecutar_myinvestingreportnormal()
    print(f"Proceso {'completado exitosamente' if resultado else 'finalizado con errores'}")

2025-03-27 18:40:03,350 [INFO] INICIANDO PROCESO: MyinvestingreportNormal
2025-03-27 18:40:03,352 [INFO] Archivo de configuración: Data Engineering.xlsx
2025-03-27 18:40:03,354 [INFO] Directorio raíz de datos: data/Macro/raw
2025-03-27 18:40:03,355 [INFO] Fecha y hora: 2025-03-27 18:40:03


2025-03-27 18:40:03,357 [INFO] Iniciando proceso completo MyinvestingreportNormal...
2025-03-27 18:40:03,360 [INFO] Leyendo archivo de configuración...
2025-03-27 18:40:03,396 [INFO] Se encontraron 28 configuraciones para procesar
2025-03-27 18:40:03,398 [INFO] 
Procesando: Australia_10Y_Bond (bond)
2025-03-27 18:40:03,398 [INFO] - Archivo: Australia_10Y_Bond
2025-03-27 18:40:03,399 [INFO] - Columna TARGET: PRICE
2025-03-27 18:40:03,401 [INFO] - Ruta encontrada: data/Macro/raw\bond\Australia_10Y_Bond.csv
2025-03-27 18:40:03,410 [INFO] - Filas encontradas: 3810
2025-03-27 18:40:03,412 [INFO] - Columna de fecha identificada: Date
2025-03-27 18:40:04,712 [INFO] - Valores no nulos en TARGET: 3810
2025-03-27 18:40:04,714 [INFO] - Cobertura: 100.00%
2025-03-27 18:40:04,715 [INFO] - Periodo: 2014-01-01 a 2025-03-26
2025-03-27 18:40:04,716 [INFO] - Estado: OK
2025-03-27 18:40:04,719 [INFO] 
Procesando: Italy_10Y_Bond (bond)
2025-03-27 18:40:04,721 [INFO] - Archivo: Italy_10Y_Bond
2025-03-27 18

Proceso completado exitosamente


In [2]:
import pandas as pd
import numpy as np
import os
import re
import time
import logging
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')


# ---------------------------------------------------
# Configuración de logging
# ---------------------------------------------------
def configurar_logging(log_file='myinvestingreportnormal.log'):
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger('MyinvestingreportNormal')


# ---------------------------------------------------
# Función central para conversión de valores numéricos
# ---------------------------------------------------
def convertir_valor(valor, variable=None, formatos_conocidos=None):
    """
    Convierte cualquier representación de valor numérico a float.

    Args:
        valor: Valor a convertir (string, int, float)
        variable: Nombre de la variable (opcional, para formatos específicos)
        formatos_conocidos: Diccionario {variable: formato} para optimizar conversiones

    Returns:
        float: Valor numérico convertido o None si no es posible
    """
    # Si ya es numérico, retornarlo directamente
    if isinstance(valor, (int, float)):
        return float(valor)
    
    # Si es None o no es string, retornar None
    if not isinstance(valor, str) or valor is None:
        return None

    valor_limpio = valor.strip()
    if not valor_limpio:
        return None

    # Si tenemos un formato conocido para la variable, aplicarlo
    if variable and formatos_conocidos and variable in formatos_conocidos:
        formato = formatos_conocidos[variable]
        if formato == 'europeo':
            # En formato europeo, el punto es separador de miles y la coma decimal
            valor_limpio = valor_limpio.replace('.', '')
            valor_limpio = valor_limpio.replace(',', '.')
    # Determinar multiplicador basado en sufijos
    multiplicadores = {
        '%': 1,      # Si se requiere convertir a proporción se puede dividir por 100
        'K': 1e3,    # Miles
        'M': 1e6,    # Millones
        'B': 1e9,    # Billones
        'T': 1e12    # Trillones
    }
    multiplicador = 1
    for sufijo, mult in multiplicadores.items():
        if valor_limpio.endswith(sufijo):
            valor_limpio = valor_limpio.replace(sufijo, '')
            multiplicador = mult
            break

    # Detectar formato basado en separadores
    if ',' in valor_limpio and '.' in valor_limpio:
        if valor_limpio.rfind(',') > valor_limpio.rfind('.'):
            # Formato europeo: "1.234,56"
            valor_limpio = valor_limpio.replace('.', '')
            valor_limpio = valor_limpio.replace(',', '.')
        else:
            # Formato americano: "1,234.56"
            valor_limpio = valor_limpio.replace(',', '')
    elif ',' in valor_limpio:
        partes = valor_limpio.split(',')
        if len(partes) == 2 and len(partes[1]) <= 2:
            # Probable decimal
            valor_limpio = valor_limpio.replace(',', '.')
        else:
            valor_limpio = valor_limpio.replace(',', '')
    # En caso de que solo haya puntos se asume que es formato americano

    try:
        return float(valor_limpio) * multiplicador
    except (ValueError, TypeError):
        return None


# ---------------------------------------------------
# Clase para detección y cacheo de formatos numéricos
# ---------------------------------------------------
class FormatosNumericos:
    """Clase para detectar y cachear formatos numéricos"""

    def __init__(self):
        self.formatos_cache = {}  # {variable: formato}

    def detectar_formato(self, valores, variable=None):
        """
        Detecta el formato numérico predominante en una lista de valores.

        Args:
            valores: Lista de valores para analizar.
            variable: Nombre de la variable (opcional).

        Returns:
            str: Formato detectado ('europeo' o 'americano').
        """
        conteo = {
            'europeo': 0,   # Ejemplo: 1.234,56
            'americano': 0  # Ejemplo: 1,234.56
        }
        for valor in valores:
            if not isinstance(valor, str):
                continue
            valor = valor.strip()
            if ',' in valor and '.' in valor:
                if valor.rfind(',') > valor.rfind('.'):
                    conteo['europeo'] += 1
                else:
                    conteo['americano'] += 1
            elif ',' in valor:
                partes = valor.split(',')
                if len(partes) == 2 and len(partes[1]) <= 2:
                    conteo['europeo'] += 1
                else:
                    conteo['americano'] += 1
            elif '.' in valor:
                conteo['americano'] += 1

        formato = 'americano'  # Valor por defecto
        if conteo['europeo'] > conteo['americano']:
            formato = 'europeo'
        if variable:
            self.formatos_cache[variable] = formato
        return formato

    def obtener_formato(self, variable):
        """Obtiene el formato conocido para una variable"""
        return self.formatos_cache.get(variable)


# ---------------------------------------------------
# Función para convertir fechas en diversos formatos
# ---------------------------------------------------
def convertir_fecha(fecha_str):
    """
    Convierte diversos formatos de fecha a pd.Timestamp.

    Args:
        fecha_str: Fecha en cualquier formato

    Returns:
        pd.Timestamp o None si no es posible la conversión.
    """
    # Si ya es datetime o Timestamp
    if isinstance(fecha_str, (pd.Timestamp, datetime)):
        return pd.Timestamp(fecha_str)
    
    if pd.isna(fecha_str):
        return None

    # Si es numérico, intentar formato compacto DDMMYYYY
    if isinstance(fecha_str, (int, float)):
        fecha_str = str(int(fecha_str))
        if len(fecha_str) == 8:
            try:
                dia = int(fecha_str[:2])
                mes = int(fecha_str[2:4])
                anio = int(fecha_str[4:])
                return pd.Timestamp(year=anio, month=mes, day=dia)
            except Exception:
                pass

    if isinstance(fecha_str, str):
        fecha_str = fecha_str.strip()
        formatos = [
            '%d.%m.%Y', '%d/%m/%Y', '%d-%m-%Y',
            '%m/%d/%Y', '%Y-%m-%d',
            '%Y%m%d', '%d%m%Y'
        ]
        for fmt in formatos:
            try:
                return pd.to_datetime(fecha_str, format=fmt)
            except Exception:
                continue

        # Intentar detectar patrones como "Apr 01, 2025" o meses en español
        try:
            if re.search(r'([A-Za-z]+\s+\d+,\s+\d{4})', fecha_str):
                match = re.search(r'([A-Za-z]+\s+\d+,\s+\d{4})', fecha_str)
                return pd.to_datetime(match.group(1))
            # Reemplazar meses en español por inglés
            meses_es = {
                'ene': 'Jan', 'feb': 'Feb', 'mar': 'Mar', 'abr': 'Apr',
                'may': 'May', 'jun': 'Jun', 'jul': 'Jul', 'ago': 'Aug',
                'sep': 'Sep', 'oct': 'Oct', 'nov': 'Nov', 'dic': 'Dec'
            }
            texto_procesado = fecha_str.lower()
            for mes_es, mes_en in meses_es.items():
                if mes_es in texto_procesado:
                    texto_procesado = texto_procesado.replace(mes_es, mes_en)
            return pd.to_datetime(texto_procesado)
        except Exception:
            pass

    return None


# ---------------------------------------------------
# Clase principal: MyinvestingreportNormal
# ---------------------------------------------------
class MyinvestingreportNormal:
    """
    Clase mejorada para el procesamiento de datos económicos.
    Integra funciones centralizadas para conversión numérica y de fechas,
    detección automática de columnas y lectura adaptativa de CSV.
    """
    def __init__(self, config_file, data_root='data/Macro/raw', log_file='myinvestingreportnormal.log'):
        self.config_file = config_file
        self.data_root = data_root
        self.logger = configurar_logging(log_file)
        self.config_data = None
        self.fecha_min_global = None
        self.fecha_max_global = None
        self.indice_diario = None
        self.datos_procesados = {}
        self.df_combinado = None
        self.estadisticas = {}

        # Nuevos atributos para manejo de formatos
        self.formatos_numericos = FormatosNumericos()
        self.formatos_conocidos = {}  # Para guardar configuraciones de CSV exitosas

        self.inicializar_formatos_conocidos()

        self.logger.info("=" * 80)
        self.logger.info("INICIANDO PROCESO: MyinvestingreportNormal")
        self.logger.info(f"Archivo de configuración: {config_file}")
        self.logger.info(f"Directorio raíz de datos: {data_root}")
        self.logger.info(f"Fecha y hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        self.logger.info("=" * 80)

    def inicializar_formatos_conocidos(self):
        """Inicializa formatos conocidos para variables específicas (por ejemplo, índices europeos)"""
        indices_europeos = [
            'Russell_2000', 'NASDAQ_Composite', 'S&P500_Index',
            'Nikkei_225', 'DAX_30', 'VIX_VolatilityIndex'
        ]
        for indice in indices_europeos:
            self.formatos_numericos.formatos_cache[indice] = 'europeo'

    # ---------------------------------------------------
    # Métodos para leer configuración y archivos
    # ---------------------------------------------------
    def leer_configuracion(self):
        self.logger.info("Leyendo archivo de configuración...")
        try:
            df_config = pd.read_excel(self.config_file)
            self.config_data = df_config[
                (df_config['Fuente'] == 'Investing Data') &
                (df_config['Tipo de Preprocesamiento Según la Fuente'] == 'Normal')
            ].copy()
            num_configs = len(self.config_data)
            self.logger.info(f"Se encontraron {num_configs} configuraciones para procesar")
            if num_configs == 0:
                self.logger.warning("No se encontraron configuraciones que cumplan los criterios")
                return None
            return self.config_data
        except Exception as e:
            self.logger.error(f"Error al leer configuración: {str(e)}")
            return None

    def encontrar_ruta_archivo(self, variable, tipo_macro):
        ruta_base = os.path.join(self.data_root, tipo_macro)
        nombre_archivo = f"{variable}.csv"
        ruta_completa = os.path.join(ruta_base, nombre_archivo)
        if os.path.exists(ruta_completa):
            return ruta_completa
        nombre_archivo_alt = f"{variable}.xlsx"
        ruta_completa_alt = os.path.join(ruta_base, nombre_archivo_alt)
        if os.path.exists(ruta_completa_alt):
            return ruta_completa_alt
        for root, dirs, files in os.walk(self.data_root):
            if nombre_archivo in files:
                return os.path.join(root, nombre_archivo)
            if nombre_archivo_alt in files:
                return os.path.join(root, nombre_archivo_alt)
        return None

    def leer_csv_adaptativo(self, ruta_archivo, variable):
        """
        Lee un archivo CSV adaptándose a diferentes configuraciones de separadores,
        codificaciones y formatos decimales.
        """
        configuraciones = [
            # Formato americano
            {'sep': ',', 'decimal': '.', 'encoding': 'utf-8'},
            {'sep': ',', 'decimal': '.', 'encoding': 'latin1'},
            # Formato europeo
            {'sep': ';', 'decimal': ',', 'thousands': '.', 'encoding': 'utf-8'},
            {'sep': ';', 'decimal': ',', 'thousands': '.', 'encoding': 'latin1'},
            {'sep': ',', 'decimal': ',', 'thousands': '.', 'encoding': 'utf-8'},
            # Otros comunes
            {'sep': '\t', 'encoding': 'utf-8'},
            {'sep': ' ', 'encoding': 'utf-8'}
        ]
        # Si ya se conoce una configuración para esta variable, intentarla primero
        if variable in self.formatos_conocidos:
            config_conocida = self.formatos_conocidos[variable]
            try:
                df = pd.read_csv(ruta_archivo, **config_conocida)
                if len(df) > 0:
                    return df
            except Exception:
                pass

        errores = []
        for idx, config in enumerate(configuraciones):
            try:
                df = pd.read_csv(ruta_archivo, **config)
                if len(df) > 0:
                    self.formatos_conocidos[variable] = config
                    return df
            except Exception as e:
                errores.append(f"Config {idx}: {str(e)}")
        self.logger.error(f"No se pudo leer {ruta_archivo} con ninguna configuración")
        for error in errores:
            self.logger.debug(f"- {error}")
        return None

    # ---------------------------------------------------
    # Detección automática de columnas de fecha y valor
    # ---------------------------------------------------
    def detectar_columnas(self, df):
        """
        Detecta automáticamente las columnas de fecha y valor en un DataFrame.
        Returns:
            tuple: (columna_fecha, columna_valor) o (None, None)
        """
        candidatos_fecha = [col for col in df.columns if any(
            palabra in col.lower() for palabra in ['date', 'fecha', 'time', 'día', 'day', 'periodo']
        )]
        candidatos_valor = [col for col in df.columns if any(
            palabra in col.lower() for palabra in ['price', 'precio', 'close', 'cierre', 'último', 'ultimo', 'valor', 'value']
        )]

        columnas_datetime = [col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])]
        columna_fecha = None
        if columnas_datetime:
            columna_fecha = columnas_datetime[0]
        elif candidatos_fecha:
            for col in candidatos_fecha:
                try:
                    pd.to_datetime(df[col].iloc[:5])
                    columna_fecha = col
                    break
                except Exception:
                    continue
        if columna_fecha is None and len(df.columns) > 0:
            try:
                pd.to_datetime(df[df.columns[0]].iloc[:5])
                columna_fecha = df.columns[0]
            except Exception:
                pass

        columna_valor = None
        if candidatos_valor:
            columna_valor = candidatos_valor[0]
        elif len(df.columns) > 1:
            columna_valor = df.columns[1]
        return columna_fecha, columna_valor

    # ---------------------------------------------------
    # Conversión de fechas
    # ---------------------------------------------------
    def convertir_fecha(self, fecha_str):
        return convertir_fecha(fecha_str)

    # ---------------------------------------------------
    # Versión optimizada de limpiar_valor_porcentaje
    # ---------------------------------------------------
    def limpiar_valor_porcentaje(self, valor, variable=None):
        return convertir_valor(valor, variable, self.formatos_numericos.formatos_cache)

    # ---------------------------------------------------
    # Procesar archivo individual
    # ---------------------------------------------------
    def procesar_archivo(self, config_row):
        variable = config_row['Variable']
        tipo_macro = config_row['Tipo Macro']
        target_col = config_row['TARGET']

        ruta_archivo = self.encontrar_ruta_archivo(variable, tipo_macro)
        self.logger.info(f"\nProcesando: {variable} ({tipo_macro})")
        self.logger.info(f"- Archivo: {variable}")
        self.logger.info(f"- Columna TARGET: {target_col}")
        if ruta_archivo is None:
            self.logger.error(f"- ERROR: Archivo no encontrado: {variable}")
            return variable, None
        self.logger.info(f"- Ruta encontrada: {ruta_archivo}")

        try:
            _, extension = os.path.splitext(ruta_archivo)
            extension = extension.lower()
            if extension == '.csv':
                df = self.leer_csv_adaptativo(ruta_archivo, variable)
            elif extension in ['.xlsx', '.xls']:
                df = pd.read_excel(ruta_archivo, engine='openpyxl')
            else:
                self.logger.error(f"- ERROR: Formato de archivo no soportado: {extension}")
                return variable, None
            if df is None or len(df) == 0:
                self.logger.error("- ERROR: El archivo está vacío o no se pudo leer")
                return variable, None

            col_fecha, col_valor = self.detectar_columnas(df)
            if col_fecha is None or col_valor is None:
                self.logger.error("- ERROR: No se pudieron detectar columnas necesarias (fecha o valor)")
                return variable, None

            # Procesar fechas
            df['fecha'] = df[col_fecha].apply(self.convertir_fecha)
            df = df.dropna(subset=['fecha'])
            # Detectar formato numérico en una muestra
            muestra_valores = df[col_valor].astype(str).head(20).tolist()
            formato_detectado = self.formatos_numericos.detectar_formato(muestra_valores, variable)
            self.logger.info(f"Formato numérico detectado para {variable}: {formato_detectado}")

            # Procesar valores con la función central
            df['valor'] = df[col_valor].apply(lambda x: self.limpiar_valor_porcentaje(x, variable))
            df = df.dropna(subset=['valor'])

            total_filas = len(df)
            if total_filas == 0:
                self.logger.error(f"- ERROR: No se encontraron valores válidos en {variable}")
                return variable, None

            # Renombrar la columna de valor según el patrón
            nuevo_nombre = f"{target_col}_{variable}_{tipo_macro}"
            df.rename(columns={'valor': nuevo_nombre}, inplace=True)

            # Seleccionar solo las columnas relevantes y ordenar por fecha
            df_procesado = df[['fecha', nuevo_nombre]].copy()
            df_procesado = df_procesado.sort_values('fecha')

            # Actualizar fechas globales
            fecha_min = df_procesado['fecha'].min()
            fecha_max = df_procesado['fecha'].max()
            if self.fecha_min_global is None or fecha_min < self.fecha_min_global:
                self.fecha_min_global = fecha_min
            if self.fecha_max_global is None or fecha_max > self.fecha_max_global:
                self.fecha_max_global = fecha_max

            # Registrar estadísticas
            self.estadisticas[variable] = {
                'tipo_macro': tipo_macro,
                'columna_target': target_col,
                'total_filas': total_filas,
                'valores_validos': df_procesado[nuevo_nombre].count(),
                'fecha_min': fecha_min,
                'fecha_max': fecha_max,
                'nuevo_nombre': nuevo_nombre
            }
            self.logger.info(f"- {variable}: {total_filas} filas procesadas, periodo: {fecha_min.strftime('%Y-%m-%d')} a {fecha_max.strftime('%Y-%m-%d')}")
            return variable, df_procesado
        except Exception as e:
            self.logger.error(f"- ERROR al procesar {ruta_archivo}: {str(e)}")
            return variable, None

    # ---------------------------------------------------
    # Generar índice diario y combinar datos
    # ---------------------------------------------------
    def generar_indice_diario(self):
        if self.fecha_min_global is None or self.fecha_max_global is None:
            self.logger.error("No se pudieron determinar fechas mínima y máxima globales")
            return None
        self.logger.info("\nGenerando índice temporal diario...")
        todas_fechas = pd.date_range(start=self.fecha_min_global, end=self.fecha_max_global, freq='D')
        self.indice_diario = pd.DataFrame({'fecha': todas_fechas})
        self.logger.info(f"- Total de fechas diarias generadas: {len(self.indice_diario)}")
        return self.indice_diario

    def combinar_datos(self):
        if not self.datos_procesados:
            self.logger.error("No hay datos procesados para combinar")
            return None
        if self.indice_diario is None:
            self.logger.error("No se ha generado el índice diario")
            return None
        self.logger.info("\nCombinando datos con índice diario...")
        df_combinado = self.indice_diario.copy()
        for variable, df in self.datos_procesados.items():
            if df is None:
                self.logger.warning(f"Omitiendo {variable} por errores de procesamiento")
                continue
            nombre_col = df.columns[1]
            self.logger.info(f"- Combinando: {nombre_col}")
            df['fecha'] = pd.to_datetime(df['fecha']).dt.normalize()
            filas_antes = len(df_combinado)
            df_combinado = pd.merge(df_combinado, df, on='fecha', how='left')
            filas_despues = len(df_combinado)
            if filas_antes != filas_despues:
                self.logger.error(f"- ERROR: Cambio en número de filas después del merge de {nombre_col}: {filas_antes} -> {filas_despues}")
            df_combinado[nombre_col] = df_combinado[nombre_col].ffill()
        self.df_combinado = df_combinado
        self.logger.info(f"- DataFrame combinado: {len(df_combinado)} filas, {len(df_combinado.columns)} columnas")
        return self.df_combinado

    def analizar_cobertura_final(self):
        if self.df_combinado is None or not self.estadisticas:
            self.logger.error("No hay datos combinados o estadísticas para analizar")
            return
        self.logger.info("\n" + "=" * 50)
        self.logger.info("RESUMEN DE COBERTURA FINAL")
        self.logger.info("=" * 50)
        total_indicadores = len(self.estadisticas)
        total_dias = len(self.indice_diario)
        self.logger.info(f"Total indicadores procesados: {total_indicadores}")
        self.logger.info(f"Rango de fechas: {self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}")
        self.logger.info(f"Total días en la serie: {total_dias}")
        for variable, stats in self.estadisticas.items():
            cobertura = (stats['valores_validos'] / total_dias) * 100
            self.logger.info(f"- {variable} ({stats['nuevo_nombre']}): Cobertura aproximada {cobertura:.2f}%")

    def generar_estadisticas_valores(self):
        if self.df_combinado is None:
            self.logger.error("No hay datos combinados para analizar")
            return
        self.logger.info("\n" + "=" * 50)
        self.logger.info("ESTADÍSTICAS DE VALORES")
        self.logger.info("=" * 50)
        for col in self.df_combinado.columns:
            if col == 'fecha':
                continue
            serie = self.df_combinado[col].dropna()
            if len(serie) == 0:
                self.logger.warning(f"La columna {col} no tiene valores")
                continue
            stats = {
                'min': serie.min(),
                'max': serie.max(),
                'mean': serie.mean(),
                'median': serie.median(),
                'std': serie.std()
            }
            self.logger.info(f"\nEstadísticas para {col}:")
            self.logger.info(f"- Min: {stats['min']:.4f}")
            self.logger.info(f"- Max: {stats['max']:.4f}")
            self.logger.info(f"- Media: {stats['mean']:.4f}")
            self.logger.info(f"- Mediana: {stats['median']:.4f}")
            self.logger.info(f"- Desv. Estándar: {stats['std']:.4f}")
            if col in self.estadisticas:
                self.estadisticas[col] = {**self.estadisticas.get(col, {}), **stats}

    def guardar_resultados(self, output_file='datos_economicos_procesados.xlsx'):
        if self.df_combinado is None:
            self.logger.error("No hay datos para guardar")
            return False
        try:
            self.logger.info("\n" + "=" * 50)
            self.logger.info("GUARDANDO RESULTADOS")
            self.logger.info("=" * 50)
            self.logger.info(f"Guardando resultados en: {output_file}")
            with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
                self.df_combinado.to_excel(writer, sheet_name='Datos Diarios', index=False)
                df_stats = pd.DataFrame()
                for var, stats in self.estadisticas.items():
                    serie = pd.Series(stats, name=var)
                    df_temp = pd.DataFrame(serie).transpose()
                    df_stats = pd.concat([df_stats, df_temp])
                df_stats.to_excel(writer, sheet_name='Estadisticas')
                metadata = {
                    'Proceso': ['MyinvestingreportNormal'],
                    'Fecha de proceso': [datetime.now().strftime('%Y-%m-%d %H:%M:%S')],
                    'Total indicadores': [len(self.estadisticas)],
                    'Periodo': [f"{self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}"],
                    'Total días': [len(self.indice_diario)]
                }
                pd.DataFrame(metadata).to_excel(writer, sheet_name='Metadatos')
            self.logger.info(f"Archivo guardado exitosamente: {output_file}")
            return True
        except Exception as e:
            self.logger.error(f"Error al guardar resultados: {str(e)}")
            return False

    def ejecutar_proceso_completo(self, output_file='datos_economicos_normales_procesados.xlsx'):
        inicio = time.time()
        self.logger.info("Iniciando proceso completo MyinvestingreportNormal...")
        self.leer_configuracion()
        if self.config_data is None or len(self.config_data) == 0:
            return False
        for _, config_row in self.config_data.iterrows():
            variable, df_procesado = self.procesar_archivo(config_row)
            self.datos_procesados[variable] = df_procesado
        archivos_correctos = sum(1 for df in self.datos_procesados.values() if df is not None)
        if archivos_correctos == 0:
            self.logger.error("No se pudo procesar correctamente ningún archivo")
            return False
        self.generar_indice_diario()
        if self.indice_diario is None:
            return False
        self.combinar_datos()
        if self.df_combinado is None:
            return False
        self.analizar_cobertura_final()
        self.generar_estadisticas_valores()
        resultado = self.guardar_resultados(output_file)
        fin = time.time()
        tiempo_ejecucion = fin - inicio
        self.logger.info("\n" + "=" * 50)
        self.logger.info("RESUMEN DE EJECUCIÓN")
        self.logger.info("=" * 50)
        self.logger.info(f"Proceso: MyinvestingreportNormal")
        self.logger.info(f"Tiempo de ejecución: {tiempo_ejecucion:.2f} segundos")
        self.logger.info(f"Archivos procesados: {len(self.datos_procesados)}")
        self.logger.info(f"Archivos con error: {sum(1 for df in self.datos_procesados.values() if df is None)}")
        self.logger.info(f"Archivos procesados correctamente: {archivos_correctos}")
        self.logger.info(f"Periodo de datos: {self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}")
        self.logger.info(f"Datos combinados: {len(self.df_combinado)} filas, {len(self.df_combinado.columns)} columnas")
        self.logger.info(f"Archivo de salida: {output_file}")
        self.logger.info(f"Estado: {'COMPLETADO' if resultado else 'ERROR'}")
        self.logger.info("=" * 50)
        return resultado


# ---------------------------------------------------
# Función principal para ejecutar el proceso
# ---------------------------------------------------
def ejecutar_myinvestingreportnormal(config_file='Data Engineering.xlsx',
                                    output_file='datos_economicos_normales_procesados.xlsx',
                                    data_root='data/Macro/raw',
                                    log_file='myinvestingreportnormal.log'):
    procesador = MyinvestingreportNormal(config_file, data_root, log_file)
    return procesador.ejecutar_proceso_completo(output_file)


# ---------------------------------------------------
# Ejecución principal
# ---------------------------------------------------
if __name__ == "__main__":
    resultado = ejecutar_myinvestingreportnormal()
    print(f"Proceso {'completado exitosamente' if resultado else 'finalizado con errores'}")


2025-03-28 12:09:03,549 [INFO] INICIANDO PROCESO: MyinvestingreportNormal
2025-03-28 12:09:03,552 [INFO] Archivo de configuración: Data Engineering.xlsx
2025-03-28 12:09:03,556 [INFO] Directorio raíz de datos: data/Macro/raw
2025-03-28 12:09:03,558 [INFO] Fecha y hora: 2025-03-28 12:09:03
2025-03-28 12:09:03,561 [INFO] Iniciando proceso completo MyinvestingreportNormal...
2025-03-28 12:09:03,564 [INFO] Leyendo archivo de configuración...
2025-03-28 12:09:03,731 [INFO] Se encontraron 28 configuraciones para procesar
2025-03-28 12:09:03,739 [INFO] 
Procesando: Australia_10Y_Bond (bond)
2025-03-28 12:09:03,740 [INFO] - Archivo: Australia_10Y_Bond
2025-03-28 12:09:03,742 [INFO] - Columna TARGET: PRICE
2025-03-28 12:09:03,746 [INFO] - Ruta encontrada: data/Macro/raw\bond\Australia_10Y_Bond.csv
2025-03-28 12:09:04,320 [INFO] Formato numérico detectado para Australia_10Y_Bond: americano
2025-03-28 12:09:04,332 [INFO] - Australia_10Y_Bond: 3810 filas procesadas, periodo: 2014-01-01 a 2025-12-0

Proceso completado exitosamente


In [15]:
import pandas as pd
import numpy as np
import os
import re
import time
import logging
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# ---------------------------------------------------
# Configuración de logging
# ---------------------------------------------------
def configurar_logging(log_file='myinvestingreportnormal.log'):
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger('MyinvestingreportNormal')

# ---------------------------------------------------
# Función para convertir valores numéricos
# ---------------------------------------------------
def convertir_valor(valor, variable=None, formatos_conocidos=None):
    """
    Convierte cualquier representación de valor numérico a float.
    """
    if isinstance(valor, (int, float)):
        return float(valor)
    
    if not isinstance(valor, str) or valor is None:
        return None

    valor_limpio = valor.strip()
    if not valor_limpio:
        return None

    # Aplicar formato conocido si existe
    if variable and formatos_conocidos and variable in formatos_conocidos:
        formato = formatos_conocidos[variable]
        if formato == 'europeo':
            valor_limpio = valor_limpio.replace('.', '')
            valor_limpio = valor_limpio.replace(',', '.')
    
    # Multiplicadores para sufijos (K, M, etc.)
    multiplicadores = {'%': 1, 'K': 1e3, 'M': 1e6, 'B': 1e9, 'T': 1e12}
    multiplicador = 1
    for sufijo, mult in multiplicadores.items():
        if valor_limpio.endswith(sufijo):
            valor_limpio = valor_limpio.replace(sufijo, '')
            multiplicador = mult
            break

    # Ajuste de separadores: si se detectan ambos, se decide según la posición
    if ',' in valor_limpio and '.' in valor_limpio:
        if valor_limpio.rfind(',') > valor_limpio.rfind('.'):
            valor_limpio = valor_limpio.replace('.', '')
            valor_limpio = valor_limpio.replace(',', '.')
        else:
            valor_limpio = valor_limpio.replace(',', '')
    elif ',' in valor_limpio:
        partes = valor_limpio.split(',')
        if len(partes) == 2 and len(partes[1]) <= 2:
            valor_limpio = valor_limpio.replace(',', '.')
        else:
            valor_limpio = valor_limpio.replace(',', '')
    
    try:
        return float(valor_limpio) * multiplicador
    except (ValueError, TypeError):
        return None

# ---------------------------------------------------
# Detección dinámica del formato de fechas
# ---------------------------------------------------
def detectar_formato_fecha_inteligente(df, col_fecha, muestra_registros=10):
    """
    Analiza una muestra de la columna de fecha para determinar si se debe forzar dayfirst=True.
    Retorna un diccionario con {'dayfirst': bool, 'confianza': float}
    """
    fecha_actual = pd.Timestamp(datetime.now().date())
    muestras = df[col_fecha].dropna().astype(str).head(muestra_registros).tolist()
    
    resultados = {
        'dayfirst': {'validas': 0, 'invalidas': 0, 'futuras': 0},
        'no_dayfirst': {'validas': 0, 'invalidas': 0, 'futuras': 0}
    }
    
    for fecha_str in muestras:
        fecha_str = fecha_str.strip()
        ambigua = False
        # Identifica fechas con formato numérico separado por /, - o .
        if re.match(r'^\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{4}$', fecha_str):
            separador = re.findall(r'[\/\-\.]', fecha_str)[0]
            partes = fecha_str.split(separador)
            if len(partes) == 3:
                try:
                    p1, p2 = int(partes[0]), int(partes[1])
                    # Si ambos números son menores o iguales a 12, es ambigua
                    if p1 <= 12 and p2 <= 12:
                        ambigua = True
                except:
                    pass

        for modo, dayfirst in [('dayfirst', True), ('no_dayfirst', False)]:
            try:
                fecha = pd.to_datetime(fecha_str, dayfirst=dayfirst)
                if fecha > fecha_actual + pd.Timedelta(days=30):
                    resultados[modo]['futuras'] += 1
                else:
                    resultados[modo]['validas'] += 1
            except:
                resultados[modo]['invalidas'] += 1

    score_dayfirst = resultados['dayfirst']['validas'] - (resultados['dayfirst']['invalidas'] * 0.5) - (resultados['dayfirst']['futuras'] * 2)
    score_no_dayfirst = resultados['no_dayfirst']['validas'] - (resultados['no_dayfirst']['invalidas'] * 0.5) - (resultados['no_dayfirst']['futuras'] * 2)
    usar_dayfirst = score_dayfirst > score_no_dayfirst
    confianza = abs(score_dayfirst - score_no_dayfirst) / (muestra_registros * 2)
    return {'dayfirst': usar_dayfirst, 'confianza': confianza}

# ---------------------------------------------------
# Conversión adaptativa de fechas
# ---------------------------------------------------
def convertir_fecha_adaptativo(fecha_str, configuracion_archivo=None):
    """
    Convierte una fecha a pd.Timestamp usando la configuración detectada.
    Si la cadena contiene puntos (.) y coincide con el patrón DD.MM.YYYY, se fuerza el formato.
    """
    # Si ya es Timestamp o datetime, retornarla
    if isinstance(fecha_str, (pd.Timestamp, datetime)):
        return pd.Timestamp(fecha_str)
    if pd.isna(fecha_str):
        return None
    fecha_str = str(fecha_str).strip()
    # Si se detecta un punto como separador y el patrón es DD.MM.YYYY, forzamos el formato
    if re.match(r'^\d{1,2}\.\d{1,2}\.\d{4}$', fecha_str):
        try:
            fecha = pd.to_datetime(fecha_str, format='%d.%m.%Y', dayfirst=True)
            return fecha
        except Exception:
            pass

    # Si hay configuración detectada, usarla
    if configuracion_archivo is not None:
        try:
            fecha = pd.to_datetime(fecha_str, dayfirst=configuracion_archivo['dayfirst'])
            return fecha
        except Exception:
            pass

    # Intento estándar sin forzar dayfirst
    try:
        fecha = pd.to_datetime(fecha_str)
        return fecha
    except Exception:
        return None

# ---------------------------------------------------
# Clase para gestionar y cachear formatos de fechas
# ---------------------------------------------------
class FormatosFechas:
    """Gestiona y cachea la configuración de conversión de fechas por archivo."""
    def __init__(self):
        self.formatos_cache = {}  # {variable: configuracion}
    
    def detectar_formato(self, df, col_fecha, variable=None):
        configuracion = detectar_formato_fecha_inteligente(df, col_fecha)
        if variable:
            self.formatos_cache[variable] = configuracion
        return configuracion
        
    def obtener_formato(self, variable):
        return self.formatos_cache.get(variable)

# ---------------------------------------------------
# Clase principal para procesar los datos económicos
# ---------------------------------------------------
class MyinvestingreportNormal:
    """
    Procesa datos económicos con detección dinámica de formatos de fechas y 
    validación para evitar interpretaciones erróneas (como fechas en abril cuando
    los datos crudos solo llegan hasta marzo).
    """
    def __init__(self, config_file, data_root='data/Macro/raw', log_file='myinvestingreportnormal.log'):
        self.config_file = config_file
        self.data_root = data_root
        self.logger = configurar_logging(log_file)
        self.config_data = None
        self.fecha_min_global = None
        self.fecha_max_global = None
        self.archivo_fecha_min = None
        self.archivo_fecha_max = None
        self.indice_diario = None
        self.datos_procesados = {}
        self.df_combinado = None
        self.estadisticas = {}

        self.formatos_numericos = {}  # Se usará un diccionario para formatos numéricos simples (si se desea ampliar)
        self.formatos_fechas = FormatosFechas()
        self.formatos_conocidos = {}  # Configuraciones exitosas de CSV

        self.inicializar_formatos_conocidos()

        self.logger.info("=" * 80)
        self.logger.info("INICIANDO PROCESO: MyinvestingreportNormal")
        self.logger.info(f"Archivo de configuración: {config_file}")
        self.logger.info(f"Directorio raíz de datos: {data_root}")
        self.logger.info(f"Fecha y hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        self.logger.info("=" * 80)

    def inicializar_formatos_conocidos(self):
        # Para algunos índices se conoce el formato numérico; aquí se puede ampliar si es necesario.
        indices_europeos = ['Russell_2000', 'NASDAQ_Composite', 'S&P500_Index', 'Nikkei_225', 'DAX_30', 'VIX_VolatilityIndex']
        for indice in indices_europeos:
            self.formatos_conocidos[indice] = 'europeo'

    def leer_configuracion(self):
        self.logger.info("Leyendo archivo de configuración...")
        try:
            df_config = pd.read_excel(self.config_file)
            self.config_data = df_config[
                (df_config['Fuente'] == 'Investing Data') &
                (df_config['Tipo de Preprocesamiento Según la Fuente'] == 'Normal')
            ].copy()
            num_configs = len(self.config_data)
            self.logger.info(f"Se encontraron {num_configs} configuraciones para procesar")
            if num_configs == 0:
                self.logger.warning("No se encontraron configuraciones que cumplan los criterios")
                return None
            return self.config_data
        except Exception as e:
            self.logger.error(f"Error al leer configuración: {str(e)}")
            return None

    def encontrar_ruta_archivo(self, variable, tipo_macro):
        ruta_base = os.path.join(self.data_root, tipo_macro)
        nombre_archivo = f"{variable}.csv"
        ruta_completa = os.path.join(ruta_base, nombre_archivo)
        if os.path.exists(ruta_completa):
            return ruta_completa
        nombre_archivo_alt = f"{variable}.xlsx"
        ruta_completa_alt = os.path.join(ruta_base, nombre_archivo_alt)
        if os.path.exists(ruta_completa_alt):
            return ruta_completa_alt
        for root, dirs, files in os.walk(self.data_root):
            if nombre_archivo in files:
                return os.path.join(root, nombre_archivo)
            if nombre_archivo_alt in files:
                return os.path.join(root, nombre_archivo_alt)
        return None

    def leer_csv_adaptativo(self, ruta_archivo, variable):
        configuraciones = [
            {'sep': ',', 'decimal': '.', 'encoding': 'utf-8'},
            {'sep': ',', 'decimal': '.', 'encoding': 'latin1'},
            {'sep': ';', 'decimal': ',', 'thousands': '.', 'encoding': 'utf-8'},
            {'sep': ';', 'decimal': ',', 'thousands': '.', 'encoding': 'latin1'},
            {'sep': ',', 'decimal': ',', 'thousands': '.', 'encoding': 'utf-8'},
            {'sep': '\t', 'encoding': 'utf-8'},
            {'sep': ' ', 'encoding': 'utf-8'}
        ]
        if variable in self.formatos_conocidos:
            config_conocida = self.formatos_conocidos[variable]
            try:
                df = pd.read_csv(ruta_archivo, **config_conocida)
                if len(df) > 0:
                    return df
            except Exception:
                pass
        errores = []
        for idx, config in enumerate(configuraciones):
            try:
                df = pd.read_csv(ruta_archivo, **config)
                if len(df) > 0:
                    self.formatos_conocidos[variable] = config
                    return df
            except Exception as e:
                errores.append(f"Config {idx}: {str(e)}")
        self.logger.error(f"No se pudo leer {ruta_archivo} con ninguna configuración")
        for error in errores:
            self.logger.debug(f"- {error}")
        return None

    def detectar_columnas(self, df):
        candidatos_fecha = [col for col in df.columns if any(
            palabra in col.lower() for palabra in ['date', 'fecha', 'time', 'día', 'day', 'periodo']
        )]
        candidatos_valor = [col for col in df.columns if any(
            palabra in col.lower() for palabra in ['price', 'precio', 'close', 'cierre', 'último', 'ultimo', 'valor', 'value']
        )]
        columnas_datetime = [col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])]
        columna_fecha = None
        if columnas_datetime:
            columna_fecha = columnas_datetime[0]
        elif candidatos_fecha:
            for col in candidatos_fecha:
                try:
                    pd.to_datetime(df[col].iloc[:5])
                    columna_fecha = col
                    break
                except Exception:
                    continue
        if columna_fecha is None and len(df.columns) > 0:
            try:
                pd.to_datetime(df[df.columns[0]].iloc[:5])
                columna_fecha = df.columns[0]
            except Exception:
                pass

        columna_valor = None
        if candidatos_valor:
            columna_valor = candidatos_valor[0]
        elif len(df.columns) > 1:
            columna_valor = df.columns[1]
        return columna_fecha, columna_valor

    def convertir_fecha(self, fecha_str, configuracion_archivo=None):
        return convertir_fecha_adaptativo(fecha_str, configuracion_archivo)

    def limpiar_valor_porcentaje(self, valor, variable=None):
        return convertir_valor(valor, variable, self.formatos_conocidos)

    def procesar_archivo(self, config_row):
        variable = config_row['Variable']
        tipo_macro = config_row['Tipo Macro']
        target_col = config_row['TARGET']

        ruta_archivo = self.encontrar_ruta_archivo(variable, tipo_macro)
        self.logger.info(f"\nProcesando: {variable} ({tipo_macro})")
        self.logger.info(f"- Archivo: {variable}")
        self.logger.info(f"- Columna TARGET: {target_col}")
        if ruta_archivo is None:
            self.logger.error(f"- ERROR: Archivo no encontrado: {variable}")
            return variable, None
        self.logger.info(f"- Ruta encontrada: {ruta_archivo}")

        try:
            _, extension = os.path.splitext(ruta_archivo)
            extension = extension.lower()
            if extension == '.csv':
                df = self.leer_csv_adaptativo(ruta_archivo, variable)
            elif extension in ['.xlsx', '.xls']:
                df = pd.read_excel(ruta_archivo, engine='openpyxl')
            else:
                self.logger.error(f"- ERROR: Formato de archivo no soportado: {extension}")
                return variable, None
            if df is None or len(df) == 0:
                self.logger.error("- ERROR: El archivo está vacío o no se pudo leer")
                return variable, None

            col_fecha, col_valor = self.detectar_columnas(df)
            if col_fecha is None or col_valor is None:
                self.logger.error("- ERROR: No se pudieron detectar columnas necesarias (fecha o valor)")
                return variable, None

            self.logger.info(f"Columna de fecha detectada: {col_fecha}")
            # Detección dinámica del formato de fechas
            config_fecha = self.formatos_fechas.detectar_formato(df, col_fecha, variable)
            self.logger.info(f"Formato de fecha detectado para {variable}: dayfirst={config_fecha['dayfirst']} (confianza: {config_fecha['confianza']:.2f})")
            
            # Convertir fechas usando la configuración detectada
            df['fecha'] = df[col_fecha].apply(lambda x: self.convertir_fecha(x, configuracion_archivo=config_fecha))
            df = df.dropna(subset=['fecha'])
            ejemplos = df['fecha'].head(5).tolist()
            self.logger.info(f"Ejemplos de fechas convertidas para {variable}: {ejemplos}")
            
            # Validar fechas excesivamente futuras (más de 30 días a partir de hoy)
            fecha_actual = pd.Timestamp(datetime.now().date())
            fechas_futuras = df[df['fecha'] > fecha_actual + pd.Timedelta(days=30)]
            if len(fechas_futuras) > 0:
                self.logger.warning(f"Se detectaron {len(fechas_futuras)} fechas futuras anómalas en {variable}")
                self.logger.warning(f"Ejemplos de fechas futuras: {fechas_futuras['fecha'].head(3).tolist()}")
                df = df[df['fecha'] <= fecha_actual + pd.Timedelta(days=30)].copy()
            
            muestra_valores = df[col_valor].astype(str).head(20).tolist()
            formato_detectado = "americano"  # Se usa como etiqueta básica
            self.logger.info(f"Formato numérico detectado para {variable}: {formato_detectado}")

            df['valor'] = df[col_valor].apply(lambda x: self.limpiar_valor_porcentaje(x, variable))
            df = df.dropna(subset=['valor'])

            total_filas = len(df)
            if total_filas == 0:
                self.logger.error(f"- ERROR: No se encontraron valores válidos en {variable}")
                return variable, None

            nuevo_nombre = f"{target_col}_{variable}_{tipo_macro}"
            df.rename(columns={'valor': nuevo_nombre}, inplace=True)
            df_procesado = df[['fecha', nuevo_nombre]].copy()
            df_procesado = df_procesado.sort_values('fecha')

            fecha_min = df_procesado['fecha'].min()
            fecha_max = df_procesado['fecha'].max()
            self.logger.info(f"Para {variable} (columna {col_fecha}), la fecha mínima es {fecha_min} y la fecha máxima es {fecha_max}")
            
            if self.fecha_min_global is None or fecha_min < self.fecha_min_global:
                self.fecha_min_global = fecha_min
                self.archivo_fecha_min = variable
            if self.fecha_max_global is None or fecha_max > self.fecha_max_global:
                self.fecha_max_global = fecha_max
                self.archivo_fecha_max = variable

            self.estadisticas[variable] = {
                'tipo_macro': tipo_macro,
                'columna_target': target_col,
                'total_filas': total_filas,
                'valores_validos': df_procesado[nuevo_nombre].count(),
                'fecha_min': fecha_min,
                'fecha_max': fecha_max,
                'nuevo_nombre': nuevo_nombre,
                'formato_fecha': f"dayfirst={config_fecha['dayfirst']}",
                'confianza_formato': config_fecha['confianza']
            }
            self.logger.info(f"- {variable}: {total_filas} filas procesadas, periodo: {fecha_min.strftime('%Y-%m-%d')} a {fecha_max.strftime('%Y-%m-%d')}")
            return variable, df_procesado
        except Exception as e:
            self.logger.error(f"- ERROR al procesar {ruta_archivo}: {str(e)}")
            return variable, None

    def generar_indice_diario(self):
        if self.fecha_min_global is None or self.fecha_max_global is None:
            self.logger.error("No se pudieron determinar fechas mínima y máxima globales")
            return None
        self.logger.info("\nGenerando índice temporal diario...")
        self.logger.info(f"Archivo con fecha mínima global: {self.archivo_fecha_min} ({self.fecha_min_global.strftime('%Y-%m-%d')})")
        self.logger.info(f"Archivo con fecha máxima global: {self.archivo_fecha_max} ({self.fecha_max_global.strftime('%Y-%m-%d')})")
        todas_fechas = pd.date_range(start=self.fecha_min_global, end=self.fecha_max_global, freq='D')
        self.indice_diario = pd.DataFrame({'fecha': todas_fechas})
        self.logger.info(f"- Total de fechas diarias generadas: {len(self.indice_diario)}")
        return self.indice_diario

    def combinar_datos(self):
        if not self.datos_procesados:
            self.logger.error("No hay datos procesados para combinar")
            return None
        if self.indice_diario is None:
            self.logger.error("No se ha generado el índice diario")
            return None
        self.logger.info("\nCombinando datos con índice diario (usando join para reducir consumo de memoria)...")
        df_combinado = self.indice_diario.copy().set_index('fecha')
        for variable, df in self.datos_procesados.items():
            if df is None:
                self.logger.warning(f"Omitiendo {variable} por errores de procesamiento")
                continue
            nombre_col = df.columns[1]
            self.logger.info(f"- Combinando: {nombre_col}")
            df_temp = df.set_index('fecha')[[nombre_col]]
            df_temp = df_temp[~df_temp.index.duplicated(keep='first')]
            df_temp = df_temp.reindex(df_combinado.index)
            df_temp = df_temp.ffill()
            df_combinado = df_combinado.join(df_temp)
        self.df_combinado = df_combinado.reset_index()
        self.logger.info(f"- DataFrame combinado: {len(self.df_combinado)} filas, {len(self.df_combinado.columns)} columnas")
        return self.df_combinado

    def analizar_cobertura_final(self):
        if self.df_combinado is None or not self.estadisticas:
            self.logger.error("No hay datos combinados o estadísticas para analizar")
            return
        self.logger.info("\n" + "=" * 50)
        self.logger.info("RESUMEN DE COBERTURA FINAL")
        self.logger.info("=" * 50)
        total_indicadores = len(self.estadisticas)
        total_dias = len(self.indice_diario)
        self.logger.info(f"Total indicadores procesados: {total_indicadores}")
        self.logger.info(f"Rango de fechas: {self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}")
        self.logger.info(f"Total días en la serie: {total_dias}")
        for variable, stats in self.estadisticas.items():
            cobertura = (stats['valores_validos'] / total_dias) * 100
            self.logger.info(f"- {variable} ({stats['nuevo_nombre']}): Cobertura aproximada {cobertura:.2f}%")

    def generar_estadisticas_valores(self):
        if self.df_combinado is None:
            self.logger.error("No hay datos combinados para analizar")
            return
        self.logger.info("\n" + "=" * 50)
        self.logger.info("ESTADÍSTICAS DE VALORES")
        self.logger.info("=" * 50)
        for col in self.df_combinado.columns:
            if col == 'fecha':
                continue
            serie = self.df_combinado[col].dropna()
            if len(serie) == 0:
                self.logger.warning(f"La columna {col} no tiene valores")
                continue
            stats = {
                'min': serie.min(),
                'max': serie.max(),
                'mean': serie.mean(),
                'median': serie.median(),
                'std': serie.std()
            }
            self.logger.info(f"\nEstadísticas para {col}:")
            self.logger.info(f"- Min: {stats['min']:.4f}")
            self.logger.info(f"- Max: {stats['max']:.4f}")
            self.logger.info(f"- Media: {stats['mean']:.4f}")
            self.logger.info(f"- Mediana: {stats['median']:.4f}")
            self.logger.info(f"- Desv. Estándar: {stats['std']:.4f}")
            if col in self.estadisticas:
                self.estadisticas[col] = {**self.estadisticas.get(col, {}), **stats}

    def guardar_resultados(self, output_file='datos_economicos_normales_procesados.xlsx'):
        if self.df_combinado is None:
            self.logger.error("No hay datos para guardar")
            return False
        try:
            self.logger.info("\n" + "=" * 50)
            self.logger.info("GUARDANDO RESULTADOS")
            self.logger.info("=" * 50)
            self.logger.info(f"Guardando resultados en: {output_file}")
            with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
                self.df_combinado.to_excel(writer, sheet_name='Datos Diarios', index=False)
                df_stats = pd.DataFrame()
                for var, stats in self.estadisticas.items():
                    serie = pd.Series(stats, name=var)
                    df_temp = pd.DataFrame(serie).transpose()
                    df_stats = pd.concat([df_stats, df_temp])
                df_stats.to_excel(writer, sheet_name='Estadisticas')
                metadata = {
                    'Proceso': ['MyinvestingreportNormal'],
                    'Fecha de proceso': [datetime.now().strftime('%Y-%m-%d %H:%M:%S')],
                    'Total indicadores': [len(self.estadisticas)],
                    'Periodo': [f"{self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}"],
                    'Total días': [len(self.indice_diario)]
                }
                pd.DataFrame(metadata).to_excel(writer, sheet_name='Metadatos')
            self.logger.info(f"Archivo guardado exitosamente: {output_file}")
            return True
        except Exception as e:
            self.logger.error(f"Error al guardar resultados: {str(e)}")
            return False

    def ejecutar_proceso_completo(self, output_file='datos_economicos_normales_procesados.xlsx'):
        inicio = time.time()
        self.logger.info("Iniciando proceso completo MyinvestingreportNormal...")
        self.leer_configuracion()
        if self.config_data is None or len(self.config_data) == 0:
            return False
        for _, config_row in self.config_data.iterrows():
            variable, df_procesado = self.procesar_archivo(config_row)
            self.datos_procesados[variable] = df_procesado
        archivos_correctos = sum(1 for df in self.datos_procesados.values() if df is not None)
        if archivos_correctos == 0:
            self.logger.error("No se pudo procesar correctamente ningún archivo")
            return False
        self.generar_indice_diario()
        if self.indice_diario is None:
            return False
        self.combinar_datos()
        if self.df_combinado is None:
            return False
        self.analizar_cobertura_final()
        self.generar_estadisticas_valores()
        resultado = self.guardar_resultados(output_file)
        fin = time.time()
        tiempo_ejecucion = fin - inicio
        self.logger.info("\n" + "=" * 50)
        self.logger.info("RESUMEN DE EJECUCIÓN")
        self.logger.info("=" * 50)
        self.logger.info(f"Proceso: MyinvestingreportNormal")
        self.logger.info(f"Tiempo de ejecución: {tiempo_ejecucion:.2f} segundos")
        self.logger.info(f"Archivos procesados: {len(self.datos_procesados)}")
        self.logger.info(f"Archivos con error: {sum(1 for df in self.datos_procesados.values() if df is None)}")
        self.logger.info(f"Archivos procesados correctamente: {archivos_correctos}")
        self.logger.info(f"Periodo de datos: {self.fecha_min_global.strftime('%Y-%m-%d')} a {self.fecha_max_global.strftime('%Y-%m-%d')}")
        self.logger.info(f"Datos combinados: {len(self.df_combinado)} filas, {len(self.df_combinado.columns)} columnas")
        self.logger.info(f"Archivo de salida: {output_file}")
        self.logger.info(f"Estado: {'COMPLETADO' if resultado else 'ERROR'}")
        self.logger.info("=" * 50)
        return resultado

def ejecutar_myinvestingreportnormal(config_file='Data Engineering.xlsx',
                                     output_file='datos_economicos_normales_procesados.xlsx',
                                     data_root='data/Macro/raw',
                                     log_file='myinvestingreportnormal.log'):
    procesador = MyinvestingreportNormal(config_file, data_root, log_file)
    return procesador.ejecutar_proceso_completo(output_file)

if __name__ == "__main__":
    resultado = ejecutar_myinvestingreportnormal()
    print(f"Proceso {'completado exitosamente' if resultado else 'finalizado con errores'}")


2025-03-28 15:55:59,727 [INFO] INICIANDO PROCESO: MyinvestingreportNormal
2025-03-28 15:55:59,727 [INFO] Archivo de configuración: Data Engineering.xlsx
2025-03-28 15:55:59,729 [INFO] Directorio raíz de datos: data/Macro/raw
2025-03-28 15:55:59,731 [INFO] Fecha y hora: 2025-03-28 15:55:59
2025-03-28 15:55:59,734 [INFO] Iniciando proceso completo MyinvestingreportNormal...
2025-03-28 15:55:59,735 [INFO] Leyendo archivo de configuración...
2025-03-28 15:55:59,806 [INFO] Se encontraron 28 configuraciones para procesar
2025-03-28 15:55:59,809 [INFO] 
Procesando: Australia_10Y_Bond (bond)
2025-03-28 15:55:59,810 [INFO] - Archivo: Australia_10Y_Bond
2025-03-28 15:55:59,810 [INFO] - Columna TARGET: PRICE
2025-03-28 15:55:59,812 [INFO] - Ruta encontrada: data/Macro/raw\bond\Australia_10Y_Bond.csv
2025-03-28 15:55:59,826 [INFO] Columna de fecha detectada: Date
2025-03-28 15:55:59,842 [INFO] Formato de fecha detectado para Australia_10Y_Bond: dayfirst=False (confianza: 0.00)
2025-03-28 15:56:01,

Proceso completado exitosamente


In [24]:
import pandas as pd
import numpy as np
import os
import re
import time
import logging
from datetime import datetime, timedelta
from dateutil.parser import parse
import warnings
warnings.filterwarnings('ignore')

# Configuración de logging
def configurar_logging(log_file='myinvestingreportcp.log'):
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger('EconomicDataProcessor')

class EconomicDataProcessor:
    """
    Clase para procesar datos macroeconómicos con robustez en el manejo de fechas y 
    forward filling de series de frecuencia (por ejemplo, mensuales a diarios).

    Funcionalidades:
      - Conversión robusta de cadenas de fecha usando múltiples estrategias.
      - Transformación de series (generalmente mensuales) a datos diarios mediante merge_asof.
      - Renombrado de la columna de valores usando el patrón: 
            {target_col}_{variable}_{tipo_macro}
      - Validación y log detallado en cada etapa.
    """

    def __init__(self, config_file, data_root='data/Macro/raw', log_file='myinvestingreportcp.log'):
        self.config_file = config_file
        self.data_root = data_root
        self.logger = configurar_logging(log_file)
        self.config_data = None
        self.global_min_date = None
        self.global_max_date = None
        self.daily_index = None
        self.processed_data = {}  # Diccionario {variable: DataFrame procesado}
        self.final_df = None
        self.stats = {}
        # Cache para la preferencia de conversión de fecha
        self.date_cache = {}

        self.logger.info("=" * 80)
        self.logger.info("INICIANDO PROCESO: EconomicDataProcessor")
        self.logger.info(f"Archivo de configuración: {config_file}")
        self.logger.info(f"Directorio raíz de datos: {data_root}")
        self.logger.info(f"Fecha y hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        self.logger.info("=" * 80)

    def read_config(self):
        try:
            self.logger.info("Leyendo archivo de configuración...")
            df_config = pd.read_excel(self.config_file)
            self.config_data = df_config[
                (df_config['Fuente'] == 'Investing Data') &
                (df_config['Tipo de Preprocesamiento Según la Fuente'] == 'Copiar y Pegar')
            ].copy()
            self.logger.info(f"Se encontraron {len(self.config_data)} configuraciones para procesar")
            return self.config_data
        except Exception as e:
            self.logger.error(f"Error al leer configuración: {e}")
            return None

    def robust_parse_date(self, date_str, preferred_dayfirst=None):
        """
        Intenta convertir la cadena de fecha usando múltiples estrategias.
        - Primero, busca patrones como "Apr 01, 2025 (Mar)".
        - Luego utiliza pd.to_datetime con la opción dayfirst según la preferencia.
        - Si no se especifica, prueba ambas opciones y elige la que dé una fecha razonable.

        Args:
            date_str (str): Cadena de fecha.
            preferred_dayfirst (bool, opcional): Preferencia de interpretación.

        Returns:
            pd.Timestamp o None.
        """
        if not isinstance(date_str, str):
            return None
        date_str = date_str.strip()
        if not date_str:
            return None

        m = re.search(r'([A-Za-z]+\s+\d{1,2},\s+\d{4})', date_str)
        if m:
            candidate = m.group(1)
            try:
                parsed = pd.to_datetime(candidate, errors='coerce')
                if parsed is not None:
                    return parsed
            except Exception as e:
                self.logger.warning(f"Error al parsear patrón en '{date_str}': {e}")

        if preferred_dayfirst is not None:
            try:
                parsed = pd.to_datetime(date_str, dayfirst=preferred_dayfirst, errors='coerce')
                threshold = pd.Timestamp.today() + pd.Timedelta(days=30)
                if parsed and parsed <= threshold:
                    return parsed
            except Exception as e:
                self.logger.warning(f"Error con dayfirst={preferred_dayfirst} en '{date_str}': {e}")

        try:
            parsed_true = pd.to_datetime(date_str, dayfirst=True, errors='coerce')
            parsed_false = pd.to_datetime(date_str, dayfirst=False, errors='coerce')
            threshold = pd.Timestamp.today() + pd.Timedelta(days=30)
            valid_true = parsed_true and parsed_true <= threshold
            valid_false = parsed_false and parsed_false <= threshold
            if valid_true and not valid_false:
                return parsed_true
            elif valid_false and not valid_true:
                return parsed_false
            elif valid_true and valid_false:
                return parsed_true  # Por defecto dayfirst=True
            else:
                return parsed_true if pd.notnull(parsed_true) else parsed_false
        except Exception as e:
            self.logger.warning(f"Error en robust_parse_date para '{date_str}': {e}")
            return None

    def process_file(self, config_row):
        """
        Procesa un archivo individual:
          - Lee el archivo (se usa estrategia especial para US_Leading_EconIndex).
          - Convierte la columna 'Release Date' robustamente.
          - Detecta y convierte la columna target a numérico.
          - Renombra la columna de valor usando el patrón:
                {target_col}_{variable}_{tipo_macro}
          - Selecciona solo las columnas 'fecha' y la columna renombrada.

        Returns:
            tuple: (variable, DataFrame procesado) o (variable, None) en caso de error.
        """
        variable = config_row['Variable']
        macro_type = config_row['Tipo Macro']
        target_col = config_row['TARGET']

        # Construir la ruta (buscando también en subdirectorios)
        ruta = os.path.join(self.data_root, macro_type, f"{variable}.xlsx")
        if not os.path.exists(ruta):
            for root, dirs, files in os.walk(self.data_root):
                if f"{variable}.xlsx" in files:
                    ruta = os.path.join(root, f"{variable}.xlsx")
                    break
        if not os.path.exists(ruta):
            self.logger.error(f"Archivo no encontrado: {variable}.xlsx")
            return variable, None

        self.logger.info(f"\nProcesando: {variable} ({macro_type})")
        self.logger.info(f"- Archivo: {variable}.xlsx")
        self.logger.info(f"- Columna TARGET: {target_col}")
        self.logger.info(f"- Ruta encontrada: {ruta}")

        try:
            # Estrategia especial para US_Leading_EconIndex: ajustar header y limpiar columnas
            if variable == "US_Leading_EconIndex":
                self.logger.info("Utilizando estrategia especial para US_Leading_EconIndex (header=2)")
                df = pd.read_excel(ruta, header=2, engine='openpyxl')
                df.columns = df.columns.str.strip()
                self.logger.info(f"Columnas leídas: {df.columns.tolist()}")
            else:
                df = pd.read_excel(ruta, engine='openpyxl')
        except Exception as e:
            self.logger.error(f"Error al leer {ruta}: {e}")
            return variable, None

        self.logger.info(f"- Filas encontradas: {len(df)}")
        if 'Release Date' not in df.columns:
            self.logger.error(f"No se encontró la columna 'Release Date' en {ruta}")
            return variable, None

        # Determinar preferencia de dayfirst (cacheada)
        if ruta not in self.date_cache:
            sample = df['Release Date'].dropna().head(10)
            count_true, count_false = 0, 0
            threshold = pd.Timestamp.today() + pd.Timedelta(days=30)
            for val in sample:
                dt_true = pd.to_datetime(val, dayfirst=True, errors='coerce')
                dt_false = pd.to_datetime(val, dayfirst=False, errors='coerce')
                if pd.notnull(dt_true) and dt_true <= threshold:
                    count_true += 1
                if pd.notnull(dt_false) and dt_false <= threshold:
                    count_false += 1
            preferred = count_true >= count_false
            self.date_cache[ruta] = preferred
            self.logger.info(f"Preferencia de dayfirst para {ruta}: {preferred}")
        else:
            preferred = self.date_cache[ruta]

        df['fecha'] = df['Release Date'].apply(lambda x: self.robust_parse_date(x, preferred_dayfirst=preferred))
        df = df.dropna(subset=['fecha'])
        df = df.sort_values('fecha')

        # Si el target especificado no está, intenta buscar una alternativa
        if target_col not in df.columns:
            for col in df.columns:
                if col.strip().lower() == target_col.strip().lower():
                    target_col = col
                    self.logger.warning(f"No se encontró '{config_row['TARGET']}', se usará '{target_col}'")
                    break
        if target_col not in df.columns:
            self.logger.error(f"No se encontró columna TARGET ni alternativa en {ruta}")
            return variable, None

        # Convertir la columna target a numérico y descartar valores no válidos
        df['valor'] = pd.to_numeric(df[target_col], errors='coerce')
        df = df.dropna(subset=['valor'])
        if df.empty:
            self.logger.error(f"No se encontraron valores válidos para '{target_col}' en {ruta}")
            return variable, None

        # Actualizar rango global de fechas
        current_min = df['fecha'].min()
        current_max = df['fecha'].max()
        if self.global_min_date is None or current_min < self.global_min_date:
            self.global_min_date = current_min
        if self.global_max_date is None or current_max > self.global_max_date:
            self.global_max_date = current_max

        # Calcular cobertura (puedes ajustar la fórmula si lo deseas)
        cobertura = (len(df) / len(df)) * 100

        # RENOMBRAR LA COLUMNA: Crear un nombre único
        nuevo_nombre = f"{target_col}_{variable}_{macro_type}"
        df.rename(columns={'valor': nuevo_nombre}, inplace=True)
        self.stats[variable] = {
            'macro_type': macro_type,
            'target_column': target_col,
            'total_rows': len(df),
            'valid_values': len(df),
            'coverage': cobertura,
            'date_min': current_min,
            'date_max': current_max,
            'nuevo_nombre': nuevo_nombre
        }
        self.logger.info(f"- Valores no nulos en TARGET: {len(df)}")
        self.logger.info(f"- Periodo: {current_min.strftime('%Y-%m-%d')} a {current_max.strftime('%Y-%m-%d')}")
        self.logger.info(f"- Cobertura: {cobertura:.2f}%")
        return variable, df[['fecha', nuevo_nombre]].copy()

    def generate_daily_index(self):
        """
        Genera un DataFrame con un índice diario desde la fecha global mínima hasta la máxima.
        """
        if self.global_min_date is None or self.global_max_date is None:
            self.logger.error("No se pudieron determinar las fechas globales")
            return None
        self.daily_index = pd.DataFrame({
            'fecha': pd.date_range(start=self.global_min_date, end=self.global_max_date, freq='D')
        })
        self.logger.info(f"Índice diario generado: {len(self.daily_index)} días desde {self.global_min_date.strftime('%Y-%m-%d')} hasta {self.global_max_date.strftime('%Y-%m-%d')}")
        return self.daily_index

    def combine_data(self):
        """
        Convierte cada serie (generalmente reportada en frecuencias bajas) a datos diarios usando merge_asof.
        Para cada archivo, se asocia el dato reportado más reciente a cada día.
        """
        if self.daily_index is None:
            self.logger.error("El índice diario no ha sido generado")
            return None

        combined = self.daily_index.copy()
        for variable, df in self.processed_data.items():
            if df is None or df.empty:
                self.logger.warning(f"Omitiendo {variable} por falta de datos")
                continue
            df = df.sort_values('fecha')
            # merge_asof para asignar cada día con el valor reportado más reciente
            df_daily = pd.merge_asof(combined, df, on='fecha', direction='backward')
            col_name = self.stats[variable]['nuevo_nombre']
            # Como precaución se aplica ffill
            df_daily[col_name] = df_daily[col_name].ffill()
            combined = combined.merge(df_daily[['fecha', col_name]], on='fecha', how='left')
        self.final_df = combined
        self.logger.info(f"DataFrame final combinado: {len(self.final_df)} filas, {len(self.final_df.columns)} columnas")
        return self.final_df

    def analyze_coverage(self):
        """
        Genera un resumen de cobertura y estadísticas para cada indicador.
        """
        total_days = len(self.daily_index)
        self.logger.info("\nResumen de Cobertura:")
        for variable, stats in self.stats.items():
            self.logger.info(f"- {variable}: {stats['coverage']:.2f}% desde {stats['date_min'].strftime('%Y-%m-%d')} a {stats['date_max'].strftime('%Y-%m-%d')}")

    def save_results(self, output_file='datos_economicos_procesados.xlsx'):
        """
        Guarda el DataFrame final combinado y las estadísticas en un archivo Excel.
        """
        if self.final_df is None:
            self.logger.error("No hay datos combinados para guardar")
            return False
        try:
            with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
                self.final_df.to_excel(writer, sheet_name='Datos Diarios', index=False)
                df_stats = pd.DataFrame(self.stats).T
                df_stats.to_excel(writer, sheet_name='Estadisticas')
                meta = {
                    'Proceso': ['EconomicDataProcessor'],
                    'Fecha de proceso': [datetime.now().strftime('%Y-%m-%d %H:%M:%S')],
                    'Total indicadores': [len(self.stats)],
                    'Periodo': [f"{self.global_min_date.strftime('%Y-%m-%d')} a {self.global_max_date.strftime('%Y-%m-%d')}"],
                    'Total días': [len(self.daily_index)]
                }
                pd.DataFrame(meta).to_excel(writer, sheet_name='Metadatos', index=False)
            self.logger.info(f"Archivo guardado exitosamente: {output_file}")
            return True
        except Exception as e:
            self.logger.error(f"Error al guardar resultados: {e}")
            return False

    def run(self, output_file='datos_economicos_procesados.xlsx'):
        """
        Ejecuta el proceso completo:
          1. Lee la configuración.
          2. Procesa cada archivo.
          3. Determina el rango global de fechas.
          4. Genera el índice diario.
          5. Convierte cada serie a datos diarios y los combina.
          6. Realiza un análisis de cobertura.
          7. Guarda los resultados.
        """
        start_time = time.time()
        if self.read_config() is None:
            return False

        for _, config_row in self.config_data.iterrows():
            var, df_processed = self.process_file(config_row)
            self.processed_data[var] = df_processed

        if len([df for df in self.processed_data.values() if df is not None]) == 0:
            self.logger.error("No se procesó ningún archivo correctamente")
            return False

        self.generate_daily_index()
        self.combine_data()
        self.analyze_coverage()
        result = self.save_results(output_file)
        end_time = time.time()
        self.logger.info("\nResumen de Ejecución:")
        self.logger.info(f"Tiempo de ejecución: {end_time - start_time:.2f} segundos")
        self.logger.info(f"Archivos procesados: {len(self.config_data)}")
        self.logger.info(f"Archivo de salida: {output_file}")
        self.logger.info(f"Estado: {'COMPLETADO' if result else 'ERROR'}")
        return result

# Función principal para ejecutar el proceso
def run_economic_data_processor(config_file='Data Engineering.xlsx',
                                output_file='datos_economicos_procesados.xlsx',
                                data_root='data/Macro/raw',
                                log_file='myinvestingreportcp.log'):
    processor = EconomicDataProcessor(config_file, data_root, log_file)
    return processor.run(output_file)

# Ejemplo de uso
if __name__ == "__main__":
    success = run_economic_data_processor()
    print(f"Proceso {'completado exitosamente' if success else 'finalizado con errores'}")


2025-03-28 18:07:48,591 [INFO] INICIANDO PROCESO: EconomicDataProcessor
2025-03-28 18:07:48,593 [INFO] Archivo de configuración: Data Engineering.xlsx
2025-03-28 18:07:48,594 [INFO] Directorio raíz de datos: data/Macro/raw
2025-03-28 18:07:48,595 [INFO] Fecha y hora: 2025-03-28 18:07:48
2025-03-28 18:07:48,599 [INFO] Leyendo archivo de configuración...


2025-03-28 18:07:48,642 [INFO] Se encontraron 21 configuraciones para procesar
2025-03-28 18:07:48,643 [INFO] 
Procesando: US_ISM_Manufacturing (business_confidence)
2025-03-28 18:07:48,645 [INFO] - Archivo: US_ISM_Manufacturing.xlsx
2025-03-28 18:07:48,646 [INFO] - Columna TARGET: ACTUAL
2025-03-28 18:07:48,647 [INFO] - Ruta encontrada: data/Macro/raw\business_confidence\US_ISM_Manufacturing.xlsx
2025-03-28 18:07:48,666 [INFO] - Filas encontradas: 138
2025-03-28 18:07:48,674 [INFO] Preferencia de dayfirst para data/Macro/raw\business_confidence\US_ISM_Manufacturing.xlsx: True
2025-03-28 18:07:48,731 [INFO] - Valores no nulos en TARGET: 137
2025-03-28 18:07:48,733 [INFO] - Periodo: 2013-11-01 a 2025-03-03
2025-03-28 18:07:48,733 [INFO] - Cobertura: 100.00%
2025-03-28 18:07:48,737 [INFO] 
Procesando: US_ISM_Services (business_confidence)
2025-03-28 18:07:48,738 [INFO] - Archivo: US_ISM_Services.xlsx
2025-03-28 18:07:48,738 [INFO] - Columna TARGET: ACTUAL
2025-03-28 18:07:48,740 [INFO] -

Proceso completado exitosamente


In [35]:
import pandas as pd
import numpy as np
import os
import re
import time
import logging
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Configuración de logging
def configurar_logging(log_file='freddataprocessor.log'):
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger('FredDataProcessor')

class FredDataProcessor:
    """
    Clase para procesar datos de FRED (Fuente "FRED" y Preprocesamiento "Normal").

    Se espera que cada archivo tenga:
      - Una columna de fecha llamada "observation_date" (o "DATE" si no existe).
      - Una columna de datos cuyo nombre se especifica en el archivo de configuración (TARGET).

    Esta versión:
      - Busca el archivo usando varias extensiones: .csv, .xlsx, .xls.
      - Detecta dinámicamente el formato de fecha analizando hasta 20 registros.
         Si la mayoría siguen el formato ISO (YYYY-MM-DD), se fuerza ese formato;
         de lo contrario se evalúan ambas opciones (dayfirst True/False) usando la monotonicidad.
      - La función improved_robust_parse_date ahora maneja objetos Timestamp y valores no-string.
      - Convierte la columna de fecha y la columna target a numérico.
      - Renombra la columna de datos con el patrón: {TARGET}_{variable}_{Tipo_Macro}.
      - Genera un índice diario global y usa merge_asof para imputar los datos (forward fill).
    """

    def __init__(self, config_file, data_root='data/Macro/raw', log_file='freddataprocessor.log'):
        self.config_file = config_file
        self.data_root = data_root
        self.logger = configurar_logging(log_file)
        self.config_data = None
        self.global_min_date = None
        self.global_max_date = None
        self.daily_index = None
        self.processed_data = {}   # {variable: DataFrame procesado}
        self.final_df = None
        self.stats = {}
        self.date_cache = {}  # Guarda la preferencia de formato para cada archivo

        self.logger.info("=" * 80)
        self.logger.info("INICIANDO PROCESO: FredDataProcessor")
        self.logger.info(f"Archivo de configuración: {config_file}")
        self.logger.info(f"Directorio raíz de datos: {data_root}")
        self.logger.info(f"Fecha y hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        self.logger.info("=" * 80)

    def read_config(self):
        try:
            self.logger.info("Leyendo archivo de configuración...")
            df_config = pd.read_excel(self.config_file)
            self.config_data = df_config[
                (df_config['Fuente'] == 'FRED') &
                (df_config['Tipo de Preprocesamiento Según la Fuente'] == 'Normal')
            ].copy()
            self.logger.info(f"Se encontraron {len(self.config_data)} configuraciones para procesar")
            return self.config_data
        except Exception as e:
            self.logger.error(f"Error al leer configuración: {e}")
            return None

    def detect_date_format(self, series, n=20, iso_threshold=0.6):
        """
        Analiza hasta n registros de la serie de fechas para determinar si la mayoría 
        siguen el formato ISO (YYYY-MM-DD).

        Returns:
            "ISO" si al menos iso_threshold de los registros coinciden con el patrón ISO,
            de lo contrario "AMBIGUOUS".
        """
        sample = series.dropna().head(n)
        if len(sample) == 0:
            return "AMBIGUOUS"
        iso_count = 0
        for val in sample:
            if isinstance(val, str) and re.match(r'^\d{4}-\d{2}-\d{2}$', val.strip()):
                iso_count += 1
        ratio = iso_count / len(sample)
        self.logger.info(f"Detección formato: {iso_count}/{len(sample)} registros ISO (ratio {ratio:.2f})")
        return "ISO" if ratio >= iso_threshold else "AMBIGUOUS"

    def monotonic_score(self, parsed_series):
        """
        Calcula la puntuación de monotonicidad de una serie de fechas.
        Es la proporción de diferencias no negativas respecto al total.
        """
        parsed = parsed_series.dropna()
        if len(parsed) < 2:
            return 0
        diffs = parsed.diff().dropna()
        score = (diffs >= timedelta(0)).sum() / len(diffs)
        return score

    def improved_robust_parse_date(self, date_str, preferred_dayfirst=None, use_iso=False):
        """
        Convierte una cadena o timestamp de fecha.

        Args:
            date_str: Cadena de fecha o timestamp.
            preferred_dayfirst (bool, opcional): Preferencia para la conversión.
            use_iso (bool): Si se debe forzar el formato ISO (YYYY-MM-DD).

        Returns:
            pd.Timestamp o None.
        """
        # Si ya es un timestamp, devolverlo directamente
        if isinstance(date_str, pd.Timestamp):
            return date_str

        if not isinstance(date_str, str):
            self.logger.debug(f"Valor no string en fecha: {date_str} (tipo: {type(date_str)})")
            return None

        date_str = date_str.strip()
        if not date_str:
            return None

        if use_iso:
            try:
                return pd.to_datetime(date_str, format='%Y-%m-%d', errors='coerce')
            except Exception as e:
                self.logger.warning(f"Error al convertir formato ISO en '{date_str}': {e}")
                return None

        # Intentar patrón "Apr 01, 2025 (Mar)"
        m = re.search(r'([A-Za-z]+\s+\d{1,2},\s+\d{4})', date_str)
        if m:
            candidate = m.group(1)
            try:
                parsed = pd.to_datetime(candidate, errors='coerce')
                if pd.notnull(parsed):
                    return parsed
            except Exception as e:
                self.logger.warning(f"Error al parsear patrón en '{date_str}': {e}")

        if preferred_dayfirst is not None:
            try:
                return pd.to_datetime(date_str, dayfirst=preferred_dayfirst, errors='coerce')
            except Exception as e:
                self.logger.warning(f"Error con dayfirst={preferred_dayfirst} en '{date_str}': {e}")

        try:
            return pd.to_datetime(date_str, dayfirst=True, errors='coerce')
        except Exception as e:
            self.logger.warning(f"Error en improved_robust_parse_date para '{date_str}': {e}")
            return None

    def process_file(self, config_row):
        """
        Procesa un archivo individual de FRED.
        
        - Busca el archivo usando extensiones: .csv, .xlsx, .xls.
        - Usa la columna de fecha "observation_date" (o "DATE").
        - Analiza hasta 20 registros para determinar el formato de fecha.
        - Convierte la columna de fecha y la columna target a numérico.
        - Renombra la columna de datos con el patrón: {TARGET}_{variable}_{Tipo_Macro}.
        - Devuelve un DataFrame con columnas ['fecha', nuevo_nombre].
        """
        variable = config_row['Variable']
        macro_type = config_row['Tipo Macro']
        target_col = config_row['TARGET']

        # Lista de extensiones a buscar
        extensions = ['.csv', '.xlsx', '.xls']
        ruta = None
        for ext in extensions:
            ruta_candidate = os.path.join(self.data_root, macro_type, f"{variable}{ext}")
            if os.path.exists(ruta_candidate):
                ruta = ruta_candidate
                break
        if ruta is None:
            for ext in extensions:
                for root, dirs, files in os.walk(self.data_root):
                    if f"{variable}{ext}" in files:
                        ruta = os.path.join(root, f"{variable}{ext}")
                        break
                if ruta is not None:
                    break
        if ruta is None:
            self.logger.error(f"Archivo no encontrado: {variable}* (se probaron extensiones: {', '.join(extensions)})")
            return variable, None

        self.logger.info(f"\nProcesando: {variable} ({macro_type})")
        self.logger.info(f"- Archivo: {os.path.basename(ruta)}")
        self.logger.info(f"- Columna TARGET: {target_col}")
        self.logger.info(f"- Ruta encontrada: {ruta}")

        try:
            if ruta.endswith('.csv'):
                df = pd.read_csv(ruta)
            elif ruta.endswith(('.xlsx', '.xls')):
                df = pd.read_excel(ruta)
            else:
                self.logger.error(f"Extensión no soportada para {ruta}")
                return variable, None
        except Exception as e:
            self.logger.error(f"Error al leer {ruta}: {e}")
            return variable, None

        self.logger.info(f"- Filas encontradas: {len(df)}")
        # Determinar la columna de fecha: preferir "observation_date", sino "DATE"
        if 'observation_date' in df.columns:
            date_col = 'observation_date'
        elif 'DATE' in df.columns:
            date_col = 'DATE'
        else:
            self.logger.error(f"No se encontró columna de fecha ('observation_date' o 'DATE') en {ruta}")
            return variable, None

        # Detectar el formato de fecha a partir de 20 registros
        fmt = self.detect_date_format(df[date_col], n=20, iso_threshold=0.6)
        use_iso = (fmt == "ISO")
        self.logger.info(f"Formato detectado para {ruta}: {fmt}")

        # Si el formato no es ISO, determinar la preferencia de dayfirst usando la monotonicidad
        if not use_iso:
            sample = df[date_col].dropna().head(20)
            parsed_true = pd.to_datetime(sample, dayfirst=True, errors='coerce')
            parsed_false = pd.to_datetime(sample, dayfirst=False, errors='coerce')
            score_true = self.monotonic_score(parsed_true)
            score_false = self.monotonic_score(parsed_false)
            preferred = score_true >= score_false
            self.date_cache[ruta] = preferred
            self.logger.info(f"Preferencia de dayfirst para {ruta}: {preferred} (score True: {score_true:.2f}, False: {score_false:.2f})")
        else:
            preferred = None

        # Convertir la columna de fecha usando improved_robust_parse_date
        df['fecha'] = df[date_col].apply(lambda x: self.improved_robust_parse_date(x, preferred_dayfirst=preferred, use_iso=use_iso))
        df = df.dropna(subset=['fecha'])
        df = df.sort_values('fecha')
        self.logger.info(f"Primeras fechas convertidas: {df['fecha'].head(5).tolist()}")

        # Verificar la columna target usando búsqueda insensible a mayúsculas
        if target_col not in df.columns:
            for col in df.columns:
                if col.strip().lower() == target_col.strip().lower():
                    target_col = col
                    self.logger.warning(f"No se encontró '{config_row['TARGET']}', se usará '{target_col}'")
                    break
        if target_col not in df.columns:
            self.logger.error(f"No se encontró columna TARGET ni alternativa en {ruta}")
            return variable, None

        df['valor'] = pd.to_numeric(df[target_col], errors='coerce')
        df = df.dropna(subset=['valor'])
        if df.empty:
            self.logger.error(f"No se encontraron valores válidos para '{target_col}' en {ruta}")
            return variable, None

        current_min = df['fecha'].min()
        current_max = df['fecha'].max()
        if self.global_min_date is None or current_min < self.global_min_date:
            self.global_min_date = current_min
        if self.global_max_date is None or current_max > self.global_max_date:
            self.global_max_date = current_max

        # Renombrar la columna de datos usando el patrón
        nuevo_nombre = f"{target_col}_{variable}_{macro_type}"
        df.rename(columns={'valor': nuevo_nombre}, inplace=True)
        self.stats[variable] = {
            'macro_type': macro_type,
            'target_column': target_col,
            'total_rows': len(df),
            'valid_values': len(df),
            'coverage': 100.0,
            'date_min': current_min,
            'date_max': current_max,
            'nuevo_nombre': nuevo_nombre
        }
        self.logger.info(f"- Valores no nulos en TARGET: {len(df)}")
        self.logger.info(f"- Periodo: {current_min.strftime('%Y-%m-%d')} a {current_max.strftime('%Y-%m-%d')}")
        self.logger.info(f"- Cobertura: 100.00%")
        return variable, df[['fecha', nuevo_nombre]].copy()

    def generate_daily_index(self):
        """
        Genera un índice diario desde la fecha global mínima hasta la máxima.
        """
        if self.global_min_date is None or self.global_max_date is None:
            self.logger.error("No se pudieron determinar las fechas globales")
            return None
        self.daily_index = pd.DataFrame({
            'fecha': pd.date_range(start=self.global_min_date, end=self.global_max_date, freq='D')
        })
        self.logger.info(f"Índice diario generado: {len(self.daily_index)} días desde {self.global_min_date.strftime('%Y-%m-%d')} hasta {self.global_max_date.strftime('%Y-%m-%d')}")
        return self.daily_index

    def combine_data(self):
        """
        Convierte cada serie (diaria o de menor frecuencia) a datos diarios usando merge_asof.
        Cada día se asocia al valor reportado más reciente (forward fill).
        """
        if self.daily_index is None:
            self.logger.error("El índice diario no ha sido generado")
            return None

        combined = self.daily_index.copy()
        for variable, df in self.processed_data.items():
            if df is None or df.empty:
                self.logger.warning(f"Omitiendo {variable} por falta de datos")
                continue
            df = df.sort_values('fecha')
            df_daily = pd.merge_asof(combined, df, on='fecha', direction='backward')
            col_name = self.stats[variable]['nuevo_nombre']
            df_daily[col_name] = df_daily[col_name].ffill()
            combined = combined.merge(df_daily[['fecha', col_name]], on='fecha', how='left')
        self.final_df = combined
        self.logger.info(f"DataFrame final combinado: {len(self.final_df)} filas, {len(self.final_df.columns)} columnas")
        return self.final_df

    def analyze_coverage(self):
        """
        Genera un resumen de cobertura y estadísticas para cada variable.
        """
        total_days = len(self.daily_index)
        self.logger.info("\nResumen de Cobertura:")
        for variable, stats in self.stats.items():
            self.logger.info(f"- {variable}: {stats['coverage']:.2f}% desde {stats['date_min'].strftime('%Y-%m-%d')} a {stats['date_max'].strftime('%Y-%m-%d')}")

    def save_results(self, output_file='datos_economicos_procesados.xlsx'):
        """
        Guarda el DataFrame final combinado y las estadísticas en un archivo Excel.
        """
        if self.final_df is None:
            self.logger.error("No hay datos combinados para guardar")
            return False
        try:
            with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
                self.final_df.to_excel(writer, sheet_name='Datos Diarios', index=False)
                df_stats = pd.DataFrame(self.stats).T
                df_stats.to_excel(writer, sheet_name='Estadisticas')
                meta = {
                    'Proceso': ['FredDataProcessor'],
                    'Fecha de proceso': [datetime.now().strftime('%Y-%m-%d %H:%M:%S')],
                    'Total indicadores': [len(self.stats)],
                    'Periodo': [f"{self.global_min_date.strftime('%Y-%m-%d')} a {self.global_max_date.strftime('%Y-%m-%d')}"],
                    'Total días': [len(self.daily_index)]
                }
                pd.DataFrame(meta).to_excel(writer, sheet_name='Metadatos', index=False)
            self.logger.info(f"Archivo guardado exitosamente: {output_file}")
            return True
        except Exception as e:
            self.logger.error(f"Error al guardar resultados: {e}")
            return False

    def run(self, output_file='datos_economicos_procesados.xlsx'):
        """
        Ejecuta el proceso completo:
          1. Lee la configuración.
          2. Procesa cada archivo de FRED (buscando las extensiones adecuadas).
          3. Genera el índice diario global.
          4. Convierte cada serie a datos diarios y las combina.
          5. Analiza la cobertura.
          6. Guarda los resultados.
        """
        start_time = time.time()
        if self.read_config() is None:
            return False

        for _, config_row in self.config_data.iterrows():
            var, df_processed = self.process_file(config_row)
            self.processed_data[var] = df_processed

        if len([df for df in self.processed_data.values() if df is not None]) == 0:
            self.logger.error("No se procesó ningún archivo correctamente")
            return False

        self.generate_daily_index()
        self.combine_data()
        self.analyze_coverage()
        result = self.save_results(output_file)
        end_time = time.time()
        self.logger.info("\nResumen de Ejecución:")
        self.logger.info(f"Tiempo de ejecución: {end_time - start_time:.2f} segundos")
        self.logger.info(f"Archivos procesados: {len(self.config_data)}")
        self.logger.info(f"Archivo de salida: {output_file}")
        self.logger.info(f"Estado: {'COMPLETADO' if result else 'ERROR'}")
        return result

# Función principal para ejecutar el proceso
def run_fred_data_processor(config_file='Data Engineering.xlsx',
                            output_file='datos_economicos_procesados.xlsx',
                            data_root='data/Macro/raw',
                            log_file='freddataprocessor.log'):
    processor = FredDataProcessor(config_file, data_root, log_file)
    return processor.run(output_file)

# Ejemplo de uso
if __name__ == "__main__":
    success = run_fred_data_processor()
    print(f"Proceso {'completado exitosamente' if success else 'finalizado con errores'}")


2025-03-28 20:06:28,401 [INFO] INICIANDO PROCESO: FredDataProcessor
2025-03-28 20:06:28,402 [INFO] Archivo de configuración: Data Engineering.xlsx
2025-03-28 20:06:28,402 [INFO] Directorio raíz de datos: data/Macro/raw
2025-03-28 20:06:28,404 [INFO] Fecha y hora: 2025-03-28 20:06:28
2025-03-28 20:06:28,405 [INFO] Leyendo archivo de configuración...
2025-03-28 20:06:28,431 [INFO] Se encontraron 35 configuraciones para procesar
2025-03-28 20:06:28,432 [INFO] 
Procesando: US_10Y_Treasury (bond)
2025-03-28 20:06:28,434 [INFO] - Archivo: US_10Y_Treasury.csv
2025-03-28 20:06:28,434 [INFO] - Columna TARGET: DGS10
2025-03-28 20:06:28,435 [INFO] - Ruta encontrada: data/Macro/raw\bond\US_10Y_Treasury.csv
2025-03-28 20:06:28,439 [INFO] - Filas encontradas: 2929
2025-03-28 20:06:28,442 [INFO] Detección formato: 20/20 registros ISO (ratio 1.00)
2025-03-28 20:06:28,442 [INFO] Formato detectado para data/Macro/raw\bond\US_10Y_Treasury.csv: ISO
2025-03-28 20:06:28,664 [INFO] Primeras fechas convertida

Proceso completado exitosamente
