# Lab 3.5: Advanced LangGraph Concepts (Optional)

This optional lab covers advanced patterns that are essential for building production-grade agents.

### Learning Objectives
1. **Production Persistence**: Moving beyond in-memory checkpointers to SQLite.
2. **Time Travel**: Rewinding the graph to a previous state and exploring alternative paths ("forking").
3. **Advanced Flow Control**: Using the `Command` class for dynamic, edgeless routing.
4. **Complex Workflow**: Building a LinkedIn Post Generator with iterative human feedback.

In [None]:
# 1. Install Dependencies
print("Installing dependencies...")
# We strictly need langgraph 0.2.x+ for the Command class
%pip install -qU "langgraph>=0.2.0" langchain-groq langchain-community langgraph-checkpoint-sqlite
print("Dependencies installed.")

In [None]:
# 2. Setup API Keys
from google.colab import userdata
import os

os.environ["GROQ_API_KEY"] = userdata.get('GROQ_API_KEY')
# Optional for LangSmith Tracking
os.environ["LANGSMITH_API_KEY"] = userdata.get('LANGSMITH_API_KEY')
os.environ["LANGSMITH_TRACING_V2"] = "true"
os.environ["LANGSMITH_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGSMITH_PROJECT"] = "Advanced LangGraph"

## Part 1: Production Persistence (SQLite)

In previous labs, we used `MemorySaver`. This is great for testing but loses all data if the program restarts.
For production, we use persistent databases like Postgres or SQLite.

Here, we will use `SqliteSaver` to write valid checkpoints to a local file (`checkpoints.sqlite`).

In [None]:
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver

print("Connecting to SQLite database...")
# Connect to a local SQLite database file
# check_same_thread=False is needed for some Jupyter environments to avoid threading errors
conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory_sqlite = SqliteSaver(conn)
print("SQLite checkpointer ready.")

## Part 2: Time Travel (Rewinding and Forking)

Time travel allows you to:
1. **View History**: See what the state was 3 steps ago.
2. **Rewind**: Replay the agent from a previous step.
3. **Fork**: Go back to a previous step, provide *different* input, and take a new path.

We will build a simple "Counter" graph to demonstrate this.

In [None]:
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END

class CounterState(TypedDict):
    count: int
    last_action: str

def step_one(state: CounterState):
    print("  [Executing Step One]")
    new_count = state["count"] + 1
    return {"count": new_count, "last_action": "Step One"}

def step_two(state: CounterState):
    print("  [Executing Step Two]")
    new_count = state["count"] * 2
    return {"count": new_count, "last_action": "Step Two"}

# Build the graph
builder = StateGraph(CounterState)
builder.add_node("step_1", step_one)
builder.add_node("step_2", step_two)

builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", END)

# interrupt_before step_2 to let us pause and inspect
graph = builder.compile(checkpointer=memory_sqlite, interrupt_before=["step_2"])
print("Graph compiled with checkpointer and interrupt.")

In [None]:
config = {"configurable": {"thread_id": "counter-demo-1"}}

# Run the graph. It should pause before step_2.
print("--- Starting Run (Should pause before Step 2) ---")
graph.invoke({"count": 10, "last_action": "Init"}, config=config)

# Check current state
state = graph.get_state(config)
print(f"Current State Values: {state.values}")
print(f"Next Step to Execute: {state.next}")

### Viewing History
Let's look at the history of this thread.

In [None]:
# Get all state snapshots for this thread
history = list(graph.get_state_history(config))
print(f"Found {len(history)} checkpoints.")

print("--- History (Newest First) ---")
for i, snapshot in enumerate(history):
    print(f"[{i}] ID: {snapshot.config['configurable']['checkpoint_id']} | Count: {snapshot.values.get('count')}")

### The "Fork" (Time Travel)

We are currently paused before Step 2. Step 1 added 1 to our count (10 -> 11).
Step 2 is about to multiply by 2 (11 * 2 = 22).

**Goal**: Let's go BACK to before Step 1, change the input to 50, and re-run.

Note: To do this, we need the `checkpoint_id` of the state *before* step 1 ran. Usually this is the last one in the history list (the initialization).

In [None]:
# We grab the configuration of the initial state (the last item in our history list from above)
initial_checkpoint_config = history[-1].config

print("Rewinding to checkpoint:", initial_checkpoint_config['configurable']['checkpoint_id'])

# We update the state at that past point with NEW values
# This effectively creates a "Fork" in the history
graph.update_state(
    initial_checkpoint_config,
    {"count": 50, "last_action": "Init (Forked)"} 
)
print("State updated at past checkpoint.")

In [None]:
# Now we resume execution from THAT point. 
# IMPORTANT: Use config=config (current thread). The graph keys off the Thread ID,
# but since we updated a past state, it knows to branch off from there.
print("--- Resuming from Fork (Count should start at 50) ---")
for event in graph.stream(None, config=config):
    print(event)

print("\n--- Resuming from Interrupt (Step 2) ---")
# Since we are at step_2 (from the fork), this runs step_2
for event in graph.stream(None, config=config):
    print(event)

## Part 3: Advanced Flow Control (The `Command` Class)

Normally, we define edges explicitly (`add_edge("a", "b")`).
The `Command` class allows nodes to decide *dynamically* where to go next, and update the state at the same time.
This works great for multi-agent routing.

In [None]:
from langgraph.types import Command

class AgentState(TypedDict):
    sentiment: str
    decision: str
    msg: str

