In [1]:
# Agentic RAG with Local Ollama Model
This notebook demonstrates how to build a Retrieval-Augmented Generation (RAG) agent using LangGraph, LangChain, and a local  model run via Ollama.

Adapted from: https://langchain-ai.github.io/langgraph/tutorials/rag/langgraph_agentic_rag/

## Materials
This notebook and all materials referenced here can be found on Sol `/data/sse/ai-accelerated-spark`.

SyntaxError: invalid syntax (1559956507.py, line 2)

## 1. Import libraries

In [None]:
from langchain_core.prompts import PromptTemplate
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain_core.tools import Tool
from langgraph.graph import Graph
from langchain.text_splitter import CharacterTextSplitter
from sentence_transformers import SentenceTransformer
from transformers import pipeline
import torch
from typing import List
from pydantic import BaseModel, Field
import os

os.environ["USER_AGENT"] = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/109.0"

## 2. Preprocess documents
### 2.1. Fetch documents

In [None]:
from langchain_community.document_loaders import WebBaseLoader

urls = [
    "https://medium.com/cupy-team/announcing-cupy-v13-66979ee7fab0",
    "https://www.unum.cloud/blog/2022-01-26-cupy",
    "https://medium.com/rapids-ai/easy-cpu-gpu-arrays-and-dataframes-run-your-dask-code-where-youd-like-e349d92351d"
]

docs = [WebBaseLoader(url).load() for url in urls]

In [None]:
docs[0][0].page_content.strip()[:1000]

### 2.2. Split the fetched documents into smaller chunks for indexing into the vectorstore

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=100, chunk_overlap=50
)
doc_splits = text_splitter.split_documents(docs_list)

In [None]:
doc_splits[0].page_content

## 3.Create a retriever tool
### 3.1. Use an in-memory vector store and all-MiniLM-L6-V2 embeddings model

In [None]:
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_huggingface import HuggingFaceEmbeddings

embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

vectorstore = InMemoryVectorStore.from_documents(
    documents = doc_splits, embedding = embedding_model
)
retriever = vectorstore.as_retriever()

In [None]:
# TODO: Use ChromaDB for persistent vectorstore
# https://python.langchain.com/docs/integrations/vectorstores/

### 3.2. Create a retriever tool using LangChain's prebuild `create_retriever_tool`

In [None]:
from langchain.tools.retriever import create_retriever_tool

retriever_tool = create_retriever_tool(
    retriever,
    "retrieve_python_gpu_acceleration",
    "Search and return information about accelerating Python code using the GPU with RAPIDS and CuPy.",
)

### 3.3. Test the tool

In [None]:
retriever_tool.invoke({"query": "How can I create a CuPy-backed Dask array for random data?"})

## 4. Generate query
### 4.1. Load local LLM

Start ollama using the terminal:
```bash
module load ollama/0.9.0
export OLLAMA_MODELS=/data/datasets/community/ollama
ollama-start
```

Check the available list of models using `ollama list`. Let me know via Slack if you would like to use and test other models.

In [None]:
from langchain_ollama import ChatOllama
import socket
from langchain_ollama.llms import OllamaLLM
from langchain.chat_models import init_chat_model

host_node = socket.gethostname()
llm_model = init_chat_model("ollama:qwen3:14b", temperature=0, base_url=f"http://jgarc111@{host_node}:11434/")

### 4.2. Build a `generate_query_or_respond` node

In [None]:
from langgraph.graph import MessagesState
import re

def generate_query_or_respond(state: MessagesState):
    """Call the model to generate a response based on the current state. Given
    the question, it will decide to retrieve using the retriever tool, or simply respond to the user.
    """
    response = (
        llm_model
        .bind_tools([retriever_tool]).invoke(state["messages"])
    )
    # remove thinking text
    content = re.sub(r"<think>.*</think>", "", response.content, flags=re.DOTALL).strip()
    response.content = content
    return {"messages": [response]}

### 4.3. Try a random input

In [None]:
input = {"messages": [{"role": "user", "content": "Hello! What is the color of the sky?"}]}
generate_query_or_respond(input)["messages"][-1].pretty_print()

### 4.4. Try semantic search question

In [None]:
input = {
    "messages": [
        {
            "role": "user",
            "content": "How can I create a CuPy-backed Dask array for random data?",
        }
    ]
}
generate_query_or_respond(input)["messages"][-1].pretty_print()

## 5. Grade documents
### 5.1. Add conditional edge `grade_documents` to determine the relevance of retrieved documents

In [None]:
from pydantic import BaseModel, Field
from typing import Literal

