# RAG Workflow & Backlog

## Overview & Goals

**Challenge:** Build a production-ready Retrieval-Augmented Generation (RAG) system for Deutsche Telekom that enables question-answering over internal publications and documents.

**Goals:**
- Efficiently index and retrieve relevant document chunks
- Generate accurate, cited responses using retrieved context
- Maintain traceability with publication IDs
- Support enterprise requirements (security, monitoring, scalability)
- Enable easy experimentation and iteration

**Key Components:**
- Document loading with metadata extraction
- Metadata-aware chunking
- Vector-based retrieval with optional reranking
- LLM-based generation with citation
- Structured logging and observability


## Data Exploration

Load and examine sample documents from the data folder.


In [None]:
import sys
from pathlib import Path

# Add project root to path (notebooks/ is one level below project root)
project_root = Path().resolve().parent
if (project_root / "config").exists():
    sys.path.insert(0, str(project_root))
else:
    # Try current directory if already at project root
    if (Path().resolve() / "config").exists():
        sys.path.insert(0, str(Path().resolve()))

from config import settings
from loaders.loader import DocumentLoader

# Load a few sample documents
loader = DocumentLoader(data_folder=settings.DATA_FOLDER)
documents = loader.load_all_documents()

print(f"Total documents loaded: {len(documents)}")
print("\nSample documents:")
for i, doc in enumerate(documents[:3], 1):
    pub_id = doc.metadata.get("publication_id", "unknown")
    word_count = doc.metadata.get("word_count", len(doc.page_content.split()))
    print(f"\n{i}. Publication ID: {pub_id}")
    print(f"   Word count: {word_count}")
    print(f"   Topics: {doc.metadata.get('topics', [])}")
    print(f"   Preview: {doc.page_content[:200]}...")


In [None]:
# Word statistics across all documents
if documents:
    total_words = sum(doc.metadata.get("word_count", 0) for doc in documents)
    avg_words = total_words / len(documents) if documents else 0

    print("\nDocument Statistics:")
    print(f"  Total documents: {len(documents)}")
    print(f"  Total words: {total_words:,}")
    print(f"  Average words per document: {avg_words:.0f}")
    print(f"  Documents with topics: {sum(1 for d in documents if d.metadata.get('topics'))}")
    print(f"  Documents with dates: {sum(1 for d in documents if d.metadata.get('mentioned_dates'))}")

    # Count by topics
    all_topics = []
    for doc in documents:
        all_topics.extend(doc.metadata.get("topics", []))

    from collections import Counter
    topic_counts = Counter(all_topics)
    print("\nTopic distribution:")
    for topic, count in topic_counts.most_common():
        print(f"  {topic}: {count}")


## Indexing Workflow

Index documents using the loader, chunker, and ChromaDB (in-memory for notebook).


In [None]:
from langchain.vectorstores import Chroma
from langchain_community.vectorstores.utils import filter_complex_metadata

from core.chunking import MetadataAwareChunker
from core.embeddings import get_embeddings

# Get embeddings
embeddings = get_embeddings()
print("✓ Loaded embeddings model")

# Chunk documents with metadata
chunker = MetadataAwareChunker(
    chunk_size=settings.CHUNK_SIZE,
    chunk_overlap=settings.CHUNK_OVERLAP
)
print(f"✓ Initialized chunker (size={settings.CHUNK_SIZE}, overlap={settings.CHUNK_OVERLAP})")

# Chunk all documents
all_chunks = []
for doc in documents[:5]:  # Use first 5 docs for demo
    source = doc.metadata.get("source", "Deutsche Telekom")
    doc_id = doc.metadata.get("publication_id", doc.metadata.get("file_name", "unknown"))
    extra_metadata = {
        k: v for k, v in doc.metadata.items()
        if k not in ["source", "publication_id", "file_name"]
    }

    chunks = chunker.chunk_with_metadata(
        text=doc.page_content,
        source=source,
        doc_id=doc_id,
        **extra_metadata
    )
    all_chunks.extend(chunks)

print(f"✓ Created {len(all_chunks)} chunks from {min(5, len(documents))} documents")
print(f"  Average chunks per document: {len(all_chunks) / min(5, len(documents)):.1f}")

# Filter complex metadata (lists, dicts, etc.) that ChromaDB doesn't support
filtered_chunks = filter_complex_metadata(all_chunks)
print(f"✓ Filtered to {len(filtered_chunks)} chunks (removed complex metadata)")

