# KAFKA CONSUMER PARA SUPLEMENTOS DEPORTIVOS "**GAINSIGHT**"
### Este notebook consume mensajes del t√≥pico 'supplement_products' y los almacena en MongoDB para su posterior an√°lisis en el dashboard de Streamlit.
### Arquitectura: Kafka ‚Üí Consumer Python ‚Üí MongoDB ‚Üí Streamlit Dashboard
### Pipeline: Producer ‚Üí Consumer ‚Üí Database ‚Üí Analytics

# Importaci√≥n de Librer√≠as

### Importamos las librer√≠as necesarias para:
- **PyMongo**: Conexi√≥n y operaciones con MongoDB
- **Kafka**: Consumer de mensajes de Kafka
- **JSON**: Deserializaci√≥n de mensajes
- **Time/Datetime**: Control de tiempo y timestamps
- **Pprint**: Visualizaci√≥n mejorada de datos

In [None]:
from pymongo import MongoClient
from kafka import KafkaConsumer
from json import loads
import time
from datetime import datetime
from pprint import pprint
import logging
from collections import Counter

# Configurar logging para mejor debugging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuraci√≥n de MongoDB
Establecemos la conexi√≥n con MongoDB donde almacenaremos los productos.

### Estructura de la base de datos:
- **Database**: `fitness_supplements` - Espec√≠fica para suplementos deportivos
- **Collection**: `products` - Almacena todos los productos procesados

### Validaciones incluidas:
- Verificaci√≥n de conexi√≥n a MongoDB
- Validaci√≥n de base de datos y colecci√≥n
- Manejo de errores de conectividad


In [None]:
print("üîó Configurando conexi√≥n con MongoDB...")

try:
    # Establecer conexi√≥n con MongoDB
    client = MongoClient(
        "mongodb://localhost:27017/",
        serverSelectionTimeoutMS=5000,  # Timeout de 5 segundos
        connectTimeoutMS=5000
    )

    # Verificar conexi√≥n
    client.admin.command('ping')
    print("‚úÖ Conexi√≥n con MongoDB establecida")

    # Configurar base de datos y colecci√≥n
    database = client["fitness_supplements"]
    collection = database["products"]

    print(f"Base de datos: {database.name}")
    print(f"Colecci√≥n: {collection.name}")

    # Mostrar estad√≠sticas actuales
    current_count = collection.count_documents({})
    print(f"Documentos actuales en la colecci√≥n: {current_count:,}")

except Exception as e:
    print(f"Error conectando a MongoDB: {e}")
    print("Soluci√≥n: Aseg√∫rate que MongoDB est√° ejecut√°ndose en localhost:27017")
    print("   Comando: mongod --dbpath /path/to/data")
    raise e

#Configuraci√≥n del Consumer Kafka
Configuramos el consumer para recibir mensajes del t√≥pico de suplementos.
###Par√°metros importantes:
- **auto_offset_reset='earliest'**: Lee desde el inicio del t√≥pico
- **consumer_timeout_ms**: Timeout para evitar bloqueos indefinidos
- **group_id**: Identificador del grupo de consumers para manejo de offsets
- **value_deserializer**: Convierte JSON bytes a objetos Python

### Beneficios de esta configuraci√≥n:
- Procesamiento confiable con manejo de offsets
- Recuperaci√≥n autom√°tica desde interrupciones
- Timeout configurado para evitar bloqueos

In [None]:
# Configuraci√≥n del t√≥pico y consumer
topic = 'supplement_products'
consumer_group = 'supplement_consumer_group'

print(f"Configurando Kafka Consumer...")
print(f"T√≥pico objetivo: {topic}")
print(f"Grupo de consumer: {consumer_group}")

try:
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: loads(x.decode('utf-8')),
        auto_offset_reset='earliest',     # Leer desde el inicio
        consumer_timeout_ms=30000,        # Timeout de 30 segundos
        group_id=consumer_group,          # Grupo para manejo de offsets
        enable_auto_commit=True,          # Auto-commit de offsets
        auto_commit_interval_ms=1000,     # Intervalo de commit
        max_poll_records=100              # M√°ximo records por poll
    )
    print("‚úÖ Consumer de Kafka configurado exitosamente")

except Exception as e:
    print(f"‚ùå Error configurando Kafka Consumer: {e}")
    print("Soluci√≥n: Verifica que Kafka est√© ejecut√°ndose en localhost:9092")
    raise e

# Verificaci√≥n de T√≥picos y Validaciones

### Antes de iniciar el consumo, verificamos que:
1. El t√≥pico existe en Kafka
2. Hay mensajes disponibles para procesar
3. La configuraci√≥n es correcta

### Informaci√≥n mostrada:
- Lista de t√≥picos disponibles
- Estado del t√≥pico objetivo
- Particiones y offsets disponibles


