# Advanced Routing and Customization of AI Agents

The ability to direct user input through different paths, workflows or tasks based on some criteria is referred to as **Routing** in AI agents. This is very useful for complex interactions allowing agents to decide which task or service to invoke based on user input. Without routing, the agent will struggle to manage different types of tasks and would often provide incorrect responses. Routing ensures that the agent processes each query through the correct workflow or task.

## `tools_condition`

LangGraph has prebuilt method for handling conditional routing for tools using `tools_condition` to decide whether to call a tool or respond directly. The `tools_condition` evaluates whether the recent message from assistant involves invoking a tool.

In [1]:
from langchain_mistralai import ChatMistralAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.getenv("MISTRAL_API_KEY")

llm = ChatMistralAI(model='mistral-large-latest')

def multiply(a: int, b: int) -> int:
    """
    Function to multiply two numbers.
    """
    return a * b

llm_with_tools = llm.bind_tools([multiply])

# Node to call LLM with tools
def tool_calling_llm(state: MessagesState):
    """
    Node that calls the LLM with tools bound
    """
    return {"messages": [llm_with_tools.invoke(state['messages'])]}

# Build workflow
builder = StateGraph(MessagesState)
builder.add_node("tool_calling_llm", tool_calling_llm)
builder.add_node("tools", ToolNode([multiply]))
builder.add_edge(START, "tool_calling_llm")
# Add conditional edge based on tool usage
builder.add_conditional_edges(
    "tool_calling_llm", tools_condition
)
builder.add_edge("tools", END)

graph = builder.compile()

# simulate invocation
def simulate(): 
    user_input = {
        "messages": [
            ("human", "Can you multiply 3 by 4?")
        ]
    }
    result = graph.invoke(user_input)
    return result['messages'][-1].pretty_print()

print(simulate())

Name: multiply

12
None


## Custom Conditional Routing

Sometimes, you may need to invoke different tools or node based on the user query. Below example routes weather related queries to a weather node, calculation related queries to calculator node and responds to unknown queries with a default response.
1. Define three nodes

In [2]:
from langgraph.graph import StateGraph, MessagesState, START, END

def weather_node(state: MessagesState):
    return {"messages": ["It's a beautiful sunny day with 22C temperature"]}

def calculator_node(state: MessagesState):
    user_query = state['messages'][-1].content.lower()
    if "add" in user_query:
        numbers = [int(s) for s in user_query.split() if s.isdigit()]
        result = sum(numbers)
        return {"messages": [f"The result of addition is {result}."]}
    return {"messages": ["I can only perform addition for now."]}

def default_node(state: MessagesState):
    return {"messages": ["Sorry, I don't understand that request."]}

2. Next, you need to define the routing function which decides the node where to send user query based on content. The `routing_function` inspects the last user message and routes to weather node if message contains the word 'weather', routes to the `calculator_node` if the mssage contains words like 'add' or 'calculate' and routes to the `default_node` if the input doesn't match any expected conditions.

In [3]:
def routing_function(state: MessagesState):
    last_message = state['messages'][-1].content.lower()
    if 'weather' in last_message:
        return "weather_node"
    elif 'add' in last_message or 'calculate' in last_message:
        return "calculator_node"
    return "default_node"

3. The next step is to build the workflow graph using conditional edges to dynamically route.

In [4]:
builder = StateGraph(MessagesState)
builder.add_node("weather_node", weather_node)
builder.add_node("calculator_node", calculator_node)
builder.add_node("default_node", default_node)
builder.add_node("routing_function", routing_function)
builder.add_conditional_edges(START, routing_function)
builder.add_edge("weather_node", END)
builder.add_edge("calculator_node", END)
builder.add_edge("default_node", END)

app = builder.compile()

4. Create a function to simulate teh user interaction for routing.