# Show sample chunk metadata
if filtered_chunks:
    sample = filtered_chunks[0]
    print("\nSample chunk metadata:")
    print(f"  chunk_id: {sample.metadata.get('chunk_id')}")
    print(f"  doc_id: {sample.metadata.get('doc_id')}")
    print(f"  chunk_index: {sample.metadata.get('chunk_index')}")
    print(f"  total_chunks: {sample.metadata.get('total_chunks')}")


In [None]:
# Create in-memory ChromaDB (no persistence for notebook)
vectordb = Chroma.from_documents(
    documents=filtered_chunks,
    embedding=embeddings,
)

print(f"✓ Indexed {len(filtered_chunks)} chunks into ChromaDB")
print(f"  Vector dimensions: {len(embeddings.embed_query('test'))}")
print(f"  Collection count: {vectordb._collection.count() if hasattr(vectordb, '_collection') else 'N/A'}")


## Retrieval Demo

Run queries and examine retrieved sources.


In [None]:
from core.retrieval import AdvancedRetriever

# Create retriever (no reranker for CPU-only demo)
retriever = AdvancedRetriever(vectordb=vectordb, reranker_model=None)

# Test queries
queries = [
    "What is 5G?",
    "Tell me about security",
    "Partnership information"
]

for query in queries:
    print(f"\n{'='*60}")
    print(f"Query: {query}")
    print(f"{'='*60}")

    results = retriever.retrieve(query=query, top_k=3)

    print(f"Retrieved {len(results)} documents:\n")
    for i, doc in enumerate(results, 1):
        pub_id = doc.metadata.get("publication_id", doc.metadata.get("doc_id", "unknown"))
        chunk_idx = doc.metadata.get("chunk_index", "N/A")
        print(f"{i}. [Publication: {pub_id}, Chunk: {chunk_idx}]")
        print(f"   {doc.page_content[:150]}...")
        print()


## Generation Demo

Build prompts and generate responses. Using mock model for CPU-only execution.


In [None]:
from llm.prompt_manager import PromptManager

prompt_manager = PromptManager()

# Build RAG prompt with retrieved context
query = "What services does Deutsche Telekom offer?"
retrieved_docs = retriever.retrieve(query=query, top_k=2)

prompt = prompt_manager.build_rag_prompt(
    query=query,
    context_docs=retrieved_docs,
    chat_history=None
)

print("Generated Prompt:")
print("="*80)
print(prompt[:1000])  # Show first 1000 chars
print("...")
print("="*80)
print(f"\nPrompt length: {len(prompt)} characters")
print(f"Context documents used: {len(retrieved_docs)}")


In [None]:
# Mock generation for CPU-only demo (no actual LLM call)
# In production, this would call generate_response() with real model

print("Mock Generation Response:")
print("="*80)

# Extract publication IDs from sources
publication_ids = []
for doc in retrieved_docs:
    pub_id = doc.metadata.get("publication_id", doc.metadata.get("doc_id", "unknown"))
    if pub_id not in publication_ids:
        publication_ids.append(pub_id)

# Simulate response (in real system, this comes from LLM)
mock_response = f"""Based on the retrieved documents, Deutsche Telekom offers various telecommunications services including 5G network infrastructure, secure cloud solutions, and enterprise partnerships. The company focuses on expanding coverage and providing reliable connectivity services.

Sources: {', '.join(publication_ids)}"""

print(mock_response)
print("="*80)
print("\n[Note: This is a mock response. In production, use generate_response() with loaded model]")


## Observability

Show structured logging output example.


In [None]:
import json

from monitoring.logging import StructuredLogger

# Create logger
logger = StructuredLogger("notebook_demo", log_dir="/tmp/notebook_logs")

# Log a sample query
sample_docs = retrieved_docs[:2] if retrieved_docs else []
logger.log_query(
    query="What services does Deutsche Telekom offer?",
    retrieved_docs=sample_docs,
    response_time=1.23,
    user_id="notebook_user"
)

# Show what was logged (read the JSON line from the log file)
import os

log_file = "/tmp/notebook_logs/notebook_demo.jsonl"
if os.path.exists(log_file):
    with open(log_file) as f:
        log_line = f.readlines()[-1] if f.readlines() else None
        if log_line:
            log_data = json.loads(log_line)
            print("Sample Structured Log Entry:")
            print("="*80)
            print(json.dumps(log_data, indent=2))
            print("="*80)
