# 🧪 Test Portfolio Pipeline - Strix System

Este notebook ejecuta y prueba el pipeline de procesamiento de portfolios del sistema Strix desde el proyecto `pipeline` en Visual Studio Code.

## 📋 Requisitos:
- Estar ejecutando desde el directorio `pipeline`
- Tener configurado el archivo `.env` con las credenciales
- Tener los módulos `core` y `security` instalados en modo editable o accesibles

In [2]:
# Celda 1: Configuración de rutas y verificación
import sys
import os

# Obtener la ruta raíz del proyecto
current_dir = os.getcwd()
print(f"🔹 Directorio actual: {current_dir}")

# Buscar la raíz del proyecto (donde está el venv)
def find_project_root(start_dir):
    """Busca la raíz del proyecto subiendo directorios hasta encontrar 'venv'"""
    current = start_dir
    while current != os.path.dirname(current):  # Hasta llegar a la raíz del sistema
        if os.path.exists(os.path.join(current, 'venv')):
            return current
        current = os.path.dirname(current)
    return None

project_root = find_project_root(current_dir)
print(f"🔹 Raíz del proyecto encontrada: {project_root}")

# Agregar la raíz del proyecto al sys.path si no está
if project_root and project_root not in sys.path:
    sys.path.insert(0, project_root)
    print(f"✅ Raíz del proyecto agregada a sys.path")

# Verificar que los módulos ahora son importables
def test_imports():
    print("\n🔍 Probando imports...")
    
    try:
        import core
        print(f"✅ core importado desde: {core.__file__}")
    except ImportError as e:
        print(f"❌ core: {e}")
    
    try:
        import security  
        print(f"✅ security importado desde: {security.__file__}")
    except ImportError as e:
        print(f"❌ security: {e}")
        
    try:
        import pipeline
        print(f"✅ pipeline importado desde: {pipeline.__file__}")
    except ImportError as e:
        print(f"❌ pipeline: {e}")

test_imports()

# Probar imports específicos que necesitamos
print("\n🔍 Probando imports específicos...")

try:
    from security.services.health_check import health_check
    print("✅ health_check importado correctamente")
    health_available = True
except ImportError as e:
    print(f"❌ health_check: {e}")
    health_available = False

try:
    from pipeline.services.portfolio_pipeline import PortfolioPipelineProcessor
    print("✅ PortfolioPipelineProcessor importado correctamente")
    pipeline_available = True
except ImportError as e:
    print(f"❌ PortfolioPipelineProcessor: {e}")
    pipeline_available = False

try:
    from core.infraestructure.uow.sqlalchemy_uow import SQLAlchemyUnitOfWork
    print("✅ SQLAlchemyUnitOfWork importado correctamente")
    uow_available = True
except ImportError as e:
    print(f"❌ SQLAlchemyUnitOfWork: {e}")
    uow_available = False

print(f"\n📊 Estado de módulos disponibles:")
print(f"   health_check: {'✅' if health_available else '❌'}")
print(f"   PortfolioPipelineProcessor: {'✅' if pipeline_available else '❌'}")
print(f"   SQLAlchemyUnitOfWork: {'✅' if uow_available else '❌'}")

# Verificar rutas actuales en sys.path
print(f"\n🛤️ Primeras 5 rutas en sys.path:")
for i, path in enumerate(sys.path[:5]):
    print(f"   {i+1}. {path}")

🔹 Directorio actual: c:\code\strix-system\pipeline\test\services
🔹 Raíz del proyecto encontrada: c:\code\strix-system
✅ Raíz del proyecto agregada a sys.path

🔍 Probando imports...
✅ core importado desde: c:\code\strix-system\core\__init__.py
✅ security importado desde: c:\code\strix-system\security\__init__.py
✅ pipeline importado desde: c:\code\strix-system\pipeline\__init__.py

🔍 Probando imports específicos...
✅ health_check importado correctamente
✅ PortfolioPipelineProcessor importado correctamente
✅ SQLAlchemyUnitOfWork importado correctamente