In [None]:
print("üîç Verificando configuraci√≥n de Kafka...")

try:
    # Obtener lista de t√≥picos disponibles
    available_topics = consumer.topics()
    print(f"T√≥picos disponibles: {sorted(list(available_topics))}")

    # Verificar si nuestro t√≥pico existe
    if topic in available_topics:
        print(f"‚úÖ T√≥pico '{topic}' encontrado y listo para consumo")

        # Obtener informaci√≥n de particiones
        partitions = consumer.partitions_for_topic(topic)
        if partitions:
            print(f"Particiones disponibles: {sorted(list(partitions))}")
    else:
        print(f"‚ùå T√≥pico '{topic}' NO encontrado")
        print("Soluciones posibles:")
        print("   1. Ejecutar primero el producer para crear el t√≥pico")
        print("   2. Crear el t√≥pico manualmente:")
        print(f"      bin/kafka-topics.sh --create --topic {topic} --bootstrap-server localhost:9092")

        # Mostrar t√≥picos similares si existen
        similar_topics = [t for t in available_topics if 'supplement' in t.lower()]
        if similar_topics:
            print(f"üîç T√≥picos similares encontrados: {similar_topics}")

        exit(1)

except Exception as e:
    print(f"‚ùå Error verificando t√≥picos: {e}")
    raise e

#Funciones de Utilidad para Procesamiento

### Definimos funciones auxiliares para:
- Validar estructura de mensajes
- Procesar datos de productos
- Generar estad√≠sticas en tiempo real
- Manejar errores de datos

In [None]:
def validate_message_structure(message_data):
    """
    Valida que el mensaje tenga la estructura esperada

    Args:
        message_data (dict): Datos del mensaje de Kafka

    Returns:
        tuple: (is_valid, error_message)
    """
    required_fields = ['search_category', 'timestamp']

    for field in required_fields:
        if field not in message_data:
            return False, f"Campo requerido '{field}' no encontrado"

    # Validar que tenga datos de producto
    if not message_data.get('title'):
        return False, "T√≠tulo del producto no encontrado"

    return True, None

def process_product_data(kafka_message, message_data):
    """
    Procesa y estructura los datos del producto para MongoDB

    Args:
        kafka_message: Mensaje original de Kafka
        message_data (dict): Datos deserializados del mensaje

    Returns:
        dict: Documento estructurado para MongoDB
    """
    # Preparar metadata de Kafka
    metadata = {
        "kafka_topic": kafka_message.topic,
        "kafka_partition": kafka_message.partition,
        "kafka_offset": kafka_message.offset,
        "processed_at": datetime.now().isoformat(),
        "consumer_group": consumer_group
    }

    # Estructurar documento para MongoDB
    mongo_doc = {
        "metadata": metadata,
        "search_category": message_data.get("search_category"),
        "position": message_data.get("position"),
        "title": message_data.get("title"),
        "price": message_data.get("price"),
        "extracted_price": message_data.get("extracted_price"),
        "source": message_data.get("source"),
        "rating": message_data.get("rating"),
        "reviews": message_data.get("reviews"),
        "link": message_data.get("link"),
        "image": message_data.get("image"),
        "brand": message_data.get("brand", "N/A"),
        "delivery": message_data.get("delivery", "N/A"),
        "original_timestamp": message_data.get("timestamp")
    }

    return mongo_doc

def display_processing_stats(message_count, start_time, category_counter):
    """
    Muestra estad√≠sticas de procesamiento en tiempo real

    Args:
        message_count (int): N√∫mero total de mensajes procesados
        start_time (float): Timestamp de inicio del procesamiento
        category_counter (Counter): Contador de productos por categor√≠a
    """
    elapsed_time = time.time() - start_time
    rate = message_count / elapsed_time if elapsed_time > 0 else 0

    print(f"\nEstad√≠sticas en tiempo real:")
    print(f"   Tiempo transcurrido: {elapsed_time:.1f}s")
    print(f"   Mensajes procesados: {message_count}")
    print(f"   Velocidad promedio: {rate:.2f} msg/s")
    print(f"   Categor√≠as √∫nicas: {len(category_counter)}")

# Procesamiento Principal de Mensajes

### Loop principal que:
1. Consume mensajes del t√≥pico de Kafka
2. Valida la estructura de cada mensaje
3. Procesa y transforma los datos
4. Almacena en MongoDB con metadata completa
5. Muestra progreso en tiempo real

### Funcionalidades incluidas:
- **Validaci√≥n robusta**: Verificaci√≥n de estructura de mensajes
- **Manejo de errores**: Continuidad ante errores individuales
- **Estad√≠sticas en vivo**: Progreso y m√©tricas en tiempo real
- **Metadata enriquecida**: Informaci√≥n de Kafka y procesamiento
- **Interrupci√≥n controlada**: Manejo elegante de Ctrl+C

