# Data Preparation for RAG Pipeline

This notebook demonstrates the process of preparing documents for the RAG pipeline, including:
1. Loading and parsing documents
2. Text cleaning and preprocessing
3. Document chunking strategies
4. Metadata extraction and enrichment

These steps are critical for ensuring high-quality retrieval in the RAG system.

In [1]:
import os
import re
import json
import glob
import nltk
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm.notebook import tqdm
from typing import List, Dict, Any, Tuple, Optional

# Download NLTK resources if needed
nltk.download('punkt')
nltk.download('stopwords')

# Add the src directory to the path
import sys
sys.path.append(os.path.abspath('..'))

# Set paths
DATA_DIR = Path("../data")
REAL_DOCS_DIR = DATA_DIR / "real_docs"
PROCESSED_DIR = DATA_DIR / "processed"

## 1. Document Loading

First, we'll load all the documents from our real_docs directory.

In [2]:
def load_documents(directory: Path) -> List[Dict[str, Any]]:
    """Load documents from a directory."""
    documents = []
    
    # Get all text files
    files = list(directory.glob("*.txt"))
    
    for file_path in tqdm(files, desc="Loading documents"):
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
            
            # Extract title from first line (assuming markdown format)
            lines = content.split('\n')
            title = lines[0].strip('# ') if lines and lines[0].startswith('#') else file_path.stem
            
            documents.append({
                'id': file_path.stem,
                'title': title,
                'content': content,
                'metadata': {
                    'source': str(file_path),
                    'filename': file_path.name,
                    'created_at': os.path.getctime(file_path),
                    'file_size': os.path.getsize(file_path)
                }
            })
    
    return documents

# Load documents
documents = load_documents(REAL_DOCS_DIR)
print(f"Loaded {len(documents)} documents")

# Display document info
doc_info = pd.DataFrame([
    {'id': doc['id'], 'title': doc['title'], 'length': len(doc['content'])}
    for doc in documents
])
doc_info

## 2. Text Cleaning and Preprocessing

Now we'll clean and preprocess the document text to improve retrieval quality.

In [3]:
def clean_text(text: str) -> str:
    """Basic text cleaning."""
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove URLs
    text = re.sub(r'https?://\S+|www\.\S+', '[URL]', text)
    
    # Remove email addresses
    text = re.sub(r'\S+@\S+', '[EMAIL]', text)
    
    # Replace multiple newlines with single newline
    text = re.sub(r'\n{3,}', '\n\n', text)
    
    return text.strip()

