# Gmail Email Fetcher

This notebook fetches emails from Gmail API and stores them in `dev.core.emails` table.

**Setup Requirements:**
1. Create a Google Cloud Project with Gmail API enabled
2. Create OAuth 2.0 credentials (Desktop app type)
3. Store credentials in Databricks Secrets:
   - `gmail/client_id`
   - `gmail/client_secret`
   - `gmail/refresh_token` (obtained after first OAuth flow)

In [0]:
# Install required packages
%pip install --quiet google-auth google-auth-oauthlib google-auth-httplib2 google-api-python-client
dbutils.library.restartPython()

In [0]:
# Import libraries
import base64
import json
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, current_timestamp

print("✓ Libraries imported")

In [None]:
# Configuration
# For local testing, you can hardcode these. For production, use Databricks Secrets.

# Option 1: Use Databricks Secrets (recommended for production)
# CLIENT_ID = dbutils.secrets.get(scope="gmail", key="client_id")
# CLIENT_SECRET = dbutils.secrets.get(scope="gmail", key="client_secret")
# REFRESH_TOKEN = dbutils.secrets.get(scope="gmail", key="refresh_token")

# Option 2: Hardcode for testing (NOT RECOMMENDED for production)
CLIENT_ID = "YOUR_CLIENT_ID_HERE"
CLIENT_SECRET = "YOUR_CLIENT_SECRET_HERE"
REFRESH_TOKEN = "YOUR_REFRESH_TOKEN_HERE"

# Configuration
MAX_RESULTS = 100  # Number of emails to fetch per run
TABLE_NAME = "dev.core.emails"

print(f"✓ Configuration loaded")
print(f"  - Fetching up to {MAX_RESULTS} emails")
print(f"  - Target table: {TABLE_NAME}")


In [None]:
# Authenticate with Gmail API
def get_gmail_service():
    """Create and return Gmail API service using refresh token."""
    creds = Credentials(
        token=None,
        refresh_token=REFRESH_TOKEN,
        token_uri="https://oauth2.googleapis.com/token",
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        scopes=["https://www.googleapis.com/auth/gmail.readonly"]
    )
    
    service = build('gmail', 'v1', credentials=creds)
    return service

try:
    gmail_service = get_gmail_service()
    print("✓ Gmail API authenticated successfully")
except Exception as e:
    print(f"✗ Authentication failed: {e}")
    raise


In [None]:
# Helper functions to parse email data
def get_header(headers: List[Dict], name: str) -> Optional[str]:
    """Extract a specific header value from email headers."""
    for header in headers:
        if header.get('name', '').lower() == name.lower():
            return header.get('value')
    return None

def parse_email_addresses(header_value: Optional[str]) -> List[str]:
    """Parse email addresses from a header value."""
    if not header_value:
        return None
    # Simple parsing - split by comma and extract email addresses
    import re
    emails = re.findall(r'[\w\.-]+@[\w\.-]+\.\w+', header_value)
    return emails if emails else None

def decode_body(part: Dict) -> Optional[str]:
    """Decode email body from base64."""
    if 'data' in part.get('body', {}):
        try:
            return base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore')
        except:
            return None
    return None

def extract_body_parts(payload: Dict) -> tuple:
    """Extract text and HTML body from email payload."""
    text_body = None
    html_body = None
    
    if 'parts' in payload:
        for part in payload['parts']:
            mime_type = part.get('mimeType', '')
            if mime_type == 'text/plain' and not text_body:
                text_body = decode_body(part)
            elif mime_type == 'text/html' and not html_body:
                html_body = decode_body(part)
            elif mime_type.startswith('multipart/'):
                # Recursively check nested parts
                if 'parts' in part:
                    for subpart in part['parts']:
                        sub_mime = subpart.get('mimeType', '')
                        if sub_mime == 'text/plain' and not text_body:
                            text_body = decode_body(subpart)
                        elif sub_mime == 'text/html' and not html_body:
                            html_body = decode_body(subpart)
    else:
        # Single part message
        mime_type = payload.get('mimeType', '')
        if mime_type == 'text/plain':
            text_body = decode_body(payload)
        elif mime_type == 'text/html':
            html_body = decode_body(payload)
    
    return text_body, html_body

