## 채팅 기록을 외부 저장소에 저장하기(ex, Redis)




> **참고**  
>
>로컬 컴퓨터에 Redis를 설치하려면 다음 링크를 방문하세요. Windows용 Redis 설치 프로그램을 다운로드할 수 있습니다. 
>https://github.com/microsoftarchive/redis/releases 
>
> Redis Desktop Manager와 같은 GUI 도구를 사용하여 Redis 데이터를 시각화하고 관리할 수도 있습니다.
> https://github.com/qishibo/AnotherRedisDesktopManager?tab=readme-ov-file


이 과정에서는 사용자 지정 ChatMessageStore를 구현하고 이를 ChatAgent와 함께 사용하여, 에이전트 채팅 기록을 외부 저장소에 저장하는 방법을 보여줍니다 

기본적으로, ChatAgent를 사용할 경우, 채팅 기록은 AgentThread 객체의 메모리에 저장되거나, 서비스가 지원할 경우 내부의 추론 서비스에 저장됩니다. 하지만, 서비스 내부에 채팅 기록을 저장하면 안되거나 저장할 수 없는 경우에는 기본적인 메모리 저장 방식 대신 채팅 기록을 영구 저장하는 사용자 지정 저장소를 제공할 수 있습니다.

#### 사용자 지정 채팅 메시지 저장소 고려사항

사용자 정의를 ChatMessageStore를 생성하려면, ChatMessageStore 프로토콜을 구현하고 필요한 메서드에 대한 구현을 제공해야 합니다. 구현해야 할 가장 중요한 메서드들은 다음과 같습니다.

**메시지를 저장하고 가져오는 메서드**

add_messages - 저장소에 새 메시지를 추가하기 위해 호출됩니다.  
list_messages - 저장소에서 메시지를 가져오기 위해 호출됩니다.

list_messages 메서드는 메시지를 시간 순서대로 오름차순으로 반환해야 합니다. 반환된 모든 메시지는 내부 채팅 클라이언트를 호출할 때 ChatAgent에 의해서 사용됩니다. 따라서 이 메서드는 내부 모델의 한계를 고려하여, 모델이 처리할 수 있는 만큼의 메시지만 반환하는 것이 중요합니다.

채팅 기록 요약이나 트리밍(trimming)과 같이 기록을 축소하는 로직은 list_messages에서 메시지를 반환하기 전에 수행되어야 합니다.

**직렬화**

스레드가 생성될 때 및 스레드가 직렬화된 상태로부터 재개될 때, ChatMessageStore 인스턴스가 생성되고 AgentThread에 연결됩니다.

채팅 기록을 구성하는 실제 메시지는 외부 저장소에 저장되지만, ChatMessageStore 인스턴스는 외부 저장소에 있는 채팅 기록을 식별하기 위해 Key 또는 기타 상태 정보를 저장해야 할 수 있습니다.

스레드의 지속하려면, ChatMessageStore 프로토콜의 `serialize_state` 및 `deserialize_state` 메서드를 구현해야 합니다. 이러한 메서드를 사용하면 스레드를 재개할 때 저장소 상태를 저장하고 복원할 수 있습니다.

#### ChatMessageStore 구현 예시

다음 샘플 구현은 Redis의 Lists 데이터 구조를 사용하여 채팅 메시지를 Redis에 저장합니다.

add_messages는 RPUSH를 사용하여 메시지를 Redis에 저장하고, 시간 순서대로 목록의 끝에 추가합니다.  
list_messages는 LRANGE를 사용하여 Redis에서 현재 스레드의 메시지를 가져오고, 시간 순서대로 오름차순으로 반환합니다.

첫 번째 메시지를 가져오면, 저장소는 해당 스레드에 대한 고유 키를 생성하고, 이 키는 이어지는 호출에서 Redis의 채팅 기록을 식별하는 데 사용됩니다.

고유한 키 및 기타 설정 정보도 저장되며, `serialize_state` 및 `deserialize_state` 메서드를 사용하여 직렬화 및 역직렬화될 수 있습니다. 따라서, 이 상태는 AgentThread의 일부로 저장되며, 나중에 스레드가 다시 재개되면 동일한 채팅 기록을 사용하여 대화를 이어갈 수 있습니다.

In [None]:
from collections.abc import Sequence
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
import json
import redis.asyncio as redis
from agent_framework import ChatMessage


class RedisStoreState(BaseModel):
    """State model for serializing and deserializing Redis chat message store data."""

    thread_id: str
    redis_url: str | None = None
    key_prefix: str = "chat_messages"
    max_messages: int | None = None


