<a target="_blank" href="https://colab.research.google.com/github/PacktPublishing/Building-Agentic-AI-Systems/blob/main/Chapter_07_b.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

# Chapter 7 (b) – Effective Agentic System Design Techniques _(continued)_
---

Install dependencies

In [None]:
!pip install crewai langchain-openai langmem langgraph

In [None]:
import getpass
import os

api_key = getpass.getpass(prompt="Enter OpenAI API Key: ")
os.environ["OPENAI_API_KEY"] = api_key

In [None]:
from crewai import Agent, Task, Crew, Process
from crewai.tools import tool
from langchain_openai import ChatOpenAI
from IPython.display import display, Markdown, HTML

llm = ChatOpenAI(model="gpt-4o")

# State spaces and environment modeling

In this section of Chapter 7, you read about some of the concepts of environment modeling and what Static and Dynamic environments are. These concepts ultimately tie back to how you maintain agent memory throughout an agentic workflow. So let's look at examples of the three different memory types we discussed-

- Short-term memory (working memory)
- Long-term memory (knowledge base)
- Episodic memory (interaction history)

Most agentic frameworks supports some sort of memory management (for example CrewAI's [memory management](https://docs.crewai.com/concepts/memory)), there are other frameworks that are solely purpose built for memory management in agentic systems, such as [LangMem](https://langchain-ai.github.io/langmem/).

---

### Short-term memory

In this example we use LangGraph's thread-scoped memory. This is a type of memory that lets your application remember previous interactions within a single thread or conversation. A thread organizes multiple interactions in a session, similar to the way email groups messages in a single conversation. In the following example, you will notice that we interact with the agent with two different thread_id and the agent is able to keep the memory separate for each thread rather than updating the global long-term memory.


In [None]:
from typing import TypedDict, List
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, BaseMessage
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI

# Define a simple state type for our travel agent
class TravelState(TypedDict):
    messages: List[BaseMessage]

# Define the travel agent response function
def generate_response(state: TravelState) -> TravelState:
    """Generate a response based on the conversation history."""
    # Create a system message with the travel agent prompt
    system_message = SystemMessage(content="""
    You are a helpful travel agent assistant. Use the conversation history to 
    remember the user's preferences and trip details. Be specific and reference 
    their previously mentioned preferences when making recommendations.
    """)
    
    # Combine the system message with the existing messages
    messages = [system_message] + state["messages"]
    
    # Generate a response
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke(messages)
    
    # Return the updated state with the new message
    return {"messages": state["messages"] + [response]}

# Build the graph
builder = StateGraph(TravelState)
builder.add_node("generate_response", generate_response)
builder.add_edge(START, "generate_response")
builder.set_finish_point("generate_response")

# Compile the graph with a memory checkpointer
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Function to process a user message
def chat_with_travel_agent(message: str, thread_id: str = "default"):
    """Process a user message and return the agent's response."""
    # Create the thread configuration
    config = {"configurable": {"thread_id": thread_id}}
    
    # Get the current state if it exists
    current_messages = []
    try:
        checkpoint = checkpointer.get_tuple(config)
        if checkpoint and "messages" in checkpoint.checkpoint.get("channel_values", {}):
            current_messages = checkpoint.checkpoint["channel_values"]["messages"]
    except:
        pass
    
    # Add the new message
    current_messages.append(HumanMessage(content=message))
    
    # Process through the graph
    result = graph.invoke({"messages": current_messages}, config)
    
    # Return just the last message content (the response)
    return result["messages"][-1].content

# Example usage
if __name__ == "__main__":
    thread_id = "user_123"
    
    # First interaction
    print("User: I want to plan a trip to Japan next month.")
    response = chat_with_travel_agent("I want to plan a trip to Japan next month.", thread_id)
    print(f"Agent: {response}\n")
    
    # Second interaction - the agent should remember Japan
    print("User: I'm interested in traditional culture and my budget is $3000.")
    response = chat_with_travel_agent("I'm interested in traditional culture and my budget is $3000.", thread_id)
    print(f"Agent: {response}\n")
    
    # Third interaction - test memory of previous details
    print("User: What was my destination again?")
    response = chat_with_travel_agent("What was my destination again?", thread_id)
    print(f"Agent: {response}\n")
    
    # New conversation thread (should not know about Japan)
    new_thread = "user_456"
    print("=== New Conversation ===")
    print("User: What kind of budget would I need for a beach vacation?")
    response = chat_with_travel_agent("What kind of budget would I need for a beach vacation?", new_thread)
    print(f"Agent: {response}")

### Long-term Memory

In this example we will use LangMem to implement "memory collections" which is also known as long-term memory or knowledgebase. In this type, memories are stored as individual documents or records. For each new conversation, the memory system can decide to insert new memories to the store. In this case, a long-term memory is saved globally within a namespace named `travel_preferences`. Whenever we define the agentic workflow with that namespace, regardless of the thread it will refer to the globally stored preferences. This also can be useful to store information such as travel advisories, weather conditions etc. that apply globally to all travel related conversations and are perhaps not user specific.

In [None]:
from langgraph.prebuilt import create_react_agent
from langgraph.store.memory import InMemoryStore
from langmem import create_manage_memory_tool, create_search_memory_tool
# from langchain_openai import ChatOpenAI

# Set up storage with vector embedding capabilities
store = InMemoryStore(
    index={
        "dims": 1536,
        "embed": "openai:text-embedding-3-small",
    }
)

# Custom tools for the travel agent
def search_flights(destination: str, dates: str, budget: str = None):
    """Search for flights based on destination, dates, and optional budget."""
    return f"Found several flight options to {destination} for {dates}. Prices range from $500-$1200 round trip."

def search_hotels(destination: str, dates: str, preferences: str = None):
    """Search for hotels based on destination, dates, and preferences."""
    return f"Found 15 hotels in {destination} for {dates}. Options include boutique hotels, chain hotels, and vacation rentals."

def search_activities(destination: str, interests: str = None):
    """Search for activities based on destination and interests."""
    return f"Popular activities in {destination} include museums, guided tours, outdoor activities, and local cuisine experiences."

# Create a travel agent with memory capabilities
travel_agent = create_react_agent(
    "gpt-4o",
    tools=[
        # Travel-specific tools
        search_flights,
        search_hotels,
        search_activities,
        
        # Memory management tools
        create_manage_memory_tool(
            namespace=("travel_preferences",),
            instructions="""
            Proactively call this tool when you:
            1. Learn about a new user preference for travel (budget, destinations, activities, diet, etc.)
            2. Receive an explicit request to remember specific trip details
            3. Need to record important context about their upcoming travel plans
            4. Need to update incorrect or outdated information about the user's travel preferences
            """
        ),
        create_search_memory_tool(
            namespace=("travel_preferences",),
        )
    ],
    store=store,
)

# Example function to handle travel planning conversation
def travel_planning_session(user_message, user_id="user123"):
    """
    Handle a user message in the travel planning conversation.
    
    Args:
        user_message: The user's message
        user_id: Unique identifier for the user
        
    Returns:
        The agent's response
    """
    # Set up the messages with the user's input
    messages = [{"role": "user", "content": user_message}]
    
    # Invoke the agent with the configured store for memory persistence
    response = travel_agent.invoke({"messages": messages})
    
    # Return the agent's response
    return response["messages"][-1].content

# Demonstrate usage with a typical travel planning conversation
if __name__ == "__main__":
    # First conversation establishing preferences
    print("User: I'm planning a trip to Barcelona in June for about a week. I love architecture and food.")
    response = travel_planning_session("I'm planning a trip to Barcelona in June for about a week. I love architecture and food.")
    print(f"Travel Agent: {response}\n")
    
    # Second message with budget information
    print("User: My budget is around $3000 for the entire trip, and I prefer staying in boutique hotels.")
    response = travel_planning_session("My budget is around $3000 for the entire trip, and I prefer staying in boutique hotels.")
    print(f"Travel Agent: {response}\n")
    
    # Asking about vegetarian restaurants (new information)
    print("User: I'm vegetarian. Can you recommend some good vegetarian restaurants in Barcelona?")
    response = travel_planning_session("I'm vegetarian. Can you recommend some good vegetarian restaurants in Barcelona?")
    print(f"Travel Agent: {response}\n")
    
    # Testing memory recall - should remember destination, budget, dietary preferences
    print("User: What was my budget again?")
    response = travel_planning_session("What was my budget again?")
    print(f"Travel Agent: {response}\n")
    
    # Testing memory recall - should remember destination, interests
    print("User: Can you suggest an architecture-focused itinerary?")
    response = travel_planning_session("Can you suggest an architecture-focused itinerary?")
    print(f"Travel Agent: {response}")

Let's test the Agent's memory one more time by asking if it remembers the travel budget given to it earlier.

In [None]:
print("User: I think I gave you a budget for my travel right?")
response = travel_planning_session("I think I gave you a budget for my travel right?")
print(f"Travel Agent: {response}\n")

### Episodic memory (interaction history)

Episodic memory preserves successful interactions as learning examples that guide future behavior. Unlike short-term memory which stores facts, episodic memory captures the full context of an interaction—the situation, the thought process that led to success, and why that approach worked. These memories help the agent learn from experience, adapting its responses based on what has worked before. In the following example, we create a travel agent system with episodic memory using the `langmem` library. The code defines a `TravelEpisode` schema that captures four critical elements: 

- the customer's travel request 
- the agent's planning considerations
- the specific recommendation provided, and 
- the positive outcome that resulted. 
 
When the system processes a conversation where a traveler expresses concerns about Italian crowds, it stores how the agent successfully redirected them to less-crowded alternatives while still satisfying their desire for art and cuisine. This structured approach to remembering successful interactions allows the system to develop a repository of effective strategies that can be applied to similar situations in the future.

In [None]:
from pydantic import BaseModel, Field
from langmem import create_memory_manager

class TravelEpisode(BaseModel):
    """An episode captures how to handle a specific travel planning situation, including the reasoning process
    and what made it successful."""

    customer_request: str = Field(
        ..., 
        description="The travel request and relevant customer context"
    )
    considerations: str = Field(
        ...,
        description="Key travel planning considerations and reasoning process"
    )
    recommendation: str = Field(
        ...,
        description="What travel options were recommended in response"
    )
    outcome: str = Field(
        ...,
        description="How the customer responded and why the recommendation worked"
    )

manager = create_memory_manager(
    "gpt-4o",
    schemas=[TravelEpisode],
    instructions="Extract examples of successful travel planning interactions. Include the customer context, consideration process, and why the recommendation satisfied the customer.",
    enable_inserts=True,
)

# Example conversation
conversation = [
    {"role": "user", "content": "I want to visit Italy in June but I'm worried about crowds. I love art and food but hate waiting in long lines."},
    {"role": "assistant", "content": "Instead of Rome and Florence which get extremely crowded in June, consider Bologna and Ravenna. Bologna offers amazing food (it's nicknamed 'La Grassa' or 'The Fat One') and beautiful medieval architecture without the overwhelming crowds. Ravenna has stunning Byzantine mosaics and rarely has long lines. You could spend 4 days in Bologna and 2 in Ravenna with a day trip to Modena for balsamic vinegar tasting."},
    {"role": "user", "content": "That sounds perfect! I never considered those cities but they seem to match exactly what I'm looking for. How would I get between them?"},
]

episodes = manager.invoke({"messages": conversation})

In [None]:
for episode in episodes:
    print(f"{episode.content.customer_request=}")
    print(f"{episode.content.considerations=}")
    print(f"{episode.content.recommendation=}")
    print(f"{episode.content.outcome=}")

### Homework

Can you implement the same examples of long-term memory, short-term memory, and episodic memory using CrewAI's memory management? Refer to documentation for more details - https://docs.crewai.com/concepts/memory

---

# Sequential and parallel processing in agentic workflows

The good news is that we have already performed parallel i.e. hierarchical workflows in the Chapter_06.ipynb notebook in the CWD approach. We used a Delegator Agent to delegate tasks to multiple worker agents.

As opposed to Hierarchical workflows, sequential workflows execute agents one after the other and are important in situations where the output of one agent is required for the next agent. For example- the flight booking dates need to be finalized by the flight booking agent before the hotel booking agent can book or recommend hotels. In CrewAI, a sequential workflow is specified by `Process.sequential` for example:

```python
my_crew = Crew(
        agents=[agent_1, agent_2],
        tasks=[task_1, task_2],
        verbose=True, 
        process=Process.sequential
    )
```

### Homework

Can you implement a sequential agentic workflow with the followinf example?

- Two agents a flight search and a hotel search agents
- Each agent uses it's own tool
- The flight search agent must finalize the flights first with dates
- The hotel search agent will use the flight arrival and departure dates to recomment hotels, check-in and check-out dates