# üì® Redis Pub/Sub - Pr√°ctica

*Autor: @mC√°rdenas 2025*

Exploraremos el patr√≥n Publicador/Suscriptor de Redis.

In [None]:
import redis
import json
import threading
import time
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
print(f"‚úÖ Conectado: {r.ping()}")

## 1. Publicar y Suscribir B√°sico

En Pub/Sub, necesitamos dos conexiones: una para publicar y otra para suscribir.

In [None]:
# Crear suscriptor en un hilo separado
mensajes_recibidos = []

def escuchar_mensajes(canal, duracion=5):
    """Escucha mensajes durante N segundos."""
    r_sub = redis.Redis(host='localhost', port=6379, decode_responses=True)
    pubsub = r_sub.pubsub()
    pubsub.subscribe(canal)
    
    print(f"üì• Suscrito a '{canal}'")
    
    inicio = time.time()
    for mensaje in pubsub.listen():
        if mensaje['type'] == 'message':
            mensajes_recibidos.append(mensaje['data'])
            print(f"üì® Recibido: {mensaje['data']}")
        
        if time.time() - inicio > duracion:
            break
    
    pubsub.unsubscribe()
    print("üì• Desuscrito")

# Iniciar suscriptor en hilo
hilo = threading.Thread(target=escuchar_mensajes, args=("demo:canal", 5))
hilo.start()

time.sleep(0.5)  # Esperar a que se suscriba

# Publicar mensajes
for i in range(3):
    num_subs = r.publish("demo:canal", f"Mensaje {i+1}")
    print(f"üì§ Publicado mensaje {i+1} ‚Üí {num_subs} suscriptor(es)")
    time.sleep(0.3)

hilo.join()
print(f"‚úÖ Total recibidos: {len(mensajes_recibidos)}")

## 2. Suscripci√≥n por Patrones

Con `PSUBSCRIBE` podemos suscribirnos a patrones con wildcards.

In [None]:
mensajes_patron = []

def escuchar_patron(patron, duracion=5):
    """Escucha un patr√≥n de canales."""
    r_sub = redis.Redis(host='localhost', port=6379, decode_responses=True)
    pubsub = r_sub.pubsub()
    pubsub.psubscribe(patron)
    
    print(f"üì• Suscrito al patr√≥n '{patron}'")
    
    inicio = time.time()
    for mensaje in pubsub.listen():
        if mensaje['type'] == 'pmessage':
            canal = mensaje['channel']
            data = mensaje['data']
            mensajes_patron.append((canal, data))
            print(f"üì® [{canal}]: {data}")
        
        if time.time() - inicio > duracion:
            break
    
    pubsub.punsubscribe()

# Escuchar todos los eventos
hilo = threading.Thread(target=escuchar_patron, args=("eventos:*", 5))
hilo.start()

time.sleep(0.5)

# Publicar a diferentes canales
canales = [
    ("eventos:login", "Usuario user123 conectado"),
    ("eventos:compra", "Compra de 99.99‚Ç¨"),
    ("eventos:error", "Error en servidor"),
    ("otros:no_capturado", "Este no se captura"),  # No coincide
]

for canal, mensaje in canales:
    num = r.publish(canal, mensaje)
    print(f"üì§ Publicado en '{canal}' ‚Üí {num} suscriptor(es)")
    time.sleep(0.2)

hilo.join()
print(f"‚úÖ Recibidos con patr√≥n: {len(mensajes_patron)}")

## 3. Sistema de Notificaciones

In [None]:
class SistemaNotificaciones:
    def __init__(self):
        self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    
    def notificar_usuario(self, user_id: str, tipo: str, mensaje: str):
        """Env√≠a notificaci√≥n a un usuario."""
        canal = f"notif:{user_id}"
        data = json.dumps({
            "tipo": tipo,
            "mensaje": mensaje,
            "timestamp": datetime.now().isoformat()
        })
        return self.r.publish(canal, data)
    
    def notificar_todos(self, tipo: str, mensaje: str):
        """Env√≠a notificaci√≥n a todos."""
        return self.notificar_usuario("broadcast", tipo, mensaje)