📊 Estado de módulos disponibles:
   health_check: ✅
   PortfolioPipelineProcessor: ✅
   SQLAlchemyUnitOfWork: ✅

🛤️ Primeras 5 rutas en sys.path:
   1. c:\code\strix-system
   2. c:\code\strix-system\pipeline\test
   3. C:\Users\gsolomita\AppData\Local\Programs\Python\Python311\python311.zip
   4. C:\Users\gsolomita\AppData\Local\Programs\Python\Python311\DLLs
   5. C:\Users\gsolomita\AppData\Local\Programs\Python\Python311\Lib


In [3]:
# EJECUTAR ESTA CELDA PRIMERO - Arreglo de rutas
import sys
import os

# Ir 3 niveles arriba desde pipeline/test/services hasta la raíz
current_dir = os.getcwd()
project_root = os.path.abspath(os.path.join(current_dir, '..', '..', '..'))

print(f"📂 Directorio actual: {current_dir}")
print(f"🏠 Raíz del proyecto: {project_root}")

# Verificar que existe el venv en la raíz
venv_path = os.path.join(project_root, 'venv')
if os.path.exists(venv_path):
    print(f"✅ Estructura correcta - venv encontrado en: {venv_path}")
else:
    print(f"❌ No se encontró venv en: {venv_path}")

# Agregar la raíz al sys.path
if project_root not in sys.path:
    sys.path.insert(0, project_root)
    print(f"✅ Raíz agregada a sys.path")

# Cambiar el directorio de trabajo a la raíz
os.chdir(project_root)
print(f"✅ Directorio de trabajo cambiado a: {os.getcwd()}")

# Verificar imports
print("\n🧪 Verificando imports...")
try:
    from security.services.health_check import health_check
    print("✅ health_check - OK")
except Exception as e:
    print(f"❌ health_check - {e}")

try:
    from pipeline.services.portfolio_pipeline import PortfolioPipelineProcessor  
    print("✅ PortfolioPipelineProcessor - OK")
except Exception as e:
    print(f"❌ PortfolioPipelineProcessor - {e}")

try:
    from core.infraestructure.uow.sqlalchemy_uow import SQLAlchemyUnitOfWork
    print("✅ SQLAlchemyUnitOfWork - OK")
except Exception as e:
    print(f"❌ SQLAlchemyUnitOfWork - {e}")

print("\n🎯 ¡Configuración completada! Ahora puedes ejecutar el resto del notebook.")

📂 Directorio actual: c:\code\strix-system\pipeline\test\services
🏠 Raíz del proyecto: c:\code\strix-system
✅ Estructura correcta - venv encontrado en: c:\code\strix-system\venv
✅ Directorio de trabajo cambiado a: c:\code\strix-system

🧪 Verificando imports...
✅ health_check - OK
✅ PortfolioPipelineProcessor - OK
✅ SQLAlchemyUnitOfWork - OK

🎯 ¡Configuración completada! Ahora puedes ejecutar el resto del notebook.


In [4]:
# Verificar el health check del sistema antes de ejecutar el pipeline
try:
    from security.services.health_check import health_check
    
    print("🔍 Verificando conexiones del sistema...")
    status = health_check()
    print(f"📊 Estado de conexiones: {status}")
    
    if status.get("postgres") != "OK":
        print("❌ PostgreSQL no está disponible. Verifica la configuración.")
        
    if status.get("s3") != "OK":
        print("❌ S3 no está disponible. Verifica la configuración.")
        
    if status.get("postgres") == "OK" and status.get("s3") == "OK":
        print("✅ Todas las conexiones están operativas. Procediendo con el pipeline.")
        
except ImportError as e:
    print(f"⚠️ No se pudo importar health_check: {e}")
    print("🔄 Continuando sin verificación de conexiones...")
    status = {"postgres": "UNKNOWN", "s3": "UNKNOWN"}