else:
    print("Sample log entry structure:")
    sample_log = {
        "timestamp": "2024-01-01T12:00:00Z",
        "level": "INFO",
        "logger": "notebook_demo",
        "message": "Query processed",
        "event_type": "query",
        "query": "What services does Deutsche Telekom offer?",
        "num_documents": 2,
        "response_time_seconds": 1.23,
        "user_id": "notebook_user",
        "document_ids": ["doc_1", "doc_2"]
    }
    print(json.dumps(sample_log, indent=2))


## Backlog of Ideas

Future enhancements and experiments for the RAG system:

### Retrieval & Ranking
- **Switch to better embeddings**: Evaluate and switch to `intfloat/e5-large-v2` or `bge-large-en-v1.5` embeddings; compare recall@k metrics against current ~~`all-MiniLM-L6-v2`~~ `intfloat/multilingual-e5-large`
  - *Implementation*: Update `core/embeddings.py` to support multiple embedding models with configurable selection via environment variables. Use `sentence-transformers` library to load models and benchmark against existing embeddings on a test query set.
- **Max Marginal Relevance (MMR)**: Implement diversity-based retrieval to reduce redundant results
  - *Implementation*: Use LangChain's `MMRRetriever` wrapper around ChromaDB, or implement custom MMR scoring that balances relevance (cosine similarity) with diversity (penalizes chunks similar to already-selected ones). Add lambda parameter to control diversity vs relevance tradeoff.
- **Hybrid Retrieval**: Combine sparse+dense retrieval (BM25 + vector search) for improved recall
  - *Implementation*: Integrate `rank-bm25` or `pyserini` for BM25 scoring. Combine BM25 and vector similarity scores using weighted fusion (e.g., Reciprocal Rank Fusion or weighted linear combination). Implement in `core/retrieval.py` as a new hybrid retriever class.
- **Reranker A/B Testing**: Compare `cross-encoder/ms-marco-MiniLM-L-6-v2` vs `bge-reranker-large`; log reranking scores for analysis
  - *Implementation*: Add reranker model selection via config, implement scoring logging in `AdvancedRetriever.retrieve()` to capture reranker scores for each candidate. Use feature flags or environment variables to switch models and compare metrics.

### Provenance & Citations
- **Normalized chunk IDs + provenance**: Store byte spans and filenames for each chunk
  - *Implementation*: Extend `MetadataAwareChunker` to capture byte offsets during chunking. Store `start_byte`, `end_byte`, and `source_file` in chunk metadata. Update ingestion pipeline to preserve this information.
- **Inline snippet highlights**: Show highlighted source snippets in responses with exact character spans
  - *Implementation*: Parse citation references from LLM responses, match to chunk IDs, and extract the relevant text spans. Use frontend highlighting libraries (e.g., `react-highlight-words` or Streamlit markdown with HTML) to visually highlight matched text.

### Response Quality & Guardrails
- **Response guardrails**: 
  - Require citations for factual statements
  - Refusal mechanism when confidence score is below threshold
  - Log confidence scores for monitoring
  - *Implementation*: Post-process LLM responses to check for citation markers (e.g., `[Source: ...]`). Implement confidence scoring using logit probabilities or self-consistency checks. Add configurable thresholds and refusal templates. Extend structured logging to capture confidence scores.
- **Prompt compression (LLMLingua)**: Compress long context for better efficiency while preserving key information
  - *Implementation*: Integrate `LLMLingua` or similar prompt compression libraries. Add compression step in `PromptManager.build_rag_prompt()` before sending to LLM. Use configurable compression ratios and preserve citation markers.

### Evaluation & Quality
- **Eval harness**: Implement RAGAS (Retrieval-Augmented Generation Assessment) metrics:
  - Context Precision
  - Context Recall  
  - Faithfulness
  - Answer Relevancy
  - *Implementation*: Use `ragas` Python library to compute metrics. Create evaluation script that runs queries against golden dataset, compares retrieved contexts and generated answers, and outputs metrics JSON. Integrate with CI pipeline.
- **Hand-curated Q/A set**: Create golden dataset over the 250 documents for regression testing
  - *Implementation*: Create JSON/YAML format for Q/A pairs with expected answers and source documents. Store in `tests/golden_set/` directory. Include queries covering various topics and difficulty levels.
- **Nightly CI evaluation**: Run RAGAS metrics in CI pipeline on golden set to detect regressions
  - *Implementation*: Add GitHub Actions workflow or similar CI step that runs evaluation script, compares metrics against baseline thresholds, and fails build if metrics degrade. Store historical metrics for trend analysis.

