In [190]:
import os
import time
from datetime import datetime
import sys

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

from dotenv import load_dotenv
_ = load_dotenv()

In [191]:
# openai_api_key = os.getenv("OPENAI_API_KEY")

# from langchain.chat_models import init_chat_model

# llm = init_chat_model("openai:gpt-4o-mini", temperature=0.0)

In [192]:
from pydantic import BaseModel, Field
from typing_extensions import TypedDict, Literal, Annotated
from langchain_ollama import ChatOllama
from langchain_core.tools import tool

from langgraph.store.memory import InMemoryStore

from prompts import triage_system_prompt, triage_user_prompt, agent_system_prompt

In [193]:
#####################Local Chat Ollama model######################
# Uncomment the following lines to use a local Ollama model instead of OpenAI
llm = ChatOllama(model="llama3.2:latest", temperature=0.0)
######################Local Chat Ollama model######################

In [194]:
profile = {
    "name": "John",
    "full_name": "John Doe",
    "user_profile_background": "Senior software engineer leading a team of 5 developers",
}

prompt_instructions = {
    "triage_rules": {
        "ignore": "Marketing newsletters, spam emails, mass company announcements",
        "notify": "Team member out sick, build system notifications, project status updates",
        "respond": "Direct questions from team members, meeting requests, critical bug reports",
    },
    "agent_instructions": "Use these tools when appropriate to help manage John's tasks efficiently."
}

email = {
    "from": "Alice Smith <alice.smith@company.com>",
    "to": "John Doe <john.doe@company.com>",
    "subject": "Quick question about API documentation",
    "body": """
Hi John,

I was reviewing the API documentation for the new authentication service and noticed a few endpoints seem to be missing from the specs. Could you help clarify if this was intentional or if we should update the docs?

Specifically, I'm looking at:
- /auth/refresh
- /auth/validate

Thanks!
Alice""",
}

In [195]:
class Router(BaseModel):
    """Analyze the unread email and route it according to its content."""

    reasoning: str = Field(
        description="Step-by-step reasoning behind the classification."
    )
    classification: Literal["ignore", "respond", "notify"] = Field(
        description="The classification of an email: 'ignore' for irrelevant emails, "
        "'notify' for important information that doesn't need a response, "
        "'respond' for emails that need a reply",
    )

llm_router = llm.with_structured_output(Router)

In [196]:
@tool
def write_email(to: str, subject: str, content: str) -> str:
    """Write and send an email."""
        # Placeholder response - in real app would send email
    time.sleep(0.1)  # Simulate some processing time
    result = f"Email sent to {to} with subject '{subject}'"
    return result

@tool
def schedule_meeting(
    attendees: list[str], 
    subject: str, 
    duration_minutes: int, 
    preferred_day: str
) -> str:
    """Schedule a calendar meeting."""
        # Placeholder response - in real app would check calendar and schedule
    time.sleep(0.2)  # Simulate some processing time
    result = f"Meeting '{subject}' scheduled for {preferred_day} with {len(attendees)} attendees"
    return result

@tool
def check_calendar_availability(day: str) -> str:
    """Check calendar availability for a given day."""
        # Placeholder response - in real app would check actual calendar
    time.sleep(0.15)  # Simulate some processing time
    result = f"Available times on {day}: 9:00 AM, 2:00 PM, 4:00 PM"
    return result

store = InMemoryStore(
    index={"embed": "openai:text-embedding-3-small"},
)

from langmem import create_manage_memory_tool, create_search_memory_tool

manage_memory_tool = create_manage_memory_tool(
    namespace=(
        "email_assistant", 
        "{langgraph_user_id}",
        "collection"
    )
)
search_memory_tool = create_search_memory_tool(
    namespace=(
        "email_assistant",
        "{langgraph_user_id}",
        "collection"
    )
)

In [197]:
agent_system_prompt_memory = """
< Role >
You are {full_name}'s executive assistant. You are a top-notch executive assistant who cares about {name} performing as well as possible.
</ Role >

< Tools >
You have access to the following tools to help manage {name}'s communications and schedule:

1. write_email(to, subject, content) - Send emails to specified recipients
2. schedule_meeting(attendees, subject, duration_minutes, preferred_day) - Schedule calendar meetings
3. check_calendar_availability(day) - Check available time slots for a given day
4. manage_memory - Store any relevant information about contacts, actions, discussion, etc. in memory for future reference
5. search_memory - Search for any relevant information that may have been stored in memory
</ Tools >

< Instructions >
{instructions}
</ Instructions >
"""

