In [7]:
# 0. Environment Setup — dependencies, dotenv loading, base imports

# Install required packages (safe to run multiple times)
!pip install -q markdown pypdf chromadb openai python-dotenv

import os
from pathlib import Path
import logging
from dotenv import load_dotenv

# Load environment variables from a .env file (if present)
load_dotenv()

# Configure logging early so every later cell can use it
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)

logger = logging.getLogger("rag-prototype")

logger.info("Environment setup completed successfully.")


2025-12-02 16:56:32,251 [INFO] Environment setup completed successfully.


# Project 03 — Retrieval-Augmented Generation (RAG) Prototype

This notebook implements an end-to-end Retrieval-Augmented Generation (RAG)
prototype to improve access to internal technical documentation.

**Repository**
- `llm-agents`

**Environment**
- Python (Jupyter notebook / GitHub Codespaces)
- Vector store: ChromaDB
- LLM provider: OpenAI API

**Goal**
Build a clear, well-documented prototype that:
- loads real technical documents (PDF / Markdown),
- preprocesses and chunks the content,
- generates embeddings,
- indexes them in a vector store (ChromaDB),
- retrieves relevant chunks for a user query,
- calls an LLM to generate grounded answers,
- logs basic information for later quality review.

> This is an internal prototype for exploration and discussion.
> It is **not** a production-ready system.


In [8]:
# 1. Core Imports — document parsing, vector store, LLM client, helpers

from dataclasses import dataclass
from typing import List, Dict, Any

import textwrap

# Document parsing (PDF, Markdown)
import markdown
from pypdf import PdfReader

# Vector store (ChromaDB)
import chromadb
from chromadb.config import Settings

# OpenAI client + error classes for safe handling
from openai import OpenAI, RateLimitError, AuthenticationError

logger.info("Core imports loaded successfully.")


2025-12-02 16:56:32,259 [INFO] Core imports loaded successfully.


In [9]:
# 2. Configuration — environment variables, OpenAI client, directories, vector store

# Load API key safely (coming from environment or .env)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if not OPENAI_API_KEY:
    raise ValueError(
        "OPENAI_API_KEY is not set. "
        "Please define it in a .env file or export it as an environment variable."
    )

# Initialize OpenAI client
openai_client = OpenAI(api_key=OPENAI_API_KEY)

# Define the data directory for documents (PDF/Markdown)
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)

# Initialize ChromaDB (local, in-memory by default)
chroma_client = chromadb.Client(
    Settings(
        anonymized_telemetry=False,   # keep telemetry disabled
    )
)

# Create or load a collection for technical documents
COLLECTION_NAME = "technical_docs_rag"
collection = chroma_client.get_or_create_collection(name=COLLECTION_NAME)

# Logging for transparency
logger.info("Configuration loaded successfully.")
logger.info("DATA_DIR set to: %s", DATA_DIR.resolve())
logger.info("ChromaDB collection in use: %s", COLLECTION_NAME)


ValueError: OPENAI_API_KEY is not set. Please define it in a .env file or export it as an environment variable.

## 3. Document loading

In this prototype, we load internal technical documentation from a local
`data/` directory.

Supported formats:
- `.pdf`
- `.md` (Markdown)

Each document is:
1. Loaded from disk,
2. Converted to plain text,
3. Stored in a simple `Document` structure,
4. Prepared for the chunking step in the next section.

> In future iterations, this layer could be replaced by:
> - cloud storage (e.g., S3, GCS),
> - internal document management APIs,
> - or automated exports from existing tools.


In [None]:
# 3.1 Helpers to load PDFs and Markdown files

from dataclasses import dataclass

@dataclass
class Document:
    doc_id: str
    source_path: Path
    text: str


def load_pdf(path: Path) -> str:
    """Extract text from a PDF file."""
    logger.info("Loading PDF file: %s", path)
    reader = PdfReader(str(path))
    pages_text = []

    for page in reader.pages:
        page_text = page.extract_text() or ""
        pages_text.append(page_text)

    return "\n\n".join(pages_text)


