# Building RAG Applications with Langchain

This notebook demonstrates how to build a Retrieval Augmented Generation (RAG) application using Langchain. We will cover two main use cases: generating quizzes from web content and extracting information from runbooks.

## Setup

First, we need to set up our environment by loading API keys and importing necessary libraries.


In [80]:
import os
import bs4
import getpass
import requests
from langchain import hub
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from langchain_core.documents import Document
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter


load_dotenv()
        
if not os.environ.get("ANTHROPIC_API_KEY"):
  os.environ["ANTHROPIC_API_KEY"] = getpass.getpass("Enter API key for Anthropic: ")

from langchain.chat_models import init_chat_model

llm = init_chat_model("claude-3-5-sonnet-latest", model_provider="anthropic")

In [70]:
if not os.environ.get("OPENAI_API_KEY"):
  os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

In [71]:
from langchain_chroma import Chroma

vector_store = Chroma(embedding_function=embeddings)

#### Explanation:

- We load the necessary API keys for Anthropic and OpenAI.
- We initialize the language model (LLM) and embeddings model.
- We set up Chroma as our vector store.

## Usecase 1: Generating Quizzes from Web Content

In this use case, we will fetch content from specified webpages, create a vector store, and generate a quiz based on the content.

### Fetching and Cleaning Web Content

In [72]:
def fetch_clean_web_content(url):
    """Fetches and extracts only the main content from a webpage."""
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.text, "html.parser")
        
        # Remove unwanted elements
        for tag in ["script", "style", "nav", "footer", "header", "aside", "form", "noscript"]:
            for element in soup.find_all(tag):
                element.decompose()
        
        # Extract meaningful sections (modify selectors based on site structure)
        main_content = soup.find("article") or soup.find("main") or soup.find("div", class_="content") or soup.find("body")
        if not main_content:
            return None

        text = main_content.get_text(separator="\n", strip=True)

        # Ensure we get a minimum amount of text (to avoid indexing useless content)
        if len(text) < 300:  
            print(f"Skipped {url} due to insufficient content.")
            return None

        return Document(page_content=text, metadata={"source": url})
    
    except Exception as e:
        print(f"Failed to load {url}: {e}")
        return None


In [73]:
list_of_webpages = [
    "https://www.datastax.com/guides/hierarchical-navigable-small-worlds",
    "https://medium.com/@datastax/how-does-hierarchical-navigable-small-world-hnsw-power-genai-ee0ee24f8fce"
]

# Load and clean content from webpages
clean_docs = [fetch_clean_web_content(url) for url in list_of_webpages]
clean_docs = [doc for doc in clean_docs if doc]  # Remove None values

# Now, use clean_docs for vector indexing
from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = splitter.split_documents(clean_docs)

In [74]:
_ = vector_store.add_documents(all_splits)

#### Explanation:

- We define a list of URLs to fetch content from.
- The fetch_clean_web_content function fetches the content, removes unwanted HTML elements, and extracts the main text.
- We split the documents into smaller chunks for better vector search.
- We add the documents to our vector store.

In [75]:
# # Debug
# retrieved_docs = vector_store.similarity_search("Generate me quiz questions on Nonissuers Communication?", k=5)
# for doc in retrieved_docs:
#     print(doc.page_content[:500])  # Print first 500 characters

## Generating Quiz

In [76]:
# Define prompt for question-answering
prompt = hub.pull("rlm/rag-prompt")

# Define state for application
class State(TypedDict):
    question: str
    context: List[Document]
    quiz_questions: str
    structured_summary: str

# Define application steps
def retrieve(state: State):
    retrieved_docs = vector_store.similarity_search(state["question"], k=5)
    
    # Combine document content
    combined_content = "\n\n".join(doc.page_content for doc in retrieved_docs)
    
    # Validate content
    if not combined_content.strip() or len(combined_content) < 300:
        return {"context": None, "error": "Retrieved context is too fragmented or insufficient."}

    return {"context": retrieved_docs}