In [10]:
def simulate_interaction():
    while True:
        user_input = input("You: ")
        if user_input.lower() in ['exit', 'quit']:
            print("Exiting...")
            break
        input_message = {"messages": [
            ("human", user_input)
        ]}
        for result in app.stream(input_message, stream_mode='values'):
            result['messages'][-1].pretty_print()

# simulate_interaction()

You:  What is weather like?



What is weather like?

It's a beautiful sunny day with 22C temperature


You:  Add 2 and 3



Add 2 and 3

The result of addition is 5.


You:  What is time?



What is time?

Sorry, I don't understand that request.


You:  quit


Exiting...


Conditional routing allows AI agent to handle multiple types of queries with ease and ensures the agent's behavior remains flexible and responsive.

## Streaming

In LangGraph, streaming allows for real-time updates of the graph's state or node outputs while the workflow is being executed. This is useful for long running processes or when continuous feedback is necessary. In LangGraph, following strreaming options available.
1. *Full state Streaming* streams entire state of the graph after each node execution.
2. *Updates Streaming* streams only the updates to the state after each node execution.
3. *LLM Token Streaming* streams the tokens produced by the LLM during its generation process.

### 1. Full State Streaming
This is useful when you want to monitor all the changes in the graph after every node execution.

In [5]:
import operator
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from typing import Annotated
from typing_extensions import TypedDict
from dotenv import load_dotenv
import os

# Defint the state schema
class State(TypedDict):
    messages: Annotated[list, operator.add]

load_dotenv()
api_key = os.getenv("MISTRAL_API_KEY")

llm = ChatMistralAI(model='mistral-large-latest')

def weather_node(state: State):
    return {"messages": ["It's a beautiful sunny day with 22C temperature"]}

def calculator_node(state: State):
    user_query = state['messages'][-1].lower()
    if "add" in user_query:
        numbers = [int(s) for s in user_query.split() if s.isdigit()]
        result = sum(numbers)
        return {"messages": [f"The result of addition is {result}."]}
    return {"messages": ["I can only perform addition for now."]}

builder = StateGraph(State)
builder.add_node("weather_node", weather_node)
builder.add_node("calculator_node", calculator_node)
builder.add_edge(START, "weather_node")
builder.add_edge("weather_node", "calculator_node")
builder.add_edge("calculator_node", END)

app = builder.compile()

def simulate_interaction():
    input_message = {"messages": [
        ("human", "Tell me the weather")
    ]}
    for result in app.stream(input_message, stream_mode='values'):
        print(result)

simulate_interaction()

{'messages': [('human', 'Tell me the weather')]}
{'messages': [('human', 'Tell me the weather'), "It's a beautiful sunny day with 22C temperature"]}
{'messages': [('human', 'Tell me the weather'), "It's a beautiful sunny day with 22C temperature", 'I can only perform addition for now.']}


### 2. Updstes Streaming

This is efficient when you only need to see what has changed rather than the entire state.

In [17]:
import operator
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from typing import Annotated
from typing_extensions import TypedDict
from dotenv import load_dotenv
import os

# Defint the state schema
class State(TypedDict):
    messages: Annotated[list, operator.add]

load_dotenv()
api_key = os.getenv("MISTRAL_API_KEY")

llm = ChatMistralAI(model='mistral-large-latest')

def weather_node(state: State):
    return {"messages": ["It's a beautiful sunny day with 22C temperature"]}

def calculator_node(state: State):
    user_query = state['messages'][-1].lower()
    if "add" in user_query:
        numbers = [int(s) for s in user_query.split() if s.isdigit()]
        result = sum(numbers)
        return {"messages": [f"The result of addition is {result}."]}
    return {"messages": ["I can only perform addition for now."]}

builder = StateGraph(State)
builder.add_node("weather_node", weather_node)
builder.add_node("calculator_node", calculator_node)
builder.add_edge(START, "weather_node")
builder.add_edge("weather_node", "calculator_node")
builder.add_edge("calculator_node", END)

