In [1]:
# Setup Keys
from dotenv import load_dotenv
load_dotenv()

True

In [None]:
from google.colab import auth
from googleapiclient.discovery import build

auth.authenticate_user()
gmail_service = build('gmail', 'v1')

In [None]:
from IPython.display import Image, display
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import InMemorySaver
from langchain.messages import AnyMessage, HumanMessage, AIMessage, SystemMessage, ToolMessage
from langchain.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.types import interrupt, Command
from typing import TypedDict, Annotated, List
from langchain_community.tools.tavily_search import TavilySearchResults
import re
import ast
import json
from pprint import pprint

import gspread
from google.auth import default
import pandas as pd
import chromadb
from chromadb.config import Settings
import base64
from email.mime.text import MIMEText

# Initialize ChromaDB client
client = chromadb.Client(Settings(
    persist_directory="/content/chroma_db",
    anonymized_telemetry=False
))

# Create or get collection
collection = client.get_or_create_collection(
    name="user_personas",
    metadata={"description": "User persona and preference data"}
)

###########
# UTILITY #
###########
def extract_dict_from_string(text: str) -> dict:
    """
    Extract and convert a dictionary from a string that may contain extra text.
    Tries JSON first, then falls back to ast.literal_eval.
    """
    # Strip markdown code fences if present
    cleaned = text.strip()

    if re.search(r'```json', cleaned):
      list_substr = cleaned.split("```json")
      cleaned = list_substr[1]
    if re.search(r'```', cleaned):
      list_substr = cleaned.split("```")
      cleaned = list_substr[0]

    cleaned = cleaned.strip()

    try:
        result = json.loads(cleaned)
        if isinstance(result, dict):
            return result
    except json.JSONDecodeError:
        pass

    print(f"❌ Failed to extract dict from response:\n{text[:200]}")
    return {}

def sync_personas_from_sheets(url:str, df_append:pd.DataFrame = pd.DataFrame()):
    """Sync persona data from Google Sheets to ChromaDB"""

    # Get latest data from sheet
    creds, _ = default()
    gc = gspread.authorize(creds)
    sheet = gc.open_by_url(url).sheet1
    data = sheet.get_all_records()
    df = pd.DataFrame(data)

    # Append new info
    df = pd.DataFrame

    # Clear existing collection
    client.delete_collection("user_personas")
    collection = client.create_collection("user_personas")

    # Re-populate
    documents = []
    metadatas = []
    ids = []

    for idx, row in df.iterrows():
        doc_text = " | ".join([f"{k}: {v}" for k, v in row.items()])
        documents.append(doc_text)
        metadatas.append({key: str(value) for key, value in row.items()})
        ids.append(f"user_{idx}")

    collection.add(documents=documents, metadatas=metadatas, ids=ids)
    print(f"✅ Synced {len(documents)} personas from Google Sheets")

    return collection

##########
# PROMPT #
##########
BASE_PROMPT = """
  ROLE
  You are an intelligent Email Classification Agent designed to help busy professionals manage their inbox efficiently by categorizing incoming emails based on urgency, relevance, and required action.

  # BACKGROUND: USER PERSONA
  You are assisting {name}, a {persona}
"""

PROMPT_ROUTER = """
  {base_prompt}

  INSTRUCTION
  Classify user intent

  Use "email_workflow" if:
    - User provides an email to analyze/classify
    - User asks to draft/send an email
    - User mentions responding to an email

    Use "general_query" if:
    - User asks about calendar availability
    - User asks to schedule a meeting (without email context)
    - General questions or requests

  INPUT
  {message}

  OUTPUT, in json format
  ```json
  name : [The inferred user name]
  user_intent : [email_workflow/general_query]
  ```
"""

PROMPT_PERSONA = """
  ROLE
  You are an AI that can help inferred the user persona based on the chain of message asked by that user

  INPUT
  {conversation}

  OUTPUT
  Summary of the user persona
"""

PROMPT_TRIAGE = """
  {base_prompt}

  # INSTRUCTION
  For each incoming email, analyze its content, sender, subject, and context to categorize it into one of three actions: IGNORE, NOTIFY, or RESPOND.

  # RULES: EMAIL CATEGORY DEFINITIONS

  ## IGNORE
  Emails that require NO action and NO attention.

  **Criteria:**
  Marketing newsletters, spam emails, mass company announcements

  ## NOTIFY
  Emails that Marcel should be AWARE of but don't require immediate response.

  **Criteria:**
  Team member out sick, build system notifications, project status updates

  ## RESPOND
  Emails that require Marcel's direct action, response, or decision.

  **Criteria:**
  Direct questions from team members, meeting requests, critical bug reports

  INPUT
  Email : {email}

  OUTPUT, in json format
  ```json
  from : [The sender of the email]
  to : [The user that get the email]
  subject : [The subject of the email]
  email_content : [The content of the email]
  email_classification : [IGNORE/NOTIFY/RESPOND]
  classification_reason : [The reason behind the value of email_classification]
  ```
"""