def load_markdown(path: Path) -> str:
    """Convert Markdown file to a plain-text representation."""
    logger.info("Loading Markdown file: %s", path)

    raw = path.read_text(encoding="utf-8")

    # Convert Markdown -> HTML
    html = markdown.markdown(raw)

    # Very lightweight HTML -> text
    text = (
        html.replace("<p>", "\n\n")
            .replace("</p>", "")
            .replace("<code>", "`")
            .replace("</code>", "`")
            .replace("<strong>", "")
            .replace("</strong>", "")
    )

    return text


def load_documents(data_dir: Path) -> List[Document]:
    """Load all supported documents from the given directory."""
    docs = []

    for path in sorted(data_dir.glob("**/*")):
        if path.is_dir():
            continue

        suffix = path.suffix.lower()

        if suffix == ".pdf":
            text = load_pdf(path)
        elif suffix == ".md":
            text = load_markdown(path)
        else:
            continue

        doc_id = path.stem
        docs.append(Document(doc_id=doc_id, source_path=path, text=text))

    logger.info("Loaded %d documents from %s", len(docs), data_dir.resolve())
    return docs


In [None]:
# 3.2 Load and inspect documents

documents = load_documents(DATA_DIR)

if not documents:
    logger.warning(
        "No documents found in %s. Please add PDFs or Markdown files into the data/ folder and re-run this cell.",
        DATA_DIR.resolve()
    )
else:
    logger.info("Loaded %d documents.", len(documents))
    # Preview the first document (for debugging)
    print("=== First document preview ===\n")
    print("ID:", documents[0].doc_id)
    print("Source:", documents[0].source_path)
    print("\n--- Text (first 500 chars) ---\n")
    print(documents[0].text[:500])


2025-12-02 16:18:13,750 [INFO] Loaded 0 documents from /workspaces/llm-agents/Project 03 — Retrieval-Augmented Generation (RAG) Prototype/data


## 4. Preprocessing and chunking

RAG systems usually operate on **document chunks**, not on entire files.

In this prototype we keep preprocessing intentionally simple and focus on:
- preserving most of the original text, and
- splitting content into manageable chunks for embeddings and retrieval.

### Design choices (for this first version)

- Light preprocessing (no aggressive cleaning).
- Character-based chunking with a maximum size (e.g. ~1000 characters).
- Prefer splitting on paragraph boundaries when possible.
- Keep track of:
  - `doc_id`
  - `chunk_id`
  - source path

> In future iterations we can:
> - switch to token-based splitting,
> - add language-specific normalization,
> - or plug into higher-level frameworks (LangChain, LlamaIndex, etc.).


In [None]:
# 4.1 Chunking utilities

@dataclass
class DocumentChunk:
    doc_id: str
    chunk_id: int
    text: str
    source_path: str


def chunk_text(
    text: str,
    doc_id: str,
    source_path: Path,
    max_chars: int = 1000,
) -> List[DocumentChunk]:
    """
    Split a long text into smaller chunks based on paragraphs,
    keeping each chunk below `max_chars` characters.
    """
    paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]

    chunks: List[DocumentChunk] = []
    current_parts: List[str] = []
    current_len = 0
    chunk_id = 0

    for paragraph in paragraphs:
        paragraph_len = len(paragraph)

        # If adding this paragraph keeps us under the limit, append
        if current_len + paragraph_len + 2 <= max_chars:
            current_parts.append(paragraph)
            current_len += paragraph_len + 2
        else:
            # Close current chunk
            if current_parts:
                chunk_text_block = "\n\n".join(current_parts)
                chunks.append(
                    DocumentChunk(
                        doc_id=doc_id,
                        chunk_id=chunk_id,
                        text=chunk_text_block,
                        source_path=str(source_path),
                    )
                )
                chunk_id += 1

            # Start a new chunk with the current paragraph
            current_parts = [paragraph]
            current_len = paragraph_len

    # Flush remaining text
    if current_parts:
        chunk_text_block = "\n\n".join(current_parts)
        chunks.append(
            DocumentChunk(
                doc_id=doc_id,
                chunk_id=chunk_id,
                text=chunk_text_block,
                source_path=str(source_path),
            )
        )

    return chunks


