In [None]:
# Email Summarizer - Interactive Testing Notebook

This notebook provides an interactive environment for testing the Email Summarizer API locally with mock data.

## Prerequisites
- OpenAI API key set in `.env` file
- Docker services running (Redis, PostgreSQL)
- Mock Graph API running on port 8001
- Email Summarizer API running on port 8000

Run `./scripts/local-test-setup.sh` to set everything up.


In [None]:
# Setup and imports
import requests
import json
import os
from datetime import datetime
from pprint import pprint
import pandas as pd
from IPython.display import display, Markdown, HTML

# API endpoints
API_BASE = "http://localhost:8000"
MOCK_GRAPH_BASE = "http://localhost:8001"

# Helper function for pretty printing
def pretty_json(data):
    return json.dumps(data, indent=2, default=str)

# Check API health
def check_services():
    services = {
        "Email Summarizer API": f"{API_BASE}/health",
        "Mock Graph API": f"{MOCK_GRAPH_BASE}/health"
    }
    
    print("Checking services...")
    for name, url in services.items():
        try:
            resp = requests.get(url)
            status = "✅ Running" if resp.status_code == 200 else "❌ Not responding"
        except:
            status = "❌ Not running"
        print(f"{name}: {status}")

check_services()


In [None]:
## 1. Test Mock Email Retrieval

First, let's fetch some mock emails from our Mock Graph API:


In [None]:
# Fetch mock emails
headers = {"Authorization": "Bearer test-token"}
response = requests.get(f"{MOCK_GRAPH_BASE}/v1.0/me/messages", headers=headers)

if response.status_code == 200:
    emails = response.json()["value"]
    print(f"Found {len(emails)} mock emails\n")
    
    # Display in a nice table
    email_data = []
    for email in emails[:5]:  # Show first 5
        email_data.append({
            "ID": email["id"],
            "Subject": email["subject"],
            "From": email["from"]["emailAddress"]["name"],
            "Time": email["sentDateTime"][:16],
            "Preview": email["body"]["content"][:50] + "..."
        })
    
    df = pd.DataFrame(email_data)
    display(df)
else:
    print(f"Error: {response.status_code}")
    print(response.text)


In [None]:
## 2. Test Email Search via Our API

Now let's test searching emails through our API (which will use the mock Graph API):


In [None]:
# Test email search endpoint
search_params = {
    "query": "budget",
    "limit": 5
}

response = requests.get(f"{API_BASE}/emails/search", params=search_params)

if response.status_code == 200:
    results = response.json()
    print(f"Search results for 'budget':")
    for email in results:
        print(f"- {email['subject']} (from: {email['from']['emailAddress']['name']})")
else:
    print(f"Error: {response.status_code}")
    print(response.text)


In [None]:
## 3. Test Email Summarization

Let's test summarizing individual emails using OpenAI:


In [None]:
# Get first email ID
if 'emails' in locals() and emails:
    first_email_id = emails[0]["id"]
    
    # Test summarization
    print(f"Summarizing email: {emails[0]['subject']}")
    print("-" * 50)
    
    response = requests.post(f"{API_BASE}/messages/{first_email_id}/summary")
    
    if response.status_code == 200:
        summary_data = response.json()
        
        display(Markdown(f"### Summary\n{summary_data.get('summary', 'No summary available')}"))
        
        if 'key_points' in summary_data:
            display(Markdown("### Key Points"))
            for point in summary_data['key_points']:
                display(Markdown(f"- {point}"))
        
        if 'action_items' in summary_data:
            display(Markdown("### Action Items"))
            for item in summary_data['action_items']:
                display(Markdown(f"- {item}"))
                
    else:
        print(f"Error: {response.status_code}")
        print(response.text)
else:
    print("No emails available to summarize")


In [None]:
## 4. Test Bulk Summarization

Let's test summarizing multiple emails at once:


In [None]:
# Test bulk summarization
if 'emails' in locals() and len(emails) >= 3:
    # Get first 3 email IDs
    email_ids = [email["id"] for email in emails[:3]]
    
    print("Summarizing multiple emails:")
    for i, email in enumerate(emails[:3]):
        print(f"{i+1}. {email['subject']}")
    print("-" * 50)
    
    # Request bulk summary
    response = requests.post(
        f"{API_BASE}/summaries/bulk",
        json={"message_ids": email_ids}
    )
    
    if response.status_code in [200, 202]:
        result = response.json()
        
        if "task_id" in result:
            print(f"Bulk summarization started. Task ID: {result['task_id']}")
            print("Check status at: GET /summaries/bulk/{task_id}")
        else:
            display(Markdown("### Bulk Summary Results"))
            display(Markdown(result.get('combined_summary', 'Processing...')))
            
            if 'individual_summaries' in result:
                display(Markdown("### Individual Summaries"))
                for email_id, summary in result['individual_summaries'].items():
                    display(Markdown(f"**{email_id}**: {summary}"))
    else:
        print(f"Error: {response.status_code}")
        print(response.text)
