<b>Lab 2 | Week 4 Day 3</b>

Going Deep

In [None]:
from dotenv import load_dotenv
from pydantic import BaseModel
from typing import Annotated, TypedDict
from IPython.display import Image, display

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI

import random, requests, os
import gradio as gr

In [None]:
# Load environment variables
load_dotenv(override=True)

<b> Set up LangSmith  </b>

https://smith.langchain.com

<b> Next, here is a useful function in LangChain community:</b>

In [None]:
from langchain_community.utilities import GoogleSerperAPIWrapper

serper = GoogleSerperAPIWrapper()
serper.run("When is the next Cricket worldcup?")

<b>A LangChain wrapper class for converting functions into Tools</b>

In [None]:
from langchain.agents import Tool

tool_search = Tool(
    name="search",
    func=serper.run,
    description="Useful when you need more information from an online search"
)

Now we can try out the tool the langchain way

In [None]:
tool_search.invoke("When is the next Cricket worldcup?")

<b> Let's write a tool ourselves</b>

-- pick a familiar one

In [None]:
pushover_token = os.getenv("PUSHOVER_TOKEN")
pushover_user = os.getenv("PUSHOVER_USER")
pushover_url = "https://api.pushover.net/1/messages.json"

def push(text: str):
    """Send a push notification to the user"""
    requests.post(pushover_url, data = {"token": pushover_token, "user": pushover_user, "message": text})

In [None]:
tool_push = Tool(
        name="send_push_notification",
        func=push,
        description="useful for when you want to send a push notification"
    )

tool_push.invoke("Hello, me")

In [None]:
# Bring the tools together
tools = [tool_search, tool_push]

<b> Back to the Graph from yesterday </b>

One small change - using <b> TypedDict </b> instead of BaseModel for the State object

When we implemented tools, we always needed to make 2 changes to the code:

1. Changes to provide the tools to OpenAI in json when we make the call

2. Changes to handle the results back: look for the model saying that the finish_reason=="tool_calls" and then retrieve the call, run the function, provide the results.

Below, we can see how LangGraph handles these cases

In [None]:
# Step 1: Define the State object
class State(TypedDict):
    messages: Annotated[list, add_messages]

In [None]:
# Step 2: Start the Graph Builder with this State class
graph_builder = StateGraph(State)

In [None]:
# This is different: bind the llm with tools before using in the node in Step 3
llm = ChatOpenAI(model="gpt-4o-mini")
llm_with_tools = llm.bind_tools(tools) # This is the first scenario in tool use, mentioned above

In [None]:
# Step 3: Create Nodes (the chatbot and the tool node)

def chatbot(state: State):
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("tools", ToolNode(tools=tools))

In [None]:
# Step 4: Create Edges

# Entry point
graph_builder.add_edge(START, "chatbot")

# Add a conditional edge from the chatbot to the tool node (the if statement)
# This is the 2nd scenario mentioned above in tool use
graph_builder.add_conditional_edges("chatbot", tools_condition, "tools") 

# Any time a tool is called, the control has to return to the chatbot to decide the next step. 
# The order here is important 
graph_builder.add_edge("tools", "chatbot")


In [None]:
# Step 5: Compile the Graph
graph = graph_builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

LangGraph automatically assigns an END node for unreolved conditions. That's why we see the END node in the graph display

#### That's it! And, let's do this:

In [None]:
def chat(user_input: str, history):
    result = graph.invoke({"messages": [{"role": "user", "content": user_input}]})
    return result["messages"][-1].content

In [None]:
gr.ChatInterface(chat, type="messages").launch()

<b> It's time to add Memory! (Checkpointing) </b>

 BUT WAIT!

We have this whole Graph maintaining the state and appending to the state. 
Why isn't this handling memory?

### This is a crucial point for understanding LangGraph

> A super-step can be considered a single iteration over the graph nodes. Nodes that run in parallel are part of the same super-step, while nodes that run sequentially belong to separate super-steps.


One "Super-Step" of the graph represents one invocation of passing messages between agents.

In idomatic LangGraph, you call invoke to run your graph for each super-step; for each interaction.

The reducer handles state updates automatically within one super-step, but not between them.

That is what checkpointing achieves.

<b>In memory storage</b>

In [None]:
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

Now, all 5 steps in one go -

In [None]:
# Steps 1 and 2
graph_builder = StateGraph(State)


# Step 3
llm = ChatOpenAI(model="gpt-4o-mini")
llm_with_tools = llm.bind_tools(tools)

def chatbot(state: State):
    print(state)
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("tools", ToolNode(tools=tools))

# Step 4
graph_builder.add_conditional_edges( "chatbot", tools_condition, "tools")
graph_builder.add_edge("tools", "chatbot")

graph_builder.add_edge(START, "chatbot")

# Step 5
graph = graph_builder.compile(checkpointer=memory) # Add checkpointer during compilation
# display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
# Define the Gradio chat function with configuration
config = {"configurable": {"thread_id": "1"}}

def chat(user_input: str, history):
    result = graph.invoke({"messages": [{"role": "user", "content": user_input}]}, config=config)
    return result["messages"][-1].content


In [None]:
graph.get_state(config)

In [None]:
gr.ChatInterface(chat, type="messages").launch()

In [None]:
graph.get_state(config)

In [None]:
# Most recent first

list(graph.get_state_history(config))

### LangGraph gives us tools to set the state back to a prior point in time, to branch off:

```
config = {"configurable": {"thread_id": "1", "checkpoint_id": ...}}
graph.invoke(None, config=config)
```

And this allows you to build stable systems that can be recovered and rerun from any prior checkpoint.

In [None]:
config = {"configurable": {"thread_id": "1", "checkpoint_id": '1f0f1e98-882c-67f8-8007-04b447626bcf'}}
graph.invoke(None, config=config)

<b>Persistent Storage -  Store in SQL</b>

> And this is the power of LangGraph

In [None]:
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver

db_path = "memory.db"
conn = sqlite3.connect(db_path, check_same_thread=False)

sql_memory = SqliteSaver(conn)

In [None]:
# Steps 1 and 2
graph_builder = StateGraph(State)


# Step 3
llm = ChatOpenAI(model="gpt-4o-mini")
llm_with_tools = llm.bind_tools(tools)

def chatbot(state: State):
    print(state)
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("tools", ToolNode(tools=tools))

# Step 4
graph_builder.add_conditional_edges( "chatbot", tools_condition, "tools")
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")

# Step 5
graph = graph_builder.compile(checkpointer=sql_memory) # Add sql memory checkpointer during compilation
 

In [None]:
display(Image(graph.get_graph().draw_mermaid_png()))

Gradio interface

In [None]:
config = {"configurable": {"thread_id": "3"}}

def chat(user_input: str, history):
    result = graph.invoke({"messages": [{"role": "user", "content": user_input}]}, config=config)
    return result["messages"][-1].content

In [None]:
gr.ChatInterface(chat, type="messages").launch()

That brings the Memory section to an END and is wrap of Day 3