except Exception as e:
    print(f"❌ Error al verificar conexiones: {e}")
    status = {"postgres": "ERROR", "s3": "ERROR"}

🔍 Verificando conexiones del sistema...
✅ Nueva Conexión a PostgreSQL establecida.
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy


2025-06-23 15:36:25,000 - INFO - ✅ PostgreSQL está operativo.
2025-06-23 15:36:25,355 - INFO - ✅ S3 está operativo.


✅ Conexión a S3 establecida con éxito.
📊 Estado de conexiones: {'postgres': 'OK', 's3': 'OK'}
✅ Todas las conexiones están operativas. Procediendo con el pipeline.


In [5]:
# Importar los módulos necesarios para el pipeline paralelo
try:
    from pipeline.services.portfolio_pipeline_parallel import ParallelPortfolioPipelineProcessor
    print("✅ ParallelPortfolioPipelineProcessor importado correctamente")
    parallel_available = True
except ImportError as e:
    print(f"⚠️ ParallelPortfolioPipelineProcessor no disponible: {e}")
    parallel_available = False
    
    # Fallback al processor original
    try:
        from pipeline.services.portfolio_pipeline import PortfolioPipelineProcessor
        print("✅ PortfolioPipelineProcessor original importado como fallback")
    except ImportError as e:
        print(f"❌ Error importando PortfolioPipelineProcessor: {e}")

try:
    from core.infraestructure.uow.sqlalchemy_uow import SQLAlchemyUnitOfWork
    print("✅ SQLAlchemyUnitOfWork importado correctamente")
except ImportError as e:
    print(f"❌ Error importando SQLAlchemyUnitOfWork: {e}")
    print("⚠️ Puede que necesites instalar el módulo core en modo editable")
    
from datetime import datetime, timedelta
print("✅ datetime importado correctamente")

# Importar módulos adicionales para paralelización
try:
    import psutil
    import concurrent.futures
    import multiprocessing
    print("✅ Módulos de paralelización disponibles")
    parallelization_modules = True
except ImportError as e:
    print(f"⚠️ Módulos de paralelización no disponibles: {e}")
    print("💡 Instala con: pip install psutil")
    parallelization_modules = False

print("\n📦 Estado de importaciones completado")
print(f"   🚀 Procesador paralelo: {'✅' if parallel_available else '❌'}")
print(f"   🔧 Módulos paralelización: {'✅' if parallelization_modules else '❌'}")

✅ ParallelPortfolioPipelineProcessor importado correctamente
✅ SQLAlchemyUnitOfWork importado correctamente
✅ datetime importado correctamente
✅ Módulos de paralelización disponibles

📦 Estado de importaciones completado
   🚀 Procesador paralelo: ✅
   🔧 Módulos paralelización: ✅


In [6]:
# Verificar portfolios activos antes de ejecutar el pipeline
print("📋 Verificando portfolios activos en el sistema...")

try:
    if 'SQLAlchemyUnitOfWork' in globals():
        with SQLAlchemyUnitOfWork() as uow:
            active_portfolios = uow.portfolio_domains.get_active_portfolios()
            
        print(f"📊 Portfolios activos encontrados: {len(active_portfolios)}")
        if active_portfolios:
            print(f"🔢 IDs de portfolios activos: {active_portfolios}")
        else:
            print("⚠️ No hay portfolios activos. El pipeline no procesará ningún dato.")
    else:
        print("⚠️ SQLAlchemyUnitOfWork no está disponible, saltando verificación")
        active_portfolios = []
        
except Exception as e:
    print(f"❌ Error al verificar portfolios activos: {e}")
    active_portfolios = []

📋 Verificando portfolios activos en el sistema...
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy
📊 Portfolios activos encontrados: 3
🔢 IDs de portfolios activos: [34, 35, 1]


In [7]:
# Verificar algunos dominios del sistema
print("🌐 Verificando dominios en el sistema...")

