# LangGraph:  Agent Orchestration

#### By Pedro Izquierdo Lehmann

Welcome back! This last notebook is the second part of the LangChain notebook. We will explore the **LangGraph** library, which is built on top of LangChain, and is useful for the controlled design of complex workflows with or without loops, encompassing **agents orchestration**.

Also, we will not use the openAI API, but we will query locally stored LLMs using Ollama.

---
## (0. Environment Setup)

Before starting, set up a Python virtual environment and install required dependencies.

#### 1. Create a Virtual Environment

```bash
python3 -m venv lang-graph
```

#### 2. Activate the Virtual Environment

**On macOS/Linux:**
```bash
source lang-graph/bin/activate
```

**On Windows:**
```bash
lang-graph\Scripts\activate
```

#### 3. Install Required Dependencies

```bash
pip install langchain langgraph langchain-ollama jupyter ipykernel
```

Install Ollama (required for local LLMs):

```bash
# macOS (Homebrew)
brew install ollama
brew services start ollama

# Linux
curl -fsSL https://ollama.com/install.sh | sh

# Windows: download the installer
# https://ollama.com/download
```

#### 4. Register the Kernel

```bash
python -m ipykernel install --user --name=lang-graph --display-name "Python (lang-graph)"
```

#### 5. Start Jupyter

```bash
jupyter notebook
```

#### 6. Deactivate

```bash
deactivate
```

In [None]:
# python3 -m venv lang-graph
# source lang-graph/bin/activate
# lang-graph\Scripts\activate
# pip install langchain langgraph langchain-ollama jupyter ipykernel

In [None]:
# Install Ollama (required for local LLMs)
# macOS (Homebrew): brew install ollama
#                   brew services start ollama
# Linux: curl -fsSL https://ollama.com/install.sh | sh
# Windows: https://ollama.com/download

In [None]:
# python -m ipykernel install --user --name=lang-graph --display-name "Python (lang-graph)"
# jupyter notebook
# deactivate

#### Ollama Setup

All model calls in this notebook must run **locally** via Ollama.

1. Start the Ollama server (separate terminal if needed)
2. Pull a local model (you can change the model name later)
3. Verify that Ollama can list your models

Run the code cell below in order.

In [None]:
# Check that Ollama is installed and running
!ollama --version
!ollama list

# Pull a local model (edit if you want a different one)
OLLAMA_MODEL = "llama3.1"
!ollama pull {OLLAMA_MODEL}

# If Ollama is not running, start it in a separate terminal:
# !ollama serve

In [None]:
from typing import TypedDict, Annotated, Literal
from dataclasses import dataclass, field
import operator
import math

from langchain_ollama import ChatOllama
from langchain_core.messages import HumanMessage, SystemMessage, ToolMessage, BaseMessage
from langchain_core.tools import tool
from langchain.agents import create_agent

from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.checkpoint.memory import InMemorySaver

from ollama import chat
from ollama import ChatResponse

In [9]:
response: ChatResponse = chat(model='llama3.1', messages=[
  {
    'role': 'user',
    'content': 'Please discuss the conversion of Phillips 66 from oil to renewable diesel.',
  },
])
print(response.message.content)

In 2018, Phillips 66, a leading American multinational energy company, announced its plans to convert its Ponca City, Oklahoma refinery into a renewable diesel production facility. This significant transformation marked a shift away from traditional fossil fuels and towards more sustainable, low-carbon options.

Here's an overview of the conversion:

**Background:**

The Ponca City Refinery, built in 1919, was one of Phillips 66's largest refineries, with a capacity to process approximately 130,000 barrels per day. The refinery primarily produced gasoline and diesel fuel from crude oil.

**Conversion plans:**

In 2018, Phillips 66 announced that it would invest around $360 million to convert the Ponca City Refinery into a renewable diesel facility. This project was expected to increase the refinery's capacity by about 50% while reducing greenhouse gas emissions by approximately 40%.

