In [6]:
# Install required libraries

import os
from dotenv import load_dotenv
import logging # Import logging early

load_dotenv()

# Your familiar email sending/fetching variables
APP_PASSWORD = os.getenv('app_password')
SENDER_EMAIL = os.getenv('sender')

# Split recipients by comma if it's a string from .env
RECIPIENTS_STR = os.getenv('recipients1', '')
RECIPIENTS = [r.strip() for r in RECIPIENTS_STR.split(',') if r.strip()]

# For IMAP, you explicitly mentioned GMAIL_EMAIL in your example
GMAIL_IMAP_EMAIL = os.getenv("GMAIL_EMAIL", SENDER_EMAIL) # Fallback to SENDER_EMAIL if not set

# OpenRouter API Key for AI models
# OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")



In [16]:
OPENROUTER_API_KEY= os.getenv('openai_api_key')

In [18]:
model_filtering = ChatOpenAI(
    base_url="https://openrouter.ai/api/v1",
    model="deepseek/deepseek-chat",
    temperature=0.0,
    openai_api_key=OPENROUTER_API_KEY,
    max_retries=5
)


model_filtering.invoke("hi")

AuthenticationError: Error code: 401 - {'error': {'message': 'No auth credentials found', 'code': 401}}

#agents 


In [12]:
import imaplib
import email
from email.header import decode_header
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
import ssl
import re
from typing import List, Dict, Any, TypedDict
import pandas as pd # For potential DataFrame use, although not strictly required by the core logic

# Langchain and LangGraph imports
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import END, StateGraph


## Utils: Logger
def get_logger(name):
    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)

    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    ch.setFormatter(formatter)

    # Avoid adding duplicate handlers in Jupyter re-runs
    if not logger.handlers:
        logger.addHandler(ch)

    return logger

logger = get_logger(__name__)


## Core: EmailState (TypedDict)
class EmailState(TypedDict):
    """
    Represents the state of an email as it moves through the processing pipeline.
    """
    emails: List[Dict[str, Any]]
    current_email: Dict[str, Any]
    history: List[Dict[str, Any]]
    metadata: Dict[str, Any]


## Core: Email Fetching (Adapted to imaplib.IMAP4_SSL)
def decode_email_header(header):
    """Decodes email headers that might contain non-ASCII characters."""
    decoded_parts = decode_header(header)
    decoded_string = ""
    for part, charset in decoded_parts:
        if isinstance(part, bytes):
            try:
                decoded_string += part.decode(charset if charset else 'utf-8')
            except (UnicodeDecodeError, LookupError):
                decoded_string += part.decode('latin-1', errors='ignore') # Fallback
        else:
            decoded_string += part
    return decoded_string