GRADE_PROMPT = (
    "You are a grader assessing relevance of a retrieved document to a user question. \n "
    "Here is the retrieved document: \n\n {context} \n\n"
    "Here is the user question: {question} \n"
    "If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n"
    "Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."
)

class GradeDocuments(BaseModel):
    """Grade documents using a binary score for relevance check."""

    binary_score: str = Field(
        description="Relevance score: 'yes' if relevant, or 'no' if not relevant"
    )


def grade_documents(
    state: MessagesState,
) -> Literal["generate_answer", "rewrite_question"]:
    """Determine whether the retrieved documents are relevant to the question."""
    question = state["messages"][0].content
    context = state["messages"][-1].content
    

    prompt = GRADE_PROMPT.format(question=question, context=context)
    response = (
        llm_model
        .with_structured_output(GradeDocuments).invoke(
            [{"role": "user", "content": prompt}]
        )
    )
    score = response.binary_score

    if score == "yes":
        return "generate_answer"
    else:
        return "rewrite_question"

### 5.2. Try with irrelevant documents in the tool response

In [None]:
from langchain_core.messages import convert_to_messages

input = {
    "messages": convert_to_messages(
        [
            {
                "role": "user",
                "content": "How can I create a CuPy-backed Dask array for random data?",
            },
            {
                "role": "assistant",
                "content": "",
                "tool_calls": [
                    {
                        "id": "1",
                        "name": "retrieve_python_gpu_acceleration",
                        "args": {"query": "creating CuPy-backed Dask arrays for random data"},
                    }
                ],
            },
            {"role": "tool", "content": "meow", "tool_call_id": "1"},
        ]
    )
}
grade_documents(input)

### 5.3. Try with relevant documents

In [None]:
input = {
    "messages": convert_to_messages(
        [
            {
                "role": "user",
                "content": "How can I create a CuPy-backed Dask array for random data?",
            },
            {
                "role": "assistant",
                "content": "",
                "tool_calls": [
                    {
                        "id": "1",
                        "name": "retrieve_python_gpu_acceleration",
                        "args": {"query": "creating CuPy-backed Dask arrays for random data"},
                    }
                ],
            },
            {
                "role": "tool",
                "content": 'Now, we can leverage the array.backend configuration to create a CuPy-backed Dask array for random data:>>> with dask.config.set({“array.backend”: “cupy”}):…    darr = da.random.randint(0, 3, size=(10, 20), chunks=(2, 5)) #\n\n= rs.randint(0, 3, size=(10, 20), chunks=(2, 5))>>> darrdask.array<randint, shape=(10, 20), dtype=int64, chunksize=(2, 5), \\chunktype=cupy.ndarray>Now, we can leverage the array.backend configuration to create a CuPy-backed Dask array for random data:>>> with\n\nfor random array creation.',
                "tool_call_id": "1",
            },
        ]
    )
}
grade_documents(input)

## 6.
### 6.1

In [None]:
REWRITE_PROMPT = (
    "Look at the input and try to reason about the underlying semantic intent / meaning.\n"
    "Here is the initial question:"
    "\n ------- \n"
    "{question}"
    "\n ------- \n"
    "Formulate an improved question:"
)


def rewrite_question(state: MessagesState):
    """Rewrite the original user question."""
    messages = state["messages"]
    question = messages[0].content
    prompt = REWRITE_PROMPT.format(question=question)
    response = llm_model.invoke([{"role": "user", "content": prompt}])
    # remove thinking text
    content = re.sub(r"<think>.*</think>", "", response.content, flags=re.DOTALL).strip()
    response.content = content
    return {"messages": [{"role": "user", "content": response.content}]}

### 6.2 Test

In [None]:
input = {
    "messages": convert_to_messages(
        [
            {
                "role": "user",
                "content": "How can I create a CuPy-backed Dask array for random data?",
            },
            {
                "role": "assistant",
                "content": "",
                "tool_calls": [
                    {
                        "id": "1",
                        "name": "retrieve_python_gpu_acceleration",
                        "args": {"query": "creating CuPy-backed Dask arrays for random data"},
                    }
                ],
            },
            {"role": "tool", "content": "meow", "tool_call_id": "1"},
        ]
    )
}

response = rewrite_question(input)
print(response["messages"][-1]["content"])

## 7. Generate an answer
### 7.1. Build `generate_answer` node

In [None]:
GENERATE_PROMPT = (
    "You are an assistant for question-answering tasks. "
    "Use the following pieces of retrieved context to answer the question. "
    "If you don't know the answer, just say that you don't know. "
    "Use three sentences maximum and keep the answer concise.\n"
    "Question: {question} \n"
    "Context: {context}"
)