def chunk_documents(docs: List[Document], max_chars: int = 1000) -> List[DocumentChunk]:
    """
    Apply `chunk_text` to all loaded documents.
    """
    all_chunks: List[DocumentChunk] = []

    for doc in docs:
        doc_chunks = chunk_text(
            text=doc.text,
            doc_id=doc.doc_id,
            source_path=doc.source_path,
            max_chars=max_chars,
        )
        all_chunks.extend(doc_chunks)

    logger.info(
        "Generated %d chunks from %d documents (max_chars=%d).",
        len(all_chunks),
        len(docs),
        max_chars,
    )
    return all_chunks


# Run chunking if we have documents
if documents:
    chunks: List[DocumentChunk] = chunk_documents(documents, max_chars=1000)
    logger.info("Example chunk: %s", chunks[0] if chunks else "No chunks created.")
else:
    chunks = []
    logger.warning("No documents available to chunk. 'chunks' list is empty.")




## 5. Embedding generation and vector store (ChromaDB)

With the documents chunked, the next step is to convert each chunk into a
numerical vector representation (embedding) and store it in a vector database.

### Why embeddings?

LLMs and RAG systems cannot search raw text efficiently.  
Instead, each chunk is transformed into a vector that captures its semantic meaning.  
These vectors allow similarity search (k-NN), enabling the system to find the
chunks most relevant to a user query.

### Why ChromaDB?

For this prototype we use **ChromaDB** because it is:
- lightweight,
- fast,
- easy to set up locally,
- Python-first,
- perfect for experimentation and internal demos.

### Pipeline in this section:

1. Generate embeddings using an OpenAI embedding model  
2. Clear or initialize the Chroma collection (for reproducibility)  
3. Upsert each chunk along with its metadata:
   - `doc_id`
   - `chunk_id`
   - `source_path`
4. Validate the number of items indexed  
5. Prepare for retrieval in the next section

> In future iterations we can add:
> - hybrid search (sparse + dense),
> - metadata filtering,
> - alternative vector stores (Weaviate, Pinecone, Milvus),
> - or custom embedding models.


In [None]:
# 5.1 Generate embeddings and index chunks into ChromaDB

EMBEDDING_MODEL = "text-embedding-3-small"   # lightweight, cost-efficient model


def embed_texts(texts: List[str]) -> List[List[float]]:
    """
    Generate embeddings for a list of texts using the configured OpenAI model.
    """
    logger.info("Generating embeddings for %d texts...", len(texts))

    response = openai_client.embeddings.create(
        model=EMBEDDING_MODEL,
        input=texts,
    )

    return [item.embedding for item in response.data]


def index_chunks(chunks: List[DocumentChunk]) -> None:
    """
    Index all chunks into the ChromaDB collection with:
      - id
      - embedding vector
      - original text
      - metadata (doc_id, chunk_id, source_path)
    """

    if not chunks:
        logger.warning("No chunks to index. Skipping embedding step.")
        return

    # Optional: clear existing data to ensure reproducible runs
    collection.delete(where={})
    logger.info("Existing ChromaDB collection cleared.")

    batch_size = 64
    total = len(chunks)

    for i in range(0, total, batch_size):
        batch = chunks[i : i + batch_size]
        ids = [f"{c.doc_id}-{c.chunk_id}" for c in batch]
        texts = [c.text for c in batch]
        metadata = [
            {
                "doc_id": c.doc_id,
                "chunk_id": c.chunk_id,
                "source_path": c.source_path,
            }
            for c in batch
        ]

        embeddings = embed_texts(texts)

        collection.add(
            ids=ids,
            embeddings=embeddings,
            documents=texts,
            metadatas=metadata,
        )

        logger.info(
            "Indexed batch %d–%d of %d chunks.",
            i,
            min(i + batch_size - 1, total - 1),
            total,
        )

    logger.info("Indexing completed. Total items in Chroma: %s", collection.count())


