# Executive AI Assistant - System Walkthrough
# =========================================

# [MARKDOWN CELL]
# # 1. Setup and Configuration
# -------------------------
# This section sets up our environment and imports necessary libraries.


In [1]:
import os
from pathlib import Path
from datetime import datetime, timedelta
import base64
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import email.utils
import pytz
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build


In [2]:
from dotenv import load_dotenv
load_dotenv()

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY")


In [7]:
# # 2. Gmail Authentication
# ----------------------
# This section handles the Gmail OAuth2 flow and service creation.

# [CODE CELL]
def setup_gmail_credentials():
    """Setup Gmail credentials and return the service object"""
    
    # Define the scopes we need
    SCOPES = [
        "https://www.googleapis.com/auth/gmail.modify",
        "https://www.googleapis.com/auth/calendar",
    ]
    
    # Setup paths
    secrets_dir = Path("eaia/.secrets")
    #secrets_dir.mkdir(exist_ok=True)
    secrets_path = secrets_dir / "secrets.json"
    token_path = secrets_dir / "token.json"
    
    creds = None
    
    # Try to load existing credentials
    if token_path.exists():
        creds = Credentials.from_authorized_user_file(str(token_path))
    
    # If credentials don't exist or are invalid, get new ones
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(str(secrets_path), SCOPES)
            creds = flow.run_local_server(port=54191)
        
        # Save the credentials
        with open(token_path, "w") as token:
            token.write(creds.to_json())
    
    # Build and return the Gmail service
    return build("gmail", "v1", credentials=creds)


In [55]:
# # 3. Email Retrieval
# -----------------
# This section handles fetching and processing emails from Gmail.

# [CODE CELL]
def extract_message_part(msg):
    """Extract the message body from an email"""
    if msg["mimeType"] == "text/plain":
        body_data = msg.get("body", {}).get("data")
        if body_data:
            return base64.urlsafe_b64decode(body_data).decode("utf-8")
    elif msg["mimeType"] == "text/html":
        body_data = msg.get("body", {}).get("data")
        if body_data:
            return base64.urlsafe_b64decode(body_data).decode("utf-8")
    if "parts" in msg:
        for part in msg["parts"]:
            body = extract_message_part(part)
            if body:
                return body
    return "No message body available."



In [56]:
def fetch_emails(service, email_address, minutes_since=240):
    """Fetch emails from Gmail with enhanced error handling
    
    Args:
        service: Gmail API service instance
        email_address: Email address to fetch emails for
        minutes_since: How many minutes back to fetch emails (default 240)
        
    Yields:
        dict: Email data including id, thread_id, subject, from_email, to_email, page_content, send_time
        
    Raises:
        ValueError: If email_address is invalid
        googleapiclient.errors.HttpError: For Gmail API errors
        Exception: For unexpected errors
    """
    if not email_address or '@' not in email_address:
        raise ValueError("Invalid email address provided")
        
    try:
        print("fetching emails")
        
        # Calculate the timestamp for filtering
        after = int((datetime.now() - timedelta(minutes=minutes_since)).timestamp())
        
        # Build the query
        query = f"(to:{email_address} OR from:{email_address}) after:{after}"
        print(f"Query: {query}")
        
        # Fetch messages
        messages = []
        next_page_token = None
        
        while True:
            try:
                results = service.users().messages().list(
                    userId="me",
                    q=query,
                    pageToken=next_page_token
                ).execute()
                
                if "messages" in results:
                    messages.extend(results["messages"])
                else:
                    print("No messages found matching the query")
                    break
                    
                next_page_token = results.get("nextPageToken")
                if not next_page_token:
                    break
                    
            except Exception as e:
                print(f"Error fetching message list: {str(e)}")
                raise
                
        # Process each message
        for message in messages:
            try:
                print(f"Processing message ID: {message['id']}")
                msg = service.users().messages().get(
                    userId="me",
                    id=message["id"]
                ).execute()
                
                thread_id = msg["threadId"]
                payload = msg["payload"]
                headers = payload.get("headers", [])
                
                # Extract email details with error handling
                try:
                    subject = next(
                        (header["value"] for header in headers if header["name"] == "Subject"),
                        "No Subject"
                    )
                except Exception as e:
                    print(f"Error extracting subject: {str(e)}")
                    subject = "No Subject"
                    
                try:
                    from_email = next(
                        (header["value"] for header in headers if header["name"] == "From"),
                        ""
                    ).strip()
                except Exception as e:
                    print(f"Error extracting from_email: {str(e)}")
                    from_email = ""
                    
                try:
                    to_email = next(
                        (header["value"] for header in headers if header["name"] == "To"),
                        ""
                    ).strip()
                except Exception as e:
                    print(f"Error extracting to_email: {str(e)}")
                    to_email = ""
                    
                # Get the message body with error handling
                try:
                    body = extract_message_part(payload)
                except Exception as e:
                    print(f"Error extracting message body: {str(e)}")
                    body = "Error extracting message body"
                
                print(f"Successfully processed email: {subject}")
                
                # Yield the email data
                yield {
                    "id": message["id"],
                    "thread_id": thread_id,
                    "subject": subject,
                    "from_email": from_email,
                    "to_email": to_email,
                    "page_content": body,
                    "send_time": datetime.now().isoformat()
                }
                
            except Exception as e:
                print(f"Error processing message {message.get('id', 'unknown')}: {str(e)}")
                continue  # Skip this message and continue with the next one
                
    except Exception as e:
        print(f"Fatal error in fetch_emails: {str(e)}")
        raise  # Re-raise the exception for the caller to handle