def create_prompt(state):
    return [
        {
            "role": "system", 
            "content": agent_system_prompt_memory.format(
                instructions=prompt_instructions["agent_instructions"], 
                **profile
            )
        }
    ] + state['messages']

In [198]:
from langgraph.prebuilt import create_react_agent

response_agent = create_react_agent(
    llm,
    tools=[
        write_email, 
        schedule_meeting, 
        check_calendar_availability, 
        manage_memory_tool, 
        search_memory_tool
    ],
    prompt=create_prompt,
    store=store,
)

In [199]:
config = {"configurable": {"langgraph_user_id": "lance"}}

In [None]:
response = response_agent.invoke(
    {"messages": [{"role": "user", "content": "Jim is my friend"}]},
    config=config # type: ignore
)

for m in response["messages"]:
    m.pretty_print()

In [None]:
response2 = response_agent.invoke(
    {"messages": [{"role":"user","content":"who is Jim?"}]},
    config=config # type: ignore
)

for m in response2["messages"]:
    m.pretty_print()

In [None]:
store.list_namespaces()

In [None]:
store.search(('email_assistant', 'lance', 'collection'), query='jim')

In [None]:
from langgraph.graph import add_messages

class State(TypedDict):
    email_input: dict
    messages: Annotated[list, add_messages]

In [None]:
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from typing import Literal
from IPython.display import Image, display

In [None]:
def triage_router(state: State) -> Command[
    Literal["response_agent", "__end__"]
]:
    author = state['email_input']['author']
    to = state['email_input']['to']
    subject = state['email_input']['subject']
    email_thread = state['email_input']['email_thread']

    system_prompt = triage_system_prompt.format(
        full_name=profile["full_name"],
        name=profile["name"],
        user_profile_background=profile["user_profile_background"],
        triage_no=prompt_instructions["triage_rules"]["ignore"],
        triage_notify=prompt_instructions["triage_rules"]["notify"],
        triage_email=prompt_instructions["triage_rules"]["respond"],
        examples=None
    )
    user_prompt = triage_user_prompt.format(
        author=author, 
        to=to, 
        subject=subject, 
        email_thread=email_thread
    )
    result = llm_router.invoke(
        [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]
    )
    if result.classification == "respond":
        print("📧 Classification: RESPOND - This email requires a response")
        goto = "response_agent"
        update = {
            "messages": [
                {
                    "role": "user",
                    "content": f"Respond to the email {state['email_input']}",
                }
            ]
        }
    elif result.classification == "ignore":
        print("🚫 Classification: IGNORE - This email can be safely ignored")
        update = None
        goto = END
    elif result.classification == "notify":
        # If real life, this would do something else
        print("🔔 Classification: NOTIFY - This email contains important information")
        update = None
        goto = END
    else:
        raise ValueError(f"Invalid classification: {result.classification}")
    return Command(goto=goto, update=update)

In [None]:
email_agent = StateGraph(State)
email_agent = email_agent.add_node(triage_router)
email_agent = email_agent.add_node("response_agent", response_agent)
email_agent = email_agent.add_edge(START, "triage_router")
email_agent = email_agent.compile(store=store)

In [None]:
display(Image(email_agent.get_graph(xray=True).draw_mermaid_png()))

In [None]:
email_input = {
    "author": "Alice Smith <alice.smith@company.com>",
    "to": "John Doe <john.doe@company.com>",
    "subject": "Quick question about API documentation",
    "email_thread": """Hi John,

I was reviewing the API documentation for the new authentication service and noticed a few endpoints seem to be missing from the specs. Could you help clarify if this was intentional or if we should update the docs?

Specifically, I'm looking at:
- /auth/refresh
- /auth/validate

Thanks!
Alice""",
}

In [None]:
response = email_agent.invoke( # type: ignore
    {"email_input": email_input},
    config=config
)

In [None]:
for m in response["messages"]:
    m.pretty_print()

In [None]:
email_input = {
    "author": "Alice Smith <alice.smith@company.com>",
    "to": "John Doe <john.doe@company.com>",
    "subject": "Follow up",
    "email_thread": """Hi John,

Any update on my previous ask?""",
}

In [None]:
response = email_agent.invoke({"email_input": email_input}, config=config)

In [None]:
for m in response["messages"]:
    m.pretty_print()