app = builder.compile()

def simulate_interaction():
    input_message = {"messages": [
        ("human", "Tell me the weather")
    ]}
    for result in app.stream(input_message, stream_mode='updates'):
        print(result)

simulate_interaction()

{'weather_node': {'messages': ["It's a beautiful sunny day with 22C temperature"]}}
{'calculator_node': {'messages': ['I can only perform addition for now.']}}


### 3. LLM Token Streaming

This is useful when using LLM to generate long responses. You can stream the tokens as they are produced rather than waiting for the entire response to be generated. This provides responsiveness.

In [23]:
import operator
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_mistralai import ChatMistralAI
from typing import Annotated
from typing_extensions import TypedDict
import asyncio
from langgraph.graph.message import add_messages
from langchain_core.messages import AIMessageChunk, HumanMessage

# Define the state schema
class State(TypedDict):
    messages: Annotated[list, add_messages]

# Initialize the LLM
model = ChatMistralAI(model="mistral-small-latest")

# Define a node to handle LLM queries
async def call_llm(state: State):
    messages = state["messages"]
    response = await model.ainvoke(messages)
    return {"messages": [response]}

# Define the graph
workflow = StateGraph(State)
workflow.add_node("call_llm", call_llm)
workflow.add_edge(START, "call_llm")
workflow.add_edge("call_llm", END)

app = workflow.compile()

# Simulate interaction and stream tokens
async def simulate_interaction():
    input_message = {"messages": [("human", "Tell me a very long joke")]}
    first = True
    # Stream LLM tokens
    async for msg, metadata  in app.astream(input_message, stream_mode="messages"):
        if msg.content and not isinstance(msg, HumanMessage):
            print(msg.content, end="|", flush=True)

        if isinstance(msg, AIMessageChunk):
            if first:
                gathered = msg
                first = False
            else:
                gathered = gathered + msg

            if msg.tool_call_chunks:
                print(gathered.tool_calls)

await simulate_interaction()

Of| course!| Hereâ€™s| a long|,| winding| joke for youâ€”|it|â€™s a| classic "|light| bulb|" joke with| a lot| of setup|.| Buck|le up!

|---

|**Why| did it| take| so| long to change| the light| bulb?**

Because| the| light bulb was| in| a| haunted| house,| and it| needed| a team| of experts| to change| it.

First|, a| **|psych|ologist** came| in| and said|, *"|This| light| bulb is clearly| suffering| from a fear| of the| dark|.| We| need to address| its| anxiety| before| we| can proceed|."*

Then|, a **pl|umber** showed| up and| said, *"The| wiring| in| this| house| is all| wrong|.| The| light bulb is getting| mixed| signals|.| I|â€™ll| need to rer|oute the| electricity|."*

Next|, a **law|yer** arrived| and said, *"|Before| we change| the light bulb,| we need to establish| liability|. Who|â€™s| responsible| for the| darkness| in| this house?"*

|Then|, a **car|p|enter** came in| and said, *"|The fixture| is| too| loose.| I|â€™ll| need to reinforce| the socket| before| we can install|

You can combine different streaming modes.

In [25]:
import operator
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_mistralai import ChatMistralAI
from typing import Annotated
from typing_extensions import TypedDict
import asyncio
from langgraph.graph.message import add_messages
from langchain_core.messages import AIMessageChunk, HumanMessage
 
# Define the state schema
class State(TypedDict):
    messages: Annotated[list, add_messages]
 
# Initialize the LLM with the correct model name and streaming enabled
model = ChatMistralAI(
    model="mistral-medium-latest",
    streaming=True
)
 
# Define a node to handle LLM queries
async def call_llm(state: State):
    messages = state["messages"]
    response = await model.ainvoke(messages)
    return {"messages": [response]}
 
# Define the graph
workflow = StateGraph(State)
workflow.add_node("call_llm", call_llm)
workflow.add_edge(START, "call_llm")
workflow.add_edge("call_llm", END)
 
