In [1]:
# Install necessary libraries
%pip install -U langchain langchain-openai langgraph faiss-cpu tiktoken

Defaulting to user installation because normal site-packages is not writeable
Looking in links: /usr/share/pip-wheels
Collecting langgraph
  Downloading langgraph-0.5.2-py3-none-any.whl.metadata (6.7 kB)
Collecting faiss-cpu
  Downloading faiss_cpu-1.11.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (4.8 kB)
Collecting langgraph-checkpoint<3.0.0,>=2.1.0 (from langgraph)
  Downloading langgraph_checkpoint-2.1.0-py3-none-any.whl.metadata (4.2 kB)
Collecting langgraph-prebuilt<0.6.0,>=0.5.0 (from langgraph)
  Downloading langgraph_prebuilt-0.5.2-py3-none-any.whl.metadata (4.5 kB)
Collecting langgraph-sdk<0.2.0,>=0.1.42 (from langgraph)
  Downloading langgraph_sdk-0.1.72-py3-none-any.whl.metadata (1.5 kB)
Collecting xxhash>=3.5.0 (from langgraph)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting ormsgpack>=1.10.0 (from langgraph-checkpoint<3.0.0,>=2.1.0->langgraph)
  Downloading ormsgpack-1.10.0-cp310-cp310-manylinux_2_17_x8

In [None]:
import os
import sys
from openai import OpenAI
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator


# --- Set your OpenAI API Key ---
# It's highly recommended to set this as an environment variable for security.
# You can do this in your terminal before starting Jupyter:
# export OPENAI_API_KEY='your_api_key_here' (Linux/macOS)
# $env:OPENAI_API_KEY='your_api_key_here' (PowerShell)
#
# If you must set it directly in the notebook (NOT recommended for production):
# os.environ["OPENAI_API_KEY"] = "YOUR_ACTUAL_OPENAI_API_KEY"

# Verify API key is set
if "OPENAI_API_KEY" not in os.environ:
    print("WARNING: OPENAI_API_KEY environment variable not set.")
    print("Please set it before proceeding, or uncomment the line above to set it directly (not recommended).")
else:
    print("OPENAI_API_KEY is set.")

# Print the Python executable path to help debug environment issues
print(f"Python executable: {sys.executable}")

# Initialize the ChatOpenAI model for generation
llm = ChatOpenAI(model="gpt-4", temperature=0.7) # Adjust temperature for creativity (0.0 for deterministic)

# Initialize OpenAIEmbeddings for RAG
embeddings = OpenAIEmbeddings()

print("LangChain, LangGraph, and OpenAI setup complete.")
print("\nIf you still encounter 'ModuleNotFoundError' after running this cell, please try:")
print("1. Restarting your Jupyter kernel (Kernel -> Restart Kernel...)")
print("2. Running this setup cell again.")

OPENAI_API_KEY is set.
Python executable: /opt/conda/envs/anaconda-ai-2024.04-py310/bin/python
LangChain, LangGraph, and OpenAI setup complete.

If you still encounter 'ModuleNotFoundError' after running this cell, please try:
1. Restarting your Jupyter kernel (Kernel -> Restart Kernel...)
2. Running this setup cell again.


In [3]:
# --- RAG Setup ---
rag_file_path = "pisa_history.txt"

# 1. Load the document
try:
    loader = TextLoader(rag_file_path, encoding="utf-8")
    documents = loader.load()
    print(f"Successfully loaded RAG document from '{rag_file_path}'")
except FileNotFoundError:
    print(f"Error: The RAG file '{rag_file_path}' was not found. Please create it.")
    exit()
except Exception as e:
    print(f"An error occurred while loading the RAG document: {e}")
    exit()

# 2. Split the document into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(documents)
print(f"Split document into {len(splits)} chunks.")

# 3. Create a FAISS vector store from the chunks and embeddings
vectorstore = FAISS.from_documents(documents=splits, embedding=embeddings)
print("FAISS vector store created.")

# 4. Create a retriever
retriever = vectorstore.as_retriever()
print("Retriever created.")


Successfully loaded RAG document from 'pisa_history.txt'
Split document into 4 chunks.
FAISS vector store created.
Retriever created.