# A simple routing node
def oracle(state: AgentState) -> Command[Literal["happy_path", "sad_path"]]:
    sentiment = state.get("sentiment", "neutral")
    print(f"Oracle sees sentiment: {sentiment}")
    
    if sentiment == "happy":
        # Go to 'happy_path' node and update state simultaneously
        return Command(goto="happy_path", update={"decision": "Stay Positive!"})
    else:
        return Command(goto="sad_path", update={"decision": "Cheer Up!"})

def happy_path(state: AgentState):
    return {"msg": "Sunshine and rainbows!"}

def sad_path(state: AgentState):
    return {"msg": "Rainy days..."}

# Build Graph
edgeless_builder = StateGraph(AgentState)
edgeless_builder.add_node("oracle", oracle)
edgeless_builder.add_node("happy_path", happy_path)
edgeless_builder.add_node("sad_path", sad_path)

edgeless_builder.add_edge(START, "oracle")
# Note: No edges defined from 'oracle' to others! The Command handles it.

edgeless_graph = edgeless_builder.compile()
print("Edgeless graph compiled.")

In [None]:
# Test the graph dynamically
print("--- Test 1: Happy ---")
result_happy = edgeless_graph.invoke({"sentiment": "happy"})
print("Result:", result_happy["msg"])

print("\n--- Test 2: Sad ---")
result_sad = edgeless_graph.invoke({"sentiment": "sad"})
print("Result:", result_sad["msg"])

## Part 4: Comprehensive Example - LinkedIn Post Generator

We will combine these concepts into a workflow:
1. **Draft**: Agent drafts a post.
2. **Interrupt**: Human reviews it.
3. **Feedback**: Human provides feedback.
4. **Iterate**: Agent rewrites based on feedback (Loop back to 1).

In [None]:
from langchain_core.messages import HumanMessage, AIMessage
from typing import Annotated
from langgraph.graph.message import add_messages
from langchain_groq import ChatGroq

# State
class LinkedInState(TypedDict):
    messages: Annotated[list, add_messages]
    # We track the 'next_action' to decide whether to finish or revise
    next_action: str 

print("Initializing Llama 3 for LinkedIn Bot...")
llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.7)

# Node 1: Drafter
def drafter(state: LinkedInState):
    print("  [Drafter Node] Generating post...")
    messages = state["messages"]
    response = llm.invoke(messages)
    return {"messages": [response], "next_action": "review"}

# Node 2: Human Review (Simulated Logic)
# In a real app, this node might not exist; the interrupt happens, 
# and the human interaction happens via 'update_state' before resuming.
# But here we define a 'reviewer' node to handle the routing logic AFTER the human input.
def reviewer(state: LinkedInState) -> Command[Literal["drafter", END]]:
    # We check the LAST message. 
    # If it's from a Human, it means feedback was provided during interrupt.
    last_msg = state["messages"][-1]
    print(f"  [Reviewer Node] Checking last message type: {type(last_msg)}")
    
    if isinstance(last_msg, HumanMessage) and "looks good" in last_msg.content.lower():
        print("  [Reviewer Node] Approved.")
        return Command(goto=END)
    else:
        # If there's feedback, go back to drafter
        print("  [Reviewer Node] Feedback detected, sending back to Drafter.")
        return Command(goto="drafter")

# Build Graph
post_builder = StateGraph(LinkedInState)
post_builder.add_node("drafter", drafter)
post_builder.add_node("reviewer", reviewer)

post_builder.add_edge(START, "drafter")
post_builder.add_edge("drafter", "reviewer")

# Interrupt BEFORE the reviewer node runs, so the human can inject feedback.
post_graph = post_builder.compile(checkpointer=memory_sqlite, interrupt_before=["reviewer"])
print("LinkedIn Graph compiled.")

In [None]:
# 1. Initial Draft
config_li = {"configurable": {"thread_id": "li-post-1"}}
initial_prompt = "Draft a LinkedIn post about the importance of AI Agents in 2025."

print("--- Start Drafting ---")
for event in post_graph.stream({"messages": [("user", initial_prompt)]}, config=config_li):
    print(event)

# 2. Inspect Draft
state = post_graph.get_state(config_li)
print("\n--- Current Draft (Waiting for Review) ---")
print(state.values['messages'][-1].content)

### Providing Feedback
The graph is paused at `reviewer`. We will "inject" our feedback as a user message.

In [None]:
# 3. Human provides critical feedback
feedback = "Too generic. Mention 'LangGraph' specifically."
print(f"Injecting Feedback: '{feedback}'")

# We update the state by adding a HumanMessage.
# Note: 'as_node="reviewer"' tells the graph that this update is happening while we are waiting for 'reviewer' node.
post_graph.update_state(
    config_li,
    {"messages": [HumanMessage(content=feedback)]},
    as_node="reviewer"
)

# 4. Resume. 
# The 'reviewer' node will run now. It will see our feedback (HumanMessage), determine it's not approval, and routing back to 'drafter'.
print("--- Resuming with Feedback ---")
for event in post_graph.stream(None, config=config_li):
    print(event)

# 5. Check New Draft
state = post_graph.get_state(config_li)
print("\n--- New Draft (Waiting for Review) ---")
print(state.values['messages'][-1].content)

In [None]:
# 6. Final Approval
approval = "Looks good!"
print(f"Injecting Approval: '{approval}'")

post_graph.update_state(
    config_li,
    {"messages": [HumanMessage(content=approval)]},
    as_node="reviewer"
)

print("--- Resuming for Approval ---")
for event in post_graph.stream(None, config=config_li):
    print(event)

print("--- Workflow Complete ---")