<a href="https://colab.research.google.com/github/upashanadutta23/gmail-triage-bot/blob/main/gmail_triage_bot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install --upgrade google-auth-oauthlib



In [2]:
#IMPORTS
import os
import json
import base64
import datetime
import pandas as pd
from bs4 import BeautifulSoup

import google.generativeai as genai
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from IPython.display import display


#1. USER CONFIGURATION
GEMINI_API_KEY = "AIzaSyBgVEFchGk89wsj9agh6P9yv9kCZYEAPmo"
GOOGLE_SHEET_ID = "1cKUCuYb9VY6NS8kvAHx_Cf_PHURWgfVthS8Ev95waVQ"

#2. AUTHENTICATION MODULE

SCOPES = [
    "https://www.googleapis.com/auth/gmail.readonly",
    "https://www.googleapis.com/auth/gmail.modify",
    "https://www.googleapis.com/auth/tasks",
    "https://www.googleapis.com/auth/calendar",
    "https://www.googleapis.com/auth/spreadsheets",
]
CREDENTIALS_PATH = 'credentials.json'
TOKEN_PATH = 'token.json'

def authenticate():
    """Authenticates using the console flow, requesting all scopes."""
    creds = None
    if os.path.exists(TOKEN_PATH):
        creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES)

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            try:
                print("Refreshing access token...")
                creds.refresh(Request())
            except Exception as e:
                print(f"Token refresh failed: {e}. Deleting old token and re-authenticating.")
                os.remove(TOKEN_PATH)
                creds = None

    if not creds or not creds.valid:
        if not os.path.exists(CREDENTIALS_PATH):
            print(f"FATAL: '{CREDENTIALS_PATH}' not found. Please upload it to the Colab session.")
            return None
        print("Performing new user authentication...")
        flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
        # creds = flow.run_console()
        # creds = flow.run_local_server(
        #     port=0,  # picks a free port
        #     authorization_prompt_message="Please visit this URL to authorize: {url}",
        #     success_message="Authentication complete. You may close this tab.",
        #     open_browser=True,
        #     )
        creds = Credentials.from_authorized_user_file("token.json", SCOPES)


    # with open(TOKEN_PATH, "w") as token:
    #     token.write(creds.to_json())
    print("Authentication successful.")
    return creds

#3. GMAIL CLIENT MODULE
def get_unread_emails(creds, max_results=50):
    """Fetches and parses a specified number of unread emails."""
    try:
        service = build("gmail", "v1", credentials=creds)
        print(f"\nFetching up to {max_results} unread emails...")
        results = service.users().messages().list(userId="me", q="is:unread", maxResults=max_results).execute()
        messages = results.get("messages", [])

        if not messages:
            print(" No unread messages found.")
            return []

        emails = []
        for msg in messages:
            msg_data = service.users().messages().get(userId="me", id=msg["id"], format="full").execute()
            payload = msg_data.get("payload", {})
            headers = payload.get("headers", [])

            email_data = {
                "id": msg_data.get("id"),
                "subject": next((h["value"] for h in headers if h["name"] == "Subject"), "No Subject"),
                "sender": next((h["value"] for h in headers if h["name"] == "From"), "Unknown Sender"),
                "body": ""
            }

            if "parts" in payload:
                for part in payload["parts"]:
                    if part["mimeType"] == "text/plain":
                        encoded_body = part.get("body", {}).get("data", "")
                        if encoded_body: email_data["body"] = base64.urlsafe_b64decode(encoded_body).decode("utf-8"); break
                    elif part["mimeType"] == "text/html":
                        encoded_body = part.get("body", {}).get("data", "")
                        if encoded_body: html_content = base64.urlsafe_b64decode(encoded_body).decode("utf-8"); soup = BeautifulSoup(html_content, "html.parser"); email_data["body"] = soup.get_text(separator='\n', strip=True)
            else:
                encoded_body = payload.get("body", {}).get("data", "")
                if encoded_body: email_data["body"] = base64.urlsafe_b64decode(encoded_body).decode("utf-8")

            emails.append(email_data)

        print(f"Successfully fetched {len(emails)} emails.")
        return emails
    except HttpError as error:
        print(f"An error occurred with the Gmail API: {error}")
        return []