In [4]:
# --- LangGraph Setup ---

# 1. Define Graph State
# This defines the object that is passed between nodes in the graph.
class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: The user's question.
        context: Retrieved context from the RAG system.
        generation: The final generated answer from the LLM.
        is_historical_query: A flag to determine if the query is historical.
    """
    question: str
    context: Annotated[List[str], operator.add] # Context will be accumulated
    generation: str
    is_historical_query: bool # New field to control flow

# 2. Define Nodes (Functions)

# Node 1: Query Classifier
# This node determines if the user's question is within the historical expert's scope.
def query_classifier(state: GraphState):
    """
    Determines if the incoming query is a historical question.
    This helps in deciding whether to perform RAG or directly apply guardrails.
    """
    print("---CLASSIFYING QUERY---")
    question = state["question"]

    # Use a simpler LLM call for classification to save tokens/latency
    classifier_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content="You are a helpful assistant. Your task is to classify if a given user question is related to 'history', 'architecture', or 'engineering' of historical structures. Respond with 'YES' if it is, and 'NO' if it is not. Be strict with your classification. Examples of 'NO': current events, personal opinions, finance, medical advice, fictional scenarios."),
        HumanMessage(content=f"Is the following question historical/architectural/engineering-related? '{question}'")
    ])
    classifier_chain = classifier_prompt | llm | StrOutputParser()

    classification_result = classifier_chain.invoke({"question": question})
    is_historical = "YES" in classification_result.upper()

    print(f"Query Classification: {classification_result.strip()} (Is Historical: {is_historical})")
    return {"is_historical_query": is_historical}


# Node 2: Retrieve
def retrieve(state: GraphState):
    """
    Retrieves documents from the vector store based on the user's question.
    """
    print("---RETRIEVING CONTEXT---")
    question = state["question"]
    docs = retriever.invoke(question)
    context = [doc.page_content for doc in docs]
    print(f"Retrieved {len(context)} documents.")
    return {"context": context}

# Node 3: Generate
def generate(state: GraphState):
    """
    Generates a response using the LLM, incorporating retrieved context if available,
    and adhering to the system prompt with guardrails.
    """
    print("---GENERATING RESPONSE---")
    question = state["question"]
    context = state["context"]

    # Load system prompt content from file
    try:
        with open("prompt.txt", "r", encoding="utf-8") as file:
            system_prompt_content = file.read()
    except Exception as e:
        print(f"Error loading prompt.txt in generate node: {e}")
        system_prompt_content = "You are a helpful assistant." # Fallback

    # Construct the prompt for generation
    # If context is available, include it for RAG. Otherwise, the LLM will rely solely on its knowledge + system prompt.
    if context:
        template = (
            system_prompt_content + "\n\n"
            "Use the following retrieved context to answer the question. If the question cannot be answered from the provided context, state that you do not have sufficient information, but still adhere to your historical expert persona and guardrails.\n\n"
            "Context:\n{context}\n\n"
            "Question: {question}\n\n"
            "Answer:"
        )
    else:
        # For non-historical or general questions, rely only on the system prompt
        template = (
            system_prompt_content + "\n\n"
            "Question: {question}\n\n"
            "Answer:"
        )

    prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=template),
        HumanMessage(content="{question}")
    ])

    # Create the generation chain
    rag_chain = prompt | llm | StrOutputParser()

    # Prepare input for the chain
    input_data = {"question": question}
    if context:
        input_data["context"] = "\n".join(context)

    generation_result = rag_chain.invoke(input_data)
    print("Response generated.")
    return {"generation": generation_result}

# 3. Define Conditional Edge
def decide_to_retrieve(state: GraphState):
    """
    Decides whether to retrieve context based on the query classification.
    """
    print("---DECIDING TO RETRIEVE---")
    if state["is_historical_query"]:
        print("Decision: Query is historical, proceeding to retrieve.")
        return "retrieve"
    else:
        print("Decision: Query is not historical, skipping retrieval and directly generating (applying general guardrails).")
        return "generate" # Skip retrieval for non-historical questions

print("Graph state and nodes defined.")

Graph state and nodes defined.


In [5]:
# --- Build the Graph ---
workflow = StateGraph(GraphState)

# Add nodes
workflow.add_node("classify_query", query_classifier)
workflow.add_node("retrieve_context", retrieve)
workflow.add_node("generate_response", generate)

# Set entry point
workflow.set_entry_point("classify_query")

# Add edges
workflow.add_conditional_edges(
    "classify_query",
    decide_to_retrieve,
    {
        "retrieve": "retrieve_context",
        "generate": "generate_response",
    },
)

# Add edge from retrieve to generate
workflow.add_edge("retrieve_context", "generate_response")

# Set end point
workflow.add_edge("generate_response", END)

# Compile the graph
app = workflow.compile()

print("LangGraph workflow compiled.")

LangGraph workflow compiled.


### 7. Run the Agent and Test Guardrails
Let's test our RAG-enabled, stateful Historical Expert with both an on-topic and an off-topic question.

Test Case 1: On-Topic Historical Question (RAG should activate)

In [7]:
print("\n--- Test Case 1: Asking an on-topic historical question (RAG expected) ---")
historical_question = "Why does the Leaning Tower of Pisa lean, and what was done to fix it?"
print(f"User Question: {historical_question}")

try:
    inputs = {"question": historical_question, "context": [], "generation": "", "is_historical_query": False}
    for s in app.stream(inputs):
        print(s)
        print("---")
    final_state = app.invoke(inputs)
    print("\nFinal Historical Expert's Response:")
    print(final_state["generation"])

except Exception as e:
    print(f"An error occurred during the historical question API call: {e}")


--- Test Case 1: Asking an on-topic historical question (RAG expected) ---
User Question: Why does the Leaning Tower of Pisa lean, and what was done to fix it?
---CLASSIFYING QUERY---
Query Classification: YES (Is Historical: True)
---DECIDING TO RETRIEVE---
Decision: Query is historical, proceeding to retrieve.
{'classify_query': {'is_historical_query': True}}
---
---RETRIEVING CONTEXT---
Retrieved 4 documents.
{'retrieve_context': {'context': ['Over the centuries, various efforts were made to correct or prevent the collapse of the tower. In 1838, architect Alessandro Gherardesca dug a pathway around the base to make the base visible, which caused the tower to lean even more. Benito Mussolini ordered that the tower be returned to a vertical position, and concrete was poured into the foundations in 1934, which also worsened the lean.\n\nThe most significant stabilization efforts took place from 1990 to 2001. An international committee of experts, led by Michele Jamiolkowski, undertook

### Test Case 2: Off-Topic Question (Guardrails should activate)

In [8]:
print("\n--- Test Case 2: Asking an off-topic question (Guardrails expected) ---")
off_topic_question = "Can you give me a detailed analysis of the current stock market trends for tech companies?"
print(f"User Question: {off_topic_question}")

try:
    inputs = {"question": off_topic_question, "context": [], "generation": "", "is_historical_query": False}
    for s in app.stream(inputs):
        print(s)
        print("---")
    final_state = app.invoke(inputs)
    print("\nFinal Historical Expert's Response:")
    print(final_state["generation"])

except Exception as e:
    print(f"An error occurred during the off-topic question API call: {e}")



--- Test Case 2: Asking an off-topic question (Guardrails expected) ---
User Question: Can you give me a detailed analysis of the current stock market trends for tech companies?
---CLASSIFYING QUERY---
Query Classification: NO (Is Historical: False)
---DECIDING TO RETRIEVE---
Decision: Query is not historical, skipping retrieval and directly generating (applying general guardrails).
{'classify_query': {'is_historical_query': False}}
---
---GENERATING RESPONSE---
Response generated.
{'generate_response': {'generation': "I'm sorry, but I can't assist without knowing the question. Can you please provide more details?"}}
---
---CLASSIFYING QUERY---
Query Classification: NO (Is Historical: False)
---DECIDING TO RETRIEVE---
Decision: Query is not historical, skipping retrieval and directly generating (applying general guardrails).
---GENERATING RESPONSE---
Response generated.

Final Historical Expert's Response:
Apologies for the confusion, but it seems your question got lost. Could you ple