PROMPT_RESPOND = """
  {base_prompt}

  INSTRUCTION
  Draft and call send_email tool with :
  - to_email: recipient's email address
  - subject: email subject line
  - body: the complete email you drafted

  INPUT
  From : {from}
  To : {to}
  Subject : {subject}
  Content : {email_content}

  OUTPUT, in json format
  ```json
  respond_subject : [Subject of the respond email based on user persona]
  respond_content : [Content of the respond email based on user persona]

"""

########
# TOOL #
########
@tool
def send_email(to_email: str, subject: str, body: str) -> str:
  """
  Actually send an email via Gmail API.
  The LLM should provide the to_email, subject, and body.

  Args:
      to_email: Recipient email address
      subject: Email subject line
      body: Complete email body (already drafted by LLM)

  Returns:
      Confirmation message
  """

  try:
      # Create message
      message = MIMEText(body, 'plain')
      message['to'] = to_email
      message['subject'] = subject

      # Encode message
      raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')

      # Send email
      send_message = gmail_service.users().messages().send(
          userId='me',
          body={'raw': raw_message}
      ).execute()

      return f"✅ Email sent successfully to {to_email}! Message ID: {send_message['id']}"

  except Exception as e:
      return f"❌ Failed to send email: {str(e)}"


#########
# AGENT #
#########
class AgentState(TypedDict):
  task: str
  name: str
  persona: str
  user_intent: str
  sender: str
  email_subject: str
  email_content: str
  email_classification: str
  classification_reason: str
  email_respond_subject: str
  email_respond_content: str

class EmailAgent:
  def __init__(self, tools, middleware):
    self.config = {'configurable': {'thread_id': 101}}
    self.checkpointer = InMemorySaver()

    # Store tools as dictionary
    self.tools = {t.name: t for t in tools}

    # create LLM with tool binding
    self.llm = init_chat_model(
        model="claude-haiku-4-5-20251001",
        temperature=0,
    )

    # Bind tools to the model
    self.llm_with_tools = self.llm.bind_tools(tools)

    # to initiate the Graph
    self.graph = StateGraph(AgentState)
    self.graph.add_node('router', self.call_agent_plan)
    self.graph.add_node('persona', self.call_get_user_persona)
    self.graph.add_edge('router', 'persona')
    self.graph.add_node('generate', self.call_agent_generate)
    self.graph.add_edge('research', 'generate')
    self.graph.add_conditional_edges(
        'generate',
        self.check_revision_iteration,
        {True: 'reflect', False: END})
    self.graph.add_node('reflect', self.call_agent_reflect)
    self.graph.add_edge('reflect', 'research')
    self.graph.set_entry_point('plan')

    self.graph_np = self.graph.compile(
        checkpointer=self.checkpointer,
        )

  def _update_state(self, new_dict:dict, state: AgentState):
    """Check whether the state will be updated based on the JSON output from the LLM response """
    for key, value in new_dict.items():
      if key in state:
        print(f"  ✅ Updated '{key}'")
      else:
        print(f"  ⚠️  Unknown key '{key}' - skipped")

  def call_agent_router(self, state:AgentState):

    # Initial sync of the user persona data
    sync_personas_from_sheets("https://docs.google.com/spreadsheets/d/1Kg-j1_zjdk2UJdVNQanTsF_mLu8S4X9LuXYiBLOBWGA/edit?gid=0#gid=0")

    # Invoke LLM
    message = HumanMessage(content = PROMPT_ROUTER.format(
        base_prompt=BASE_PROMPT,
        message=state['task'],
      )
    )

    print(f"\n{'='*60}")
    print(f"ROUTING}")
    print(f"{'='*60}")

    result = self.llm.invoke(message)
    json_result = extract_dict_from_string(result.content)
    self._update_state(json_result, state)

    print(f"\nUser : {json_result.get('name', '')}")
    print(f"\Task Type : {json_result.get('user_intent', '')}")

    return {
      'name': json_result.get('name', ''),
      'user_intent': json_result.get('user_intent', ''),
    }

  def call_get_user_persona(self, state:AgentState):
    """ Retrieve user persona and preference information """

    # Filter by specific user
    result = collection.get(
          where={"name": state['name']}
      )

    if len(result['metadata']) > 0 :
      print(f"✅ User Persona {state['name']} found, proceed with it")

      return result['metadata'][0]

    else:
      if state['']
      print(f"❌ No user persona found, proceed with default")
      result = {
          'name':state['name'],
          'persona':'User not specified, treat it as default persona'
      }

  def execute(self, message) :
    """Function to run the agent graph flow"""

    self.input_message = message
    print(f"\n{'='*60}")
    print(f"TOPIC FROM THE USER")
    print(f"{'='*60}")
    pprint(f"{self.input_message}")

    # ✅ Initialize ALL required state fields
    initial_state = {
        'task': self.input_message,
        'name': '',
        'persona': '',
        'user_intent': '',
        'sender': '',
        'email_subject': '',
        'email_content' : '',
        'email_classification': '',
        'classification_reason': '',
        'email_respond_subject': '',
        'email_respond_content' : ''
    }

    # Run the graph
    result = self.graph_np.invoke(
        initial_state,
        config=self.config
    )

    # Get final answer
    final_answer = result['essay']

