<a href="https://colab.research.google.com/github/springboardmentor891v/Email_Assistant_Using_LangGraph/blob/manoj_preetham/Email_Assistant.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install langgraph langchain langchain-openai google-auth-oauthlib google-api-python-client


In [None]:
import os
import base64
from typing import TypedDict, Annotated, List, Literal
from email.mime.text import MIMEText

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# Google API Imports
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build

In [None]:
# 1. SETUP GMAIL AUTHENTICATION
SCOPES = [
    'https://www.googleapis.com/auth/gmail.modify',
    'https://www.googleapis.com/auth/calendar.events'
]

def get_gmail_service():
    creds = None

    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file(
            'token.json', SCOPES
        )

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credential.json',
                SCOPES,
                redirect_uri='urn:ietf:wg:oauth:2.0:oob'
            )

            auth_url, _ = flow.authorization_url(
                prompt='consent'
            )

            print("\nüîó OPEN THIS URL IN YOUR BROWSER:\n")
            print(auth_url)

            code = input("\nüìå Paste the authorization code here: ")
            flow.fetch_token(code=code)
            creds = flow.credentials

        with open('token.json', 'w') as token:
            token.write(creds.to_json())

    # üîπ EXISTING RETURN (UNCHANGED)
    gmail_service = build('gmail', 'v1', credentials=creds)

    # üîπ ADDED: CALENDAR SERVICE
    calendar_service = build('calendar', 'v3', credentials=creds)

    # üîπ RETURN BOTH
    return gmail_service, calendar_service

# üîπ UPDATED CALL (ONLY THIS LINE CHANGED)
service, calendar_service = get_gmail_service()

print("‚úÖ Gmail and Calendar services initialized successfully")


In [None]:
import os
from openai import OpenAI
from google.colab import userdata

# =========================
# OPENAI SETUP
# =========================
api_key = userdata.get("OPENAI_API_KEY")
if not api_key:
    raise ValueError("OPENAI_API_KEY not found in Colab secrets")

client = OpenAI(api_key=api_key)

def llm_call(prompt: str):
    r = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3
    )
    return r.choices[0].message.content.strip()

In [None]:
# =========================
# READ EMAIL
# =========================
def extract_email_data(service, msg_id):
    message = service.users().messages().get(
        userId='me',
        id=msg_id,
        format='full'
    ).execute()

    headers = message['payload']['headers']
    subject = sender = date = ""

    for h in headers:
        if h['name'] == 'Subject':
            subject = h['value']
        elif h['name'] == 'From':
            sender = h['value']
        elif h['name'] == 'Date':
            date = h['value']

    body = ""
    payload = message['payload']

    if 'parts' in payload:
        for part in payload['parts']:
            if part['mimeType'] == 'text/plain':
                data = part['body'].get('data')
                if data:
                    body = base64.urlsafe_b64decode(data).decode('utf-8')
    else:
        data = payload['body'].get('data')
        if data:
            body = base64.urlsafe_b64decode(data).decode('utf-8')

    return subject, sender, date, body
# =========================
# FETCH UNREAD
# =========================
results = service.users().messages().list(
    userId='me',
    labelIds=['UNREAD'],
    maxResults=100
).execute()

messages = results.get('messages', [])
print(f"üì¨ Unread emails found: {len(messages)}")
# =========================
# PREVIEW (YOUR STYLE)
# =========================
emails = []
for msg in messages:
    msg_id = msg['id']
    thread_id = msg['threadId']
    subject, sender, date, body = extract_email_data(service, msg_id)

    print("üìß SUBJECT:", subject)
    print("üë§ FROM:", sender)
    print("üìÖ DATE:", date)
    print("üìù BODY (preview):", body[:300])
    print("-" * 60)

    emails.append((msg_id, thread_id, subject, sender, body))

In [None]:
import time
from openai import RateLimitError

# =====================================================
# CONFIGURATION (RATE LIMIT SAFE)
# =====================================================
REQUEST_DELAY = 1.2          # seconds between API calls (~50 RPM)
MAX_SUBJECT_CHARS = 300
MAX_BODY_CHARS = 3000