def preprocess_documents(documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Preprocess all documents."""
    processed_docs = []
    
    for doc in tqdm(documents, desc="Preprocessing documents"):
        # Create a copy of the document
        processed_doc = doc.copy()
        
        # Clean the content
        processed_doc['content'] = clean_text(doc['content'])
        
        # Add preprocessing metadata
        processed_doc['metadata']['word_count'] = len(processed_doc['content'].split())
        processed_doc['metadata']['preprocessed'] = True
        
        processed_docs.append(processed_doc)
    
    return processed_docs

# Preprocess documents
processed_documents = preprocess_documents(documents)

# Compare original vs preprocessed length
comparison = pd.DataFrame([
    {
        'id': doc['id'],
        'original_length': len(documents[i]['content']),
        'processed_length': len(doc['content']),
        'word_count': doc['metadata']['word_count']
    }
    for i, doc in enumerate(processed_documents)
])
comparison

## 3. Document Chunking

For effective retrieval, we'll chunk the documents into smaller, more focused pieces.

In [4]:
def chunk_document_by_paragraphs(doc: Dict[str, Any], min_chunk_size: int = 100, max_chunk_size: int = 512) -> List[Dict[str, Any]]:
    """Chunk document by paragraphs with size constraints."""
    content = doc['content']
    
    # Split by double newlines (paragraphs)
    paragraphs = [p.strip() for p in re.split(r'\n\s*\n', content) if p.strip()]
    
    chunks = []
    current_chunk = ""
    
    for para in paragraphs:
        # If paragraph is too long, split it by sentences
        if len(para.split()) > max_chunk_size:
            sentences = nltk.sent_tokenize(para)
            for sent in sentences:
                if len(current_chunk.split()) + len(sent.split()) <= max_chunk_size:
                    current_chunk += " " + sent if current_chunk else sent
                else:
                    if current_chunk:  # Save current chunk if not empty
                        if len(current_chunk.split()) >= min_chunk_size:
                            chunks.append(current_chunk.strip())
                    current_chunk = sent
        else:
            # Check if adding this paragraph exceeds max size
            if len(current_chunk.split()) + len(para.split()) <= max_chunk_size:
                current_chunk += "\n\n" + para if current_chunk else para
            else:
                if current_chunk:  # Save current chunk if not empty
                    if len(current_chunk.split()) >= min_chunk_size:
                        chunks.append(current_chunk.strip())
                current_chunk = para
    
    # Add the last chunk if not empty
    if current_chunk and len(current_chunk.split()) >= min_chunk_size:
        chunks.append(current_chunk.strip())
    
    # Create chunk documents
    chunk_docs = []
    for i, chunk in enumerate(chunks):
        chunk_doc = {
            'id': f"{doc['id']}_chunk_{i+1}",
            'title': f"{doc['title']} (Part {i+1})",
            'content': chunk,
            'metadata': {
                **doc['metadata'],
                'parent_id': doc['id'],
                'chunk_id': i+1,
                'chunk_count': len(chunks),
                'word_count': len(chunk.split())
            }
        }
        chunk_docs.append(chunk_doc)
    
    return chunk_docs

def chunk_all_documents(documents: List[Dict[str, Any]], min_size: int = 100, max_size: int = 512) -> List[Dict[str, Any]]:
    """Chunk all documents."""
    all_chunks = []
    
    for doc in tqdm(documents, desc="Chunking documents"):
        doc_chunks = chunk_document_by_paragraphs(doc, min_chunk_size=min_size, max_chunk_size=max_size)
        all_chunks.extend(doc_chunks)
    
    return all_chunks

# Chunk documents
chunked_documents = chunk_all_documents(processed_documents, min_size=100, max_size=300)
print(f"Created {len(chunked_documents)} chunks from {len(processed_documents)} documents")

# Display chunk statistics
chunk_stats = pd.DataFrame([
    {
        'chunk_id': doc['id'],
        'parent_id': doc['metadata']['parent_id'],
        'word_count': doc['metadata']['word_count'],
        'chunk_number': doc['metadata']['chunk_id'],
        'total_chunks': doc['metadata']['chunk_count']
    }
    for doc in chunked_documents
])

# Display summary statistics
chunk_stats.describe()

## 4. Metadata Enrichment

Let's enrich our document metadata to improve retrieval relevance.

In [5]:
def extract_topics(text: str, n_keywords: int = 5) -> List[str]:
    """Extract main topics/keywords from text."""
    from sklearn.feature_extraction.text import TfidfVectorizer
    from nltk.corpus import stopwords
    
    # Get stopwords
    stop_words = set(stopwords.words('english'))
    
    # Create vectorizer
    vectorizer = TfidfVectorizer(stop_words=stop_words, max_features=1000)
    
    # Fit on the single document
    tfidf_matrix = vectorizer.fit_transform([text])
    
    # Get feature names
    feature_names = vectorizer.get_feature_names_out()
    
    # Get top keywords
    tfidf_scores = tfidf_matrix.toarray()[0]
    top_indices = tfidf_scores.argsort()[-n_keywords:][::-1]
    top_keywords = [feature_names[i] for i in top_indices]
    
    return top_keywords

def categorize_document(text: str) -> str:
    """Simple categorization based on keyword presence."""
    text_lower = text.lower()
    
    categories = {
        'hrv': ['hrv', 'heart rate variability', 'rmssd', 'parasympathetic'],
        'training': ['training', 'workout', 'exercise', 'intensity', 'volume'],
        'recovery': ['recovery', 'rest', 'adaptation', 'supercompensation'],
        'sleep': ['sleep', 'rem', 'deep sleep', 'circadian'],
        'nutrition': ['nutrition', 'diet', 'protein', 'carbohydrate', 'hydration'],
        'stress': ['stress', 'overtraining', 'fatigue', 'burnout']
    }
    
    # Count keyword matches for each category
    category_scores = {}
    for category, keywords in categories.items():
        score = sum(1 for keyword in keywords if keyword in text_lower)
        category_scores[category] = score
    
    # Get category with highest score
    if max(category_scores.values()) > 0:
        return max(category_scores.items(), key=lambda x: x[1])[0]
    else:
        return 'general'

def enrich_document_metadata(doc: Dict[str, Any]) -> Dict[str, Any]:
    """Enrich document with additional metadata."""
    # Create a copy
    enriched_doc = doc.copy()
    
    # Extract topics
    topics = extract_topics(doc['content'])
    
    # Categorize document
    category = categorize_document(doc['content'])
    
    # Add to metadata
    enriched_doc['metadata']['topics'] = topics
    enriched_doc['metadata']['category'] = category
    
    # Add reading time estimate (assuming 200 words per minute)
    word_count = doc['metadata'].get('word_count', len(doc['content'].split()))
    enriched_doc['metadata']['reading_time_minutes'] = round(word_count / 200, 1)
    
    return enriched_doc

# Enrich document metadata
enriched_documents = []
for doc in tqdm(chunked_documents, desc="Enriching metadata"):
    enriched_doc = enrich_document_metadata(doc)
    enriched_documents.append(enriched_doc)

# Display enriched metadata
metadata_df = pd.DataFrame([
    {
        'id': doc['id'],
        'category': doc['metadata']['category'],
        'topics': ', '.join(doc['metadata']['topics']),
        'reading_time': doc['metadata']['reading_time_minutes']
    }
    for doc in enriched_documents
])

# Show category distribution
category_counts = metadata_df['category'].value_counts()
category_counts.plot(kind='bar', figsize=(10, 5))
metadata_df.head(10)

## 5. Save Processed Documents

Finally, let's save our processed documents for use in the RAG pipeline.

In [6]:
# Create processed directory if it doesn't exist
os.makedirs(PROCESSED_DIR, exist_ok=True)

# Save processed documents
with open(PROCESSED_DIR / "processed_chunks.json", 'w') as f:
    json.dump(enriched_documents, f, indent=2)

# Also save as JSONL for easier loading
with open(PROCESSED_DIR / "processed_chunks.jsonl", 'w') as f:
    for doc in enriched_documents:
        f.write(json.dumps(doc) + '\n')

print(f"Saved {len(enriched_documents)} processed document chunks to {PROCESSED_DIR}")

## 6. Document Statistics and Analysis

Let's analyze our processed documents to understand their characteristics.

In [7]:
# Word count distribution
word_counts = [doc['metadata']['word_count'] for doc in enriched_documents]

import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 6))
sns.histplot(word_counts, bins=20)
plt.title('Document Chunk Word Count Distribution')
plt.xlabel('Word Count')
plt.ylabel('Frequency')
plt.axvline(np.mean(word_counts), color='red', linestyle='--', label=f'Mean: {np.mean(word_counts):.1f}')
plt.axvline(np.median(word_counts), color='green', linestyle='--', label=f'Median: {np.median(word_counts):.1f}')
plt.legend()
plt.show()

# Category distribution
plt.figure(figsize=(10, 6))
sns.countplot(y=metadata_df['category'], order=metadata_df['category'].value_counts().index)
plt.title('Document Category Distribution')
plt.xlabel('Count')
plt.ylabel('Category')
plt.tight_layout()
plt.show()

# Topic analysis
from collections import Counter

# Flatten all topics
all_topics = [topic for doc in enriched_documents for topic in doc['metadata']['topics']]
topic_counts = Counter(all_topics)

# Plot top 20 topics
top_topics = pd.DataFrame(topic_counts.most_common(20), columns=['Topic', 'Count'])
plt.figure(figsize=(12, 6))
sns.barplot(x='Count', y='Topic', data=top_topics)
plt.title('Top 20 Topics Across All Documents')
plt.tight_layout()
plt.show()

## Conclusion

We've successfully prepared our documents for the RAG pipeline by:
1. Loading and parsing the raw documents
2. Cleaning and preprocessing the text
3. Chunking documents into appropriate sizes
4. Enriching metadata with topics, categories, and other useful information

These processed documents are now ready for embedding and indexing in our vector store.