### Performance & Optimization
- **Caching**: Implement Redis/SQLite caching layer keyed by `(normalized_query, embedding_hash, index_version)` to avoid redundant retrievals and generations
  - *Implementation*: Create cache wrapper in `api/routes.py` that checks cache before retrieval/generation. Use query normalization (lowercase, remove punctuation) and hash embeddings to create stable cache keys. Store full responses with TTL. Use `cachetools` or `redis-py` for implementation.
- **Vector DB swaps**: Support FAISS index via Chroma settings; create plug-in layer for Milvus/PGVector backends
  - *Implementation*: Abstract vector store interface in `core/retrieval.py`. Implement adapter pattern for different vector stores. Use LangChain's vector store abstractions or create custom adapters. Make backend selection configurable via environment variables.
- **Async Processing**: Use async/await for concurrent operations
  - *Implementation*: Convert FastAPI endpoints to async functions. Use `asyncio.gather()` for parallel operations (e.g., concurrent document retrieval, parallel embedding generation). Update `AdvancedRetriever` to support async retrieval if needed.

### User Experience
- **Cancellation support**: Allow users to cancel long-running queries
  - *Implementation*: Use FastAPI's `BackgroundTasks` with cancellation tokens or `asyncio.Task` cancellation. For streaming, implement `Server-Sent Events` with cancellation support. Add cancellation endpoint and frontend UI to trigger cancellation.
- **Streaming support**: Add streaming response support in FastAPI endpoints for real-time token delivery
  - *Implementation*: Use FastAPI's `StreamingResponse` with Server-Sent Events (SSE) format. Update `api/routes.py` to create a new `/query/stream` endpoint that yields tokens as they're generated and utilise `generate_stream()` method from `llm/generation.py` to generate them. Format each token as SSE event (`data: {token}\n\n`). Handle connection cleanup and error handling. Update frontend (React) to consume SSE stream and display tokens progressively using `EventSource` API or `sseclient` library.
- **Queue limit**: Implement request queue limits to prevent overload
  - *Implementation*: Use `asyncio.Semaphore` or `slowapi` rate limiter to cap concurrent requests. Add queue management with configurable max queue size. Return HTTP 503 when queue is full.

### Security & Authentication
- **FastAPI authentication**: Implement API key or OAuth2 authentication
  - *Implementation*: Use `fastapi-users` or `python-jose` for JWT tokens. Implement API key middleware that validates keys from environment variables or database. Add authentication decorators to protected endpoints.
- **Rate limiting**: Add per-user/IP rate limiting to prevent abuse
  - *Implementation*: Use `slowapi` library with `Limiter` middleware. Configure rate limits per user/IP combination. Store rate limit state in Redis or in-memory cache. Add rate limit headers to responses.
- **SSL/TSL**: Setup Caddy to serve both Streamlit and FastAPI via https (instead of self-signed for FastAPI and http for Streamlit)
  - *Implementation*: Configure Caddy reverse proxy with automatic HTTPS (Let's Encrypt). Update `docker-compose.yml` to include Caddy container. Route traffic: Caddy → Streamlit (port 8501) and Caddy → FastAPI (port 8080). Update CORS settings to allow Caddy domain.

### Multilingual & Internationalization
- **Multilingual Support**: Support documents in multiple languages with language detection
  - *Implementation*: Use `langdetect` or `fasttext` for language detection during document ingestion. Store language metadata in chunk metadata. Use language-specific embedding models (e.g., multilingual sentence transformers) or language-specific rerankers.
- **Cross-lingual Retrieval**: Retrieve relevant content even when query language differs from document language
  - *Implementation*: Use multilingual embedding models (e.g., `paraphrase-multilingual-MiniLM-L12-v2`) that support cross-lingual similarity. Alternatively, use translation APIs to translate queries to document language before retrieval.
- **React Interface Multi-lingual**: Add i18n for the React consumer and move hard-coded strings to lang files
  - *Implementation*: Use `react-i18next` or `i18n` library. Create translation files (JSON/YAML) for each language in `react/src/locales/`. Extract all user-facing strings and replace with translation keys. Add language selector UI component.

### Data Management & Deployment

- **Externalize data folder**: Inject the data/ directory at runtime instead of committing it to Git:
  - Mount it as a Docker/K8s volume or download from secure storage (S3, Azure Blob, etc.) during startup.
  - Add checksum validation and update scripts to ensure dataset integrity across environments.
  - *Implementation*: Update `docker-compose.yml` to mount external volume for `data/` directory. Add startup script that downloads data from S3/Azure if `DATA_SOURCE_URL` is set. Use `boto3` or `azure-storage-blob` for cloud storage. Implement checksum validation using SHA256 hashes stored in metadata file.