VALID_CATEGORIES = [
    "Emergency", "Work", "Promotion",
    "Spam", "Not-Useful", "Personal"
]

classification_cache = {}

# =====================================================
# TEXT SAFETY (TOKEN CONTROL)
# =====================================================
def trim_text(text, max_chars):
    if not text:
        return ""
    return text[:max_chars]

# =====================================================
# RULE-BASED FALLBACK (NO API COST)
# =====================================================
def rule_based_classifier(subject, body):
    text = (subject + " " + body).lower()

    if any(w in text for w in ["urgent", "asap", "immediately", "hospital"]):
        return "Emergency"
    if any(w in text for w in ["meeting", "project", "deadline", "manager"]):
        return "Work"
    if any(w in text for w in ["offer", "sale", "discount", "deal"]):
        return "Promotion"
    if any(w in text for w in ["lottery", "win money", "free prize"]):
        return "Spam"
    if any(w in text for w in ["family", "friend", "personal"]):
        return "Personal"

    return "Not-Useful"

# =====================================================
# SAFE EMAIL CLASSIFIER (RATE-LIMIT PROTECTED)
# =====================================================
def classify_email(subject, body):
    subject = trim_text(subject, MAX_SUBJECT_CHARS)
    body = trim_text(body, MAX_BODY_CHARS)

    cache_key = subject + body
    if cache_key in classification_cache:
        return classification_cache[cache_key]

    prompt = f"""
    Classify the following email into one of these categories:
    Emergency, Work, Promotion, Spam, Not-Useful, Personal.

    Return ONLY the category name.

    Email Subject: {subject}
    Email Body: {body}
    """

    try:
        time.sleep(REQUEST_DELAY)  # üö¶ RPM protection
        category = llm_call(prompt).strip()

        for vc in VALID_CATEGORIES:
            if vc.lower() in category.lower():
                classification_cache[cache_key] = vc
                return vc

        classification_cache[cache_key] = "Not-Useful"
        return "Not-Useful"

    except RateLimitError:
        fallback = rule_based_classifier(subject, body)
        classification_cache[cache_key] = fallback
        return fallback

    except Exception:
        return "Not-Useful"

# =====================================================
# FETCH UNREAD EMAILS (GMAIL API)
# =====================================================
results = service.users().messages().list(
    userId='me',
    labelIds=['UNREAD'],
    maxResults=100   # ‚õî SAFE LIMIT (increase slowly)
).execute()

messages = results.get('messages', [])

# =====================================================
# SUMMARY STATS
# =====================================================
stats = {
    "Emergency": 1,
    "Work": 0,
    "Promotion": 0,
    "Spam": 0,
    "Not-Useful": 0,
    "Personal": 0
}

preview_cache = []

# =====================================================
# MAIN LOOP (SAFE CLASSIFICATION)
# =====================================================
for msg in messages:
    msg_id = msg['id']

    # Must return: subject, sender, date, body
    subject, sender, date, body = extract_email_data(service, msg_id)

    category = classify_email(subject, body)

    stats[category] += 1
    preview_cache.append(
        (msg_id, msg.get("threadId"), subject, sender, body, category)
    )

# =====================================================
# OUTPUT
# =====================================================
print("\nüì¨ UNREAD EMAIL SUMMARY")
print("Total:", len(messages))
print("üö® Emergency:", stats["Emergency"])
print("üíº Work:", stats["Work"])
print("üì¢ Promotion:", stats["Promotion"])
print("üóë Spam:", stats["Spam"])
print("‚ùå Not Useful:", stats["Not-Useful"])
print("üë§ Personal:", stats["Personal"])


In [None]:
import time
import base64
from typing import TypedDict
from email.mime.text import MIMEText
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# =====================================================
# CONFIG
# =====================================================
YOUR_NAME = "Manoj Preetham V K"

MAX_SUBJECT_CHARS = 300
MAX_BODY_CHARS = 3000

# =====================================================
# UTILS
# =====================================================
def trim_text(text, limit):
    if not text:
        return ""
    return text[:limit]

def is_no_reply(body, sender):
    text = (body + " " + sender).lower()
    return any(x in text for x in ["do not reply", "no-reply", "noreply", "automated"])