def fetch_imap_emails(imap_username: str, app_password: str, num_emails: int = 5) -> list:
    """
    Fetches emails from an IMAP server using imaplib.IMAP4_SSL.
    """
    emails = []
    IMAP_SERVER = "imap.gmail.com" # Hardcoded for Gmail as per your example

    try:
        mail = imaplib.IMAP4_SSL(IMAP_SERVER)
        logger.info(f"Connecting to IMAP server: {IMAP_SERVER} with username: {imap_username}")
        mail.login(imap_username, app_password)
        logger.info("IMAP login successful.")
        mail.select("inbox")
        logger.info("Selected INBOX folder.")

        status, messages = mail.search(None, "ALL")

        if status != "OK":
            logger.warning("No messages found in INBOX.")
            return []

        email_ids = messages[0].split()
        logger.debug(f"Found {len(email_ids)} messages.")

        # Fetch the latest N messages
        fetch_uids = email_ids[-num_emails:] if len(email_ids) > num_emails else email_ids

        if not fetch_uids:
            logger.info("No messages to fetch.")
            return []

        for uid in fetch_uids:
            status, msg_data = mail.fetch(uid, "(RFC822)")
            if status != "OK":
                logger.error(f"Failed to fetch email UID {uid}: {msg_data}")
                continue

            raw_email = msg_data[0][1]
            msg = email.message_from_bytes(raw_email)

            subject = decode_email_header(msg.get('Subject', 'No Subject'))
            sender_raw = msg.get('From', 'Unknown Sender')
            sender = decode_email_header(sender_raw)

            body = ""
            if msg.is_multipart():
                for part in msg.walk():
                    ctype = part.get_content_type()
                    cdispo = str(part.get('Content-Disposition'))

                    if ctype == 'text/plain' and 'attachment' not in cdispo:
                        try:
                            body = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='ignore')
                        except (UnicodeDecodeError, LookupError):
                            body = part.get_payload(decode=True).decode('latin-1', errors='ignore')
                        break
            else:
                try:
                    body = msg.get_payload(decode=True).decode(msg.get_content_charset() or 'utf-8', errors='ignore')
                except (UnicodeDecodeError, LookupError):
                    body = msg.get_payload(decode=True).decode('latin-1', errors='ignore')

            emails.append({
                "id": uid.decode(), # Decode UID from bytes to string
                "subject": subject,
                "sender": sender,
                "body": body,
                "raw_message": raw_email
            })
        logger.info(f"Successfully fetched {len(emails)} emails.")

    except imaplib.IMAP4.error as e:
        logger.error(f"IMAP Error: {e}")
    except Exception as e:
        logger.error(f"An unexpected error occurred while fetching emails: {e}")
    finally:
        try:
            if 'mail' in locals() and mail.state == 'SELECTED':
                mail.close()
            if 'mail' in locals() and mail.state == 'AUTH':
                mail.logout()
        except Exception as e:
            logger.warning(f"Error during IMAP logout: {e}")
    return emails


## Core: Email Sending (Adapted to smtplib.SMTP_SSL)
def send_email_smtp(subject: str, body: str, sender: str, recipients: List[str], password: str) -> bool:
    """
    Connects to Gmail's SMTP server using SMTP_SSL and sends an email.
    """
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = sender
    msg['To'] = ', '.join(recipients)

    try:
        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp_server:
            smtp_server.login(sender, password)
            smtp_server.sendmail(sender, recipients, msg.as_string())
        logger.info(f"Email sent from {sender} to {recipients} with subject '{subject}'.")
        return True
    except Exception as e:
        logger.error(f"Failed to send email. Error: {e}")
        logger.error("Please check: 1. Your 'sender' email and 'app_password' are correct. 2. Internet connection.")
        return False

# Re-implement send_email and send_draft_to_gmail to use the new send_email_smtp
def send_email(email_data: dict, your_name: str, sender_email_address: str, sender_app_password: str) -> bool:
    """Wrapper for the core send_email_smtp for the AI agent context."""
    recipient_email_raw = email_data.get("sender", "unknown@example.com")
    match = re.search(r'<(.*?)>', recipient_email_raw)
    recipient_email = match.group(1) if match else recipient_email_raw

    subject = f"Re: {email_data.get('subject', 'No Subject')}"
    body = email_data.get("response", "No response generated.")

    return send_email_smtp(subject, body, sender_email_address, [recipient_email], sender_app_password)


def send_draft_to_gmail(email_data: dict, your_name: str, gmail_address: str, gmail_app_password: str) -> bool:
    """Wrapper for the core send_email_smtp for drafting purposes."""
    logger.warning("Sending draft to Gmail address as a regular email. For true 'Drafts' functionality, Gmail API is generally required.")

    subject = f"DRAFT: Re: {email_data.get('subject', 'No Subject')}"
    body = email_data.get("response", "No response generated.")

    return send_email_smtp(subject, body, gmail_address, [gmail_address], gmail_app_password)


## Agents: Filtering
model_filtering = ChatOpenAI(
    base_url="https://openrouter.ai/api/v1",
    model="deepseek/deepseek-chat",
    temperature=0.0,
    openai_api_key=OPENROUTER_API_KEY,
    max_retries=5
)