In [19]:
# # 4. Graph Creation and Processing
# ------------------------------
# This section creates the workflow graph for processing emails.

# [CODE CELL]
from langgraph.graph import StateGraph
from typing import TypedDict, Annotated
from typing_extensions import TypedDict

# Define our state
class State(TypedDict):
    email: dict
    response: str | None
    next: str

# [CODE CELL]
# Define our nodes
def triage_email(state: State) -> State:
    """Triage the email to determine next action"""
    print("triage")
    # In a real implementation, this would use an LLM to analyze the email
    # and determine if it needs immediate attention, can be handled by the AI,
    # or should be ignored
    return state

def draft_response(state: State) -> State:
    """Draft a response to the email"""
    print("draft")
    # In a real implementation, this would use an LLM to generate a response
    return state

def send_email(state: State) -> State:
    """Send the drafted response"""
    print("send")
    # In a real implementation, this would use the Gmail API to send the email
    return state

# [CODE CELL]
# Create the graph
def create_graph():
    workflow = StateGraph(State)
    
    # Add nodes
    workflow.add_node("triage", triage_email)
    workflow.add_node("draft", draft_response)
    workflow.add_node("send", send_email)
    
    # Add edges
    workflow.add_edge("triage", "draft")
    workflow.add_edge("draft", "send")
    
    # Set entry point
    workflow.set_entry_point("triage")
    
    return workflow.compile()


In [8]:
# Setup Gmail service
service = setup_gmail_credentials()


Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=583696102042-si5jc6463o7h7ul03a3p4r4ljejnh9qu.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A54191%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fgmail.modify+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcalendar&state=cDjcPkHnO9JgI1XzGAu2LB87tXtTCD&access_type=offline


In [9]:


# Create the graph
graph = create_graph()

In [71]:
graph.ainvoke({"email": email})

<coroutine object Pregel.ainvoke at 0x107627610>

In [57]:
email_address = "daniel.mitchell80@gmail.com"
emails = fetch_emails(service, email_address)

In [65]:

for email in fetch_emails(service, email_address,600): 
    print("email")

fetching emails
Query: (to:daniel.mitchell80@gmail.com OR from:daniel.mitchell80@gmail.com) after:1749844048
Processing message ID: 1976bf479d2664fc
Successfully processed email: GoHenry payment received
email
Processing message ID: 1976ae1ab31ad851
Successfully processed email: Blood sugar 101
email


In [67]:
email