try:
    if 'SQLAlchemyUnitOfWork' in globals():
        with SQLAlchemyUnitOfWork() as uow:
            # Obtener algunos dominios de ejemplo
            all_domains = uow.domains.get_all_domains()
            
        print(f"🔢 Total de dominios en el sistema: {len(all_domains)}")
        
        if all_domains:
            print("📝 Primeros 5 dominios:")
            for i, domain in enumerate(all_domains[:5]):
                id_thing_display = domain['id_thing'][:20] + "..." if domain['id_thing'] and len(domain['id_thing']) > 20 else domain['id_thing']
                print(f"   {i+1}. ID: {domain['id']}, Domain: {domain['domain']}, Thing: {id_thing_display}")
        else:
            print("⚠️ No hay dominios en el sistema.")
    else:
        print("⚠️ SQLAlchemyUnitOfWork no está disponible, saltando verificación")
        all_domains = []
        
except Exception as e:
    print(f"❌ Error al verificar dominios: {e}")
    all_domains = []

🌐 Verificando dominios en el sistema...
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy
🔢 Total de dominios en el sistema: 2320
📝 Primeros 5 dominios:
   1. ID: 324, Domain: AG393AY, Thing: mrn:thing:vehicle:46...
   2. ID: 751, Domain: GNW901, Thing: mrn:thing:vehicle:c6...
   3. ID: 686, Domain: AC881NO, Thing: mrn:thing:vehicle:a0...
   4. ID: 187, Domain: AF434HM, Thing: mrn:thing:vehicle:eb...
   5. ID: 622, Domain: AF389CB, Thing: mrn:thing:vehicle:6f...


In [9]:
# Inicializar el procesador del pipeline (paralelo o estándar)
print("🚀 Inicializando el procesador del pipeline...")

try:
    if parallel_available and parallelization_modules:
        # Usar procesador paralelo
        print("🔧 Inicializando procesador PARALELO...")
        
        # Configurar número de workers (opcional)
        max_workers = None  # Auto-detectar, o especificar número: max_workers = 4
        
        processor = ParallelPortfolioPipelineProcessor(max_workers=max_workers)
        processor_type = "PARALELO"
        
        # Mostrar configuración del sistema
        if parallelization_modules:
            cpu_count = psutil.cpu_count(logical=True)
            memory_gb = psutil.virtual_memory().total / (1024**3)
            print(f"   💻 CPUs detectadas: {cpu_count}")
            print(f"   🧠 RAM disponible: {memory_gb:.1f} GB")
            print(f"   🔧 Workers configurados: {processor.max_workers}")
        
    elif 'PortfolioPipelineProcessor' in globals():
        # Usar procesador estándar como fallback
        print("🔄 Inicializando procesador ESTÁNDAR (fallback)...")
        processor = PortfolioPipelineProcessor()
        processor_type = "ESTÁNDAR"
        
    else:
        print("❌ Ningún procesador disponible")
        processor = None
        processor_type = "NO DISPONIBLE"
    
    if processor is not None:
        print(f"✅ Procesador {processor_type} inicializado correctamente")
        
        # Configurar rango de fechas personalizado
        start_date = datetime(2025, 5, 30).strftime("%Y-%m-%d")
        end_date = datetime(2025, 6, 2).strftime("%Y-%m-%d")
        print(f"📅 Rango de fechas configurado: {start_date} hasta {end_date}")
    else:
        start_date = None
        end_date = None
        
except Exception as e:
    print(f"❌ Error al inicializar el procesador: {e}")
    processor = None
    processor_type = "ERROR"
    start_date = None
    end_date = None

🚀 Inicializando el procesador del pipeline...
🔧 Inicializando procesador PARALELO...
✅ Conexión a S3 establecida con éxito.
   💻 CPUs detectadas: 12
   🧠 RAM disponible: 15.7 GB
   🔧 Workers configurados: 8
✅ Procesador PARALELO inicializado correctamente
📅 Rango de fechas configurado: 2025-05-30 hasta 2025-06-02


