<a href="https://colab.research.google.com/github/rubenapf/AI-for-Developers/blob/main/202509_practical_ai_development_w4_d1_start.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RAG - v4

- Refactor Ingestion
  - Cleaning
  - Topic Detection
- Refactor Inference
  - Topic Classification
  - RAG Chain
- Gradio Chat History
  - format messages function

## Install Dependencies

In [None]:
%pip install "langchain==0.3.27" -qqq
%pip install "langchain-community==0.3.31" -qqq
%pip install "langchain-openai==0.3.35" -qqq
%pip install "langchain-chroma==0.2.6" -qqq
%pip install pypdf -qqq
%pip install gradio -qqq

## Configuration

In [None]:
import os
from google.colab import userdata

# OpenAI API key
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
os.environ["CHROMA_API_KEY"] = userdata.get("CHROMA_API_KEY")
os.environ["CHROMA_TENANT"] = userdata.get("CHROMA_TENANT")

# Version Management
VERSION = "v4"

# Vector Database collection name
COLLECTION_NAME = f"bitcoin_docs_{VERSION}"


## Global Variables

In [None]:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma

# Embeddings Model
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small"
)

# LLM
llm = ChatOpenAI(
    model="gpt-4o-mini"
)

# Classification LLM
classification_llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    temperature=0  # Deterministic for consistent classification
)

# Vector Database
vectorstore = Chroma(
  embedding_function=embeddings,
  collection_name=COLLECTION_NAME,  # Version-based naming
  chroma_cloud_api_key=os.getenv("CHROMA_API_KEY"),
  tenant=os.getenv("CHROMA_TENANT"),
  database="code_for_all_rag_1"
)

## Ingestion

In [None]:
#@title Clean Text Function

import re

def clean_text(text: str) -> str:
    """
    TEXT PREPROCESSING - Clean PDF - NEW IN v4

    Args:
        text (str): Raw PDF text

    Returns:
        str: Cleaned text
    """
    # Remove multiple whitespaces -> \s+ matches one or more whitespace characters (spaces, tabs, newlines) and replaces them with a single space " "
    text = re.sub(r'\s+', ' ', text)

    # Remove standalone page numbers -> remove a trailing number only if it's the final token
    text = re.sub(r'(?<=\.)\s*\d+\s*$', '', text)

    # Removes whitespace from both the beginning and the end of the string
    text = text.strip()

    return text

In [None]:
#@title Topic Detection Function

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

def detect_document_topic(documents) -> str:
    """
    DETECT DOCUMENT TOPIC - NEW IN v4

    GOAL: Wrapper arround V3 functions

    Args:
        documents (list): List of Document objects from PyPDFLoader

    Returns:
        str: Detected topic (e.g., 'bitcoin', 'ethereum')
    """

    # Template for automatic topic detection
    topic_detection_template = ChatPromptTemplate.from_template(
      """
      Analyze the following document content and determine its primary topic.

      Document content:
      {content}

      Based on this content, what is the primary topic? Answer with a single word or short phrase (e.g., 'bitcoin', 'ethereum').

      Examples:
      If the document is about Bitcoin, answer: bitcoin
      If the document is about Ethereum, answer: ethereum
      If the document is about general crypto technology, answer: crypto

      Primary topic:
      """
    )

    topic_detection_chain = topic_detection_template | classification_llm | StrOutputParser()

    # Extract sample content from first few pages
    sample_content = ""
    for doc in documents[:3]:  # First 3 pages
        sample_content += doc.page_content + "\n\n"

    # Limit to 4000 characters to save costs
    sample_content = sample_content[:4000]

    # Use LLM to detect topic
    detected_topic = topic_detection_chain.invoke({
        "content": sample_content
    }).strip().lower()

    return detected_topic

In [None]:
#@title Ingest function

from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

