### AutoGen Memory (RAG)

In [None]:
import re

from typing import List
import aiofiles
import aiohttp

from autogen_core.memory import Memory, MemoryContent, MemoryMimeType

In [None]:
class SimpleDocumentIndexer:
    """Basic document indexer for AutoGen Memory."""

    def __init__(self, memory: Memory, chunk_size: int = 1500) -> None:
        self.memory = memory
        self.chunk_size = chunk_size

    async def _fetch_content(self, source: str) -> str:
        """Fetch content from URL or file."""
        if source.startswith(("http://", "https://")):
            async with aiohttp.ClientSession() as session:
                async with session.get(source) as response:
                    return await response.text()
        else:
            async with aiofiles.open(source, "r", encoding="utf-8") as f:
                return await f.read()

    def _strip_html(self, text: str) -> str:
        """Remove HTML tags and normalize whitespace."""
        text = re.sub(r"<[^>]*>", " ", text)
        text = re.sub(r"\s+", " ", text)
        return text.strip()

    def _split_text(self, text: str) -> List[str]:
        """Split text into fixed-size chunks."""
        chunks: list[str] = []
        # Just split text into fixed-size chunks
        for i in range(0, len(text), self.chunk_size):
            chunk = text[i: i + self.chunk_size]
            chunks.append(chunk.strip())
        return chunks

    async def index_documents(self, sources: List[str]) -> int:
        """Index documents into memory."""
        total_chunks = 0

        for source in sources:
            try:
                content = await self._fetch_content(source)

                # Strip HTML if content appears to be HTML
                if "<" in content and ">" in content:
                    content = self._strip_html(content)

                chunks = self._split_text(content)

                for i, chunk in enumerate(chunks):
                    await self.memory.add(
                        MemoryContent(
                            content=chunk, mime_type=MemoryMimeType.TEXT, metadata={
                                "source": source, "chunk_index": i}
                        )
                    )

                total_chunks += len(chunks)

            except Exception as e:
                print(f"Error indexing {source}: {str(e)}")

        return total_chunks



In [None]:
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import AzureOpenAIChatCompletionClient
from autogen_agentchat.ui import Console
from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType
from autogen_ext.memory.chromadb import ChromaDBVectorMemory, PersistentChromaDBVectorMemoryConfig

from pathlib import Path
from dotenv import load_dotenv

import os

In [None]:
rag_memory = ChromaDBVectorMemory(
    config=PersistentChromaDBVectorMemoryConfig(
        collection_name="user_memory",
        persistent_path=os.path.join(str(Path.home()), ".chroma_autogen"),
        k=2,
        scrore_threshold=0.5,
    )
)

await rag_memory.clear()

In [None]:
async def index_autogen_documentation() -> None:
    indexer = SimpleDocumentIndexer(memory = rag_memory, chunk_size=1500)
    sources = [
        "https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/tutorial/agents.html",
        "https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/tutorial/teams.html",
        "https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/tutorial/termination.html",
        "https://raw.githubusercontent.com/microsoft/autogen/refs/heads/main/README.md"
    ]
    
    chunks: int = await indexer.index_documents(sources)
    print(f"Indexed {chunks} chunks from {len(sources)} sources.")
    
await index_autogen_documentation()

In [None]:
load_dotenv(override=True)

azure_openai_api_key = os.getenv("AZURE_OPENAI_API_KEY")
azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
azure_openai_api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2023-05-15")
azure_openai_deployment_name = os.getenv(
    "AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-35-turbo")

if not azure_openai_api_key or \
        not azure_openai_endpoint or \
        not azure_openai_api_version or \
        not azure_openai_deployment_name:
    raise ValueError(
        "Please set Azure Open AI Endpoint Details in your environment variables.")


In [None]:
model_client = AzureOpenAIChatCompletionClient(
    azure_deployment=azure_openai_deployment_name,
    model=azure_openai_deployment_name,
    api_key=azure_openai_api_key,
    azure_endpoint=azure_openai_endpoint,
    api_version=azure_openai_api_version,
)

In [None]:
rag_assistant = AssistantAgent(
    name="RAGAssistant",
    description="An assistant that can answer questions using the indexed AutoGen documentation.",
    model_client=model_client,
    memory=[rag_memory],
)

In [None]:
stream = rag_assistant.run_stream(task = "What is AutoGen AgentChat?")

await Console(stream)

In [None]:
await rag_memory.close()