The conversion involved installing new equipment and modifying existing facilities to produce renewable

In [None]:
# LLM (local via Ollama)
OLLAMA_MODEL = "llama3.1"
llm = ChatOllama(model=OLLAMA_MODEL, temperature=0)

---

## 1. Graphs for Agentic AI

LangChain gives you **tools** and **agents**, but the control flow still lives inside the LLM loop; the default ReAct pattern is sufficient. Effectively, the chains of LangChain are directed acyclic graphs (DAGs), where 

- **Nodes** are explicit steps (LLM calls or Python logic)
- **Edges** define how control flows between steps
- **State** is a typed object that persists across the graph. 

> **Note**: The graph state is the central, typed data structure that flows through nodes and is mutated by each node. It’s the single source of truth for the workflow at any step (messages, intermediate results, flags, etc.). Every node reads from it and returns partial updates to it.

> **Keep in mind**: The graph state, `ToolRuntime`, and `InMemorySaver` all relate to “runtime context,” but they operate at different layers.
>- The LangGraph state is the _actual working state_ your nodes read/write.
>- `ToolRuntime` is a _tool-level context injection_ (useful for user/session info), not a state container.
>- `InMemorySaver` is _storage_ so state can be resumed later; it does not define state, it just saves it.

With LangGraph one can construct explicitly a **graph** workflow that may have **cycles**, allowing for more workflow expressivity. Also, it provides conditional routing along the graph; much more control over the workflow is possible.



The following is an example of an agentic workflow graph. Notice how we **separate reasoning from deterministic checks**, which is hard to guarantee with a pure agent loop.

In [None]:
# This graph receives financial macros in text and the returns of an asset, then it summarizes the text and computes the volatility of the asset, and then it provides a one-sentence implication for a long-only equity portfolio.
# Graph: summarize macro -> compute volatility -> final brief (linear DAG)
macro_headlines = [
    "CPI comes in below expectations; core inflation cools",
    "Fed signals pause; markets price in two cuts",
    "Oil spikes on supply concerns",
]

returns = [0.002, -0.004, 0.001, 0.003, -0.002, 0.0005, -0.001]

class MacroState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    macro_summary: str
    realized_vol: float
    risk_bucket: str


def summarize_macro(state: MacroState):
    prompt = (
        "Summarize these headlines into a 2-sentence macro brief for an equity analyst:\n"
        + "\n".join(f"- {h}" for h in macro_headlines)
    )
    response = llm.invoke([SystemMessage(content=prompt)])
    return {"messages": [response], "macro_summary": response.content}


def compute_vol(state: MacroState):
    daily_vol = math.sqrt(sum(r * r for r in returns) / len(returns))
    annualized = daily_vol * math.sqrt(252)
    if annualized < 0.15:
        bucket = "LOW"
    elif annualized < 0.25:
        bucket = "MEDIUM"
    else:
        bucket = "HIGH"
    return {"realized_vol": annualized, "risk_bucket": bucket}


def final_brief(state: MacroState):
    content = (
        f"Macro brief: {state['macro_summary']}\n"
        f"Risk overlay: realized vol {state['realized_vol']:.2%} -> {state['risk_bucket']} risk.\n"
        "Provide a one-sentence implication for a long-only equity portfolio."
    )
    response = llm.invoke([HumanMessage(content=content)])
    return {"messages": [response]}


builder = StateGraph(MacroState)
builder.add_node("summarize_macro", summarize_macro)
builder.add_node("compute_vol", compute_vol)
builder.add_node("final_brief", final_brief)

builder.add_edge(START, "summarize_macro")
builder.add_edge("summarize_macro", "compute_vol")
builder.add_edge("compute_vol", "final_brief")
builder.add_edge("final_brief", END)

macro_graph = builder.compile()

result = macro_graph.invoke({"messages": []})
print(result["messages"][-1].content)

If you want to visualize the graph structure:

In [None]:
from IPython.display import Image, display