def filter_email(email: dict) -> str:
    """
    Classifies an email into Spam, Urgent, Needs Review, or Informational categories.
    """
    subject = email.get("subject", "")
    body = email.get("body", "")

    prompt = ChatPromptTemplate.from_messages([
        ("system", """You are an expert email classifier. Your task is to categorize emails into one of the following:
        - 'spam': Unsolicited commercial email, phishing attempts, or suspicious content.
        - 'urgent': Emails requiring immediate attention or action.
        - 'needs_review': Emails that require human discretion, contain ambiguous information, or might be important but not urgent.
        - 'informational': Emails that are for general information, newsletters, or routine updates, requiring no immediate action.

        Analyze the subject and body of the email to make an accurate classification.
        Output only the category name (e.g., 'spam', 'urgent', 'needs_review', 'informational')."""),
        ("human", "Subject: {subject}\n\nBody: {body}")
    ])

    chain = prompt | model_filtering | StrOutputParser()

    try:
        classification = chain.invoke({"subject": subject, "body": body}).strip().lower()
        logger.info(f"Email ID: {email.get('id', 'unknown')} classified as: {classification}")

        valid_categories = {"spam", "urgent", "needs_review", "informational"}
        if classification not in valid_categories:
            logger.warning(f"Unexpected classification received: {classification}. Defaulting to 'needs_review'.")
            return "needs_review"

        return classification
    except Exception as e:
        logger.error(f"Error classifying email {email.get('id', 'unknown')}: {e}")
        return "needs_review"


## Agents: Summarization
model_summarization = ChatOpenAI(
    base_url="https://openrouter.ai/api/v1",
    model="deepseek/deepseek-chat",
    temperature=0.7,
    openai_api_key=OPENROUTER_API_KEY,
    max_retries=5
)