else:
    print("Not enough emails for bulk summarization")


In [None]:
## 5. Test RAG (Retrieval-Augmented Generation)

### First, let's ingest some emails into the vector database:


In [None]:
# Ingest emails into RAG system
print("Ingesting emails into vector database...")

response = requests.post(
    f"{API_BASE}/rag/ingest",
    params={"query": "from:company.com", "limit": 10}
)

if response.status_code == 202:
    result = response.json()
    print(f"✅ Ingestion started successfully")
    print(f"Task ID: {result.get('task_id', 'N/A')}")
    print(f"Message: {result.get('message', '')}")
else:
    print(f"Error: {response.status_code}")
    print(response.text)


In [None]:
### Now let's query the RAG system:


In [None]:
# Test RAG queries
test_queries = [
    "What are the budget issues?",
    "Tell me about security updates",
    "What action items do I have?",
    "Summarize recent team updates"
]

for query in test_queries:
    print(f"\n📝 Query: {query}")
    print("-" * 50)
    
    response = requests.get(
        f"{API_BASE}/rag/query",
        params={"q": query}
    )
    
    if response.status_code == 200:
        result = response.json()
        
        # Display answer
        display(Markdown(f"**Answer:** {result.get('answer', 'No answer available')}"))
        
        # Display sources if available
        if 'sources' in result and result['sources']:
            display(Markdown("**Sources:**"))
            for source in result['sources']:
                display(Markdown(f"- Email: {source.get('subject', 'Unknown')} (Score: {source.get('score', 0):.2f})"))
    else:
        print(f"Error: {response.status_code}")
        print(response.text)


In [None]:
## 6. Test Caching

Let's verify that Redis caching is working by making the same request twice:


In [None]:
import time

# Test caching by making the same request twice
if 'first_email_id' in locals():
    print("Testing cache performance...")
    
    # First request (should hit OpenAI)
    start = time.time()
    response1 = requests.post(f"{API_BASE}/messages/{first_email_id}/summary")
    time1 = time.time() - start
    
    print(f"First request: {time1:.2f} seconds")
    
    # Second request (should hit cache)
    start = time.time()
    response2 = requests.post(f"{API_BASE}/messages/{first_email_id}/summary")
    time2 = time.time() - start
    
    print(f"Second request: {time2:.2f} seconds")
    print(f"Speed improvement: {time1/time2:.1f}x faster")
    
    # Verify same result
    if response1.json() == response2.json():
        print("✅ Cache working correctly - same results returned")
    else:
        print("❌ Cache issue - different results returned")
else:
    print("No email ID available for cache testing")


In [None]:
## 7. Performance Testing

Let's run a simple performance test:


In [None]:
import concurrent.futures
import statistics

def make_request(email_id):
    """Make a single summarization request"""
    start = time.time()
    response = requests.post(f"{API_BASE}/messages/{email_id}/summary")
    duration = time.time() - start
    return duration, response.status_code

# Performance test with concurrent requests
if 'emails' in locals() and len(emails) >= 5:
    print("Running performance test with 5 concurrent requests...")
    
    email_ids = [email["id"] for email in emails[:5]]
    
    # Run requests concurrently
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(make_request, email_ids))
    
    # Analyze results
    durations = [r[0] for r in results]
    statuses = [r[1] for r in results]
    
    print(f"\nResults:")
    print(f"- Total requests: {len(results)}")
    print(f"- Successful: {statuses.count(200)}")
    print(f"- Failed: {len(statuses) - statuses.count(200)}")
    print(f"- Average time: {statistics.mean(durations):.2f}s")
    print(f"- Min time: {min(durations):.2f}s")
    print(f"- Max time: {max(durations):.2f}s")
    
    # Create performance chart
    performance_data = pd.DataFrame({
        'Request': range(1, len(durations) + 1),
        'Duration (s)': durations
    })
    
    display(performance_data)
else:
    print("Not enough emails for performance testing")


In [None]:
## 8. Custom Testing

Use this section to write your own test cases:


In [None]:
# Your custom test code here
# Example: Test a specific endpoint or scenario

# Test health endpoint with detailed info
response = requests.get(f"{API_BASE}/health")
if response.status_code == 200:
    health_data = response.json()
    print("API Health Status:")
    print(pretty_json(health_data))