def mark_email_as_read(creds, msg_id):
    """Marks an email as read by removing the UNREAD label."""
    try:
        service = build("gmail", "v1", credentials=creds)
        service.users().messages().modify(userId='me', id=msg_id, body={'removeLabelIds': ['UNREAD']}).execute()
        print(f" Marked email {msg_id[:10]}... as read.")
    except Exception as e:
        print(f"Error marking email as read: {e}")

#4. COGNITIVE & WORKSPACE MODULE
def configure_gemini(api_key):
    """Configures the Gemini API and returns the model."""
    try:
        genai.configure(api_key=api_key)
        model = genai.GenerativeModel('models/gemini-pro-latest')
        print("Gemini API configured successfully.")
        return model
    except Exception as e:
        print(f"Error configuring Gemini: {e}")
        return None

def get_email_intent(model, email_body):
    """Uses Gemini to classify the intent of an email."""
    prompt = f"""Analyze the email and classify its intent. Choose one: task_delegation, scheduling_request, receipt_invoice, information_query, no_action_needed. Email: --- {email_body[:2000]} --- Return only the intent name."""
    try:
        response = model.generate_content(prompt)
        return response.text.strip()
    except Exception as e:
        print(f"Error classifying intent: {e}")
        return "error"

def extract_information(model, email_body, intent):
    """Uses Gemini to extract structured information from an email."""
    prompt_template = "Extract {details} from the email. Return a clean JSON object with the specified keys. Email: --- {email} ---"
    details_map = {
        "task_delegation": 'task details with keys: "task_description", "due_date" (YYYY-MM-DD or null), "priority" ("High", "Medium", "Low")',
        "scheduling_request": 'event details with keys: "event_title", "attendees" (list of emails), "proposed_time" (YYYY-MM-DDTHH:MM:SS or null)',
        "receipt_invoice": 'expense details with keys: "vendor_name", "total_amount" (float), "purchase_date" (YYYY-MM-DD)'
    }
    if intent not in details_map: return None

    prompt = prompt_template.format(details=details_map[intent], email=email_body[:4000])
    try:
        response = model.generate_content(prompt)
        cleaned_response = response.text.strip().replace("```json", "").replace("```", "")
        return json.loads(cleaned_response)
    except Exception as e:
        print(f"Error extracting information: {e}")
        return None

def create_google_task(creds, details):
    service = build("tasks", "v1", credentials=creds)
    if not details.get("task_description"): return
    task = {'title': details["task_description"]}
    if details.get("due_date"): task['due'] = f'{details["due_date"]}T00:00:00.000Z'
    try:
        res = service.tasks().insert(tasklist='@default', body=task).execute()
        print(f" Successfully created task: '{res['title']}'")
    except HttpError as e: print(f" Error creating Google Task: {e}")

def create_calendar_event(creds, details):
    service = build("calendar", "v3", credentials=creds)
    if not all(k in details for k in ["event_title", "proposed_time"]): return
    start = datetime.datetime.fromisoformat(details["proposed_time"])
    end = start + datetime.timedelta(hours=1)
    event = {'summary': details["event_title"],'start': {'dateTime': start.isoformat()},'end': {'dateTime': end.isoformat()},'attendees': [{'email': e} for e in details.get("attendees", [])]}
    try:
        res = service.events().insert(calendarId='primary', body=event).execute()
        print(f"Successfully created event: '{res['summary']}'")
    except HttpError as e: print(f"Error creating Google Calendar event: {e}")

def log_expense_to_sheet(creds, sheet_id, details):
    service = build("sheets", "v4", credentials=creds)
    if not all(k in details for k in ["purchase_date", "vendor_name", "total_amount"]): return
    values = [[details["purchase_date"], details["vendor_name"], details["total_amount"]]]
    try:
        service.spreadsheets().values().append(spreadsheetId=sheet_id, range="Sheet1!A1", valueInputOption="USER_ENTERED", body={'values': values}).execute()
        print(f"Successfully logged expense for '{details['vendor_name']}'")
    except HttpError as e: print(f" Error logging expense: {e}")