In [None]:
print("\n" + "="*60)
print("INICIANDO CONSUMO Y PROCESAMIENTO DE MENSAJES")
print("="*60)
print("Presiona Ctrl+C para detener el consumo de forma segura\n")

# Inicializar contadores y estad√≠sticas
message_count = 0
successful_inserts = 0
failed_inserts = 0
validation_errors = 0
start_time = time.time()
category_counter = Counter()
error_log = []

try:
    for kafka_message in consumer:
        message_data = kafka_message.value

        # Validar estructura del mensaje
        is_valid, validation_error = validate_message_structure(message_data)

        if not is_valid:
            validation_errors += 1
            print(f"‚ö†Ô∏è Mensaje #{message_count + 1} - Error de validaci√≥n: {validation_error}")
            error_log.append(f"Validaci√≥n - Offset {kafka_message.offset}: {validation_error}")
            continue

        # Mostrar informaci√≥n del mensaje recibido
        print(f"\nüì® Mensaje #{message_count + 1} recibido")
        print(f"üìä Partici√≥n: {kafka_message.partition} | Offset: {kafka_message.offset}")

        # Extraer informaci√≥n clave del producto
        category = message_data.get('search_category', 'N/A')
        title = message_data.get('title', 'Sin t√≠tulo')
        price = message_data.get('price', 'N/A')
        rating = message_data.get('rating', 'N/A')
        reviews = message_data.get('reviews', 0)
        source = message_data.get('source', 'N/A')

        # Mostrar informaci√≥n del producto
        print(f"Categor√≠a: {category}")
        print(f"Timestamp: {message_data.get('timestamp', 'N/A')}")
        print(f"Producto: {title[:60]}{'...' if len(title) > 60 else ''}")
        print(f"Precio: {price}")
        print(f"Rating: {rating} ({reviews} reviews)")
        print(f"Fuente: {source}")

        # Actualizar contadores
        category_counter[category] += 1

        # Procesar y estructurar datos para MongoDB
        try:
            mongo_doc = process_product_data(kafka_message, message_data)

            # Insertar en MongoDB
            result = collection.insert_one(mongo_doc)
            print(f"üíæ Almacenado en MongoDB con ID: {result.inserted_id}")

            successful_inserts += 1
            message_count += 1

            # Mostrar estad√≠sticas cada 10 mensajes
            if message_count % 10 == 0:
                display_processing_stats(message_count, start_time, category_counter)

        except Exception as e:
            failed_inserts += 1
            error_msg = f"Error MongoDB - Offset {kafka_message.offset}: {str(e)}"
            print(f"‚ùå Error al insertar en MongoDB: {e}")
            error_log.append(error_msg)
            logger.error(error_msg)

        # Pausa opcional para visualizaci√≥n
        time.sleep(0.1)

except KeyboardInterrupt:
    print("\nüõë Consumo detenido por el usuario (Ctrl+C)")
except Exception as e:
    print(f"\n‚ùå Error cr√≠tico durante el consumo: {e}")
    logger.error(f"Error cr√≠tico: {e}")
finally:
    print("\nüîÑ Finalizando consumer...")
    consumer.close()

# Estad√≠sticas Finales y Reportes

### Generamos un reporte final del procesamiento:
- M√©tricas de rendimiento b√°sicas
- Conteo de √©xitos y errores
- Estado final de la conexi√≥n

In [None]:
# Calcular m√©tricas finales b√°sicas
elapsed_time = time.time() - start_time
processing_rate = message_count / elapsed_time if elapsed_time > 0 else 0
success_rate = (successful_inserts / (successful_inserts + failed_inserts)) * 100 if (successful_inserts + failed_inserts) > 0 else 0

print("\n" + "="*60)
print("üìã RESUMEN DE PROCESAMIENTO COMPLETADO")
print("="*60)

# M√©tricas b√°sicas
print(f"\nTiempo total de procesamiento: {elapsed_time:.2f} segundos")
print(f"Mensajes procesados exitosamente: {successful_inserts}")
print(f"Velocidad promedio: {processing_rate:.2f} mensajes/segundo")

if validation_errors > 0:
    print(f"‚ö†Ô∏è Errores de validaci√≥n: {validation_errors}")
if failed_inserts > 0:
    print(f"‚ùå Errores de inserci√≥n: {failed_inserts}")

print(f"üéØ Tasa de √©xito: {success_rate:.1f}%")

# Estado final simple
print(f"\nEstado Final:")
print(f"   Datos guardados en MongoDB: fitness_supplements.products")
print(f"   Los datos est√°n listos para el dashboard de Streamlit")
print(f"   Finalizado: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

# Solo mostrar errores si los hay
if error_log:
    print(f"\n‚ö†Ô∏è Se registraron {len(error_log)} errores - revisar logs para detalles")

print("\n‚úÖ Consumer finalizado correctamente")