def extract_attachments(payload: Dict) -> List[Dict]:
    """Extract attachment metadata from email payload."""
    attachments = []
    
    def process_part(part):
        filename = part.get('filename')
        if filename:
            attachments.append({
                'filename': filename,
                'mime_type': part.get('mimeType'),
                'size_bytes': part.get('body', {}).get('size', 0),
                'attachment_id': part.get('body', {}).get('attachmentId')
            })
        
        # Check nested parts
        if 'parts' in part:
            for subpart in part['parts']:
                process_part(subpart)
    
    if 'parts' in payload:
        for part in payload['parts']:
            process_part(part)
    
    return attachments if attachments else None

print("✓ Helper functions defined")


In [None]:
# Fetch emails from Gmail
def fetch_recent_emails(service, max_results=100):
    """Fetch recent emails from Gmail."""
    try:
        # Get list of message IDs
        results = service.users().messages().list(
            userId='me',
            maxResults=max_results,
            labelIds=['INBOX']  # You can modify this to fetch from different labels
        ).execute()
        
        messages = results.get('messages', [])
        print(f"✓ Found {len(messages)} messages to process")
        
        email_data = []
        
        for i, msg in enumerate(messages):
            if (i + 1) % 10 == 0:
                print(f"  Processing message {i + 1}/{len(messages)}...")
            
            # Get full message details
            message = service.users().messages().get(
                userId='me',
                id=msg['id'],
                format='full'
            ).execute()
            
            # Parse message
            headers = message['payload'].get('headers', [])
            
            # Extract header fields
            subject = get_header(headers, 'Subject')
            from_header = get_header(headers, 'From')
            to_header = get_header(headers, 'To')
            cc_header = get_header(headers, 'Cc')
            bcc_header = get_header(headers, 'Bcc')
            reply_to = get_header(headers, 'Reply-To')
            in_reply_to = get_header(headers, 'In-Reply-To')
            references_header = get_header(headers, 'References')
            date_header = get_header(headers, 'Date')
            
            # Parse from field for name and email
            from_name = None
            from_email = None
            if from_header:
                import re
                match = re.match(r'^(.*?)\s*<(.+?)>$', from_header)
                if match:
                    from_name = match.group(1).strip('"')
                    from_email = match.group(2)
                else:
                    from_email = from_header
            
            # Parse recipients
            to_recipients = parse_email_addresses(to_header)
            cc_recipients = parse_email_addresses(cc_header)
            bcc_recipients = parse_email_addresses(bcc_header)
            
            # Parse references
            references = references_header.split() if references_header else None
            
            # Extract body
            text_body, html_body = extract_body_parts(message['payload'])
            
            # Extract attachments
            attachments = extract_attachments(message['payload'])
            
            # Parse timestamps
            internal_date = datetime.fromtimestamp(
                int(message['internalDate']) / 1000, 
                tz=timezone.utc
            )
            
            # Parse date header (sent_at)
            sent_at = None
            if date_header:
                try:
                    from email.utils import parsedate_to_datetime
                    sent_at = parsedate_to_datetime(date_header)
                except:
                    sent_at = internal_date
            
            # Determine read status
            is_read = 'UNREAD' not in message.get('labelIds', [])
            is_starred = 'STARRED' in message.get('labelIds', [])
            spam_flag = 'SPAM' in message.get('labelIds', [])
            
            email_data.append({
                'email_id': message['id'],
                'thread_id': message['threadId'],
                'subject': subject,
                'from_name': from_name,
                'from_email': from_email,
                'reply_to': reply_to,
                'in_reply_to': in_reply_to,
                'references': references,
                'to_recipients': to_recipients,
                'cc_recipients': cc_recipients,
                'bcc_recipients': bcc_recipients,
                'sent_at': sent_at,
                'received_at': internal_date,
                'gmail_internal_date': internal_date,
                'received_date': internal_date.date(),
                'snippet': message.get('snippet'),
                'body_text': text_body,
                'body_html': html_body,
                'raw_headers': None,  # Can store json.dumps(headers) if needed
                'labels': message.get('labelIds'),
                'is_read': is_read,
                'is_starred': is_starred,
                'importance': None,  # Can parse from headers if needed
                'spam_flag': spam_flag,
                'message_size_bytes': message.get('sizeEstimate'),
                'has_attachments': attachments is not None,
                'attachments': attachments,
                'gmail_history_id': message.get('historyId'),
                'created_at': datetime.now(timezone.utc),
                'updated_at': None
            })
        
        print(f"✓ Processed {len(email_data)} emails")
        return email_data
        
    except HttpError as error:
        print(f'✗ An error occurred: {error}')
        raise