# Mermaid text diagram (paste into a Mermaid renderer if needed)
print(macro_graph.get_graph().draw_mermaid())
display(Image(macro_graph.get_graph().draw_mermaid_png()))

### Exercise 1: Add a Position Sizing Node

Example above uses a three-step flow: summarize macro, compute volatility, and final brief. Now add a **position sizing** node that sets a target exposure based on the risk bucket:

- LOW -> 1.0
- MEDIUM -> 0.6
- HIGH -> 0.3

Then include the target exposure in the final response.

In [None]:
# EXERCISE: Add a position sizing node
# 1. Create a node `position_size(state)` that returns target_exposure
# 2. Insert it between compute_vol and final_brief
# 3. Update final_brief to include target_exposure

class MacroState2(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    macro_summary: str
    realized_vol: float
    risk_bucket: str
    target_exposure: float


# TODO: define position_size
# def position_size(state: MacroState2):
#     ...


# TODO: update final_brief to use target_exposure
# def final_brief(state: MacroState2):
#     ...


# TODO: build and run the graph with the new node

## 2. Streaming

LangGraph supports streaming, allowing you to see agent responses in real-time as they're generated. This is essential for building responsive user interfaces.

Below is a simple example that streams node updates as the graph executes. This is especially useful in finance dashboards where you want to show partial progress as each step completes.

In [None]:
# We will use the same macro_graph as before

def print_updates(event: dict):
    for node, update in event.items():
        if isinstance(update, dict) and update.get("messages"):
            print(f"[{node}] {update['messages'][-1].content}")
        else:
            print(f"[{node}] {update}")


for event in macro_graph.stream({"messages": []}, stream_mode="updates"):
    print_updates(event)

## 3. Conditional Routing (Risk Gates)

LangGraph lets you **route** based on state. This is a major difference from LangChain's agent loop, because the control flow is explicit and testable.

We will route to a hedge node only when volatility is high.

In [None]:
# This graph computes realized volatility and uses it as a risk gate to decide whether to propose a hedge or skip hedging.
# Graph: compute vol -> route to hedge or skip (conditional gate)
class HedgeState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    realized_vol: float
    hedge_action: str


def compute_vol_only(state: HedgeState):
    daily_vol = math.sqrt(sum(r * r for r in returns) / len(returns))
    annualized = daily_vol * math.sqrt(252)
    return {"realized_vol": annualized}


def propose_hedge(state: HedgeState):
    action = f"Buy 1-month put spread; realized vol {state['realized_vol']:.2%}."
    return {"hedge_action": action}


def skip_hedge(state: HedgeState):
    return {"hedge_action": "No hedge needed."}


def hedge_router(state: HedgeState) -> Literal["propose_hedge", "skip_hedge"]:
    return "propose_hedge" if state["realized_vol"] > 0.25 else "skip_hedge"


hedge_builder = StateGraph(HedgeState)
hedge_builder.add_node("compute_vol", compute_vol_only)
hedge_builder.add_node("propose_hedge", propose_hedge)
hedge_builder.add_node("skip_hedge", skip_hedge)

hedge_builder.add_edge(START, "compute_vol")
hedge_builder.add_conditional_edges("compute_vol", hedge_router)
hedge_builder.add_edge("propose_hedge", END)
hedge_builder.add_edge("skip_hedge", END)

hedge_graph = hedge_builder.compile()

hedge_result = hedge_graph.invoke({"messages": []})
print(hedge_result["hedge_action"])

### Exercise 2: Add a Second Risk Gate

Example above uses a volatility gate. Now you have to build a graph with **drawdown gate** that routes to a reduce exposure step if the max drawdown exceeds 6%.

In [None]:
# EXERCISE: Add a drawdown gate
# 1. Compute max drawdown from returns
# 2. Create a node reduce_exposure
# 3. Route to reduce_exposure when drawdown > 6%

returns_ex2 = [0.01, -0.02, 0.005, -0.03, 0.015, -0.01, 0.004]

class GateState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    max_drawdown: float
    action: str

# TODO: implement max drawdown calculation
# def compute_drawdown(state: GateState):
#     ...

# TODO: implement reduce_exposure
# def reduce_exposure(state: GateState):
#     ...

# TODO: implement keep_exposure
# def keep_exposure(state: GateState):
#     ...

# TODO: implement router and graph

# TODO: display the graph

## 4. Durable Execution with Checkpoints

In finance, long-running analyses may need to **pause and resume**. LangGraph lets you **checkpoint** state and continue later using a `thread_id`.

In [None]:
# This graph creates a single-step thesis draft and checkpoints it so the state can be resumed across sessions.
# Graph: single LLM node with checkpointed state
class ResearchState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    ticker: str
    thesis: str


def draft_thesis(state: ResearchState):
    prompt = (
        f"Write a 2-sentence investment thesis for {state['ticker']} as a large-cap stock."
    )
    response = llm.invoke([HumanMessage(content=prompt)])
    return {"messages": [response], "thesis": response.content}


research_builder = StateGraph(ResearchState)
research_builder.add_node("draft_thesis", draft_thesis)
research_builder.add_edge(START, "draft_thesis")
research_builder.add_edge("draft_thesis", END)

checkpointer = InMemorySaver()
research_graph = research_builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "research-1"}}

