# Jupyter Notebook for Testing Email Analyzer Agent

This notebook allows for manual testing of the Email Analyzer agent (`classifier.py`). It performs the following steps:
1.  Installs and imports necessary libraries.
2.  Loads configuration and input data (products and emails) from the Google Spreadsheet provided in the assignment.
3.  Initializes the Email Analyzer agent.
4.  Processes each email using the agent to get its classification.
5.  Formats the classification results.
6.  Saves the results to a new Google Spreadsheet in the 'email-classification' sheet.

---

## 1. Setup and Installations

First, let's install the required Python packages.

In [None]:
%pip install pandas openai gspread gspread-dataframe google-auth google-auth-oauthlib google-auth-httplib2 pydantic langchain langchain-openai python-dotenv nest-asyncio

In [None]:
# Standard library imports
import asyncio
import os
import sys
from typing import Dict, List, Any, Optional

# Third-party imports
import pandas as pd
import gspread
from gspread_dataframe import set_with_dataframe
from google.colab import auth # For Colab authentication
from google.auth import default # For Colab authentication
from IPython.display import display
import nest_asyncio

# Apply nest_asyncio to allow running asyncio event loop in Jupyter
nest_asyncio.apply()

# Add src directory to Python path to import custom modules
# This assumes the notebook is in 'notebooks/' and 'src/' is at the root of the workspace
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

# Project-specific imports
from src.config import HermesConfig
from src.agents.classifier import analyze_email_node, EmailAnalysis
from src.state import HermesState

print("Setup complete. Libraries and paths configured.")

## 2. Configuration

Define constants for spreadsheet IDs, names, and initialize `HermesConfig`.
Make sure you have a `.env` file in the root of your workspace with your `OPENAI_API_KEY` and `OPENAI_BASE_URL` if you are using the Crossover-provided key, or your `GEMINI_API_KEY` if using Gemini.

In [None]:
# Load environment variables from .env file
from dotenv import load_dotenv
load_dotenv(dotenv_path=os.path.join(module_path, '.env'), override=True)

# Configuration for input and output
INPUT_SPREADSHEET_ID = '14fKHsblfqZfWj3iAaM2oA51TlYfQlFT4WKo52fVaQ9U' # From assignment
OUTPUT_SPREADSHEET_NAME = 'Hermes - Email Analyzer Test Output' # Name for the new spreadsheet

# Initialize HermesConfig
# Update llm_model_name, llm_api_key, llm_base_url as needed, or ensure they are in your .env
hermes_config = HermesConfig(
    llm_model_name=os.getenv("OPENAI_MODEL_NAME", "gpt-4o"), # or your preferred model
    llm_api_key=os.getenv("OPENAI_API_KEY"), # Loaded from .env
    llm_base_url=os.getenv("OPENAI_BASE_URL") # Loaded from .env for Crossover key
)

print(f"Using LLM Model: {hermes_config.llm_model_name}")
if hermes_config.llm_base_url:
    print(f"Using LLM Base URL: {hermes_config.llm_base_url}")
print(f"Input Spreadsheet ID: {INPUT_SPREADSHEET_ID}")
print(f"Output Spreadsheet Name: {OUTPUT_SPREADSHEET_NAME}")

## 3. Helper Function to Read Google Sheets
This function reads data from a specified Google Sheet.

In [None]:
def read_data_from_gsheet(document_id: str, sheet_name: str) -> pd.DataFrame:
    """Reads a sheet from a Google Spreadsheet into a pandas DataFrame."""
    export_link = f"https://docs.google.com/spreadsheets/d/{document_id}/gviz/tq?tqx=out:csv&sheet={sheet_name}"
    try:
        df = pd.read_csv(export_link)
        # Standardize column names: lowercase and replace spaces with underscores
        df.columns = [str(col).lower().replace(' ', '_') for col in df.columns]
        print(f"Successfully read {len(df)} rows from sheet: {sheet_name}")
        return df
    except Exception as e:
        print(f"Error reading Google Sheet {sheet_name} from document {document_id}: {e}")
        if sheet_name == 'products':
            return pd.DataFrame(columns=['product_id', 'name', 'category', 'stock_amount', 'description', 'price', 'season'])
        elif sheet_name == 'emails':
            return pd.DataFrame(columns=['email_id', 'subject', 'body'])
        return pd.DataFrame()

## 4. Load Input Data
Load the product catalog and emails from the input Google Spreadsheet.

In [None]:
# Load Product Catalog
print(f"Loading product catalog from Google Sheet ID: {INPUT_SPREADSHEET_ID}, sheet: products")
product_catalog_df = read_data_from_gsheet(INPUT_SPREADSHEET_ID, 'products')
if not product_catalog_df.empty:
    display(product_catalog_df.head(3))