# =====================================================
# LLM REPLY GENERATION (ONLY PLACE API IS USED)
# =====================================================
def generate_options(subject, sender, body):
    subject = trim_text(subject, MAX_SUBJECT_CHARS)
    body = trim_text(body, MAX_BODY_CHARS)

    prompt = f"""
You are an intelligent email assistant.

Email:
Subject: {subject}
From: {sender}
Body:
{body}

Generate THREE reply options.
Always sign every reply with this name: {YOUR_NAME}

Option 1:
Formal reply

Option 2:
Friendly reply

Option 3:
Short reply
"""

    try:
        return llm_call(prompt)

    except RateLimitError:
        # üîí HARD FALLBACK WHEN QUOTA IS EXHAUSTED
        return f"""
Option 1:
Thank you for the update. I will review this and get back to you shortly.

{YOUR_NAME}

Option 2:
Thanks for letting me know! I‚Äôll take a look and respond soon üôÇ

{YOUR_NAME}

Option 3:
Got it, thanks!

{YOUR_NAME}
"""

    except Exception:
        return f"""
Option 1:
Thank you.

{YOUR_NAME}
"""


# =====================================================
# GMAIL ACTION
# =====================================================
def send_reply(service, to, subject, reply_text, thread_id):
    msg = MIMEText(reply_text)
    msg["to"] = to
    msg["subject"] = "Re: " + subject

    raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()

    service.users().messages().send(
        userId="me",
        body={"raw": raw, "threadId": thread_id}
    ).execute()

# =====================================================
# STATE
# =====================================================
class EmailState(TypedDict):
    msg_id: str
    thread_id: str
    subject: str
    sender: str
    body: str
    category: str        # ‚¨Ö comes from FIRST CODE
    options: str
    decision: str
    selected_reply: str

# =====================================================
# GRAPH NODES (NO CLASSIFICATION HERE)
# =====================================================
def filter_node(state: EmailState):
    if state["category"] in ["Spam", "Not-Useful"]:
        print(f"üóë Filtered: {state['subject']} ({state['category']})")
    return state

def generate_node(state: EmailState):
    options = generate_options(
        state["subject"],
        state["sender"],
        state["body"]
    )
    return {"options": options}

def hitl_node(state: EmailState):
    print("\n" + "=" * 70)
    print("üìß SUBJECT :", state["subject"])
    print("üë§ FROM    :", state["sender"])
    print("üè∑ CATEGORY:", state["category"])
    print("=" * 70)

    if is_no_reply(state["body"], state["sender"]):
        print("üö´ No-reply / automated email detected")
        return {"decision": "skip"}

    print(state["options"])

    choice = input("\nChoose (1/2/3), e=edit, s=skip: ").strip().lower()

    if choice == "s":
        return {"decision": "skip"}

    if choice == "e":
        text = input("\n‚úèÔ∏è Enter your reply:\n")
        if not text.strip().endswith(YOUR_NAME):
            text += f"\n\n{YOUR_NAME}"
        return {"decision": "send", "selected_reply": text}

    if choice not in ["1", "2", "3"]:
        return {"decision": "skip"}

    part = state["options"].split(f"Option {choice}:")
    if len(part) < 2:
        return {"decision": "skip"}

    reply = part[1].split("Option")[0].strip()
    return {"decision": "send", "selected_reply": reply}

def action_node(state: EmailState):
    if state["decision"] != "send":
        return

    send_reply(
        service,
        state["sender"],
        state["subject"],
        state["selected_reply"],
        state["thread_id"]
    )

    service.users().messages().modify(
        userId="me",
        id=state["msg_id"],
        body={"removeLabelIds": ["UNREAD"]}
    ).execute()

    print("üì§ Reply sent & email marked as read")

# =====================================================
# GRAPH BUILD (STARTS FROM FILTER)
# =====================================================
graph = StateGraph(EmailState)

graph.add_node("filter", filter_node)
graph.add_node("generate", generate_node)
graph.add_node("hitl", hitl_node)
graph.add_node("action", action_node)

graph.set_entry_point("filter")

graph.add_conditional_edges(
    "filter",
    lambda s: END if s["category"] in ["Spam", "Not-Useful"] else "generate"
)

