# MongoDB Short-Term Memory for Rental Agents

This notebook demonstrates how to use AsyncMongoDBSaver with the Tenant and Landlord agents in our rental negotiation system.

In [174]:
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.mongodb.aio import AsyncMongoDBSaver

from app.config import config
from app.conversation_service.tenant_workflow.graph import create_tenant_workflow_graph
from app.conversation_service.landlord_workflow.graph import create_landlord_workflow_graph
from app.conversation_service.meta_controller import create_meta_controller_graph

## Create AsyncMongoDBSaver

First, we'll create an instance of AsyncMongoDBSaver to handle state management.

In [175]:
# Get MongoDB configuration
mongodb_config = config.mongodb
conn_string = mongodb_config.connection_string
db_name = mongodb_config.database
checkpoint_collection = mongodb_config.mongo_state_checkpoint_collection
writes_collection = mongodb_config.mongo_state_writes_collection

print(f"Connection String: {conn_string}")
print(f"Database: {db_name}")
print(f"Checkpoint Collection: {checkpoint_collection}")
print(f"Writes Collection: {writes_collection}")

Connection String: mongodb://localhost:27017/rental_agent
Database: rental_agent
Checkpoint Collection: state_checkpoints
Writes Collection: state_writes


## Functions for Testing Tenant Agent Memory

In [176]:
async def generate_tenant_response_with_memory(tenant_id: str, messages: list):
    """Generate a response from the tenant agent using MongoDB memory."""
    async with AsyncMongoDBSaver.from_conn_string(
        conn_string=conn_string,
        db_name=db_name,
        checkpoint_collection_name=checkpoint_collection,
        writes_collection_name=writes_collection,
    ) as checkpointer:
        graph_builder = create_tenant_workflow_graph()
        graph = graph_builder.compile(checkpointer=checkpointer)
        
        # Basic tenant data
        tenant_data = {
            "tenant_id": tenant_id,
            "name": "Alex Smith",
            "annual_income": 45000,
            "max_budget": 1500,
            "has_guarantor": False,
            "min_bedrooms": 1,
            "max_bedrooms": 2,
            "preferred_locations": ["Manchester City Centre"],
            "is_student": False,
            "has_pets": False,
            "is_smoker": False,
            "num_occupants": 1,
            "conversation_context": "",
            "summary": ""
        }
        
        config = {
            "configurable": {"thread_id": f"tenant_{tenant_id}"},
        }
        
        output_state = await graph.ainvoke(
            input={
                "messages": messages,
                **tenant_data
            },
            config=config,
        )
        
        if output_state.get("messages") and len(output_state["messages"]) > 0:
            last_message = output_state["messages"][-1]
            return last_message
        return None

## Functions for Testing Landlord Agent Memory

In [177]:
async def generate_landlord_response_with_memory(landlord_id: str, messages: list):
    """Generate a response from the landlord agent using MongoDB memory."""
    async with AsyncMongoDBSaver.from_conn_string(
        conn_string=conn_string,
        db_name=db_name,
        checkpoint_collection_name=checkpoint_collection,
        writes_collection_name=writes_collection,
    ) as checkpointer:
        graph_builder = create_landlord_workflow_graph()
        graph = graph_builder.compile(checkpointer=checkpointer)
        
        # Basic landlord data
        landlord_data = {
            "landlord_id": landlord_id,
            "name": "John Wilson",
            "branch_name": "City Living Properties",
            "conversation_context": "",
            "summary": "",
            "preferences": {
                "min_tenant_income": 35000,
                "accepts_students": True,
                "accepts_pets": False,
                "accepts_smokers": False
            }
        }
        
        # Add a property to discuss
        property_data = {
            "property_id": "prop123",
            "address": "15 Oxford Road, Manchester",
            "monthly_rent": 1200,
            "bedrooms": 2,
            "bathrooms": 1,
            "available_from": "2023-07-01",
            "furnished": True,
            "description": "Modern apartment in city center with great amenities"
        }
        
        config = {
            "configurable": {"thread_id": f"landlord_{landlord_id}"},
        }
        
        output_state = await graph.ainvoke(
            input={
                "messages": messages,
                **landlord_data,
                "properties": [property_data]
            },
            config=config,
        )
        
        if output_state.get("messages") and len(output_state["messages"]) > 0:
            last_message = output_state["messages"][-1]
            return last_message
        return None

## Testing Tenant Agent with Memory

In [178]:
messages = [
    HumanMessage(content="Hello, I'm interested in renting a property in Manchester. My name is Sarah.")
]
# await generate_tenant_response_with_memory("tenant123", messages)

In [179]:
messages = [
    HumanMessage(content="Can you remember my name?")
]
# await generate_tenant_response_with_memory("tenant123", messages)

## Testing Landlord Agent with Memory

In [180]:
messages = [
    HumanMessage(content="Hello, I'm looking for tenants for my property at 15 Oxford Road. My name is John from City Living Properties.")
]
# await generate_landlord_response_with_memory("landlord456", messages)

In [181]:
messages = [
    HumanMessage(content="Do you recall the property address I mentioned earlier?")
]
# await generate_landlord_response_with_memory("landlord456", messages)

In [182]:
from langchain_core.messages import AIMessage
from langgraph.graph import StateGraph, END
from typing import Dict, List, TypedDict