# Fetch emails
emails = fetch_recent_emails(gmail_service, MAX_RESULTS)
print(f"\n✓ Fetched {len(emails)} emails from Gmail")


In [None]:
# Convert to Spark DataFrame
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, LongType, TimestampType, DateType, ArrayType

# Define schema matching the table
email_schema = StructType([
    StructField("email_id", StringType(), False),
    StructField("thread_id", StringType(), True),
    StructField("subject", StringType(), True),
    StructField("from_name", StringType(), True),
    StructField("from_email", StringType(), True),
    StructField("reply_to", StringType(), True),
    StructField("in_reply_to", StringType(), True),
    StructField("references", ArrayType(StringType()), True),
    StructField("to_recipients", ArrayType(StringType()), True),
    StructField("cc_recipients", ArrayType(StringType()), True),
    StructField("bcc_recipients", ArrayType(StringType()), True),
    StructField("sent_at", TimestampType(), True),
    StructField("received_at", TimestampType(), True),
    StructField("gmail_internal_date", TimestampType(), True),
    StructField("received_date", DateType(), True),
    StructField("snippet", StringType(), True),
    StructField("body_text", StringType(), True),
    StructField("body_html", StringType(), True),
    StructField("raw_headers", StringType(), True),
    StructField("labels", ArrayType(StringType()), True),
    StructField("is_read", BooleanType(), True),
    StructField("is_starred", BooleanType(), True),
    StructField("importance", StringType(), True),
    StructField("spam_flag", BooleanType(), True),
    StructField("message_size_bytes", LongType(), True),
    StructField("has_attachments", BooleanType(), True),
    StructField("attachments", ArrayType(StructType([
        StructField("filename", StringType(), True),
        StructField("mime_type", StringType(), True),
        StructField("size_bytes", LongType(), True),
        StructField("attachment_id", StringType(), True)
    ])), True),
    StructField("gmail_history_id", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", TimestampType(), True)
])

# Create DataFrame
df = spark.createDataFrame(emails, schema=email_schema)

print(f"✓ Created Spark DataFrame with {df.count()} rows")
df.printSchema()
display(df.select("email_id", "subject", "from_email", "received_at", "is_read").limit(5))


In [None]:
# Upsert into Delta table (merge on email_id to avoid duplicates)
from delta.tables import DeltaTable

# Check if table exists
table_exists = spark.catalog.tableExists(TABLE_NAME)

if table_exists:
    print(f"✓ Table {TABLE_NAME} exists, performing MERGE (upsert)...")
    
    # Get existing table
    delta_table = DeltaTable.forName(spark, TABLE_NAME)
    
    # Perform merge (upsert based on email_id)
    delta_table.alias("target").merge(
        df.alias("source"),
        "target.email_id = source.email_id"
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()
    
    print(f"✓ MERGE completed successfully")
    
else:
    print(f"✓ Table {TABLE_NAME} does not exist, creating and inserting data...")
    
    # Write as new table
    df.write.format("delta") \
        .mode("overwrite") \
        .partitionBy("received_date") \
        .option("delta.columnMapping.mode", "name") \
        .option("delta.minReaderVersion", "2") \
        .option("delta.minWriterVersion", "5") \
        .saveAsTable(TABLE_NAME)
    
    print(f"✓ Table created and data inserted successfully")

# Show summary
result_count = spark.table(TABLE_NAME).count()
print(f"\n✓ Total emails in table: {result_count}")


In [None]:
# Verify the data
print("Sample of latest emails:")
display(
    spark.table(TABLE_NAME)
    .orderBy(col("received_at").desc())
    .select("email_id", "subject", "from_name", "from_email", "received_at", "is_read", "is_starred")
    .limit(10)
)