# Run indexing if chunks exist
if chunks:
    index_chunks(chunks)
else:
    logger.warning("No chunks found — skipping embedding + indexing.")




## 6. RAG Pipeline — Retrieval + Generation

With all chunks indexed in ChromaDB, we can now build the RAG pipeline.

### High-level flow

1. **User query**  
   A natural language question is received.

2. **Retrieval**  
   ChromaDB performs a similarity search (k-NN) using the embedding of the query.
   It returns the most relevant chunks based on semantic proximity.

3. **Prompt construction**  
   - Retrieved chunks are injected as context.  
   - System instructions are applied to ensure grounded, concise answers.  
   - Hallucination is discouraged by design.

4. **LLM generation**  
   The LLM receives the constructed prompt and generates a final answer
   strictly based on the provided context.

5. **Logging**  
   For each query we store:
   - the original question,
   - the retrieved document IDs and metadata,
   - the final model response.

### Why this structure?

This separation of responsibilities makes the prototype:
- easy to debug,
- easy to extend,
- production-friendly,
- and transparent for reviewers (leadership, engineers, auditors).

> In future iterations we can add:
> - hybrid search,
> - ranking & scoring layers,
> - evaluation datasets,
> - retrieval metrics,
> - or UI layers (e.g., Streamlit, Gradio).


In [None]:
# 6.1 RAG core functions — retrieval, prompt building, LLM call, logging

CHAT_MODEL = "gpt-4.1-mini"  # adjust if your org uses a different default model

# In-memory log of RAG interactions (for later inspection / evaluation)
rag_logs: List[Dict[str, Any]] = []


def retrieve_context(query: str, k: int = 4) -> Dict[str, Any]:
    """
    Retrieve top-k most relevant chunks from ChromaDB for a given query.
    """
    logger.info("Retrieving context for query: %s", query)

    results = collection.query(
        query_texts=[query],
        n_results=k,
    )

    # ChromaDB returns lists-of-lists (one per query)
    ids = results.get("ids", [[]])[0]
    documents = results.get("documents", [[]])[0]
    metadatas = results.get("metadatas", [[]])[0]

    logger.info("Retrieved %d chunks from ChromaDB.", len(ids))

    return {
        "ids": ids,
        "documents": documents,
        "metadatas": metadatas,
    }


def build_prompt(query: str, retrieved: Dict[str, Any]) -> str:
    """
    Build a grounded prompt using the retrieved context + user question.
    """
    context_blocks = []

    for meta, doc in zip(retrieved["metadatas"], retrieved["documents"]):
        header = f"[doc_id={meta.get('doc_id')} chunk_id={meta.get('chunk_id')}]"
        block = f"{header}\n{doc}"
        context_blocks.append(block)

    context_text = "\n\n---\n\n".join(context_blocks) if context_blocks else "No relevant context found."

    prompt = f"""
You are a technical assistant answering questions based ONLY on the documentation provided below.

If the documentation does not contain enough information to answer confidently,
say that the information is not available in the current documents.

# Documentation

{context_text}

# User question

{query}

# Instructions
- Answer in a concise and clear way.
- Reference relevant document ids and/or chunk ids when helpful.
- Do not invent features or details that are not supported by the documentation.
    """.strip()

    return prompt