class RedisChatMessageStore:
    """Redis-backed implementation of ChatMessageStore using Redis Lists."""

    def __init__(
        self,
        redis_url: str | None = None,
        thread_id: str | None = None,
        key_prefix: str = "chat_messages",
        max_messages: int | None = None,
    ) -> None:
        """Initialize the Redis chat message store.

        Args:
            redis_url: Redis connection URL (for example, "redis://localhost:6379").
            thread_id: Unique identifier for this conversation thread.
                      If not provided, a UUID will be auto-generated.
            key_prefix: Prefix for Redis keys to namespace different applications.
            max_messages: Maximum number of messages to retain in Redis.
                         When exceeded, oldest messages are automatically trimmed.
        """
        if redis_url is None:
            raise ValueError("redis_url is required for Redis connection")

        self.redis_url = redis_url
        self.thread_id = thread_id or f"thread_{uuid4()}"
        self.key_prefix = key_prefix
        self.max_messages = max_messages

        # Initialize Redis client
        self._redis_client = redis.from_url(redis_url, decode_responses=True)

    @property
    def redis_key(self) -> str:
        """Get the Redis key for this thread's messages."""
        return f"{self.key_prefix}:{self.thread_id}"

    async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
        """Add messages to the Redis store.

        Args:
            messages: Sequence of ChatMessage objects to add to the store.
        """
        if not messages:
            return

        # Serialize messages and add to Redis list
        serialized_messages = [self._serialize_message(msg) for msg in messages]
        await self._redis_client.rpush(self.redis_key, *serialized_messages)

        # Apply message limit if configured
        if self.max_messages is not None:
            current_count = await self._redis_client.llen(self.redis_key)
            if current_count > self.max_messages:
                # Keep only the most recent max_messages using LTRIM
                await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1)

    async def list_messages(self) -> list[ChatMessage]:
        """Get all messages from the store in chronological order.

        Returns:
            List of ChatMessage objects in chronological order (oldest first).
        """
        # Retrieve all messages from Redis list (oldest to newest)
        redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1)

        messages = []
        for serialized_message in redis_messages:
            message = self._deserialize_message(serialized_message)
            messages.append(message)

        return messages

    async def serialize_state(self, **kwargs: Any) -> Any:
        """Serialize the current store state for persistence.

        Returns:
            Dictionary containing serialized store configuration.
        """
        state = RedisStoreState(
            thread_id=self.thread_id,
            redis_url=self.redis_url,
            key_prefix=self.key_prefix,
            max_messages=self.max_messages,
        )
        return state.model_dump(**kwargs)

    async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
        """Deserialize state data into this store instance.

        Args:
            serialized_store_state: Previously serialized state data.
            **kwargs: Additional arguments for deserialization.
        """
        if serialized_store_state:
            state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
            self.thread_id = state.thread_id
            self.key_prefix = state.key_prefix
            self.max_messages = state.max_messages

            # Recreate Redis client if the URL changed
            if state.redis_url and state.redis_url != self.redis_url:
                self.redis_url = state.redis_url
                self._redis_client = redis.from_url(self.redis_url, decode_responses=True)

    def _serialize_message(self, message: ChatMessage) -> str:
        """Serialize a ChatMessage to JSON string."""
        # Convert ChatMessage to dict using vars() or __dict__
        if hasattr(message, 'model_dump'):
            message_dict = message.model_dump()
        elif hasattr(message, 'to_dict'):
            message_dict = message.to_dict()
        else:
            # Fallback: use __dict__ or vars()
            message_dict = vars(message) if hasattr(message, '__dict__') else dict(message)
        return json.dumps(message_dict, separators=(",", ":"), default=str)

    def _deserialize_message(self, serialized_message: str) -> ChatMessage:
        """Deserialize a JSON string to ChatMessage."""
        message_dict = json.loads(serialized_message)
        # Try different deserialization methods
        if hasattr(ChatMessage, 'model_validate'):
            return ChatMessage.model_validate(message_dict)
        elif hasattr(ChatMessage, 'from_dict'):
            return ChatMessage.from_dict(message_dict)
        else:
            # Fallback: use constructor with **kwargs
            return ChatMessage(**message_dict)

    async def clear(self) -> None:
        """Remove all messages from the store."""
        await self._redis_client.delete(self.redis_key)

    async def aclose(self) -> None:
        """Close the Redis connection."""
        await self._redis_client.aclose()

#### ChatAgent에서 사용자 지정 ChatMessageStore 사용하기

사용자 지정 ChatMessageStore를 사용하려면, 에이전트를 생성할 때 chat_message_store_factory를 제공해야 합니다. 이 팩토리를 통해 에이전트는 각 스레드에 대해 원하는 ChatMessageStore의 새 인스턴스를 생성할 수 있습니다.

In [None]:
from azure.identity import AzureCliCredential
from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient

# Create the chat agent with custom message store factory
agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()),
    name="Joker",
    instructions="You are good at telling jokes who responds in korean.",
    chat_message_store_factory=lambda: RedisChatMessageStore(
        redis_url="redis://localhost:6379"
    )
)

# Use the agent with persistent chat history
thread = agent.get_new_thread()
response = await agent.run("Tell me a joke about pirates", thread=thread)
print(response.text)

몇가지 대화를 더 이어가 봅니다.

In [None]:
response = await agent.run("Tell me a joke about Robot", thread=thread)
print(response.text)

In [None]:
response = await agent.run("Tell me a joke about Human", thread=thread)
print(response.text)

이제 Redis Desktop Manager를 사용하여 Redis에 저장된 채팅 기록을 확인할 수 있습니다. 각 스레드에 대해 고유한 키가 생성되며, 해당 키 아래에 메시지들이 시간 순서대로 저장됩니다.

![Redis UI](images/maf-redis.png)