def ingest_documents(
    document_path: str,
    access_level: str = "public"
):
    """
    INGESTION PIPELINE - VERSION v4

    Args:
        document_path (str): URL or local path to PDF
        access_level (str): "public" or "premium" (for access control demo)

    Returns:
        tuple: (num_chunks, detected_topic) - Number of chunks and detected topic
    """

    print("-" * 80)
    print(f"STARTING INGESTION PIPELINE - VERSION {VERSION}")
    print("-" * 80)

    #--------------------------------------------------------------------------------
    # STEP 1: LOAD DOCUMENTS
    #--------------------------------------------------------------------------------
    print("\n[1/6] Loading file from URL...")

    loader = PyPDFLoader(document_path)
    documents = loader.load()

    print(f"âœ“ Loaded {len(documents)} pages from file")

    #--------------------------------------------------------------------------------
    # STEP 2: AUTO-DETECT TOPIC (TO DO IN v4 - refactor)
    #--------------------------------------------------------------------------------
    print(f"\n[2/6] Auto-detecting document topic...")

    detected_topic = ""
    #  >> TO DO IN v4: Use detect_document_topic()

    print(f"âœ“ Topic auto-detected: '{detected_topic}'")

    #--------------------------------------------------------------------------------
    # STEP 3: PREPROCESSING (TO DO IN v4 - refactor)
    #--------------------------------------------------------------------------------
    print(f"\n[3/6] Applying text preprocessing...")

    # print(documents[0].page_content)

    # >> TO DO IN v4: Use clean_text()

    # print(f"\n{documents[0].page_content}")

    print(f"âœ“ Cleaned {len(documents)} pages")

    #--------------------------------------------------------------------------------
    # STEP 4: CHUNK DOCUMENTS
    #--------------------------------------------------------------------------------
    print(f"\n[4/6] Chunking documents...")

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200,
        # separators=["\n\n", "\n"," ", ""], # added dot + space to the list of separators
    )

    chunks = text_splitter.split_documents(documents)

    print(f"âœ“ Split into {len(chunks)} chunks")

    #--------------------------------------------------------------------------------
    # STEP 5: ADD METADATA
    #--------------------------------------------------------------------------------
    print(f"\n[5/6] Enriching chunks with metadata...")

    for chunk in chunks:
      # ADD new metadata (doesn't override existing)
      chunk.metadata.update({
          'topic': detected_topic,
          'access_level': access_level,   # Used for access control
      })

    print(f"âœ“ Metadata enriched for all chunks:")

    #--------------------------------------------------------------------------------
    # STEP 6: CREATE EMBEDDINGS AND STORE IN CHROMA
    #--------------------------------------------------------------------------------
    print(f"\n[6/6] Creating embeddings and storing in Chroma...")
    print(f"  Collection name: {COLLECTION_NAME}")

    vectorstore.add_documents(
        documents=chunks
    )

    print(f"âœ“ Embeddings created and stored")

    return len(chunks), detected_topic




### Test Ingestion

In [None]:
ingest_documents("https://bitcoin.org/bitcoin.pdf")

ingest_documents("/content/ethereum.pdf", "premium")

## Inference

In [None]:
#@title Topic Classification Chain Creation

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableSequence

def create_classification_chain() -> RunnableSequence:
    """
    CREATE CLASSIFICATION CHAIN - TO DO IN v4

    GOAL: Refactor and enhance with conversation history

    OUTCOME:
     - Determine whether a user query is about Bitcoin, Ethereum, or both.
     - Enable intelligent routing to relevant documents only.

    Returns:
        RunnableSequence: Classification chain
    """

    classification_template = ChatPromptTemplate.from_template(
      """
      Classify this question as about 'bitcoin', 'ethereum', or 'both'.

      >>> Complete Prompt Template
      """
    )

     ## >>> Complete classification Chain

    classification_chain = None

    return classification_chain

In [None]:
#@title RAG Chain Creation

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableSequence

def create_rag_chain() -> RunnableSequence:
    """
    CREATE RAG CHAIN - TO DO IN v4

    KEY DIFFERENCES WE MUST INCLUDE FROM v3:
    - Conversation history in the prompt
    - Handleing for follow-up questions

    OUTCOME:
    - More natural conversational flow

    Returns:
        RunnableSequence: RAG chain
    """

    rag_template = ChatPromptTemplate.from_template(
      """
      You are a helpful assistant answering questions about cryptocurrency whitepapers.

      >>> Complete Prompt Template
      """
    )

    ## >>> Complete RAG Chain

    rag_chain = None

    return rag_chain

In [None]:
#@title Format Chat History Helper

def format_chat_history(history: list, max_turns: int = 5) -> str:
    """
    FORMAT CHAT HISTORY - TO DO IN v4

    GOAL: Convert Gradio's messages format history (Args) into a string (Return).

    WHY LIMIT TURNS?
    - Token limits: Long conversations exceed context window
    - Relevance: Recent turns are more relevant
    - Cost: Fewer tokens = lower cost
    - Performance: Faster processing

    GRADIO MESSAGES FORMAT:
    [
        {"role": "user", "content": "What is Bitcoin?"},
        {"role": "assistant", "content": "Bitcoin is..."},
        {"role": "user", "content": "How does it work?"},
        {"role": "assistant", "content": "It works by..."}
    ]

    Args:
        history (list): Gradio chat history in messages format
        max_turns (int): Maximum number of conversation turns to include

    Returns:
        str: Formatted conversation history
    """

    # >> Add rest of the code

    formatted_history = ""

    return formatted_history

In [None]:
#@title Inference Function