def answer_question(query: str, k: int = 4) -> str:
    """
    Full RAG pipeline:
    1) retrieve context from ChromaDB
    2) build a grounded prompt
    3) call the LLM
    4) log query, retrieved metadata and answer

    This function also handles common API issues gracefully:
    - invalid / missing API key (AuthenticationError)
    - quota / rate-limit problems (RateLimitError)
    """
    retrieved = retrieve_context(query, k=k)
    prompt = build_prompt(query, retrieved)

    try:
        response = openai_client.chat.completions.create(
            model=CHAT_MODEL,
            messages=[
                {"role": "system", "content": "You are a helpful technical assistant."},
                {"role": "user", "content": prompt},
            ],
            temperature=0.1,
        )
    except AuthenticationError as e:
        logger.error("Authentication error when calling the LLM API: %s", e)
        return (
            "Authentication error when calling the LLM API. "
            "Please check if the API key is valid and correctly configured."
        )
    except RateLimitError as e:
        logger.error("Rate limit / quota error when calling the LLM API: %s", e)
        return (
            "Rate limit / quota error when calling the LLM API. "
            "Please verify the API quota / billing configuration for this key."
        )

    answer = response.choices[0].message.content

    # Store a compact log entry for later review
    log_entry = {
        "query": query,
        "retrieved_ids": retrieved["ids"],
        "retrieved_metadatas": retrieved["metadatas"],
        "answer": answer,
    }
    rag_logs.append(log_entry)

    logger.info("Query answered successfully. Retrieved chunks: %d", len(retrieved["ids"]))
    return answer


In [None]:
# 6.2 Manual smoke test for the RAG pipeline

test_questions = [
    "What is the main purpose of the system described in the documentation?",
    # You can add more questions here if desired, e.g.:
    # "How is authentication handled according to the docs?",
    # "Are there any limitations or known issues mentioned?"
]

for q in test_questions:
    print("\n" + "=" * 80)
    print("QUESTION:")
    print(q)
    print("\nANSWER:\n")

    answer = answer_question(q)
    print(answer)
    print("\n" + "=" * 80)


2025-12-02 16:30:22,619 [INFO] Retrieving context for query: What is the main purpose of the system described in the documentation?



QUESTION:
What is the main purpose of the system described in the documentation?

ANSWER:



2025-12-02 16:30:22,960 [INFO] HTTP Request: GET https://chroma-onnx-models.s3.amazonaws.com/all-MiniLM-L6-v2/onnx.tar.gz "HTTP/1.1 200 OK"
/home/codespace/.cache/chroma/onnx_models/all-MiniLM-L6-v2/onnx.tar.gz: 100%|██████████| 79.3M/79.3M [00:00<00:00, 104MiB/s] 
2025-12-02 16:30:24,879 [INFO] Retrieved 0 chunks from ChromaDB.
2025-12-02 16:30:25,339 [INFO] HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 401 Unauthorized"
2025-12-02 16:30:25,340 [ERROR] Authentication error when calling the LLM API: Error code: 401 - {'error': {'message': 'Incorrect API key provided: sk-xxxxx***********************xxxx. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'code': 'invalid_api_key', 'param': None}}


Authentication error when calling the LLM API. Please check if the API key is valid and correctly configured.



## 7. Next steps and known limitations

This notebook implements a full end-to-end RAG prototype with:

- document loading (PDF / Markdown),
- chunking,
- embedding generation,
- vector store indexing (ChromaDB),
- retrieval,
- grounded prompt construction,
- LLM-based answer generation,
- and structured logging for review.

### Known limitations (expected for a prototype)

- The pipeline uses a simple character-based chunker.
- No metadata filtering or ranking layer is applied.
- Evaluation metrics (recall, precision, mAP) are not included.
- The LLM call is synchronous and single-turn.
- ChromaDB runs locally and is not persistent across sessions.
- No UI layer (CLI / Streamlit / Gradio) has been added yet.
- Error handling covers only the most common API issues.

### Possible upcoming improvements

- Add token-based chunking (tiktoken or other tokenizer).
- Integrate a scalable vector store (Weaviate, Pinecone, Milvus).
- Implement hybrid search (BM25 + embeddings).
- Add RAG evaluation datasets (question/answer pairs).
- Introduce reranking (cross-encoders, ColBERT, LLM-based re-ranking).
- Build a simple UI to demonstrate the end-user experience.
- Containerize the prototype (Dockerfile + requirements + entrypoint).
- Add monitoring (latency, similarity heatmaps, retrieval diagnostics).

### Final note

This notebook is intentionally clean, minimal and education-oriented.
It is designed to serve as a baseline for further refinement and as a
transparent demonstration of how a simple RAG pipeline works end-to-end.