# First call
out1 = research_graph.invoke({"messages": [], "ticker": "AAPL"}, config=config)
print(out1["thesis"])

In [None]:
# Retrieve the latest checkpointed state
state = research_graph.get_state(config)
print(state.values["ticker"], "->", state.values["thesis"])

### Exercise 3: Persistent Analyst Notes

Example above stored a thesis in memory. Now add a second node that **appends** a risk note and verify it persists via the state retrieval call.

In [None]:
# EXERCISE: Add persistent analyst notes
# 1. Extend the state with risk_note: str
# 2. Add a node add_risk_note that uses the LLM to write 1 sentence
# 3. Connect draft_thesis -> add_risk_note -> END
# 4. Use get_state to confirm risk_note is stored

# TODO: implement here

## 5. Agent Nodes with Tools

A LangGraph node can wrap a **LangChain agent**. This means you can keep tool-choosing autonomy **inside a node**, while still controlling the workflow **between nodes**.

Below we build a small cycle: an agent drafts a trade idea using two tools, then a validator checks the position size. If the size is too large, the graph loops back to the agent for revision.

> **Note**: This cycle can't be executed purely with the LangChain framework!

In [None]:
# This graph wraps an agent that drafts a trade with tools, validates position size, and loops with a max-cycle limit to avoid infinite revisions.
# Graph: agent drafts trade -> validate -> increment cycle -> loop with max cycles
import re

@tool
def get_price(ticker: str) -> float:
    """Get a rough price for a ticker."""
    prices = {"SPY": 540.0, "TLT": 92.0, "GLD": 215.0}
    return prices.get(ticker.upper(), 100.0)

@tool
def position_size_from_vol(vol: float, risk_budget: float = 0.01) -> float:
    """Simple position sizing from volatility and risk budget."""
    return min(0.15, risk_budget / max(vol, 1e-6))

@tool
def get_max_cycles(max_cycles: int = 3) -> int:
    """Return the maximum number of graph cycles allowed."""
    return max_cycles


class AgentNodeState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    draft: str
    approved: bool
    max_position: float
    max_cycles: int
    cycle_count: int
    stop_reason: str


def set_max_cycles(state: AgentNodeState):
    requested = state.get("max_cycles", 3)
    return {
        "max_cycles": get_max_cycles.invoke({"max_cycles": requested}),
        "cycle_count": 0,
        "stop_reason": "",
    }