def summarize_context(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    
    summary_prompt = """
    Given the following text, extract the key concepts in a structured manner:
    
    Text:
    {context}

    Output the response as a structured summary.
    """
    
    response = llm.invoke(summary_prompt.format(context=docs_content))
    return {"structured_summary": response.content}

def generate_quiz(state: State):
    structured_summary = state["structured_summary"]
    
    quiz_prompt = """
    Based on the structured summary below, generate 5 multiple-choice quiz questions. 
    Each question should have 4 options with one correct answer.

    Structured Summary:
    {summary}

    Output Format:
    Q1: [Question here]
    A) Option 1
    B) Option 2
    C) Option 3
    D) Option 4
    Correct Answer: [Correct Option]
    """
    
    response = llm.invoke(quiz_prompt.format(summary=structured_summary))
    return {"quiz_questions": response.content}

In [77]:
graph_builder = StateGraph(State).add_sequence([retrieve, summarize_context, generate_quiz])

graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()

In [78]:
response = graph.invoke({"question": "Generate a quiz on HNSW indexing"})

print(response["quiz_questions"])

Here are 5 multiple-choice questions based on the structured summary:

Q1: What is a key architectural component of HNSW?
A) Single-layer structure
B) Tiered architecture with multiple layers
C) Random node distribution
D) Fixed-path routing
Correct Answer: B

Q2: How has HNSW evolved from its predecessor NSW?
A) From logarithmic to linear complexity
B) From linear to exponential complexity
C) From polylogarithmic to logarithmic complexity
D) From exponential to linear complexity
Correct Answer: C

Q3: Which feature is NOT one of the main applications of HNSW?
A) Vector databases
B) Blockchain processing
C) Similarity search operations
D) AI and data science applications
Correct Answer: B

Q4: What is one of the primary implementation considerations for HNSW?
A) Network bandwidth requirements
B) CPU clock speed limitations
C) Practical limitations on memory usage
D) Graphics processing capabilities
Correct Answer: C

Q5: Which functionality is central to HNSW's design?
A) Data encrypti

### Explanation:

- We define a prompt for question-answering.
- We create a state graph to manage the flow of our application.
- The retrieve function fetches relevant documents from the vector store.
- The summarize_context function generates a structured summary of the retrieved content.
- The generate_quiz function creates a quiz based on the summary.
- We invoke the graph with a question and print the generated quiz.

## Usecase 2: Extracting Information from Runbooks

In this use case, we will extract information from PDF and text files in a specified directory.

In [58]:
def load_documents_from_directory(directory: str):
    """Loads all PDFs and text files from a given directory."""
    documents = []
    
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        
        if filename.endswith(".pdf"):
            print(f"Loading PDF: {filename}")
            loader = PyPDFLoader(file_path)
            docs = loader.load()
        
        elif filename.endswith(".txt"):
            print(f"Loading Text File: {filename}")
            loader = TextLoader(file_path)
            docs = loader.load()
        
        else:
            print(f"Skipping unsupported file: {filename}")
            continue
        
        # Attach metadata
        for doc in docs:
            doc.metadata["source"] = filename  # Track document source
        
        documents.extend(docs)
    
    return documents

docs = load_documents_from_directory(DATA_DIR)
chunks = chunk_documents(docs)
runbook_vector_store = Chroma(embedding_function=embeddings)
index_documents(runbook_vector_store, chunks)

#### Explanation:

- The load_documents_from_directory function loads PDF and text files from the directory.


#### Helper Methods

In [81]:
def chunk_documents(docs, chunk_size=500, chunk_overlap=200):
    """Splits documents into smaller chunks for better vector search."""
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    return splitter.split_documents(docs)

def create_vector_store():
    """Initializes Chroma vector store."""
    return Chroma(embedding_function=embeddings)