In [None]:
# Ejecutar el pipeline con procesamiento paralelo
print("🔄 Ejecutando el pipeline de portfolio...")
print("=" * 60)

if processor is not None and start_date is not None and end_date is not None:
    try:
        print(f"📅 Procesando eventos desde {start_date} hasta {end_date}")
        print(f"🔧 Tipo de procesador: {processor_type}")
        
        # Determinar método de ejecución
        if hasattr(processor, 'run_pipeline_parallel'):
            # Procesador paralelo - ofrecer opciones
            print("🚀 Ejecutando con procesador PARALELO")
            print("\n🔧 Modos de ejecución disponibles:")
            print("   • thread: ThreadPoolExecutor (recomendado para I/O)")
            print("   • process: ProcessPoolExecutor (para CPU intensivo)")
            print("   • auto: Detecta automáticamente el mejor modo")
            
            # Configurar modo de ejecución
            execution_mode = "auto"  # Cambiar por "thread" o "process" si quieres forzar
            print(f"\n🎯 Modo seleccionado: {execution_mode}")
            print("-" * 40)
            
            # Ejecutar pipeline paralelo
            result = processor.run_pipeline_parallel(
                start_date=start_date,
                end_date=end_date,
                execution_mode=execution_mode
            )
            
        elif hasattr(processor, 'run_pipeline'):
            # Procesador estándar
            print("🔄 Ejecutando con procesador ESTÁNDAR")
            
            # Verificar si acepta parámetros
            import inspect
            sig = inspect.signature(processor.run_pipeline)
            
            if len(sig.parameters) >= 2:
                result = processor.run_pipeline(start_date, end_date)
                print("✅ Pipeline ejecutado con parámetros de fecha")
            else:
                print("⚠️ El método run_pipeline() no acepta parámetros de fecha")
                result = processor.run_pipeline()
        else:
            raise AttributeError("El procesador no tiene método run_pipeline válido")
            
        # Mostrar resultados detallados
        print("=" * 60)
        print("📊 RESULTADO DEL PIPELINE:")
        print(f"✅ Éxito: {result.get('success', False)}")
        print(f"📝 Mensaje: {result.get('message', 'Sin mensaje')}")
        
        # Métricas adicionales para procesador paralelo
        if result.get('total_duration'):
            print(f"⏱️ Tiempo total: {result['total_duration']:.2f} segundos")
        
        if result.get('workers_used'):
            print(f"🔧 Workers utilizados: {result['workers_used']}")
        
        if result.get('portfolios_processed'):
            print(f"📂 Portfolios procesados: {result['portfolios_processed']}")
            print(f"✅ Exitosos: {result.get('successful', 0)}")
            print(f"❌ Fallidos: {result.get('failed', 0)}")
        
        if result.get('estimated_speedup'):
            speedup = result['estimated_speedup']
            print(f"🚀 Speedup estimado: {speedup:.2f}x")
            if speedup > 2:
                print("   🎉 ¡Excelente aceleración!")
            elif speedup > 1.5:
                print("   👍 Buena aceleración")
            else:
                print("   ⚠️ Aceleración limitada (normal para pocos portfolios)")
        
        if result.get('avg_duration_per_portfolio'):
            avg_time = result['avg_duration_per_portfolio']
            print(f"📈 Tiempo promedio por portfolio: {avg_time:.2f}s")
        
        # Estado final
        if result.get('success'):
            print("\n🎉 Pipeline ejecutado exitosamente!")
        else:
            print("\n❌ El pipeline falló")
            
    except Exception as e:
        print("=" * 60)
        print(f"❌ Error durante la ejecución del pipeline: {e}")
        import traceback
        print("🔍 Traceback completo:")
        traceback.print_exc()
        result = {"success": False, "message": f"Error: {e}"}
        
else:
    print("❌ No se puede ejecutar el pipeline - procesador o fechas no inicializados")
    print(f"   Procesador: {processor is not None}")
    print(f"   Start date: {start_date is not None}")
    print(f"   End date: {end_date is not None}")
    result = {"success": False, "message": "Procesador o fechas no disponibles"}