def inference(
    query: str,
    chat_history: list = None,
    user_access_level: str = "public"
) -> str:
    """
    INFERENCE PIPELINE - VERSION v4

    FLOW:
    1. Format conversation history
    2. Classify query as topic (using a topic classification chain)
    3. Build metadata filter (topic + access_level)
    4. Retrieve filtered documents (using a chain)
    5. Generate answer

    KEY CHANGES FROM v3:
    - Uses chat_history for context
    - Classification considers history
    - RAG chain includes conversation history
    - Handles follow-up questions

    Args:
        query (str): User's question
        chat_history (list): Gradio chat history
        user_access_level (str): "public" or "premium"

    Returns:
        str: Natural language answer
    """

    print("="*80)
    print(f"RUNNING INFERENCE - VERSION {VERSION}")
    print("="*80)

    #--------------------------------------------------------------------------------
    # STEP 1: FORMAT CONVERSATION HISTORY (TO DO IN v4)
    #--------------------------------------------------------------------------------
    print(f"\n[1/6] Formatting conversation history...")

    # >>> Implement format_chat_history()
    formatted_history = ""

    # print(f"âœ“ History formatted ({len(chat_history or [])} messages)")
    # print(f"\n{formatted_history}")

    #--------------------------------------------------------------------------------
    # STEP 2: TOPIC DETECTION (TO DO IN v4 - refactor)
    #--------------------------------------------------------------------------------
    print(f"\n[2/6] Detecting document topic...")

    # >>> Implement create_classification_chain()
    topic = "bitcoin"

    # print(f"âœ“ Topic: '{topic}'")

    #--------------------------------------------------------------------------------
    # STEP 3: METADATA TOPIC FILTER - 2 alternatives: only topic or topic + access_control
    #--------------------------------------------------------------------------------
    print(f"\n[3/6] Building metadata filter...")

    # Filter: only topic
    # filter_conditions = {}

    # if topic in ['bitcoin', 'ethereum']:
    #     filter_conditions['topic'] = topic
    #     print(f"  âœ“ Topic filter: {topic}")


    # Filter: topic + access_control
    conditions = []

    if topic in ("bitcoin", "ethereum"):
        conditions.append({"topic": topic})
        print(f"  âœ“ Topic filter: {topic}")
    else:
        print(f"  âœ“ Topic filter: none (searching both)")

    if user_access_level == "public":
        conditions.append({"access_level": "public"})
        print(f"  âœ“ Access filter: public only")
    else:
        print(f"  âœ“ Access filter: all content")

    # Combine conditions properly for Chroma
    filter_conditions = {"$and": conditions} if len(conditions) > 1 else (conditions[0] if conditions else None)

    print(f"  Final filter: {filter_conditions or 'None'}")

    #--------------------------------------------------------------------------------
    # STEP 4: SIMILARITY SEARCH
    #--------------------------------------------------------------------------------
    print(f"\n[4/6] Performing similarity search...")

    if filter_conditions:
        results = vectorstore.similarity_search(query, k=3, filter=filter_conditions)
    else:
        results = vectorstore.similarity_search(query, k=3)

    #--------------------------------------------------------------------------------
    # STEP 5: FORMAT CONTEXT
    #--------------------------------------------------------------------------------

    print(f"\n[5/6] Formatting context for LLM...")

    context = "\n\n".join([doc.page_content for doc in results])

    #--------------------------------------------------------------------------------
    # STEP 6: GENERATE ANSWER BY INVOKING CHAIN (TO DO IN v4 - refactor)
    #--------------------------------------------------------------------------------
    print(f"\n[6/6] Invoking RAG chain...")

    # >>> Implement create_rag_chain method
    response = ""

    print("\n" + "="*80)
    print("INFERENCE COMPLETE")
    print("="*80)

    return response  # Returns string

In [None]:
inference("What is Bitcoin?")

## Gradio Demo

In [None]:
def chat_inference(message, history):
  print(history)
  """
  Gradio ChatInterface wrapper

  Args:
      message (str): Current user message
      history (list): Chat history (we won't be using it now)

  Returns:
      str: Bot response
  """
  user_access_level = "premium"

  return inference(
      query=message,
      chat_history=history,
      user_access_level=user_access_level
  )

In [None]:
import gradio as gr

demo = gr.ChatInterface(
    fn=chat_inference,
    type="messages",
    title="ðŸª™ Bitcoin RAG Assistant (v2)",
    description="Ask questions about crypto.",
    examples=[
        "What is Bitcoin?",
        "What is Ethereum?",
        "How does mining work?",
        "What is proof of work?",
        "Explain the blockchain structure",
        "How are transactions verified?",
        "What is the double-spending problem?"
    ],
)

demo.launch(share=True, debug=True)