def index_documents(runbook_vector_store, docs):
    """Adds chunked documents to the vector store."""
    runbook_vector_store.add_documents(docs)

def retrieve_similar_docs(runbook_vector_store, query, k=5):
    """Retrieves top-k similar documents for a given query."""
    return runbook_vector_store.similarity_search(query, k=k)


#### Explanation:

- `chunk_documents`: Splits the documents into smaller chunks using RecursiveCharacterTextSplitter.
- `create_vector_store`: Initializes a Chroma vector store with the specified embeddings.
- `index_documents`: Adds the document chunks to the vector store.

### Defining Runbook Graph

In [98]:
# Define prompt for question-answering
prompt = hub.pull("rlm/rag-prompt")

class RunbookState(TypedDict):
    question: str
    context: List[Document]
    answer: str

def rb_retrieve(state: RunbookState):
    retrieved_docs = runbook_vector_store.similarity_search(state["question"], k=5)
    return {"context": retrieved_docs}

def rb_generate(state: RunbookState):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    prompt = hub.pull("rlm/rag-prompt")
    generate_prompt = """
        You are an expert runbook assistant. Given the user's question and the relevant information from the runbooks, provide a clear and actionable response.
        
        User Question: {question}
                
        Ensure your response is accurate, practical, and easy to understand.
        
        Output your response in the following structured format:
        
        **Core Problem:** [Clearly state the problem]
        
        **Action Plan:**
        1.  [Action 1]
        2.  [Action 2]
        3.  [Action 3]
        
        **Key Information:**
        [Summarize relevant details from the runbook information]
        
        Remember, your goal is to provide fast, practical, and accurate assistance to the user.
        """
    messages = prompt.invoke({"question": generate_prompt.format(question=state['question']), "context": docs_content})
    response = llm.invoke(messages)
    return {"answer": response.content}

runbook_graph_builder = StateGraph(RunbookState).add_sequence([rb_retrieve, rb_generate])
runbook_graph_builder.add_edge(START, "rb_retrieve")
runbook_graph = runbook_graph_builder.compile()

#### Explanation:

- `RunbookState`: Defines the state for our runbook application.
- `rb_retrieve`: Retrieves relevant documents from the runbook vector store.
- `rb_generate`: Generates an answer based on the retrieved documents using the LLM.
- We build and compile the state graph for the runbook application.

In [99]:
# Define directory containing runbooks (PDFs & Text)
DATA_DIR = "data/runbooks"

# Load documents
docs = load_documents_from_directory(DATA_DIR)

# Chunk documents
chunks = chunk_documents(docs)

# Create & populate vector store
# runbook_vector_store.delete_collection()
runbook_vector_store = create_vector_store()
index_documents(runbook_vector_store, chunks)

Loading Text File: AtherStream-Runbook.txt
Loading Text File: sample_runbook1.txt


### Querying the Runbook System

In [100]:
runbook_graph_builder = StateGraph(RunbookState).add_sequence([rb_retrieve, rb_generate])

runbook_graph_builder.add_edge(START, "rb_retrieve")
runbook_graph = runbook_graph_builder.compile()

In [102]:
# Query the system

query = input('[Runbook expert] How can I help you today?')

runbook_response = runbook_graph.invoke({"question": query})
print(runbook_response['answer'])

[Runbook expert] How can I help you today? How to check for kinesis metrics for AetherStream?


**Core Problem:** 
Need to monitor Kinesis metrics for AetherStream data processing pipeline

**Action Plan:**
1. Access AWS CloudWatch console as AetherStream uses CloudWatch for monitoring
2. Navigate to Kinesis Data Streams metrics section
3. Monitor metrics related to AetherStream's Kinesis components through CloudWatch dashboard

**Key Information:**
- AetherStream uses AWS Kinesis Data Streams for high-volume data ingestion and processing
- System architecture flows from data sources through Kinesis to Lambda and then to downstream systems
- Monitoring is implemented through CloudWatch integration
