## 1. Importar Librer√≠as

In [None]:
from collections.abc import Sequence
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
import json
import redis.asyncio as redis
from agent_framework import ChatMessage
import asyncio
import os
from azure.identity import AzureCliCredential
from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from dotenv import load_dotenv

load_dotenv()

## 2. Modelo de Estado

In [None]:
class RedisStoreState(BaseModel):
    """State model for serializing Redis chat message store data."""
    thread_id: str
    redis_url: str | None = None
    key_prefix: str = "chat_messages"
    max_messages: int | None = None

print("‚úÖ RedisStoreState definido")

## 3. RedisChatMessageStore

Implementaci√≥n de almacenamiento en Redis.

In [None]:
class RedisChatMessageStore:
    """Redis-backed implementation of ChatMessageStore using Redis Lists."""

    def __init__(
        self,
        redis_url: str | None = None,
        thread_id: str | None = None,
        key_prefix: str = "chat_messages",
        max_messages: int | None = None,
    ) -> None:
        if redis_url is None:
            raise ValueError("redis_url is required for Redis connection")

        self.redis_url = redis_url
        self.thread_id = thread_id or f"thread_{uuid4()}"
        self.key_prefix = key_prefix
        self.max_messages = max_messages
        self._redis_client = redis.from_url(redis_url, decode_responses=True)

    @property
    def redis_key(self) -> str:
        return f"{self.key_prefix}:{self.thread_id}"

    async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
        if not messages:
            return

        serialized_messages = [self._serialize_message(msg) for msg in messages]
        await self._redis_client.rpush(self.redis_key, *serialized_messages)

        if self.max_messages is not None:
            current_count = await self._redis_client.llen(self.redis_key)
            if current_count > self.max_messages:
                await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1)

    async def list_messages(self) -> list[ChatMessage]:
        redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1)
        messages = []
        for serialized_message in redis_messages:
            message = self._deserialize_message(serialized_message)
            messages.append(message)
        return messages

    async def serialize_state(self, **kwargs: Any) -> Any:
        state = RedisStoreState(
            thread_id=self.thread_id,
            redis_url=self.redis_url,
            key_prefix=self.key_prefix,
            max_messages=self.max_messages,
        )
        return state.model_dump(**kwargs)

    async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
        if serialized_store_state:
            state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
            self.thread_id = state.thread_id
            self.key_prefix = state.key_prefix
            self.max_messages = state.max_messages

            if state.redis_url and state.redis_url != self.redis_url:
                self.redis_url = state.redis_url
                self._redis_client = redis.from_url(self.redis_url, decode_responses=True)

    def _serialize_message(self, message: ChatMessage) -> str:
        message_dict = message.model_dump()
        return json.dumps(message_dict, separators=(",", ":"))

    def _deserialize_message(self, serialized_message: str) -> ChatMessage:
        message_dict = json.loads(serialized_message)
        return ChatMessage.model_validate(message_dict)

    async def clear(self) -> None:
        await self._redis_client.delete(self.redis_key)

    async def aclose(self) -> None:
        await self._redis_client.aclose()

print("‚úÖ RedisChatMessageStore definido")

## 4. Crear Agente con Redis Store

In [None]:
# Create the chat agent with custom message store factory
agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(
        credential=AzureCliCredential(),
        endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        deployment_name=os.getenv("MODEL")
    ),
    name="Joker",
    instructions="You are good at telling jokes.",
    chat_message_store_factory=lambda: RedisChatMessageStore(
        redis_url="redis://localhost:6379"
    )
)

print("‚úÖ Agente creado con Redis message store")

## 5. Usar el Agente

**Nota**: Requiere Redis corriendo en localhost:6379

In [None]:
async def main():
    """Usa el agente con persistencia en Redis"""
    thread = agent.get_new_thread()
    response = await agent.run("Tell me a joke about pirates", thread=thread)
    print(f"ü§ñ {response.text}")

print("‚úÖ Funci√≥n main() definida")
print("\n‚ö†Ô∏è Aseg√∫rate de tener Redis corriendo antes de ejecutar")
print("   Comando: docker run -d -p 6379:6379 redis")

In [None]:
# Descomentar para ejecutar
# await main()

## Conclusi√≥n

### 1. **Custom Message Stores**
- Implementar interfaz de ChatMessageStore
- M√©todos: add_messages, list_messages, serialize_state, deserialize_state
- Integraci√≥n con ChatAgent via factory

### 2. **Redis Integration**
- Redis Lists para almacenamiento ordenado
- LTRIM para limitar tama√±o
- Serializaci√≥n JSON de mensajes

### 3. **Benefits**
- Distribuci√≥n entre instancias
- TTL autom√°tico
- Cache compartido
- Alta disponibilidad

### Aplicaciones:
- Multi-server deployments
- Load balancing
- Session sharing
- Analytics pipelines