def agent_trade(state: AgentNodeState):
    agent = create_agent(
        model=llm,
        tools=[get_price, position_size_from_vol],
        system_prompt=(
            "You are a trading assistant. Use tools to get prices and position size. "
            "Always produce a trade with an explicit percent of NAV."
        ),
    )
    prompt = (
        f"Draft one trade idea for SPY or TLT. Max position is {state['max_position']:.0%} of NAV. "
        "Call both tools, then give a 2-3 sentence recommendation with a percent of NAV."
    )
    response = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
    message = response["messages"][-1]
    return {"messages": [message], "draft": message.content}


def validate_draft(state: AgentNodeState):
    percents = [float(x) for x in re.findall(r"(\d+(?:\.\d+)?)%", state["draft"])]
    if not percents:
        approved = False
    else:
        approved = max(percents) <= state["max_position"] * 100
    return {"approved": approved}


def increment_cycle(state: AgentNodeState):
    count = state.get("cycle_count", 0) + 1
    stop_reason = ""
    if count >= state["max_cycles"]:
        stop_reason = f"Reached max cycles ({state['max_cycles']})."
    return {"cycle_count": count, "stop_reason": stop_reason}


def revision_router(state: AgentNodeState) -> Literal["agent_trade", END]:
    if state["approved"]:
        return END
    if state["cycle_count"] >= state["max_cycles"]:
        return END
    return "agent_trade"


def print_state(state: AgentNodeState):
    print(
        f"cycle={state.get('cycle_count', 0)}/{state.get('max_cycles', '?')} "
        f"approved={state.get('approved')} stop_reason={state.get('stop_reason', '')}"
    )
    if state.get("draft"):
        print(f"draft: {state['draft']}")


agent_builder = StateGraph(AgentNodeState)
agent_builder.add_node("set_max_cycles", set_max_cycles)
agent_builder.add_node("agent_trade", agent_trade)
agent_builder.add_node("validate_draft", validate_draft)
agent_builder.add_node("increment_cycle", increment_cycle)

agent_builder.add_edge(START, "set_max_cycles")
agent_builder.add_edge("set_max_cycles", "agent_trade")
agent_builder.add_edge("agent_trade", "validate_draft")
agent_builder.add_edge("validate_draft", "increment_cycle")
agent_builder.add_conditional_edges("increment_cycle", revision_router)

agent_graph = agent_builder.compile()

last_state = None
for state in agent_graph.stream({"messages": [], "max_position": 0.1}, stream_mode="values"):
    last_state = state
    print_state(state)

if last_state and last_state.get("stop_reason"):
    print("Stop reason:", last_state["stop_reason"])

if last_state and last_state.get("draft"):
    print("Final draft:", last_state["draft"])

## 6. Portfolio Rebalancing Agent

We will build a **multi-step agent** that:

1. Ingests market inputs (returns, volatility, rates)
2. Classifies the regime (LLM)
3. Proposes trades (agent node)
4. Runs risk/compliance checks (Python)
5. Revises if checks fail (loop)
6. Produces a final execution memo

This showcases what LangGraph is great at: **explicit looping, validation gates, and stateful orchestration**.

In [None]:
# This graph drafts a trade, validates it against position limits, and revises until the proposal passes.
# Graph: draft -> validate -> revise loop for position limits
class LoopState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    proposal: str
    approved: bool


def draft_trade(state: LoopState):
    prompt = "Propose a single trade idea for SPY with a position size in percent of NAV."
    response = llm.invoke([HumanMessage(content=prompt)])
    return {"messages": [response], "proposal": response.content}


def validate_trade(state: LoopState):
    # Simple rule: any proposal containing '20%' is rejected
    approved = "20%" not in state["proposal"]
    return {"approved": approved}


def revise_trade(state: LoopState):
    prompt = (
        "Revise the proposal so position size <= 10% of NAV. "
        f"Original proposal: {state['proposal']}"
    )
    response = llm.invoke([HumanMessage(content=prompt)])
    return {"messages": [response], "proposal": response.content}