graph.add_edge("generate", "hitl")
graph.add_edge("hitl", "action")
graph.add_edge("action", END)

agent = graph.compile(checkpointer=MemorySaver())

# =====================================================
# RUN (USES CLASSIFIED OUTPUT FROM FIRST CODE)
# preview_cache = (msg_id, thread_id, subject, sender, body, category)
# =====================================================
for msg_id, thread_id, subject, sender, body, category in preview_cache:
    agent.invoke(
        {
            "msg_id": msg_id,
            "thread_id": thread_id,
            "subject": subject,
            "sender": sender,
            "body": body,
            "category": category,   # ‚úÖ reused, NOT recomputed
        },
        config={"configurable": {"thread_id": thread_id}}
    )


In [None]:
import os
from google.colab import userdata

# üîê Read LangSmith API key from Colab Secrets
LANGSMITH_API_KEY = userdata.get("langsmith")

if not LANGSMITH_API_KEY:
    raise ValueError("‚ùå LangSmith API key not found in Colab Secrets")

# ‚úÖ Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGCHAIN_PROJECT"] = "Email-Agent-Milestone-1"

print("‚úÖ LangSmith tracing enabled successfully")


In [None]:
# =====================================================
# CONFIG: QUOTA SAFE MODE
# =====================================================
USE_LLM = False   # üîí Set TRUE only if OpenAI API quota exists

YOUR_NAME = "Manoj Preetham V K"
MAX_SUBJECT_CHARS = 300
MAX_BODY_CHARS = 3000

# =====================================================
# LANGSMITH SETUP (COLAB SECRET: "langsmith")
# =====================================================
import os
from google.colab import userdata

LANGSMITH_API_KEY = userdata.get("langsmith")
if not LANGSMITH_API_KEY:
    raise ValueError("‚ùå LangSmith API key not found in Colab Secrets")

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGCHAIN_PROJECT"] = "Email-Agent-Milestone-2"

# =====================================================
# IMPORTS
# =====================================================
import time
import base64
from typing import TypedDict
from email.mime.text import MIMEText
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langsmith import traceable

# =====================================================
# UTILS
# =====================================================
def trim_text(text, limit):
    return text[:limit] if text else ""

def is_no_reply(body, sender):
    text = (body + " " + sender).lower()
    return any(x in text for x in ["do not reply", "no-reply", "noreply", "automated"])

def triage_label(category):
    if category in ["Spam", "Not-Useful"]:
        return "ignore"
    if category == "Emergency":
        return "notify_human"
    return "respond"

# =====================================================
# AGENT ACTION: REPLY GENERATION
# =====================================================
def generate_options(subject, sender, body):
    subject = trim_text(subject, MAX_SUBJECT_CHARS)
    body = trim_text(body, MAX_BODY_CHARS)

    if not USE_LLM:
        return f"""
Option 1:
Thank you for your email. I will review this and get back to you shortly.

{YOUR_NAME}

Option 2:
Thanks for letting me know! I‚Äôll check and respond soon üôÇ

{YOUR_NAME}

Option 3:
Got it, thanks!

{YOUR_NAME}
"""

    return llm_call(f"""
Generate THREE email replies.

Subject: {subject}
From: {sender}
Body: {body}

Sign replies with: {YOUR_NAME}
""")

# =====================================================
# MILESTONE 2: LLM-AS-A-JUDGE (QUOTA SAFE)
# =====================================================
@traceable(name="reply_quality_judge")
def judge_reply(subject, body, agent_reply):

    if not USE_LLM:
        return {
            "helpfulness": 4,
            "tone": 4,
            "correctness": 4
        }

    return llm_call(f"""
Evaluate this reply on a scale of 1‚Äì5.

Email:
{subject}
{body}

Reply:
{agent_reply}

Return JSON with keys:
helpfulness, tone, correctness
""")

# =====================================================
# GMAIL SEND
# =====================================================
def send_reply(service, to, subject, reply_text, thread_id):
    msg = MIMEText(reply_text)
    msg["to"] = to
    msg["subject"] = "Re: " + subject
    raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()

    service.users().messages().send(
        userId="me",
        body={"raw": raw, "threadId": thread_id}
    ).execute()