def summarize_email(email: dict) -> str:
    """
    Generates a brief summary of the email content.
    """
    subject = email.get("subject", "")
    body = email.get("body", "")

    prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a concise email summarizer. Your task is to provide a brief,
        accurate summary of the email's key points. Focus on the main message and any
        actionable items. Keep the summary to 2-3 sentences."""),
        ("human", "Subject: {subject}\n\nBody: {body}")
    ])

    chain = prompt | model_summarization | StrOutputParser()

    try:
        summary = chain.invoke({"subject": subject, "body": body}).strip()
        logger.info(f"Email ID: {email.get('id', 'unknown')} summarized.")
        return summary
    except Exception as e:
        logger.error(f"Error summarizing email {email.get('id', 'unknown')}: {e}")
        return "Could not generate summary."


## Agents: Response Generation
model_response = ChatOpenAI(
    base_url="https://openrouter.ai/api/v1",
    model="deepseek/deepseek-chat",
    temperature=0.8,
    openai_api_key=OPENROUTER_API_KEY,
    max_retries=5
)

def format_email_response_text(original_subject: str, recipient_name: str, response_text: str, your_name: str) -> str:
    """Formats the generated response into a proper email format."""
    return (
        f"Subject: Re: {original_subject}\n\n"
        f"Hi {recipient_name},\n\n"
        f"{response_text}\n\n"
        f"Best regards,\n"
        f"{your_name}"
    )

def generate_response(email: dict, summary: str, recipient_name: str, your_name: str) -> str:
    """
    Generates a suitable email reply based on the email summary and original details.
    """
    subject = email.get("subject", "")
    sender = email.get("sender", "The sender")
    body = email.get("body", "")

    prompt = ChatPromptTemplate.from_messages([
        ("system", """You are an AI email assistant. Your task is to craft a polite, relevant, and concise email reply.
        Consider the email's subject, sender, and the provided summary.

        If the email is 'informational', a brief acknowledgment or "thank you" is usually sufficient.
        If it's 'urgent' or 'needs_review', suggest next steps or ask clarifying questions.

        Aim for a professional and helpful tone. Do not include subject lines or greetings/signatures in your direct response,
        just the body of the reply."""),
        ("human", "Original Subject: {subject}\nFrom: {sender}\nSummary of email: {summary}\n\nGenerate a suitable reply:")
    ])

    chain = prompt | model_response | StrOutputParser()

    try:
        response_content = chain.invoke({
            "subject": subject,
            "sender": sender,
            "summary": summary
        }).strip()

        if "?" in response_content or "I am unsure" in response_content.lower():
            logger.info(f"Response for email ID: {email.get('id', 'unknown')} flagged for human review (uncertainty detected).")
            response_content = "[[REVIEW REQUIRED]] " + response_content

        formatted_response = format_email_response_text(subject, recipient_name, response_content, your_name)
        logger.info(f"Email ID: {email.get('id', 'unknown')} response generated.")
        return formatted_response
    except Exception as e:
        logger.error(f"Error generating response for email {email.get('id', 'unknown')}: {e}")
        return format_email_response_text(subject, recipient_name, "I apologize, but I was unable to generate a response at this time. Please review the original email.", your_name)


## Agents: Human Review
def review_email(email: dict, current_response: str) -> str:
    """
    Simulates a human review process for the generated email response.
    Allows the user to modify the response.
    """
    logger.info(f"Email ID: {email.get('id', 'unknown')} flagged for human review.")
    print("\n--- Human Review Required ---")
    print(f"Original Subject: {email.get('subject', 'N/A')}")
    print(f"Original Sender: {email.get('sender', 'N/A')}")
    print(f"Summary: {email.get('summary', 'N/A')}")
    print("\nAI Generated Response:")
    print(current_response)

    while True:
        action = input("\nDo you want to (e)dit, (a)ccept, or (r)eject this response? (e/a/r): ").strip().lower()
        if action == 'e':
            modified_response = input("Enter your modified response: \n")
            logger.info("Human edited the response.")
            return modified_response
        elif action == 'a':
            logger.info("Human accepted the response as is.")
            return current_response
        elif action == 'r':
            logger.warning("Human rejected the response. No response will be sent.")
            return ""
        else:
            print("Invalid option. Please choose 'e', 'a', or 'r'.")

In [13]:
## Core: Supervisor
def supervisor_langgraph(email: dict, state: EmailState, user_name: str, recipient_name: str) -> EmailState:
    """
    Processes an individual email using a LangGraph workflow.
    Each step (filtering, summarization, response generation) is a node.
    Conditional edges are used to exit early for spam or to continue processing.
    """

    state['current_email'] = email

    def filtering_node(state: EmailState) -> EmailState:
        current_email = state['current_email']
        email_id = current_email.get("id", "unknown")
        logger.info(f'Filtering node started for email ID: {email_id}')

        classification = filter_email(current_email)
        current_email["classification"] = classification

        if 'metadata' not in state:
            state['metadata'] = {}
        state['metadata'][email_id] = classification

        logger.info(f'Email ID: {email_id} classified as: {classification}')
        return state

    def summarization_node(state: EmailState) -> EmailState:
        email = state['current_email']
        email_id = email.get("id", "unknown")
        logger.info(f'Summarization node started for email ID: {email_id}')

        summary = summarize_email(email)
        email["summary"] = summary

        logger.info(f'Email ID: {email_id} summarized.')
        return state

    def response_node(state: EmailState) -> EmailState:
        email = state['current_email']
        email_id = email.get("id", "unknown")
        logger.info(f'Response generation node started for email ID: {email_id}')

        response = generate_response(email, email.get("summary", ""), recipient_name, user_name)

        needs_review_by_ai = "[[REVIEW REQUIRED]]" in response

        if email.get("classification") == "needs_review" or needs_review_by_ai:
            logger.info(f"Email ID: {email_id} flagged for human review by supervisor logic.")
            response = review_email(email, response.replace("[[REVIEW REQUIRED]]", "").strip())

        email["response"] = response

        if 'history' not in state:
            state['history'] = []
        state['history'].append({
            "email_id": email_id,
            "response": response
        })

        logger.info(f'Email ID: {email_id} response processed.')
        return state

    graph_builder = StateGraph(EmailState)

    graph_builder.add_node("filtering", filtering_node)
    graph_builder.add_node("summarization", summarization_node)
    graph_builder.add_node("response", response_node)

    def post_filtering(state: EmailState):
        email = state['current_email']
        if email.get("classification") == "spam":
            logger.info(f"Email ID: {email.get('id', 'unknown')} classified as SPAM. Terminating pipeline.")
            return END
        else:
            logger.info(f"Email ID: {email.get('id', 'unknown')} not spam. Proceeding to summarization.")
            return "summarization"

    graph_builder.add_conditional_edges("filtering", post_filtering)
    graph_builder.add_edge("summarization", "response")
    graph_builder.add_edge("response", END)

    graph_builder.set_entry_point("filtering")

    graph = graph_builder.compile()

    logger.info(f"Invoking LangGraph for email ID: {email.get('id', 'unknown')}")
    final_state = graph.invoke(state)
    logger.info(f"LangGraph processing finished for email ID: {email.get('id', 'unknown')}")
    return final_state

In [14]:
import json # Only needed if loading from JSON for testing

def process_email_action(email_to_process, your_name, sender_email_address, sender_app_password):
    """
    Prompts the user for action (send or draft) and executes it.
    """
    action = input("Do you want to (s)end the email or (d)raft it to Gmail? (s/d): ").strip().lower()
    if action == "s":
        # Pass the actual sender email and app password
        if send_email(email_to_process, your_name, sender_email_address, sender_app_password):
            logger.info("Email sent successfully.")
        else:
            logger.warning("Failed to send email.")
    elif action == "d":
        # For drafting, we assume the draft will be sent to the sender_email_address itself.
        # This acts as a 'sent to self' for review/drafting purposes.
        if send_draft_to_gmail(email_to_process, your_name, sender_email_address, sender_app_password):
            logger.info("Draft sent to Gmail successfully.")
        else:
            logger.warning("Failed to send draft to Gmail.")
    else:
        logger.warning("Invalid option. No action taken.")

def run_email_agent():
    logger.info("Starting AI Email Agent.")

    your_name = input("Please enter your name (for signature): ")
    recipient_name = input("Please enter the recipient's name (e.g., John Doe): ")

    emails = []

    data_source = input("Fetch emails live (l) or load from JSON (j)? (l/j): ").strip().lower()

    if data_source == 'l':
        if not (GMAIL_IMAP_EMAIL and APP_PASSWORD):
            logger.error("IMAP email or app password not configured. Cannot fetch live emails.")
            print("Please ensure GMAIL_EMAIL and app_password are set in your .env file.")
            return
        emails = fetch_imap_emails(GMAIL_IMAP_EMAIL, APP_PASSWORD, num_emails=5)
        logger.debug(f"Fetched {len(emails)} emails from IMAP.")
    elif data_source == 'j':
        json_file = input("Enter path to JSON test data file (e.g., test_emails.json): ")
        try:
            with open(json_file, 'r') as f:
                emails = json.load(f)
            logger.debug(f"Loaded {len(emails)} emails from {json_file}.")
        except FileNotFoundError:
            logger.error(f"Error: JSON file not found at {json_file}")
            return
        except json.JSONDecodeError:
            logger.error(f"Error: Could not decode JSON from {json_file}")
            return
    else:
        logger.warning("Invalid data source option. Exiting.")
        return

    if not emails:
        logger.info("No emails found to process.")
        return

    print("\nSelect an email to process:")
    for idx, email_item in enumerate(emails):
        print(f"{idx + 1}. Subject: {email_item.get('subject', 'No Subject')} | From: {email_item.get('sender', 'Unknown')}")

    try:
        choice = int(input("Enter the number of the email you want to choose: ")) - 1
        if choice < 0 or choice >= len(emails):
            print("Invalid choice. Exiting.")
            return
    except ValueError:
        print("Invalid input. Please enter a number. Exiting.")
        return

    selected_email = emails[choice]

    state = EmailState(
        emails=[selected_email],
        current_email=selected_email,
        history=[],
        metadata={}
    )

    logger.info(f"Processing selected email ID: {selected_email.get('id', 'unknown')}")
    final_state = supervisor_langgraph(selected_email, state, your_name, recipient_name)

    processed_email = final_state['current_email']

    print("\n--- Processing Complete ---")
    print("\nGenerated Response:\n")
    generated_response = processed_email.get("response", "No response generated.")
    print(generated_response)

    if generated_response and "No response gener" not in generated_response:
        print("\n--- Action Options ---")
        # Use the SENDER_EMAIL and APP_PASSWORD from .env directly for sending/drafting
        process_email_action(processed_email, your_name, SENDER_EMAIL, APP_PASSWORD)
    else:
        print("No valid response to send or draft.")

In [15]:
if __name__ == "__main__":
    run_email_agent()

2025-07-24 20:20:14,266 - __main__ - INFO - Starting AI Email Agent.
2025-07-24 20:20:42,153 - __main__ - INFO - Connecting to IMAP server: imap.gmail.com with username: ultopalto0@gmail.com
2025-07-24 20:20:43,216 - __main__ - INFO - IMAP login successful.
2025-07-24 20:20:43,792 - __main__ - INFO - Selected INBOX folder.
2025-07-24 20:20:44,238 - __main__ - DEBUG - Found 3086 messages.
2025-07-24 20:20:47,439 - __main__ - INFO - Successfully fetched 5 emails.
2025-07-24 20:20:48,087 - __main__ - DEBUG - Fetched 5 emails from IMAP.



Select an email to process:
1. Subject: Is 52,000 AED/month sufficient salary for a single man in Dubai? | From: Quora Digest <english-quora-digest@quora.com>
2. Subject: Email Subject | From: ultopalto0@gmail.com
3. Subject: Email Subject | From: ultopalto0@gmail.com
4. Subject: No Subject | From: Unknown Sender
5. Subject: Email Subject | From: ultopalto0@gmail.com


2025-07-24 20:30:31,825 - __main__ - INFO - Processing selected email ID: 3083
2025-07-24 20:30:31,828 - __main__ - INFO - Invoking LangGraph for email ID: 3083
2025-07-24 20:30:31,847 - __main__ - INFO - Filtering node started for email ID: 3083
2025-07-24 20:30:33,668 - __main__ - ERROR - Error classifying email 3083: Error code: 401 - {'error': {'message': 'No auth credentials found', 'code': 401}}
2025-07-24 20:30:33,669 - __main__ - INFO - Email ID: 3083 classified as: needs_review
2025-07-24 20:30:33,669 - __main__ - INFO - Email ID: 3083 not spam. Proceeding to summarization.
2025-07-24 20:30:33,670 - __main__ - INFO - Summarization node started for email ID: 3083
2025-07-24 20:30:34,112 - __main__ - ERROR - Error summarizing email 3083: Error code: 401 - {'error': {'message': 'No auth credentials found', 'code': 401}}
2025-07-24 20:30:34,112 - __main__ - INFO - Email ID: 3083 summarized.
2025-07-24 20:30:34,113 - __main__ - INFO - Response generation node started for email ID: 


--- Human Review Required ---
Original Subject: Email Subject
Original Sender: ultopalto0@gmail.com
Summary: Could not generate summary.

AI Generated Response:
Subject: Re: Email Subject

Hi rakesh,

I apologize, but I was unable to generate a response at this time. Please review the original email.

Best regards,
ujjwal


2025-07-24 20:30:55,811 - __main__ - INFO - Human accepted the response as is.
2025-07-24 20:30:55,812 - __main__ - INFO - Email ID: 3083 response processed.
2025-07-24 20:30:55,812 - __main__ - INFO - LangGraph processing finished for email ID: 3083



--- Processing Complete ---

Generated Response:

Subject: Re: Email Subject

Hi rakesh,

I apologize, but I was unable to generate a response at this time. Please review the original email.

Best regards,
ujjwal

--- Action Options ---


2025-07-24 20:31:12,195 - __main__ - INFO - Email sent from ultopalto0@gmail.com to ['ultopalto0@gmail.com'] with subject 'Re: Email Subject'.
2025-07-24 20:31:12,195 - __main__ - INFO - Email sent successfully.
