In [None]:
from json import load
from dotenv import load_dotenv
import os
from langchain_community.document_loaders import WebBaseLoader
from supabase import create_client, Client
from langchain_openai import OpenAIEmbeddings

load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_KEY")

print("Loading environment variables...")
supabase = create_client(supabase_url, supabase_key)

In [2]:
from typing import List, Iterator, Dict, Any, Optional
from pathlib import Path
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import MarkdownHeaderTextSplitter


class DocumentLoaderHelper:
    def __init__(self, file_path):
        self.file_path = file_path

    def get_urls_from_json(self) -> list[str]:
        urls = []
        with open(self.file_path, "r", encoding="utf-8") as file:
            data = load(file)
            if not isinstance(data, list):
                print(f"Error: The '{self.filepath}' is not a JSON list.")
                return []

            for source_group in data:
                if source_group.get("active") and isinstance(
                    source_group.get("urls"), list
                ):
                    for url in source_group["urls"]:
                        if isinstance(url, str) and url.strip().startswith("http"):
                            urls.append(url.strip())
        return urls

    def load_document(self):
        file = open(self.file_path, "r")
        text = file.read()
        file.close()
        return text


class SplitMarkdownDocumentsLoader(BaseLoader):
    def __init__(self, documents: List[Document]):
        self.documents = documents

    def lazy_load(self) -> Iterator[Document]:
        for doc in self.documents:
            yield doc


def extract_book_info(content, filepath):
    """Extract book title and author from content or filename"""
    # Try to extract from content first
    lines = content.split("\n")[:10]  # Check first 10 lines
    title = author = None

    for line in lines:
        line = line.strip()
        if line.startswith("# ") and not title:
            title = line[2:].strip()
        elif "author:" in line.lower():
            author = line.split(":", 1)[1].strip()
        elif "by " in line.lower() and not author:
            author = line.lower().split("by ")[1].strip()

    # Fallback to filename
    if not title:
        filename = Path(filepath).stem
        title = filename.replace("_", " ").replace("-", " ").title()

    if not author:
        author = "Unknown Author"

    return title, author

Peter Atia site

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
import bs4

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1300,
    chunk_overlap=300,
    separators=[
        "\n\n",  # Paragraph breaks
        "\n",  # Line breaks
        ". ",  # Sentence endings
        "! ",  # Exclamations
        "? ",  # Questions
        " ",  # Word boundaries
        "",  # Character level (fallback)
    ],
)

web_loader = WebBaseLoader(
    web_path=DocumentLoaderHelper("config/web_sources.json").get_urls_from_json(),
    bs_kwargs={
        "parse_only": bs4.SoupStrainer(class_=("entry-content")),
    },
    bs_get_text_kwargs={"separator": " ", "strip": True},
)

web_documents = []
async for document in web_loader.alazy_load():
    chunks = text_splitter.split_documents([document])
    web_documents.extend(chunks)

print(f"Loaded {len(web_documents)} documents")

Book summaries

In [None]:
from langchain.schema import Document
from pathlib import Path
import glob
from datetime import datetime
from langchain_text_splitters import MarkdownHeaderTextSplitter
import re

# Define headers to split on
headers_to_split_on = [("#", "Title"), ("##", "Section"), ("###", "Subsection")]

# Create the markdown splitter once - strip headers and clean content
md_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=headers_to_split_on,
    strip_headers=True,  # This removes the header markdown tags
)

book_documents = []
book_files = glob.glob("downloads/books/*.md")

for filepath in book_files:
    with open(filepath, "r", encoding="utf-8") as f:
        content = f.read()

    # Extract book metadata
    title, author = extract_book_info(content, filepath)

    # Split by markdown headers - pass content string, not Document object
    chunks = md_splitter.split_text(content)

    print(f"\nProcessing: {title} by {author}")
    print(f"Found {len(chunks)} sections")

    for i, chunk in enumerate(chunks):
        # chunk is already a Document object from MarkdownHeaderTextSplitter

        # Clean up the content - remove extra newlines and whitespace
        cleaned_content = chunk.page_content.strip()
        # Replace multiple newlines with single newlines
        cleaned_content = re.sub(r"\n{3,}", "\n\n", cleaned_content)
        # Remove leading/trailing whitespace from each line
        cleaned_content = "\n".join(
            line.strip() for line in cleaned_content.split("\n")
        )

        # Update the chunk content
        chunk.page_content = cleaned_content

        # Get section info from the header metadata
        section_title = "Unknown Section"
        if hasattr(chunk, "metadata") and chunk.metadata:
            # Extract section title from header metadata
            if "Section" in chunk.metadata:
                section_title = chunk.metadata["Section"]
            elif "Title" in chunk.metadata:
                section_title = chunk.metadata["Title"]

        # Update metadata with book information
        chunk.metadata.update(
            {
                "source": filepath,
                "source_type": "book_summary",
                "book_title": title,
                "author": author,
                "section_title": section_title,
                "chunk_index": i,
                "total_chunks": len(chunks),
                "processed_at": datetime.now().isoformat(),
            }
        )

        book_documents.append(chunk)

        # Print section info
        print(f"  Section {i + 1}: {section_title}")
        print(f"    Content preview: {chunk.page_content[:100]}...")