{'id': '1976bf479d2664fc',
 'thread_id': '1976bf479d2664fc',
 'subject': 'GoHenry payment received',
 'from_email': '"GoHenry <gohenry >" <help@gohenry.co.uk>',
 'to_email': 'Daniel Mitchell <daniel.mitchell80@gmail.com>',
 'page_content': "gohenry ( https://u12691297.ct.sendgrid.net/ls/click?upn=u001.kDtu8mhMMXNPo7lwWXePycCdJSy4-2Fe-2F7jU36tGcgLaQ-3Df49q_E3jX7UdwUvWW16GmiaKN7LXL8DKvWmCEscS6-2Bk2HuksqVhlP-2Ba4MRMviPAkRZCh2q5NfQQ36G4URzb8mLlG3jc-2BFQMM-2BmHeQDIwCI8yVQzMt-2Bonoe9Cpf-2F8Flsjd1TO6L9c7VXT9iQVUXGAO8-2Fwaiav-2FFB01DW-2BLnZJcYPGClqjRI-2F-2B9ynmxyzPY-2FEhnjF-2F2NqG3j-2BrRASwHXNj7sFK11mK9XdeP-2BwZhqrxIWdsvXWGt8K2O9RIwByKJv9q9rtUdeU-2FWPFlYupcJe1DKJBman0HwebExqj64zcwvB-2FnXeRvsPeptfgCHXl1E2QfD5Gr3-2FTWTrxFaJFk-2FxwV-2F2G4PksGigv8UcnKjuZ5nj-2B5IgPeiG8qmpqG3-2FCbvkhPHmad6 )\r\n\r\nHi Daniel,\r\n\r\nWe'd like to confirm receipt of the following funds to your GoHenry parent account:\r\n\r\nAmount Paid: £23.98\r\nService charge: £0.50\r\nTotal: £24.48\r\n\r\nYou can view this transact

In [72]:
graph.invoke({"email": email})

{'email': {'id': '1976bf479d2664fc',
  'thread_id': '1976bf479d2664fc',
  'subject': 'GoHenry payment received',
  'from_email': '"GoHenry <gohenry >" <help@gohenry.co.uk>',
  'to_email': 'Daniel Mitchell <daniel.mitchell80@gmail.com>',
  'page_content': "gohenry ( https://u12691297.ct.sendgrid.net/ls/click?upn=u001.kDtu8mhMMXNPo7lwWXePycCdJSy4-2Fe-2F7jU36tGcgLaQ-3Df49q_E3jX7UdwUvWW16GmiaKN7LXL8DKvWmCEscS6-2Bk2HuksqVhlP-2Ba4MRMviPAkRZCh2q5NfQQ36G4URzb8mLlG3jc-2BFQMM-2BmHeQDIwCI8yVQzMt-2Bonoe9Cpf-2F8Flsjd1TO6L9c7VXT9iQVUXGAO8-2Fwaiav-2FFB01DW-2BLnZJcYPGClqjRI-2F-2B9ynmxyzPY-2FEhnjF-2F2NqG3j-2BrRASwHXNj7sFK11mK9XdeP-2BwZhqrxIWdsvXWGt8K2O9RIwByKJv9q9rtUdeU-2FWPFlYupcJe1DKJBman0HwebExqj64zcwvB-2FnXeRvsPeptfgCHXl1E2QfD5Gr3-2FTWTrxFaJFk-2FxwV-2F2G4PksGigv8UcnKjuZ5nj-2B5IgPeiG8qmpqG3-2FCbvkhPHmad6 )\r\n\r\nHi Daniel,\r\n\r\nWe'd like to confirm receipt of the following funds to your GoHenry parent account:\r\n\r\nAmount Paid: £23.98\r\nService charge: £0.50\r\nTotal: £24.48\r\n\r\nYou can vie

In [66]:
# # 5. Running the System
# --------------------
# This section ties everything together and runs the complete system.

# [CODE CELL]

# Fetch and process emails
email_address = "daniel.mitchell80@gmail.com"
for email in fetch_emails(service, email_address,600):
    # Process the email through our graph
    print("got email")
    print(email["subject"])
    result = await graph.ainvoke({"email": email})
    
    # Handle the result
    if result["next"] == "send":
        # Send the email
        pass



fetching emails
Query: (to:daniel.mitchell80@gmail.com OR from:daniel.mitchell80@gmail.com) after:1749844227
Processing message ID: 1976bf479d2664fc
Successfully processed email: GoHenry payment received
got email
GoHenry payment received


KeyError: 'next'

In [23]:
minutes_since = 240   
# Calculate the timestamp for filtering
after = int((datetime.now() - timedelta(minutes=minutes_since)).timestamp())

# Build the query
query = f"(to:{email_address} OR from:{email_address}) after:{after}"
query 


'(to:daniel.mitchell80@gmail.com OR from:daniel.mitchell80@gmail.com) after:1749862380'

In [24]:


# Fetch messages
messages = []
next_page_token = None

while True:
    results = service.users().messages().list(
        userId="me",
        q=query,
        pageToken=next_page_token
    ).execute()
    
    if "messages" in results:
        messages.extend(results["messages"])
    
    next_page_token = results.get("nextPageToken")
    if not next_page_token:
        break

In [33]:
message = messages[0]

msg = service.users().messages().get(
    userId="me",
    id=message["id"]
).execute()

In [36]:

thread_id = msg["threadId"]
payload = msg["payload"]
headers = payload.get("headers", [])

# Extract email details
subject = next(
    (header["value"] for header in headers if header["name"] == "Subject"),
    "No Subject"
)
from_email = next(
    (header["value"] for header in headers if header["name"] == "From"),
    ""
).strip()
to_email = next(
    (header["value"] for header in headers if header["name"] == "To"),
    ""
).strip()

# Get the message body
body = extract_message_part(payload)