class ExtendedMetaState(TypedDict):
    """State schema for the meta controller."""
    session_id: str
    messages: List[Dict]
    active_agent: str
    tenant_data: Dict
    landlord_data: Dict
    property_data: Dict
    is_terminated: bool
    termination_reason: str

async def call_tenant_from_meta(state: ExtendedMetaState) -> ExtendedMetaState:
    """Call the tenant agent and process its response."""
    # 准备消息和输入
    messages = state["messages"]
    tenant_id = state["tenant_data"]["tenant_id"]
    
    # 调用前面定义的生成函数
    response = await generate_tenant_response_with_memory(
        tenant_id=tenant_id, 
        messages=messages
    )
    
    # 更新状态
    if response:
        # 添加新消息到历史记录
        new_messages = state["messages"].copy()
        new_messages.append({
            "role": "assistant" if isinstance(response, AIMessage) else response.get("role", "assistant"),
            "content": response.content if isinstance(response, AIMessage) else response.get("content", ""),
            "from": "tenant"
        })
        
        # 更新状态，切换活动代理
        return {
            **state,
            "messages": new_messages,
            "active_agent": "landlord"  # 切换到房东
        }
    
    # 如果没有响应，返回终止状态
    return {
        **state,
        "is_terminated": True,
        "termination_reason": "Tenant agent did not respond"
    }

async def call_landlord_from_meta(state: ExtendedMetaState) -> ExtendedMetaState:
    """Call the landlord agent and process its response."""
    # 准备消息和输入
    messages = state["messages"]
    landlord_id = state["landlord_data"]["landlord_id"]
    
    # 调用前面定义的生成函数
    response = await generate_landlord_response_with_memory(
        landlord_id=landlord_id, 
        messages=messages
    )
    
    # 更新状态
    if response:
        # 添加新消息到历史记录
        new_messages = state["messages"].copy()
        new_messages.append({
            "role": "assistant" if isinstance(response, AIMessage) else response.get("role", "assistant"),
            "content": response.content if isinstance(response, AIMessage) else response.get("content", ""),
            "from": "landlord"
        })
        
        # 更新状态，切换活动代理
        return {
            **state,
            "messages": new_messages,
            "active_agent": "tenant"  # 切换回租客
        }
    
    # 如果没有响应，返回终止状态
    return {
        **state,
        "is_terminated": True,
        "termination_reason": "Landlord agent did not respond"
    }

def should_continue(state: ExtendedMetaState) -> str:
    """决定对话是否应该继续，或者应该终止。"""
    # 检查是否已经明确标记为终止
    if state.get("is_terminated", False):
        return "end"
    
    # 如果消息超过20条，为安全起见终止对话
    if len(state.get("messages", [])) > 20:
        return "end"
    
    # 检查最新消息是否包含终止信号词
    latest_messages = state.get("messages", [])
    if latest_messages:
        last_message = latest_messages[-1]
        content = last_message.get("content", "").lower()
        
        # 检查是否有结束对话的关键词
        termination_keywords = ["agreement reached", "deal confirmed", "thank you for your time", 
                              "not interested", "end of discussion", "conversation complete"]
        
        for keyword in termination_keywords:
            if keyword in content:
                return "end"
    
    # 默认继续对话
    return "continue"

def create_improved_meta_controller_graph():
    """创建一个改进的元控制器图，不依赖MongoDB记忆。"""
    controller = StateGraph(ExtendedMetaState)
    
    # 添加对话节点
    controller.add_node("call_tenant", call_tenant_from_meta)
    controller.add_node("call_landlord", call_landlord_from_meta)
    
    # 设置流程 - 租客和房东轮流对话
    controller.add_conditional_edges("call_tenant", should_continue, {
        "continue": "call_landlord",
        "end": END
    })
    
    controller.add_conditional_edges("call_landlord", should_continue, {
        "continue": "call_tenant",
        "end": END
    })
    
    # 设置入口点（通常由租客发起对话）
    controller.set_entry_point("call_tenant")
    
    return controller

In [None]:
async def stream_turns_from_meta_controller(session_id: str, num_turns: int = 3):
    """Simulate conversation turns in meta controller with streaming intermediate states.
    
    Args:
        session_id: Unique identifier for the session
        num_turns: Number of conversation turns to simulate
        
    Returns:
        Generator yielding intermediate states
    """

In [184]:
# 使用方法示例
session_state, all_turns = await stream_turns_from_meta_controller("test_session_123", num_turns=5)


===== 会话开始: test_session_123 =====
初始活动代理: tenant

----- 轮次 1 -----
当前活动代理: tenant
错误发生在轮次 1: Error code: 400 - {'error': {'message': "An assistant message with 'tool_calls' must be followed by tool messages responding to each 'tool_call_id'. The following tool_call_ids did not have response messages: call_cqa4fko61rFfRzo2UWgej2oH", 'type': 'invalid_request_error', 'param': 'messages.[42].role', 'code': None}}
Traceback (most recent call last):
  File "/tmp/ipykernel_64068/2547578779.py", line 71, in stream_turns_from_meta_controller
    state = await graph.ainvoke(state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/yushiran/Rental_Agent/backend/.venv/lib/python3.13/site-packages/langgraph/pregel/__init__.py", line 2788, in ainvoke
    async for chunk in self.astream(
    ...<19 lines>...
            chunks.append(chunk)
  File "/home/yushiran/Rental_Agent/backend/.venv/lib/python3.13/site-packages/langgraph/pregel/__init__.py", line 2655, in astream
    async for _ in runner.