print("=" * 60)

🔄 Ejecutando el pipeline de portfolio...
📅 Procesando eventos desde 2025-05-30 hasta 2025-06-02
🔧 Tipo de procesador: PARALELO
🚀 Ejecutando con procesador PARALELO

🔧 Modos de ejecución disponibles:
   • thread: ThreadPoolExecutor (recomendado para I/O)
   • process: ProcessPoolExecutor (para CPU intensivo)
   • auto: Detecta automáticamente el mejor modo

🎯 Modo seleccionado: auto
----------------------------------------
🚀 Iniciando pipeline paralelo (auto)
📅 Rango: 2025-05-30 hasta 2025-06-02
🔧 Workers configurados: 8
🔍 Verificando conexiones...
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy


2025-06-23 15:38:31,333 - INFO - ✅ PostgreSQL está operativo.
2025-06-23 15:38:31,524 - INFO - ✅ S3 está operativo.


✅ Conexión a S3 establecida con éxito.
📥 Obteniendo carteras activas...
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy
✅ 3 carteras encontradas
📊 Portfolios distribuidos en 3 lotes
🔄 [Worker 1] Procesando Portfolio 34
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy
🔄 [Worker 2] Procesando Portfolio 35
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy
🔄 [Worker 3] Procesando Portfolio 1
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy
🚨 ENTER: Se abrió una nueva conexión SQLAlchemy
🚨 Error registrado en BD: {'id': 53269, 'method': 'retrieve_and_store_events', 'error_type': 'NoSuchKey', 'message': 'Error al procesar ar/magenta/mrn:account:1784a4fd-f182-46ca-aa85-5fa0adebca63/things/type=mrn:things:vehicle/mrn:thing:vehicle:63c2d92a-d73f-4cb6-a306-05789d213985/signals/date=2025-05/2025-05-30_dataset.avro: The specified key does not exist.', 'bucket': 'strix-consumer-raw-prod', 's3_key': 'ar/magen

In [None]:
# Verificar algunos eventos procesados (opcional)
print("🔍 Verificando eventos procesados...")

try:
    with SQLAlchemyUnitOfWork() as uow:
        # Contar eventos en el rango de fechas procesado
        from datetime import datetime
        start_timestamp = int(datetime(2025, 4, 1).timestamp() * 1000)
        end_timestamp = int(datetime(2025, 4, 30).timestamp() * 1000)
        
        # Obtener algunos eventos de ejemplo de un dominio específico
        if active_portfolios:
            domain_ids = uow.portfolio_domains.get_domains_by_portfolio(active_portfolios[0])
            if domain_ids:
                sample_domain = domain_ids[0]
                events = uow.events.get_events_by_domain_and_date(
                    sample_domain, 
                    datetime(2025, 4, 1), 
                    datetime(2025, 4, 2)
                )
                print(f"📊 Eventos encontrados para dominio {sample_domain}: {len(events)}")
                
                if events:
                    print("📝 Primer evento:")
                    first_event = events[0]
                    print(f"   ID: {first_event['id']}")
                    print(f"   Coordenadas: ({first_event['latitude']}, {first_event['longitude']})")
                    print(f"   Velocidad: {first_event['speed']} km/h")
                    print(f"   Timestamp: {first_event['timestamp']}")
            else:
                print("⚠️ No se encontraron dominios para el portfolio activo")
        else:
            print("⚠️ No hay portfolios activos para verificar eventos")
            
except Exception as e:
    print(f"❌ Error al verificar eventos: {e}")

In [None]:
# Test manual adicional - verificar servicios individuales
print("🧪 Ejecutando tests adicionales de componentes...")

# Test del servicio de portfolio
try:
    from core.services.portfolio_service import PortfolioService
    portfolio_service = PortfolioService()
    
    portfolios = portfolio_service.get_all_portfolios()
    print(f"✅ Servicio de Portfolio: {len(portfolios)} portfolios encontrados")
except Exception as e:
    print(f"❌ Error en Servicio de Portfolio: {e}")

# Test del servicio de dominios
try:
    from core.services.domain_service import DomainService
    domain_service = DomainService()
    
    domains = domain_service.get_all_domains()
    print(f"✅ Servicio de Dominios: {len(domains)} dominios encontrados")
except Exception as e:
    print(f"❌ Error en Servicio de Dominios: {e}")

# Test del servicio S3
try:
    from core.services.s3_event_service import S3EventService
    s3_service = S3EventService()
    
    if s3_service.s3_client:
        print("✅ Servicio S3: Conexión establecida")
    else:
        print("❌ Servicio S3: No se pudo establecer conexión")
except Exception as e:
    print(f"❌ Error en Servicio S3: {e}")

# Información de configuración actual
print("\n🔧 Información del entorno:")
print(f"📁 Directorio de trabajo: {os.getcwd()}")
print(f"🐍 Python executable: {sys.executable}")
print(f"📦 Módulos en sys.path: {len(sys.path)} rutas configuradas")

In [None]:
# Resumen final
print("\n" + "=" * 60)
print("📋 RESUMEN FINAL DEL TEST")
print("=" * 60)
print(f"🔧 Sistema: Strix Portfolio Pipeline")
print(f"🚀 Tipo de procesador: {processor_type if 'processor_type' in locals() else 'No disponible'}")
print(f"📅 Fecha de ejecución: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if 'start_date' in locals() and start_date:
    print(f"🌐 Rango procesado: {start_date} a {end_date}")
if 'status' in locals():
    print(f"📊 Estado de conexiones: PostgreSQL={status.get('postgres', 'UNKNOWN')}, S3={status.get('s3', 'UNKNOWN')}")
if 'active_portfolios' in locals():
    print(f"📁 Portfolios activos: {len(active_portfolios)}")
if 'all_domains' in locals():
    print(f"🌍 Total dominios: {len(all_domains)}")
    
# Información del pipeline ejecutado
if 'result' in locals():
    print(f"✅ Pipeline completado: {result.get('success', 'No ejecutado')}")
    if result.get('total_duration'):
        print(f"⏱️ Duración total: {result['total_duration']:.2f} segundos")
    if result.get('workers_used'):
        print(f"🔧 Workers paralelos: {result['workers_used']}")
    if result.get('estimated_speedup'):
        print(f"🚀 Aceleración lograda: {result['estimated_speedup']:.2f}x")
    if result.get('portfolios_processed'):
        success_rate = (result.get('successful', 0) / result['portfolios_processed']) * 100
        print(f"📈 Tasa de éxito: {success_rate:.1f}% ({result.get('successful', 0)}/{result['portfolios_processed']})")
else:
    print(f"❌ Pipeline: No ejecutado")

# Recomendaciones
print("\n💡 RECOMENDACIONES:")
if 'parallel_available' in locals() and parallel_available:
    print("   ✅ Procesador paralelo funcionando correctamente")
    if 'result' in locals() and result.get('estimated_speedup', 0) < 1.5:
        print("   💭 Para mayor speedup, procesa más portfolios o aumenta workers")
else:
    print("   ⚠️ Considera crear el archivo portfolio_pipeline_parallel.py para mejor rendimiento")
    
if 'parallelization_modules' in locals() and not parallelization_modules:
    print("   📦 Instala psutil para métricas del sistema: pip install psutil")

print("=" * 60)
print("🎯 Test del Portfolio Pipeline completado")

# Información adicional para debugging
if 'result' in locals() and not result.get('success', False):
    print("\n🔍 INFORMACIÓN DE DEBUG:")
    print(f"   📁 Directorio actual: {os.getcwd()}")
    print(f"   🐍 Python: {sys.executable}")
    print(f"   📦 Módulos importados: {', '.join([k for k in globals().keys() if not k.startswith('_')][:10])}...")