app = workflow.compile()
 
# Simulate interaction and stream tokens
async def simulate_interaction():
    input_message = {"messages": [HumanMessage(content="Tell me a very long joke")]}
    # Stream LLM tokens
    async for msg, metadata in app.astream(input_message, stream_mode=["messages","updates"]):
        # Check if we have metadata for call_llm
        if isinstance(metadata, dict) and 'call_llm' in metadata:
            # Extract the message from call_llm metadata
            ai_message = metadata['call_llm']['messages'][0]
            if ai_message.content:
                print(ai_message.content, end="|", flush=True)
if __name__ == "__main__":
    await simulate_interaction()

Hereâ€™s a long, winding joke for youâ€”itâ€™s a classic "shaggy dog" story with a slow build and a silly punchline. Grab a snack, get comfortable, and enjoy!

---

A man walks into a bar with a tiny alligator by his side. He puts the alligator up on the bar. He turns to the astonished patrons and says, "I'll make you a deal. I'll open this alligator's mouth and place my private parts inside. Then the gator will close his mouth for one minute. After that, I'll remove my unmentionables unharmed. In return for witnessing this spectacle, I hope that you fine people will buy me a drink."

The crowd murmurs their approval. The man stands up on the bar, drops his trousers, and places his family jewels into the alligator's open mouth. The gator chomps downâ€”*CLAMP!*â€”and the man grins wildly. The crowd counts aloud to 60, and sure enough, the man removes his bits, totally unscathed. The crowd cheers, buys him drinks, and asks how he does it.

"Simple," he says. "The gator is a professional,

### Streaming Custom Data

In addition to streaming graph state or tokens, you can also configure nodes to stream custom data that your application might need. The code below simulates producing progress of a long running task.

In [26]:
from langgraph.graph import StateGraph, MessagesState, START, END
from time import sleep
from langgraph.types import StreamWriter

# Define a custom node to simulate a long-running task
def long_running_node(state: MessagesState,  writer: StreamWriter):
    for i in range(1, 6):
        sleep(1)  # Simulate a delay
        writer({"progress": f"Processing step {i}/5"})

    return {"messages": ["Task completed!"]}

# Define the graph
workflow = StateGraph(MessagesState)
workflow.add_node("long_running_node", long_running_node)
workflow.add_edge(START, "long_running_node")
workflow.add_edge("long_running_node", END)

# Compile the graph
app = workflow.compile()

# Simulate interaction and stream custom progress updates
def simulate_interaction():
    input_message = {"messages": [("human", "Start the long-running task")]}
    
    for result in app.stream(input_message, stream_mode=["custom","updates"]):
        if "progress" in result[-1]:
            print(result[-1])  # Stream custom progress updates
        else:
            print(result[-1]) # Stream final message

simulate_interaction()

{'progress': 'Processing step 1/5'}
{'progress': 'Processing step 2/5'}
{'progress': 'Processing step 3/5'}
{'progress': 'Processing step 4/5'}
{'progress': 'Processing step 5/5'}
{'long_running_node': {'messages': ['Task completed!']}}


You can disable streaming for models that do not support streaming. With `streaming=False` argument, streaming is disabled from the model. With streaming disabled, it will output the entire message at once rather than streaming tokens or updates.