print(f"\nLoaded {len(book_documents)} book summary documents")

# Combine with web documents
all_documents = web_documents + book_documents

print(f"Total documents: {len(all_documents)}")
print(f"Web articles: {len(web_documents)} chunks")
print(f"Book summaries: {len(book_documents)} chunks")

Document Embedding

In [5]:
from langchain_community.vectorstores import SupabaseVectorStore

embeddings_model = OpenAIEmbeddings(
    model="text-embedding-3-small",
)

vector_store = SupabaseVectorStore.from_documents(
    all_documents, embedding=embeddings_model, client=supabase
)

Chat

In [None]:
import gradio
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage


def promptBuilder(message):
    results = vector_store.similarity_search(message, k=3)

    context = ""

    for res in results:
        context += f"* {res.page_content}\n"

    return context


def extract_user_messages_from_history(history):
    """Extract user messages from Gradio history for context building"""
    user_messages = []
    
    if not history:
        return user_messages
    
    for conversation in history:
        try:
            # Handle dict format (type="messages")
            if isinstance(conversation, dict):
                if conversation.get("role") == "user":
                    content = conversation.get("content", "")
                    if content:
                        user_messages.append(content)
            
            # Handle tuple/list format [user_msg, ai_msg]
            elif isinstance(conversation, (list, tuple)) and len(conversation) >= 1:
                user_msg = conversation[0]
                if user_msg:
                    user_messages.append(user_msg)
                    
        except (IndexError, KeyError, TypeError):
            # Skip malformed entries
            continue
    
    return user_messages


def build_conversation_history(history):
    """Convert Gradio history format to LangChain messages"""
    messages = []

    if not history:
        return messages

    for conversation in history:
        try:
            # Handle dict format (type="messages")
            if isinstance(conversation, dict):
                role = conversation.get("role", "")
                content = conversation.get("content", "")
                
                if role == "user" and content:
                    messages.append(HumanMessage(content=content))
                elif role == "assistant" and content:
                    messages.append(AIMessage(content=content))

            # Handle tuple/list format [user_msg, ai_msg]
            elif isinstance(conversation, (list, tuple)) and len(conversation) >= 2:
                user_msg = conversation[0]
                ai_msg = conversation[1]

                if user_msg:
                    messages.append(HumanMessage(content=user_msg))
                if ai_msg:
                    messages.append(AIMessage(content=ai_msg))
                    
        except (IndexError, KeyError, TypeError):
            # Skip malformed entries
            continue

    return messages


def echo_with_context_aware_retrieval(message, history):
    """Version that considers history when building context"""
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)

    # Build conversation context for better retrieval
    conversation_context = ""
    if history:
        # Get last few user messages for context
        recent_questions = extract_user_messages_from_history(history[-3:])
        conversation_context = " ".join(recent_questions)

    # Enhanced query for vector search
    enhanced_query = f"{conversation_context} {message}".strip()

    # Get context from vector store using enhanced query
    context = promptBuilder(enhanced_query)

    # Load system message
    system_content = DocumentLoaderHelper("config/system_message.txt").load_document()

    enhanced_system_message = f"""{system_content}

Use the context provided below to answer the question.
You will answer always in English based in European units (metric system).
Consider the conversation history when answering to maintain context and continuity.

CONTEXT: {context}"""

    # Build messages
    messages = [SystemMessage(content=enhanced_system_message)]

    # Add conversation history (limited)
    MAX_HISTORY_TURNS = 10
    recent_history = (
        history[-MAX_HISTORY_TURNS:] if len(history) > MAX_HISTORY_TURNS else history
    )
    history_messages = build_conversation_history(recent_history)
    messages.extend(history_messages)

    # Add current user message
    messages.append(HumanMessage(content=message))

    # Debug info (optional)
    print(f"Enhanced query: {enhanced_query}")
    print(f"Total messages: {len(messages)}")

    # Get response
    response = llm.invoke(messages)
    return response.content


print("Starting Gradio demo...")

demo = gradio.ChatInterface(
    fn=echo_with_context_aware_retrieval,
    type="messages",
    examples=[
        "How can I improve my sleep?",
        "How can I improve my VO2 max?",
        "What consist the zone 2 training approach?",
    ],
    title="Longevity AI",
)

demo.launch(debug=True)