# 5. MAIN PIPELINE EXECUTION
def run_pipeline():
    creds = authenticate()
    if not creds: return

    gemini_model = configure_gemini(GEMINI_API_KEY)
    if not gemini_model: return

    emails = get_unread_emails(creds, max_results=50)
    if not emails: return

    results = []
    print("\n--- Starting Email Processing Pipeline ---")
    for email in emails:
        print(f"\nProcessing: '{email['subject']}' from {email['sender']}")
        if not email["body"]: continue

        intent = get_email_intent(gemini_model, email["body"])
        print(f"  - Intent: {intent}")

        action_taken, details = "None", {}
        if intent and intent not in ["no_action_needed", "information_query", "error"]:
            extracted_data = extract_information(gemini_model, email["body"], intent)
            print(f"  - Extracted: {extracted_data}")
            if extracted_data:
                details = extracted_data
                action_map = {
                    "task_delegation": ("Created Google Task", create_google_task),
                    "scheduling_request": ("Created Calendar Event", create_calendar_event),
                    "receipt_invoice": ("Logged Expense to Sheet", log_expense_to_sheet)
                }
                if intent in action_map:
                    action_taken, action_func = action_map[intent]
                    if intent == "receipt_invoice":
                        action_func(creds, GOOGLE_SHEET_ID, extracted_data)
                    else:
                        action_func(creds, extracted_data)

        mark_email_as_read(creds, email["id"])
        results.append({'subject': email['subject'], 'intent': intent, 'action': action_taken, 'details': json.dumps(details)})

    print("\nPipeline Summary")
    if results:
        df = pd.DataFrame(results)
        display(df)

# --- Run the agent ---
run_pipeline()

Authentication successful.
Gemini API configured successfully.

Fetching up to 50 unread emails...
Successfully fetched 50 emails.

--- Starting Email Processing Pipeline ---

Processing: 'Diwali Dazzle: TOP Brands, LUXE Offers! 🌟' from Shoppers Stop <shopping@mailer.shoppersstop.com>
  - Intent: no_action_needed
 Marked email 192b2cb211... as read.

Processing: 'Don't be obsolete in the AI age - Upgrade now' from Sakshi Raheja <sakshi.raheja@newsletters.analyticsvidhya.com>
  - Intent: no_action_needed
 Marked email 192ae55bf7... as read.

Processing: '😊👋 Say 🅷🅴🅻🅻🅾 to our new labels!' from Shoppers Stop <shopping@mailer.shoppersstop.com>
  - Intent: no_action_needed
 Marked email 192acf1502... as read.

Processing: 'Diwali Dhamaka Deals 🎉💥🛍' from Shoppers Stop <shopping@mailer.shoppersstop.com>
  - Intent: no_action_needed
 Marked email 192a2a5841... as read.

Processing: 'Fridays are for BIG savings 🤑' from Shoppers Stop <shopping@mailer.shoppersstop.com>
  - Intent: no_action_needed



  - Extracted: {'task_description': 'Provide feedback and suggestions by taking the survey to help improve the UrbanPro platform.', 'due_date': None, 'priority': 'Medium'}
 Error creating Google Task: <HttpError 403 when requesting https://tasks.googleapis.com/tasks/v1/lists/%40default/tasks?alt=json returned "Google Tasks API has not been used in project 1062956414385 before or it is disabled. Enable it by visiting https://console.developers.google.com/apis/api/tasks.googleapis.com/overview?project=1062956414385 then retry. If you enabled this API recently, wait a few minutes for the action to propagate to our systems and retry.". Details: "[{'message': 'Google Tasks API has not been used in project 1062956414385 before or it is disabled. Enable it by visiting https://console.developers.google.com/apis/api/tasks.googleapis.com/overview?project=1062956414385 then retry. If you enabled this API recently, wait a few minutes for the action to propagate to our systems and retry.', 'domain'

Unnamed: 0,subject,intent,action,details
0,"Diwali Dazzle: TOP Brands, LUXE Offers! 🌟",no_action_needed,,{}
1,Don't be obsolete in the AI age - Upgrade now,no_action_needed,,{}
2,😊👋 Say 🅷🅴🅻🅻🅾 to our new labels!,no_action_needed,,{}
3,Diwali Dhamaka Deals 🎉💥🛍,no_action_needed,,{}
4,Fridays are for BIG savings 🤑,no_action_needed,,{}
5,Curious about how AI can boost your career?,no_action_needed,,{}
6,📢 Big News! Exclusive Deals on Premium Brands! ✨,no_action_needed,,{}
7,Exclusivity Meets Affordability! ✨💎,no_action_needed,,{}
8,Sunburn Goa's Lineup Just Went NEXT LEVEL! 🚀,no_action_needed,,{}
9,Still Searching for Diwali Decor? 🤔🏠,no_action_needed,,{}


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