In [28]:
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_mistralai import ChatMistralAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# First example with streaming enabled
llm_streaming = ChatMistralAI(
    model="mistral-small-latest", 
    temperature=1,
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

# Second example with streaming disabled
llm_no_streaming = ChatMistralAI(
    model="mistral-small-latest", 
    temperature=1,
    streaming=False
)

def create_graph(llm):
    graph_builder = StateGraph(MessagesState)
    
    def chatbot(state: MessagesState):
        messages = state["messages"]
        if not isinstance(messages, list):
            messages = [messages]
        return {"messages": llm.invoke(messages)}

    graph_builder.add_node("chatbot", chatbot)
    graph_builder.add_edge(START, "chatbot")
    graph_builder.add_edge("chatbot", END)
    return graph_builder.compile()

input = {
    "messages": [
        {
            "role": "user", 
            "content": "how many r's are in strawberry? Explain in three paragraphs."
        }
    ]
}

print("With streaming enabled:")
graph_streaming = create_graph(llm_streaming)
for output in graph_streaming.stream(input):
    if isinstance(output, dict) and 'chatbot' in output:
        # We don't need to print here as StreamingStdOutCallbackHandler handles it
        pass

print("\n\nWith streaming disabled:")
graph_no_streaming = create_graph(llm_no_streaming)
for output in graph_no_streaming.stream(input):
    if isinstance(output, dict) and 'chatbot' in output:
        message = output['chatbot']['messages']
        print(message.content, end="", flush=True)

With streaming enabled:
To determine how many times the letter "r" appears in the word "strawberry," we can break the word down letter by letter. The word "strawberry" is spelled as S-T-R-A-W-B-E-R-R-Y. Let's count each occurrence of the letter "r" in this sequence.

Starting from the beginning, the first "r" appears as the third letter in "strawberry." The next "r" is the ninth letter, and there is no other "r" after that. Therefore, we have two "r" sounds in the word. However, it's important to note that the second "r" is actually a double "rr," which counts as two letters but is pronounced as a single sound in some dialects. For the purpose of counting individual letters, there are two "r"s in "strawberry."

In summary, the word "strawberry" contains two instances of the letter "r." The first "r" is in the middle of the word, and the second "r" is part of the double "rr" at the end. While the pronunciation may vary, the written form clearly shows two "r"s. Thus, the answer is that t

## Considerations for Streaming

1. For long tasks or real-time data, streaming provides continuous feedback, reducing the perceived latency for the user.
2. Streaming updates rather than full state can reduce overhead and bandwidth when processing large graphs or complex workflows.
3. Streaming custom data allows for flexible feedback such as showing progress or intermediate results during execution.
4. It's important to handle cases where streaming should be disabled to avoid issues.

## External API Integrations

API integration allows AI agent to fetch live data, post data such as user information or actions to a web service or interact with third-party tools. In LangGraph, API calls are done inside custom nodes which process the API's response and route the state based on the returned data.

1. You need to setup Weather API key from OpenWeatherMap.
2. Define a node that sends a request to the open weather map API, parses the response and then returns a message with the weather data.

In [None]:
import requests
from dotenv import load_dotenv
import os

load_dotenv()
weather_api_key = os.getenv("OPENWEATHER_API_KEY")

# Node to fetch live weather data
def live_weather_node(state):
    city = "London"
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={weather_api_key}&units=metric"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        temperature = data['main']['temp']
        description = data['weather'][0]['description']
        return {"messages": [f"The weather in {city} is {temperature} degree C with {description}."]}
    else:
        return {"messages": ["Sorry, I couldn't fetch the weather information."]}

3. Add this node to the graph.

In [24]:
from langgraph.graph import StateGraph, MessagesState, START, END

builder = StateGraph(MessagesState)
builder.add_node("live_weather_node", live_weather_node)
builder.add_edge(START, "live_weather_node")
builder.add_edge("live_weather_node", END)

app = builder.compile()

def simulate_interaction():
    input_message = {"messages": [
        ("human", "Tell me the weather in London")
    ]}
    # Process the input and stream the result
    for result in app.stream(input_message, stream_mode="values"):
        result['messages'][-1].pretty_print()

simulate_interaction()


Tell me the weather in London

The weather in London is 6.65 degree C with overcast clouds.


You can also extract city information from user input. For this, you need to modify the `live_weather_node` to dynamically fetch the city from user's query.

In [29]:
def live_weather_node(state):
    last_message = state['messages'][-1].content.lower()
    if "in" in last_message:
        city = last_message.split("in")[-1].strip()
    else:
        city = "London"
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={weather_api_key}&units=metric"
    print(url)
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        temperature = data['main']['temp']
        description = data['weather'][0]['description']
        return {"messages": [f"The weather in {city} is {temperature} degree C with {description}."]}
    else:
        return {"messages": ["Sorry, I couldn't fetch the weather information."]}

builder = StateGraph(MessagesState)
builder.add_node("live_weather_node", live_weather_node)
builder.add_edge(START, "live_weather_node")
builder.add_edge("live_weather_node", END)

app = builder.compile()

def simulate_interaction():
    input_message = {"messages": [
        ("human", "What's the weather in London?")
    ]}
    # Process the input and stream the result
    for result in app.stream(input_message, stream_mode="values"):
        result['messages'][-1].pretty_print()

simulate_interaction()


What's the weather in London?
http://api.openweathermap.org/data/2.5/weather?q=london?&appid=a2b6ff65d3e211f7cea9560253f1a015&units=metric

Sorry, I couldn't fetch the weather information.


Similarly, you can integrate calculator API that can handle arithmetic operations using `mathjs.org`.

In [30]:
import urllib

def calculator_node(state):
    last_message = state['messages'][-1].content.lower()
    expression = last_message.split("calculate")[-1].strip()
    # URL encode the expression to ensure it's safe for use in the query string
    encoded_expression = urllib.parse.quote(expression)
    url = f"http://api.mathjs.org/v4/?expr={encoded_expression}"
    response = requests.get(url)
    if response.status_code == 200:
        result = response.text
        return {"messages": [f"The result of {expression} is {result}."]}
    else:
        return {"messages": ["Sorry, I couldn't calculate that."]}

builder = StateGraph(MessagesState)
builder.add_node("calculator_node", calculator_node)
builder.add_edge(START, "calculator_node")
builder.add_edge("calculator_node", END)
app = builder.compile()

def simulate_interaction():
    input_message = {"messages": [
        ("human", "Calculate 5 + 3 * 2")
    ]}
    for result in app.stream(input_message, stream_mode="values"):
        result["messages"][-1].pretty_print()

simulate_interaction()


Calculate 5 + 3 * 2

The result of 5 + 3 * 2 is 11.


You can integrate both APIs using below workflow.
1. Create routing function to route requests to either calculator or weather node.
2. Build workflow

In [32]:
def routing_function(state):
    last_message = state['messages'][-1].content.lower()
    if "weather" in last_message:
        return "live_weather_node"
    elif "calculate" in last_message:
        return "calculator_node"
    return "default_node"

builder = StateGraph(MessagesState)
builder.add_node("live_weather_node", live_weather_node)
builder.add_node("calculator_node", calculator_node)
builder.add_node("default_node", lambda state: {"messages": ["Sorry, I don't understand the request."]})
builder.add_conditional_edges(START, routing_function)
builder.add_edge("live_weather_node", END)
builder.add_edge("calculator_node", END)
builder.add_edge("default_node", END)

app = builder.compile()

def simulate_interaction():
    while True:
        user_input = input("You: ")
        if user_input.lower() in ['exit', 'quit']:
            print("Exiting...")
            break
        input_message = {"messages": [
            ("human", user_input)
        ]}
        for result in app.stream(input_message, stream_mode="values"):
            result["messages"][-1].pretty_print()

simulate_interaction()

You:  Please calculate 5 * 3 + 2.



Please calculate 5 * 3 + 2.

The result of 5 * 3 + 2. is 17.


You:  What is weather in Texas?



What is weather in Texas?
http://api.openweathermap.org/data/2.5/weather?q=texas?&appid=a2b6ff65d3e211f7cea9560253f1a015&units=metric

Sorry, I couldn't fetch the weather information.


You:  quit


Exiting...