# Demo
notif = SistemaNotificaciones()

# Sin suscriptores, publish devuelve 0
print("Sin suscriptores:")
n = notif.notificar_usuario("user123", "info", "Test sin suscriptores")
print(f"  Suscriptores: {n}")

# Con suscriptor
print("Con suscriptor:")
notifs_recibidas = []

def escuchar_notifs(user_id, duracion=3):
    r_sub = redis.Redis(host='localhost', port=6379, decode_responses=True)
    pubsub = r_sub.pubsub()
    pubsub.subscribe(f"notif:{user_id}", "notif:broadcast")
    
    inicio = time.time()
    for msg in pubsub.listen():
        if msg['type'] == 'message':
            data = json.loads(msg['data'])
            notifs_recibidas.append(data)
            print(f"  üîî {data['tipo']}: {data['mensaje']}")
        if time.time() - inicio > duracion:
            break

hilo = threading.Thread(target=escuchar_notifs, args=("user123", 3))
hilo.start()
time.sleep(0.3)

notif.notificar_usuario("user123", "mensaje", "Tienes un nuevo mensaje")
notif.notificar_todos("sistema", "Mantenimiento en 10 minutos")

hilo.join()

## 4. Informaci√≥n de Canales

In [None]:
# Crear algunos suscriptores
r_sub1 = redis.Redis(host='localhost', port=6379, decode_responses=True)
r_sub2 = redis.Redis(host='localhost', port=6379, decode_responses=True)

ps1 = r_sub1.pubsub()
ps2 = r_sub2.pubsub()

ps1.subscribe("info:canal1", "info:canal2")
ps2.subscribe("info:canal1")

time.sleep(0.3)

# Ver canales activos
canales = r.pubsub_channels("info:*")
print(f"üì∫ Canales activos: {canales}")

# Ver n√∫mero de suscriptores por canal
numsub = r.pubsub_numsub("info:canal1", "info:canal2")
print(f"üë• Suscriptores por canal: {dict(numsub)}")

# Limpiar
ps1.unsubscribe()
ps2.unsubscribe()

## 5. Limitaci√≥n: Sin Persistencia

Los mensajes NO se guardan. Si no hay suscriptores, se pierden.

In [None]:
# Publicar SIN suscriptores
canal = "demo:sin_suscriptor"

n1 = r.publish(canal, "Mensaje 1 - PERDIDO")
n2 = r.publish(canal, "Mensaje 2 - PERDIDO")
print(f"Publicados sin suscriptores: {n1}, {n2}")

# Ahora suscribir
mensajes_perdidos = []

def escuchar_tarde(canal, duracion=2):
    r_sub = redis.Redis(host='localhost', port=6379, decode_responses=True)
    pubsub = r_sub.pubsub()
    pubsub.subscribe(canal)
    
    inicio = time.time()
    for msg in pubsub.listen():
        if msg['type'] == 'message':
            mensajes_perdidos.append(msg['data'])
        if time.time() - inicio > duracion:
            break

hilo = threading.Thread(target=escuchar_tarde, args=(canal, 2))
hilo.start()
time.sleep(0.3)

# Publicar CON suscriptor
n3 = r.publish(canal, "Mensaje 3 - RECIBIDO")
print(f"Publicado con suscriptor: {n3}")

hilo.join()

print(f"‚ö†Ô∏è Mensajes recibidos: {mensajes_perdidos}")
print("   Los mensajes 1 y 2 se perdieron porque no hab√≠a suscriptores.")

## üìö Resumen

| Caracter√≠stica | Pub/Sub | Streams |
|----------------|---------|--------|
| Persistencia | ‚ùå No | ‚úÖ S√≠ |
| Historial | ‚ùå No | ‚úÖ S√≠ |
| ACK | ‚ùå No | ‚úÖ S√≠ |
| Velocidad | ‚ö° M√°xima | ‚ö° Alta |
| Caso de uso | Tiempo real ef√≠mero | Event sourcing |