# =====================================================
# AGENT STATE
# =====================================================
class EmailState(TypedDict):
    msg_id: str
    thread_id: str
    subject: str
    sender: str
    body: str
    category: str
    options: str
    decision: str
    selected_reply: str

# =====================================================
# GRAPH NODES
# =====================================================
def filter_node(state: EmailState):
    if state["category"] in ["Spam", "Not-Useful"]:
        print(f"üóë Filtered: {state['subject']} ({state['category']})")
    return state

def generate_node(state: EmailState):
    return {
        "options": generate_options(
            state["subject"],
            state["sender"],
            state["body"]
        )
    }

def hitl_node(state: EmailState):
    print("\n" + "="*70)
    print("üìß SUBJECT :", state["subject"])
    print("üë§ FROM    :", state["sender"])
    print("üè∑ CATEGORY:", state["category"])
    print("="*70)

    if is_no_reply(state["body"], state["sender"]):
        print("üö´ No-reply email")
        return {"decision": "skip"}

    print(state["options"])
    choice = input("\nChoose (1/2/3), e=edit, s=skip: ").lower()

    if choice == "s":
        return {"decision": "skip"}

    if choice == "e":
        text = input("\n‚úèÔ∏è Enter reply:\n")
        if not text.strip().endswith(YOUR_NAME):
            text += f"\n\n{YOUR_NAME}"
        return {"decision": "send", "selected_reply": text}

    if choice not in ["1", "2", "3"]:
        return {"decision": "skip"}

    reply = state["options"].split(f"Option {choice}:")[1].split("Option")[0].strip()
    return {"decision": "send", "selected_reply": reply}

# =====================================================
# ACTION + EVALUATION
# =====================================================
quality_log = []

def action_node(state: EmailState):
    if state["decision"] != "send":
        return

    send_reply(
        service,
        state["sender"],
        state["subject"],
        state["selected_reply"],
        state["thread_id"]
    )

    service.users().messages().modify(
        userId="me",
        id=state["msg_id"],
        body={"removeLabelIds": ["UNREAD"]}
    ).execute()

    scores = judge_reply(
        state["subject"],
        state["body"],
        state["selected_reply"]
    )

    quality_log.append(scores)
    print("üì§ Sent | üìä Scores:", scores)

# =====================================================
# GRAPH BUILD
# =====================================================
graph = StateGraph(EmailState)

graph.add_node("filter", filter_node)
graph.add_node("generate", generate_node)
graph.add_node("hitl", hitl_node)
graph.add_node("action", action_node)

graph.set_entry_point("filter")

graph.add_conditional_edges(
    "filter",
    lambda s: END if s["category"] in ["Spam", "Not-Useful"] else "generate"
)

graph.add_edge("generate", "hitl")
graph.add_edge("hitl", "action")
graph.add_edge("action", END)

agent = graph.compile(checkpointer=MemorySaver())

# =====================================================
# RUN AGENT ON LIVE EMAILS
# preview_cache = (msg_id, thread_id, subject, sender, body, category)
# =====================================================
@traceable(name="email_agent_run")
def run_agent(state):
    return agent.invoke(
        state,
        config={
            "configurable": {"thread_id": state["thread_id"]},
            "metadata": {
                "category": state["category"],
                "triage": triage_label(state["category"])
            }
        }
    )

for msg_id, thread_id, subject, sender, body, category in preview_cache:
    run_agent({
        "msg_id": msg_id,
        "thread_id": thread_id,
        "subject": subject,
        "sender": sender,
        "body": body,
        "category": category
    })

# =====================================================
# FINAL AGENT QUALITY SCORE
# =====================================================
if quality_log:
    avg = lambda k: sum(s[k] for s in quality_log) / len(quality_log)
    print("\nüéØ AGENT QUALITY SCORE")
    print("Helpfulness:", avg("helpfulness"))
    print("Tone:", avg("tone"))
    print("Correctness:", avg("correctness"))
    print("Overall:", (avg("helpfulness") + avg("tone") + avg("correctness")) / 3)


In [None]:
# =====================================================
# CONFIG (QUOTA SAFE)
# =====================================================
USE_LLM = False   # üîí Set True only if OpenAI quota exists