else:
    print("Failed to load product catalog. Using an empty DataFrame.")

# Load Emails Data
print(f"
Loading emails from Google Sheet ID: {INPUT_SPREADSHEET_ID}, sheet: emails")
emails_df = read_data_from_gsheet(INPUT_SPREADSHEET_ID, 'emails')
if not emails_df.empty:
    # Ensure required columns exist
    if 'email_id' not in emails_df.columns:
        print("Error: 'email_id' column missing in emails sheet. Please check the input spreadsheet.")
        emails_df['email_id'] = None # Add dummy column to prevent errors later, though results will be affected
    if 'subject' not in emails_df.columns:
        emails_df['subject'] = "" # Add empty subject if missing
    if 'body' not in emails_df.columns:
        emails_df['body'] = "" # Add empty body if missing
        
    display(emails_df.head(3))
else:
    print("Failed to load emails. Using an empty list.")

# Prepare emails list for processing
sample_emails_list = []
if not emails_df.empty and 'email_id' in emails_df.columns:
    # Use standardized column names from read_data_from_gsheet
    column_mapping = {
        'email_id': 'id',
        'subject': 'subject',
        'body': 'body'
    }
    emails_df_renamed = emails_df[list(column_mapping.keys())].rename(columns=column_mapping)
    sample_emails_list = emails_df_renamed.to_dict(orient='records')
    print(f"
Prepared {len(sample_emails_list)} emails for processing.")

## 5. Process Emails with Analyzer Agent

Iterate through the loaded emails, create an initial state for each, and call the `analyze_email_node`.

In [None]:
async def run_email_analysis(emails_to_process: List[Dict[str, str]], 
                             p_catalog_df: pd.DataFrame, 
                             config_obj: HermesConfig) -> List[Dict[str, Any]]:
    """
    Analyzes a list of emails using the analyze_email_node.
    """
    results = []
    
    # The analyze_email_node needs a 'config' argument that's typically a RunnableConfig.
    # We pass HermesConfig within the 'configurable' field as expected by agents.
    runnable_config_for_node = {
        "configurable": {
            "hermes_config": config_obj
        }
    }

    for i, email_data in enumerate(emails_to_process):
        print(f"Processing email {i+1}/{len(emails_to_process)}: ID {email_data.get('id', 'N/A')}")
        
        # Create initial state for the email analyzer node
        # For this specific test, vector_store can be None as classifier primarily works on text.
        # product_catalog_df might be useful if the analyzer has logic tied to it, but not strictly for classification.
        initial_state_dict = HermesState(
            email_id=str(email_data.get('id', f"unknown_email_{i}")), # Ensure email_id is a string
            email_subject=email_data.get('subject', ''),
            email_body=email_data.get('body', ''),
            product_catalog_df=p_catalog_df, # Pass product catalog
            vector_store=None # Not strictly needed for classifier alone
        ).__dict__ # LangGraph nodes expect dictionaries
        
        try:
            # Invoke the email analyzer node
            analysis_output = await analyze_email_node(initial_state_dict, config=runnable_config_for_node)
            
            # The output is a dictionary, with 'email_analysis' key holding the EmailAnalysis model's dict form
            if 'email_analysis' in analysis_output and analysis_output['email_analysis']:
                # Reconstruct the Pydantic model for easier access and validation (optional but good practice)
                email_analysis_result = EmailAnalysis(**analysis_output['email_analysis'])
                results.append({
                    "email_id": email_data.get('id'),
                    "classification": email_analysis_result.classification.value, # Get enum's value
                    "classification_confidence": email_analysis_result.classification_confidence,
                    "classification_evidence": email_analysis_result.classification_evidence,
                    "reasoning": email_analysis_result.reasoning,
                    "raw_analysis": email_analysis_result.model_dump() # Store full analysis too
                })
                print(f"  -> Classified as: {email_analysis_result.classification.value}")
            else:
                print(f"  -> Error: 'email_analysis' not found in node output or is empty.")
                results.append({
                    "email_id": email_data.get('id'),
                    "classification": "error_no_analysis",
                    "reasoning": "No analysis returned by the agent node.",
                    "raw_analysis": None
                })
        except Exception as e:
            print(f"  -> Error processing email ID {email_data.get('id', 'N/A')}: {e}")
            results.append({
                "email_id": email_data.get('id'),
                "classification": "error_exception",
                "reasoning": str(e),
                "raw_analysis": None
            })
            # Optionally, re-raise if you want to stop on first error
            # raise e 
            
    return results

# Run the analysis (this will execute the async function)
if sample_emails_list: # Only run if emails were loaded
    print(f"
Starting email analysis for {len(sample_emails_list)} emails...")
    processed_email_results = asyncio.run(
        run_email_analysis(
            emails_to_process=sample_emails_list,
            p_catalog_df=product_catalog_df, # Pass the loaded product catalog
            config_obj=hermes_config
        )
    )
    print("
Email analysis complete.")
    
    # Display first few results
    if processed_email_results:
        print("
Sample of Processed Results:")
        for res in processed_email_results[:3]:
            print(f"Email ID: {res['email_id']}, Classification: {res['classification']}, Confidence: {res.get('classification_confidence', 'N/A')}")
else:
    print("No emails to process.")
    processed_email_results = []

## 6. Prepare Output Data for Google Sheets

Format the results into a DataFrame matching the 'email-classification' sheet structure required by the assignment.

In [None]:
email_classification_data = []
if processed_email_results:
    for result in processed_email_results:
        email_classification_data.append({
            "email ID": result.get("email_id"),
            "category": result.get("classification", "unknown") # Ensure 'category' is the assignment col name
        })

email_classification_df = pd.DataFrame(email_classification_data)

if not email_classification_df.empty:
    print("
Email Classification DataFrame for Output:")
    display(email_classification_df.head())
else:
    print("No classification results to output.")

## 7. Write Results to Google Sheets

This section authenticates with Google, creates a new spreadsheet (or opens an existing one), and writes the `email_classification_df` to the 'email-classification' sheet. 
**Note:** This part requires being in a Google Colab environment or having local Google Cloud SDK authentication set up.

In [None]:
def authenticate_and_get_gspread_client():
    """Authenticates and returns a gspread client. Works best in Colab."""
    try:
        auth.authenticate_user() # Colab specific authentication
        creds, _ = default()
        gc = gspread.authorize(creds)
        print("Successfully authenticated with Google Sheets.")
        return gc
    except Exception as e:
        print(f"Google Sheets authentication failed: {e}")
        print("Please ensure you are running this in Google Colab or have local authentication configured.")
        return None

def write_df_to_gsheet(gc: gspread.Client, spreadsheet_name: str, worksheet_name: str, df: pd.DataFrame, headers: List[str]):
    """Writes a DataFrame to a specified worksheet in a Google Spreadsheet."""
    if df.empty:
        print(f"DataFrame for {worksheet_name} is empty. Nothing to write.")
        return
    try:
        # Try to open the spreadsheet, create if not found
        try:
            spreadsheet = gc.open(spreadsheet_name)
            print(f"Opened existing spreadsheet: '{spreadsheet_name}'")
        except gspread.exceptions.SpreadsheetNotFound:
            print(f"Spreadsheet '{spreadsheet_name}' not found. Creating new one...")
            spreadsheet = gc.create(spreadsheet_name)
            print(f"Created new spreadsheet: '{spreadsheet_name}'")
            # Share it so the user can see it easily
            spreadsheet.share('', perm_type='anyone', role='reader') # Share publicly for easy access
            print(f"Publicly shared link: https://docs.google.com/spreadsheets/d/{spreadsheet.id}")
            
        # Try to open the worksheet, create if not found
        try:
            worksheet = spreadsheet.worksheet(worksheet_name)
            print(f"Opened existing worksheet: '{worksheet_name}'")
            worksheet.clear() # Clear existing data before writing new data
            print(f"Cleared existing data from worksheet: '{worksheet_name}'")
        except gspread.exceptions.WorksheetNotFound:
            print(f"Worksheet '{worksheet_name}' not found. Creating new one...")
            worksheet = spreadsheet.add_worksheet(title=worksheet_name, rows=1, cols=len(headers))
            print(f"Created new worksheet: '{worksheet_name}'")
            
        # Write headers
        worksheet.update([headers], 'A1') # Explicitly set headers
        # Write DataFrame content (excluding headers, starting from row 2)
        set_with_dataframe(worksheet, df, row=2, include_column_header=False)
        print(f"Successfully wrote {len(df)} rows to worksheet '{worksheet_name}' in spreadsheet '{spreadsheet_name}'.")
        print(f"Spreadsheet link: https://docs.google.com/spreadsheets/d/{spreadsheet.id}")
        
    except Exception as e:
        print(f"Error writing to Google Sheet: {e}")

# Authenticate and write the results
if not email_classification_df.empty:
    gspread_client = authenticate_and_get_gspread_client()
    if gspread_client:
        classification_headers = ["email ID", "category"] # As per assignment
        write_df_to_gsheet(
            gc=gspread_client, 
            spreadsheet_name=OUTPUT_SPREADSHEET_NAME, 
            worksheet_name="email-classification", 
            df=email_classification_df, 
            headers=classification_headers
        )
else:
    print("No classification data to write to Google Sheets.")

--- End of Notebook ---