def generate_answer(state: MessagesState):
    """Generate an answer."""
    question = state["messages"][0].content
    context = state["messages"][-1].content
    prompt = GENERATE_PROMPT.format(question=question, context=context)
    response = llm_model.invoke([{"role": "user", "content": prompt}])
    # remove thinking text
    content = re.sub(r"<think>.*</think>", "", response.content, flags=re.DOTALL).strip()
    response.content = content
    return {"messages": [response]}

## 7.2 Test

In [None]:
input = {
    "messages": convert_to_messages(
        [
            {
                "role": "user",
                "content": "How can I create a CuPy-backed Dask array for random data?",
            },
            {
                "role": "assistant",
                "content": "",
                "tool_calls": [
                    {
                        "id": "1",
                        "name": "retrieve_python_gpu_acceleration",
                        "args": {"query": "creating CuPy-backed Dask arrays for random data"},
                    }
                ],
            },
            {
                "role": "tool",
                "content": 'Now, we can leverage the array.backend configuration to create a CuPy-backed Dask array for random data:>>> with dask.config.set({“array.backend”: “cupy”}):…    darr = da.random.randint(0, 3, size=(10, 20), chunks=(2, 5)) #\n\n= rs.randint(0, 3, size=(10, 20), chunks=(2, 5))>>> darrdask.array<randint, shape=(10, 20), dtype=int64, chunksize=(2, 5), \\chunktype=cupy.ndarray>Now, we can leverage the array.backend configuration to create a CuPy-backed Dask array for random data:>>> with\n\nfor random array creation.',
                "tool_call_id": "1",
            },
        ]
    )
}

response = generate_answer(input)
response["messages"][-1].pretty_print()

## 8. Assemble the graph

In [None]:
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import tools_condition

workflow = StateGraph(MessagesState)

# Define the nodes we will cycle between
workflow.add_node(generate_query_or_respond)
workflow.add_node("retrieve", ToolNode([retriever_tool]))
workflow.add_node(rewrite_question)
workflow.add_node(generate_answer)

workflow.add_edge(START, "generate_query_or_respond")

# Decide whether to retrieve
workflow.add_conditional_edges(
    "generate_query_or_respond",
    # Assess LLM decision (call `retriever_tool` tool or respond to the user)
    tools_condition,
    {
        # Translate the condition outputs to nodes in our graph
        "tools": "retrieve",
        END: END,
    },
)

# Edges taken after the `action` node is called.
workflow.add_conditional_edges(
    "retrieve",
    # Assess agent decision
    grade_documents,
)
workflow.add_edge("generate_answer", END)
workflow.add_edge("rewrite_question", "generate_query_or_respond")

# Compile
graph = workflow.compile()

In [None]:
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

## 9. Run the agentic RAG

In [None]:
for chunk in graph.stream(
    {
        "messages": [
            {
                "role": "user",
                "content": "How can I create a CuPy-backed Dask array for random data?",
            }
        ]
    }
):
    for node, update in chunk.items():
        print("Update from node", node)
        update["messages"][-1].pretty_print()
        print("\n\n")

## 10. Graphic User Interface using Gradio

In [None]:
import gradio as gr

def ask_graph(user_input, chat_history):
    result = graph.invoke({
        "messages": [
            {"role": "user", "content": user_input}
        ]
    })

    response = result["messages"][-1].content

    if not chat_history:
        response = [{"role": "user", "content": user_input}, {"role": "assistant", "content": response}]
    else:
        response = chat_history + [{"role": "user", "content": user_input}, {"role": "assistant", "content": response}]

    return "", response

def clear_conversation():
    return "", ""

with gr.Blocks(fill_height=True, fill_width=True) as demo:
    gr.Markdown("### Agentic RAG")

    with gr.Column():

        with gr.Row():
            chatbot = gr.Chatbot(height=350, type="messages")

        with gr.Row():
            with gr.Column(scale=4):
                query_input = gr.Textbox(
                    label="Enter text here", placeholder="Ask something...", lines=1
                    )
            with gr.Column(scale=1):
                with gr.Row():
                    submit_btn = gr.Button("⬆")
                # 🧹 Clear button
                with gr.Row():
                    clear_btn = gr.Button("🧹 Clear Conversation")

        submit_btn.click(
            fn=ask_graph,
            inputs=[query_input, chatbot],
            outputs=[query_input, chatbot],
        )

        query_input.submit(
            fn=ask_graph,
            inputs=[query_input, chatbot],
            outputs=[query_input, chatbot],
        )

        clear_btn.click(
            fn=clear_conversation,
            outputs=[query_input, chatbot],
        )

demo.launch(share=True)