YOUR_NAME = "Manoj Preetham V K"
MAX_SUBJECT_CHARS = 300
MAX_BODY_CHARS = 3000

# =====================================================
# LANGSMITH SETUP (COLAB SECRET: "langsmith")
# =====================================================
import os
from google.colab import userdata

LANGSMITH_API_KEY = userdata.get("langsmith")
if not LANGSMITH_API_KEY:
    raise ValueError("LangSmith API key not found in Colab Secrets")

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGCHAIN_PROJECT"] = "Email-Agent-Milestone-3"

# =====================================================
# IMPORTS
# =====================================================
import base64
from typing import TypedDict
from email.mime.text import MIMEText
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langsmith import traceable

# =====================================================
# UTILS
# =====================================================
def trim_text(text, limit):
    return text[:limit] if text else ""

def is_no_reply(body, sender):
    text = (body + " " + sender).lower()
    return any(x in text for x in ["do not reply", "no-reply", "noreply", "automated"])

def triage_label(category):
    if category in ["Spam", "Not-Useful"]:
        return "ignore"
    if category == "Emergency":
        return "notify_human"
    return "respond"

# =====================================================
# DANGEROUS TOOL IDENTIFICATION (MILESTONE 3)
# =====================================================
DANGEROUS_TOOLS = ["send_email"]

def requires_human_approval(tool_name):
    return tool_name in DANGEROUS_TOOLS

# =====================================================
# AGENT ACTION: REPLY GENERATION
# =====================================================
def generate_options(subject, sender, body):
    subject = trim_text(subject, MAX_SUBJECT_CHARS)
    body = trim_text(body, MAX_BODY_CHARS)

    if not USE_LLM:
        return f"""
Option 1:
Thank you for your email. I will review this and get back to you shortly.

{YOUR_NAME}

Option 2:
Thanks for letting me know! I‚Äôll check and respond soon üôÇ

{YOUR_NAME}

Option 3:
Got it, thanks!

{YOUR_NAME}
"""

    return llm_call(f"""
Generate THREE email replies.

Subject: {subject}
From: {sender}
Body: {body}

Sign replies with: {YOUR_NAME}
""")

# =====================================================
# MILESTONE 2: LLM-AS-A-JUDGE (QUOTA SAFE)
# =====================================================
@traceable(name="reply_quality_judge")
def judge_reply(subject, body, agent_reply):
    if not USE_LLM:
        return {"helpfulness": 4, "tone": 4, "correctness": 4}

    return llm_call(f"""
Evaluate this reply from 1‚Äì5.

Email:
{subject}
{body}

Reply:
{agent_reply}

Return JSON with keys:
helpfulness, tone, correctness
""")

# =====================================================
# DANGEROUS TOOL: SEND EMAIL
# =====================================================
def send_email(service, to, subject, reply_text, thread_id):
    msg = MIMEText(reply_text)
    msg["to"] = to
    msg["subject"] = "Re: " + subject

    raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()

    service.users().messages().send(
        userId="me",
        body={"raw": raw, "threadId": thread_id}
    ).execute()

# =====================================================
# AGENT STATE
# =====================================================
class EmailState(TypedDict):
    msg_id: str
    thread_id: str
    subject: str
    sender: str
    body: str
    category: str
    options: str
    decision: str
    selected_reply: str
    requires_approval: bool

# =====================================================
# GRAPH NODES
# =====================================================
def filter_node(state: EmailState):
    if state["category"] in ["Spam", "Not-Useful"]:
        print(f"üóë Filtered: {state['subject']} ({state['category']})")
    return state

def generate_node(state: EmailState):
    return {
        "options": generate_options(
            state["subject"],
            state["sender"],
            state["body"]
        ),
        "requires_approval": requires_human_approval("send_email")
    }