def loop_router(state: LoopState) -> Literal["revise_trade", END]:
    return "revise_trade" if not state["approved"] else END


loop_builder = StateGraph(LoopState)
loop_builder.add_node("draft_trade", draft_trade)
loop_builder.add_node("validate_trade", validate_trade)
loop_builder.add_node("revise_trade", revise_trade)

loop_builder.add_edge(START, "draft_trade")
loop_builder.add_edge("draft_trade", "validate_trade")
loop_builder.add_conditional_edges("validate_trade", loop_router)
loop_builder.add_edge("revise_trade", "validate_trade")

loop_graph = loop_builder.compile()

loop_out = loop_graph.invoke({"messages": []})
print(loop_out["proposal"])

### Exercise 4: Build the Portfolio Rebalancing Graph

Use the pattern above to build a full graph. Fill in the TODOs. The end result should produce a final execution memo.

You can keep the logic simple—what matters is the **graph structure**, **state**, and **risk gates**.

In [None]:
import json

market_inputs = {
    "returns_1m": -0.03,
    "vol_1m": 0.28,
    "rates_change_bps": -15,
    "sector_momentum": {"Tech": 0.04, "Financials": -0.02, "Energy": 0.01},
}

# Make inputs available to the agent tools
capstone_inputs = market_inputs

class RebalanceState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    inputs: dict
    regime: str
    trades: list[dict]
    checks_passed: bool
    memo: str


def ingest_inputs(state: RebalanceState):
    return {"inputs": market_inputs}


def risk_checks(state: RebalanceState):
    # Simple deterministic checks: gross exposure and sector concentration
    gross = sum(abs(t.get("weight", 0.0)) for t in state["trades"])
    sector_counts = {}
    for t in state["trades"]:
        sector_counts[t.get("sector", "Unknown")] = sector_counts.get(t.get("sector", "Unknown"), 0) + 1
    max_sector = max(sector_counts.values()) if sector_counts else 0
    passed = gross <= 1.0 and max_sector <= 2
    return {"checks_passed": passed}


# TODO: classify_regime(state) -> set state["regime"]
# Hint: use llm.invoke with a prompt that maps inputs to a regime like
# "risk_on", "neutral", "risk_off"

# Tools for the capstone agent node
capstone_inputs = {}

@tool
def get_sector_momentum(sector: str) -> float:
    """Return recent momentum for a sector."""
    return capstone_inputs.get("sector_momentum", {}).get(sector, 0.0)

@tool
def risk_adjusted_weight(vol: float, base: float = 0.1) -> float:
    """Suggest a weight based on volatility and a base risk budget."""
    return min(0.2, max(0.02, base * (0.2 / max(vol, 1e-6))))
    
# TODO: propose_trades(state) -> set state["trades"]
# Hint: ask the LLM to return STRICT JSON list of trades like
# [{"ticker":"SPY","weight":-0.1,"sector":"Index","rationale":"..."}, ...]
# Then parse with json.loads

# TODO: revise_trades(state) -> adjust trades to pass checks
# Hint: tell the LLM to reduce gross exposure and diversify sectors

# TODO: final_memo(state) -> create a 5-7 sentence execution memo
# Include regime, key inputs, trade list, and risk summary

# TODO: router function that loops back to revise_trades if checks fail

# TODO: build and run the graph

## Congratulations!

You've completed the LangGraph tutorial! We covered:

- Explicit control flow as a graph
- Deterministic risk gates alongside LLM reasoning
- Durable state and resumable execution
- Validation loops for compliance-aware agents

### Possible next steps to explore
   - **Retrieval**: Connect agents to vector databases for RAG
   - **Multi-agent systems**: Agents that collaborate
   - **LangSmith**: Observability and debugging tools

### Additional resources
- [LangGraph Blog](https://www.blog.langchain.com/langgraph/)
- [LangGraph Docs](https://docs.langchain.com/oss/python/langgraph/overview)
- [Ollama Docs](https://docs.ollama.com/)