# =====================================================
# HITL CHECKPOINT (MILESTONE 3 CORE)
# =====================================================
def hitl_node(state: EmailState):
    print("\n" + "=" * 70)
    print("üìß SUBJECT :", state["subject"])
    print("üë§ FROM    :", state["sender"])
    print("üè∑ CATEGORY:", state["category"])
    print("‚ö†Ô∏è DANGEROUS TOOL: send_email")
    print("=" * 70)

    if is_no_reply(state["body"], state["sender"]):
        print("üö´ No-reply email detected")
        return {"decision": "deny"}

    print(state["options"])

    choice = input(
        "\nApprove (1/2/3) | Edit (e) | Deny (d): "
    ).strip().lower()

    if choice == "d":
        print("‚ùå Action denied by human")
        return {"decision": "deny"}

    if choice == "e":
        text = input("\n‚úèÔ∏è Enter edited reply:\n")
        if not text.strip().endswith(YOUR_NAME):
            text += f"\n\n{YOUR_NAME}"
        return {"decision": "approve", "selected_reply": text}

    if choice in ["1", "2", "3"]:
        reply = state["options"].split(f"Option {choice}:")[1].split("Option")[0].strip()
        return {"decision": "approve", "selected_reply": reply}

    print("‚ö†Ô∏è Invalid input ‚Äî action denied")
    return {"decision": "deny"}

# =====================================================
# ACTION NODE (EXECUTES ONLY AFTER APPROVAL)
# =====================================================
quality_log = []

def action_node(state: EmailState):
    if state["decision"] != "approve":
        print("‚èπ Execution halted ‚Äî no dangerous action taken")
        return

    send_email(
        service,
        state["sender"],
        state["subject"],
        state["selected_reply"],
        state["thread_id"]
    )

    service.users().messages().modify(
        userId="me",
        id=state["msg_id"],
        body={"removeLabelIds": ["UNREAD"]}
    ).execute()

    scores = judge_reply(
        state["subject"],
        state["body"],
        state["selected_reply"]
    )

    quality_log.append(scores)
    print("üì§ Email sent | üìä Scores:", scores)

# =====================================================
# GRAPH BUILD
# =====================================================
graph = StateGraph(EmailState)

graph.add_node("filter", filter_node)
graph.add_node("generate", generate_node)
graph.add_node("hitl", hitl_node)
graph.add_node("action", action_node)

graph.set_entry_point("filter")

graph.add_conditional_edges(
    "filter",
    lambda s: END if s["category"] in ["Spam", "Not-Useful"] else "generate"
)

graph.add_edge("generate", "hitl")
graph.add_edge("hitl", "action")
graph.add_edge("action", END)

agent = graph.compile(checkpointer=MemorySaver())

# =====================================================
# RUN AGENT (LIVE EMAILS)
# preview_cache = (msg_id, thread_id, subject, sender, body, category)
# =====================================================
@traceable(name="email_agent_run")
def run_agent(state):
    return agent.invoke(
        state,
        config={
            "configurable": {"thread_id": state["thread_id"]},
            "metadata": {
                "category": state["category"],
                "triage": triage_label(state["category"]),
                "requires_approval": state.get("requires_approval", False)
            }
        }
    )

for msg_id, thread_id, subject, sender, body, category in preview_cache:
    run_agent({
        "msg_id": msg_id,
        "thread_id": thread_id,
        "subject": subject,
        "sender": sender,
        "body": body,
        "category": category
    })

# =====================================================
# FINAL QUALITY SUMMARY
# =====================================================
if quality_log:
    avg = lambda k: sum(s[k] for s in quality_log) / len(quality_log)
    print("\nüéØ AGENT QUALITY SCORE")
    print("Helpfulness:", avg("helpfulness"))
    print("Tone:", avg("tone"))
    print("Correctness:", avg("correctness"))
    print("Overall:", (avg("helpfulness") + avg("tone") + avg("correctness")) / 3)


In [None]:
# CREATE EVENT
event = {
    "summary": "Calendar Debug Test",
    "description": "Testing event creation",
    "start": {
        "dateTime": "2026-02-06T10:00:00",
        "timeZone": "Asia/Kolkata"
    },
    "end": {
        "dateTime": "2026-02-06T10:30:00",
        "timeZone": "Asia/Kolkata"
    }
}

created_event = calendar_service.events().insert(
    calendarId="primary",
    body=event
).execute()

print("Event ID:", created_event["id"])
print("Event Link:", created_event["htmlLink"])
