# PST to DuckDB

Notebook to explore cleanly how to go from mbox file format to DuckDB database followed The Classes in models/models.py cleanly without different codes etc.
This has to be transformed into different function files later on.

## Imports

In [1]:
from __future__ import annotations
import mailbox
import pandas as pd
import email
import os
from email.header import decode_header
import datetime
from bs4 import BeautifulSoup
import re
from tqdm import tqdm
import sqlite3
import html
import sys
import argparse


import duckdb

import json
import uuid
from typing import List, Dict, Any, Optional, Tuple


### Importing classes

In [2]:
from models.models import EmailAddress, MailingList, Organisation, Position, Entity, Attachment, ReceiverEmail, SenderEmail

# Import Pydantic elements
from pydantic import BaseModel, Field

## Functions

### Mbox/Text Processing

In [3]:
def parse_email_address(address_str):
    """Parse a string containing email addresses into a list of Entity objects with better error handling"""
    if not address_str:
        return []

    entities = []
    # Simple regex to extract name and email from patterns like "Name <email@example.com>"
    email_pattern = re.compile(r'(.*?)\s*<([^>]+)>|([^,\s]+@[^,\s]+)')

    try:
        # Split by commas, but handle potential nested commas in quotes
        addresses = []
        in_quotes = False
        current_address = ""

        for char in address_str:
            if char == '"':
                in_quotes = not in_quotes
                current_address += char
            elif char == ',' and not in_quotes:
                addresses.append(current_address.strip())
                current_address = ""
            else:
                current_address += char

        # Add the last address if there is one
        if current_address.strip():
            addresses.append(current_address.strip())

        # If no addresses were found, try the whole string
        if not addresses:
            addresses = [address_str]

        for addr in addresses:
            addr = addr.strip()
            if not addr:
                continue

            try:
                match = email_pattern.search(addr)
                if match:
                    if match.group(2):  # Format: "Name <email@example.com>"
                        name = match.group(1).strip().strip('"')
                        email_addr = match.group(2).strip()
                    else:  # Format: "email@example.com"
                        email_addr = match.group(3).strip()
                        name = email_addr  # Use email as name if no name is provided

                    # Validate email format to a minimum degree
                    if '@' in email_addr:
                        # Create Entity with EmailAddress
                        try:
                            email_obj = EmailAddress(email=email_addr)
                            entity = Entity(
                                name=name,
                                email=email_obj,
                                is_physical_person=True  # Assuming default
                            )
                            entities.append(entity)
                        except Exception as e:
                            print(f"Error creating Entity for email {email_addr}: {e}")
                else:
                    # Try a more forgiving approach if the regex didn't match
                    parts = addr.split('@')
                    if len(parts) == 2 and '.' in parts[1]:
                        # Looks like a valid email
                        email_addr = addr.strip()
                        try:
                            email_obj = EmailAddress(email=email_addr)
                            entity = Entity(
                                name=email_addr,  # Use email as name
                                email=email_obj,
                                is_physical_person=True
                            )
                            entities.append(entity)
                        except Exception as e:
                            print(f"Error creating Entity for fallback email {email_addr}: {e}")
            except Exception as e:
                print(f"Error parsing address '{addr}': {e}")
    except Exception as e:
        print(f"Error parsing addresses string '{address_str}': {e}")

    return entities

In [4]:
def decode_str(s):
    """Decode encoded email header strings"""
    if s is None:
        return ""
    try:
        decoded_parts = decode_header(s)
        return ''.join([
            part.decode(encoding or 'utf-8', errors='replace') if isinstance(part, bytes) else part
            for part, encoding in decoded_parts
        ])
    except:
        return str(s)

def extract_clean_text_from_html(html_content):
    """
    Extract clean, readable text from HTML content.

    Args:
        html_content (str): HTML content to clean

    Returns:
        str: Clean text without HTML tags
    """
    if not html_content:
        return ""

    try:
        # Remove scripts, styles, and other tags that contain content we don't want
        html_content = re.sub(r'<(script|style|head).*?>.*?</\1>', ' ', html_content, flags=re.DOTALL)

        # Replace common block elements with newlines to preserve structure
        html_content = re.sub(r'</(p|div|h\d|tr|li)>', '\n', html_content)
        html_content = re.sub(r'<br[^>]*>', '\n', html_content)

        # Replace table cells with tab separation
        html_content = re.sub(r'</td>', '\t', html_content)

        # Remove all HTML tags
        text = re.sub(r'<[^>]+>', ' ', html_content)

        # Decode HTML entities (&nbsp;, &lt;, etc.)
        text = html.unescape(text)

        # Handle literal escape sequences that appear in the text
        # Replace literal "\xad" with empty string (remove soft hyphens)
        text = text.replace('\\xad', '')
        # Replace literal "\xa0" with a space (non-breaking spaces)
        text = text.replace('\\xa0', ' ')

        # Handle actual Unicode characters too
        # Remove soft hyphens (invisible hyphens used for word breaks)
        text = text.replace('\xad', '')
        # Replace non-breaking spaces with regular spaces
        text = text.replace('\xa0', ' ')
        # Remove other problematic control characters
        text = re.sub(r'[\x00-\x08\x0b-\x0c\x0e-\x1f\x7f]', '', text)

        # Clean up other escape sequences that might appear in text
        text = text.replace('\\\\', '\\')  # Double backslash to single
        text = text.replace("\\'", "'")    # Escaped single quote
        text = text.replace('\\"', '"')    # Escaped double quote
        text = text.replace('\\n', '\n')   # Literal \n to newline
        text = text.replace('\\t', '\t')   # Literal \t to tab

        # Remove remaining literal escape sequences like \x.. that weren't handled above
        text = re.sub(r'\\x[0-9a-fA-F]{2}', '', text)

        # Clean up whitespace (multiple spaces, tabs, newlines)
        text = re.sub(r'[ \t]+', ' ', text)
        text = re.sub(r'\n{3,}', '\n\n', text)

        # Final cleanup to remove leading/trailing whitespace
        return text.strip()
    except Exception as e:
        print(f"Error processing HTML: {e}")
        return f"Error processing HTML content: {str(e)}"

In [None]:

def get_email_body(message):
    """Extract body text from email message, handling HTML correctly"""
    body_text = ""
    body_html = ""

    if message.is_multipart():
        for part in message.walk():
            content_type = part.get_content_type()
            content_disposition = str(part.get("Content-Disposition") or "")

            # Skip attachments
            if "attachment" in content_disposition:
                continue

            try:
                payload = part.get_payload(decode=True)
                if payload is None:
                    continue

                charset = part.get_content_charset() or 'utf-8'
                decoded_payload = payload.decode(charset, errors='replace')

                if content_type == "text/plain":
                    body_text += decoded_payload
                elif content_type == "text/html":
                    body_html += decoded_payload
            except:
                continue
    else:
        # Not multipart - get payload directly
        try:
            content_type = message.get_content_type()
            payload = message.get_payload(decode=True)
            if payload:
                charset = message.get_content_charset() or 'utf-8'
                decoded_payload = payload.decode(charset, errors='replace')

                if content_type == "text/plain":
                    body_text = decoded_payload
                elif content_type == "text/html":
                    body_html = decoded_payload
        except:
            pass

    # Prefer HTML content but fall back to plain text
    if body_html:
        return {
            "html": body_html,
            "text": extract_clean_text_from_html(body_html),
            "has_html": True
        }
    else:
        return {
            "html": "",
            "text": body_text,
            "has_html": False
        }

def extract_attachments_info(message):
    """Extract information about attachments with better error handling"""
    attachments = []

    if not message.is_multipart():
        return attachments

    try:
        for part in message.walk():
            try:
                content_disposition = str(part.get("Content-Disposition") or "")

                if "attachment" in content_disposition:
                    try:
                        filename = part.get_filename()
                        if filename:
                            try:
                                filename = decode_str(filename)
                            except Exception as e:
                                print(f"Error decoding attachment filename: {e}")
                                filename = "unknown_filename"
                        else:
                            filename = "unnamed_attachment"

                        content_type = part.get_content_type() or 'application/octet-stream'

                        # Get content safely
                        try:
                            content = part.get_payload(decode=True)
                            # Ensure content is bytes
                            if content is None:
                                content = b''
                            elif not isinstance(content, bytes):
                                content = str(content).encode('utf-8', errors='replace')
                        except Exception as e:
                            print(f"Error getting attachment content: {e}")
                            content = b''

                        size = len(content)

                        attachments.append({
                            "filename": filename,
                            "content_type": content_type,
                            "size": size,
                            "content": content
                        })
                    except Exception as e:
                        print(f"Error processing individual attachment: {e}")
            except Exception as e:
                print(f"Error walking email part: {e}")
    except Exception as e:
        print(f"Error in attachment extraction: {e}")

    return attachments

def extract_recipients(message):
    """Extract all recipients (To, CC, BCC) as Entity objects with better error handling"""
    to_str = decode_str(message.get('to') or "")
    cc_str = decode_str(message.get('cc') or "")
    bcc_str = decode_str(message.get('bcc') or "")
    reply_to_str = decode_str(message.get('reply-to') or "")

    # Parse with error handling
    try:
        to_entities = parse_email_address(to_str)
    except Exception as e:
        print(f"Error parsing 'to' field: {e}, value: {to_str}")
        to_entities = []

    try:
        cc_entities = parse_email_address(cc_str)
    except Exception as e:
        print(f"Error parsing 'cc' field: {e}, value: {cc_str}")
        cc_entities = []

    try:
        bcc_entities = parse_email_address(bcc_str)
    except Exception as e:
        print(f"Error parsing 'bcc' field: {e}, value: {bcc_str}")
        bcc_entities = []

    reply_to_entity = None
    try:
        reply_to_entities = parse_email_address(reply_to_str)
        if reply_to_entities and len(reply_to_entities) > 0:
            reply_to_entity = reply_to_entities[0]
    except Exception as e:
        print(f"Error parsing 'reply-to' field: {e}, value: {reply_to_str}")

    return {
        "to": to_entities,
        "cc": cc_entities,
        "bcc": bcc_entities,
        "reply_to": reply_to_entity
    }

def extract_message_data(message, folder_name):
    """Extract comprehensive email data to match Pydantic models"""
    # Generate a unique ID
    email_id = str(uuid.uuid4())

    # Extract basic headers
    subject = decode_str(message.get('subject') or "")
    from_str = decode_str(message.get('from') or "")
    date_str = message.get('date')
    message_id = decode_str(message.get('message-id') or "")
    in_reply_to = decode_str(message.get('in-reply-to') or "")
    references = decode_str(message.get('references') or "")

    # Parse date
    try:
        timestamp = email.utils.parsedate_to_datetime(date_str)
    except:
        timestamp = datetime.datetime.now()  # Fallback to current time

    # Get sender entity
    try:
        sender_entities = parse_email_address(from_str)
        if sender_entities and len(sender_entities) > 0:
            sender_entity = sender_entities[0]
        else:
            # Create a fallback entity if parsing failed
            sender_entity = Entity(
                name="Unknown",
                email=EmailAddress(email="unknown@example.com"),
                is_physical_person=True
            )
    except Exception as e:
        print(f"Error parsing sender: {e}, from_str: {from_str}")
        sender_entity = Entity(
            name="Unknown",
            email=EmailAddress(email="unknown@example.com"),
            is_physical_person=True
        )

    # Get recipients
    try:
        recipients = extract_recipients(message)
    except Exception as e:
        print(f"Error extracting recipients: {e}")
        # Create empty recipients if extraction fails
        recipients = {
            "to": [],
            "cc": [],
            "bcc": [],
            "reply_to": None
        }

    # Get body content
    try:
        body_content = get_email_body(message)
    except Exception as e:
        print(f"Error extracting body: {e}")
        body_content = {
            "text": "",
            "html": "",
            "has_html": False
        }

    # Get attachment info with careful error handling
    attachments = []
    try:
        attachments_data = extract_attachments_info(message)
        for att in attachments_data:
            if "filename" in att and att["filename"]:
                try:
                    # Create a safe version of the content
                    content = att.get("content", b'')
                    if not isinstance(content, bytes):
                        content = b''

                    attachment = Attachment(
                        filename=att["filename"],
                        content=content
                    )

                    # Add optional metadata safely
                    attachment.content_type = att.get('content_type', 'application/octet-stream')
                    attachment.size = att.get('size', len(content))

                    attachments.append(attachment)
                except Exception as e:
                    print(f"Error creating attachment object: {e}")
    except Exception as e:
        print(f"Error extracting attachments: {e}")

    # Check if this is potentially a mailing list
    list_id = decode_str(message.get('list-id') or "")
    list_unsubscribe = decode_str(message.get('list-unsubscribe') or "")
    is_mailing_list = bool(list_id or list_unsubscribe)

    # Create a mailing list object if applicable
    mailing_list = None
    if is_mailing_list and list_id:
        try:
            # Extract name from list-id which often looks like "List Name <listname.example.com>"
            # list_name_match = re.search(r'<([^>]+)>|([^,\s]+)', list_id)
            # list_name = list_name_match.group(1) if list_name_match else "Unknown List"
            list_name_match = re.search(r'<([^>]+)>|([^,\s]+)', list_id)

            if list_name_match:
                # Check both capture groups
                list_name = list_name_match.group(1) or list_name_match.group(2) or "Unknown List"
            else:
                list_name = "Unknown List"

            # Try to find a list email address
            list_email = "list@example.com"  # Default
            list_email_match = re.search(r'([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)', list_unsubscribe)
            if list_email_match:
                list_email = list_email_match.group(1)

            mailing_list = MailingList(
                id=str(uuid.uuid4()),
                name=list_name,
                description=f"Mailing list extracted from {list_id}",
                email_address=EmailAddress(email=list_email)
            )
        except Exception as e:
            print(f"Error creating mailing list: {e}")

    # Create a SenderEmail object
    sender_email_id = str(uuid.uuid4())
    sender_email = SenderEmail(
        id=sender_email_id,
        sender=sender_entity,
        body=body_content["text"],
        timestamp=timestamp
    )

    # Create a ReceiverEmail object - safely handle the recipients
    receiver_email = ReceiverEmail(
        id=email_id,
        sender_email=sender_email,
        sender=sender_entity,
        to=recipients.get("to") if recipients.get("to") else None,
        reply_to=recipients.get("reply_to"),
        cc=recipients.get("cc") if recipients.get("cc") else None,
        bcc=recipients.get("bcc") if recipients.get("bcc") else None,
        timestamp=timestamp,
        subject=subject,
        body=body_content["text"],
        attachments=attachments if attachments else None,
        is_deleted=False,
        folder=folder_name,
        is_spam=False,
        mailing_list=mailing_list,
        importance_score=0,  # Default value
        mother_email=None,  # Will be linked later based on in_reply_to
        children_emails=None
    )

    # Create a data dictionary for our normalized database tables
    email_data = {
        'id': email_id,
        'sender_email_id': sender_email.id,
        'sender_id': None,  # Will be filled in by the processing function
        'reply_to_id': None,  # Will be filled in by the processing function
        'timestamp': timestamp,
        'subject': subject,
        'body': body_content["text"],
        'body_html': body_content["html"] if body_content["has_html"] else None,
        'has_html': body_content["has_html"],
        'is_deleted': False,
        'folder': folder_name,
        'is_spam': False,
        'mailing_list_id': mailing_list.id if mailing_list else None,
        'importance_score': 0,
        'mother_email_id': None,  # Will be updated later based on in_reply_to
        'message_id': message_id,
        'references': references,
        'in_reply_to': in_reply_to
    }

    return email_data, receiver_email

In [6]:
def collect_email_data(directory, include_html=True, include_attachments=True):
    """Process all mbox files and return a list of email data"""
    all_emails = []

    # Get list of mbox files
    mbox_files = [f for f in os.listdir(directory) if f.endswith('.mbox')]
    print(f"Found {len(mbox_files)} mbox files")

    # Process each file
    for filename in mbox_files:
        file_path = os.path.join(directory, filename)
        folder_name = os.path.basename(file_path).replace('.mbox', '')
        mbox = mailbox.mbox(file_path)

        # Count messages for progress bar
        total_messages = len(mbox)
        print(f"Processing {folder_name} ({total_messages} messages)")

        for message in tqdm(mbox, total=total_messages, desc=folder_name):
            email_data = extract_message_data(message, folder_name)

            # Optionally exclude HTML content to reduce data size
            if not include_html:
                email_data.pop('body_html', None)

            # Optionally simplify attachment info to reduce data size
            if not include_attachments:
                email_data['attachment_count'] = len(email_data.get('attachments', []))
                email_data.pop('attachments', None)

            all_emails.append(email_data)

    return all_emails

### Setup Database

In [7]:
def setup_database(db_path):
    """Set up the DuckDB database schema with proper types and indexes"""
    # Connect to DuckDB database
    conn = duckdb.connect(db_path)

    # Create tables for each Pydantic model

    # Organizations table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS organizations (
        id VARCHAR PRIMARY KEY,
        name VARCHAR,
        description VARCHAR,
        email_address VARCHAR
    )
    """)

    # Positions table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS positions (
        id VARCHAR PRIMARY KEY,
        name VARCHAR,
        start_date TIMESTAMP,
        end_date TIMESTAMP,
        description VARCHAR,
        organization_id VARCHAR,
        FOREIGN KEY (organization_id) REFERENCES organizations(id)
    )
    """)

    # Entities table (for senders and recipients)
    conn.execute("""
    CREATE TABLE IF NOT EXISTS entities (
        id VARCHAR PRIMARY KEY,
        name VARCHAR,
        email VARCHAR,
        alias_names JSON,
        is_physical_person BOOLEAN
    )
    """)

    # Entity alias emails table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS entity_alias_emails (
        id VARCHAR PRIMARY KEY,
        entity_id VARCHAR,
        email VARCHAR,
        FOREIGN KEY (entity_id) REFERENCES entities(id)
    )
    """)

    # Entity positions table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS entity_positions (
        entity_id VARCHAR,
        position_id VARCHAR,
        PRIMARY KEY (entity_id, position_id),
        FOREIGN KEY (entity_id) REFERENCES entities(id),
        FOREIGN KEY (position_id) REFERENCES positions(id)
    )
    """)

    # Mailing lists table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS mailing_lists (
        id VARCHAR PRIMARY KEY,
        name VARCHAR,
        description VARCHAR,
        email_address VARCHAR
    )
    """)

    # Sender emails table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS sender_emails (
        id VARCHAR PRIMARY KEY,
        sender_id VARCHAR,
        body TEXT,
        timestamp TIMESTAMP,
        FOREIGN KEY (sender_id) REFERENCES entities(id)
    )
    """)

    # Receiver emails table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS receiver_emails (
        id VARCHAR PRIMARY KEY,
        sender_email_id VARCHAR,
        sender_id VARCHAR,
        reply_to_id VARCHAR,
        timestamp TIMESTAMP,
        subject VARCHAR,
        body TEXT,
        body_html TEXT,
        has_html BOOLEAN,
        is_deleted BOOLEAN DEFAULT FALSE,
        folder VARCHAR DEFAULT 'inbox',
        is_spam BOOLEAN DEFAULT FALSE,
        mailing_list_id VARCHAR,
        importance_score INTEGER DEFAULT 0,
        mother_email_id VARCHAR,
        message_id VARCHAR,
        "references" TEXT,
        in_reply_to VARCHAR,
        FOREIGN KEY (sender_email_id) REFERENCES sender_emails(id),
        FOREIGN KEY (sender_id) REFERENCES entities(id),
        FOREIGN KEY (reply_to_id) REFERENCES entities(id),
        FOREIGN KEY (mailing_list_id) REFERENCES mailing_lists(id),
        FOREIGN KEY (mother_email_id) REFERENCES receiver_emails(id)
    )
    """)

    # Email recipients tables (to, cc, bcc)
    conn.execute("""
    CREATE TABLE IF NOT EXISTS email_recipients_to (
        email_id VARCHAR,
        entity_id VARCHAR,
        PRIMARY KEY (email_id, entity_id),
        FOREIGN KEY (email_id) REFERENCES receiver_emails(id),
        FOREIGN KEY (entity_id) REFERENCES entities(id)
    )
    """)

    conn.execute("""
    CREATE TABLE IF NOT EXISTS email_recipients_cc (
        email_id VARCHAR,
        entity_id VARCHAR,
        PRIMARY KEY (email_id, entity_id),
        FOREIGN KEY (email_id) REFERENCES receiver_emails(id),
        FOREIGN KEY (entity_id) REFERENCES entities(id)
    )
    """)

    conn.execute("""
    CREATE TABLE IF NOT EXISTS email_recipients_bcc (
        email_id VARCHAR,
        entity_id VARCHAR,
        PRIMARY KEY (email_id, entity_id),
        FOREIGN KEY (email_id) REFERENCES receiver_emails(id),
        FOREIGN KEY (entity_id) REFERENCES entities(id)
    )
    """)

    # Attachments table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS attachments (
        id VARCHAR PRIMARY KEY,
        email_id VARCHAR,
        filename VARCHAR,
        content BLOB,
        content_type VARCHAR,
        size INTEGER,
        FOREIGN KEY (email_id) REFERENCES receiver_emails(id)
    )
    """)

    # Create child email relationships table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS email_children (
        parent_id VARCHAR,
        child_id VARCHAR,
        PRIMARY KEY (parent_id, child_id),
        FOREIGN KEY (parent_id) REFERENCES receiver_emails(id),
        FOREIGN KEY (child_id) REFERENCES receiver_emails(id)
    )
    """)

    # Create indexes
    conn.execute('CREATE INDEX IF NOT EXISTS idx_receiver_emails_timestamp ON receiver_emails(timestamp)')
    conn.execute('CREATE INDEX IF NOT EXISTS idx_receiver_emails_folder ON receiver_emails(folder)')
    conn.execute('CREATE INDEX IF NOT EXISTS idx_receiver_emails_subject ON receiver_emails(subject)')
    conn.execute('CREATE INDEX IF NOT EXISTS idx_receiver_emails_message_id ON receiver_emails(message_id)')
    conn.execute('CREATE INDEX IF NOT EXISTS idx_entities_email ON entities(email)')
    conn.execute('CREATE INDEX IF NOT EXISTS idx_attachments_email_id ON attachments(email_id)')

    return conn

## Pipeline

In [None]:
def process_mbox_to_duckdb(mbox_path, conn, batch_size=100, entity_cache=None):
    """Process mbox file directly to DuckDB in batches with progress bar"""
    if entity_cache is None:
        entity_cache = {}  # Cache to store entities we've already seen

    mbox = mailbox.mbox(mbox_path)
    folder_name = os.path.basename(mbox_path).replace('.mbox', '')

    # Count messages for progress bar
    total_messages = len(mbox)
    print(f"Processing {folder_name} ({total_messages} messages)")

    # Process in batches for each table
    entity_batch = []
    entity_alias_emails_batch = []
    mailing_list_batch = []
    sender_email_batch = []
    receiver_email_batch = []
    to_recipients_batch = []
    cc_recipients_batch = []
    bcc_recipients_batch = []
    attachments_batch = []

    for i, message in enumerate(tqdm(mbox, total=total_messages, desc=folder_name)):
        try:
            email_data, receiver_email = extract_message_data(message, folder_name)

            # Process sender entity
            sender = receiver_email.sender
            if sender.email.email not in entity_cache:
                entity_id = str(uuid.uuid4())
                entity_cache[sender.email.email] = entity_id

                # Add to entities batch
                entity_batch.append({
                    'id': entity_id,
                    'name': sender.name,
                    'email': sender.email.email,
                    'alias_names': json.dumps(sender.alias_names) if sender.alias_names else None,
                    'is_physical_person': sender.is_physical_person
                })

                # Process alias emails if any
                if sender.alias_emails:
                    for alias_email in sender.alias_emails:
                        entity_alias_emails_batch.append({
                            'id': str(uuid.uuid4()),
                            'entity_id': entity_id,
                            'email': alias_email.email
                        })
            else:
                # Get cached entity ID
                entity_id = entity_cache[sender.email.email]

            # Process sender email
            sender_email = receiver_email.sender_email
            sender_email_batch.append({
                'id': sender_email.id,
                'sender_id': entity_id,  # Use cached or new entity ID
                'body': sender_email.body,
                'timestamp': sender_email.timestamp
            })

            # Process receiver email
            reply_to_id = None
            if receiver_email.reply_to:
                reply_to_email = receiver_email.reply_to.email.email
                if reply_to_email not in entity_cache:
                    reply_to_id = str(uuid.uuid4())
                    entity_cache[reply_to_email] = reply_to_id

                    entity_batch.append({
                        'id': reply_to_id,
                        'name': receiver_email.reply_to.name,
                        'email': reply_to_email,
                        'alias_names': None,
                        'is_physical_person': True
                    })
                else:
                    reply_to_id = entity_cache[reply_to_email]

            # Add mailing list if present
            mailing_list_id = None
            if receiver_email.mailing_list:
                mailing_list_id = receiver_email.mailing_list.id
                mailing_list_batch.append({
                    'id': mailing_list_id,
                    'name': receiver_email.mailing_list.name,
                    'description': receiver_email.mailing_list.description,
                    'email_address': receiver_email.mailing_list.email_address.email
                })

            # Add receiver email
            receiver_email_batch.append({
                'id': receiver_email.id,
                'sender_email_id': sender_email.id,
                'sender_id': entity_id,
                'reply_to_id': reply_to_id,
                'timestamp': receiver_email.timestamp,
                'subject': receiver_email.subject,
                'body': receiver_email.body,
                'body_html': email_data.get('body_html'),
                'has_html': email_data.get('has_html', False),
                'is_deleted': receiver_email.is_deleted,
                'folder': receiver_email.folder,
                'is_spam': receiver_email.is_spam,
                'mailing_list_id': mailing_list_id,
                'importance_score': receiver_email.importance_score,
                'mother_email_id': None,  # Will be updated later
                'message_id': email_data.get('message_id'),
                'references': email_data.get('references'),
                'in_reply_to': email_data.get('in_reply_to')
            })

            # Process recipients (to, cc, bcc)
            if receiver_email.to:
                for entity in receiver_email.to:
                    if entity.email.email not in entity_cache:
                        to_entity_id = str(uuid.uuid4())
                        entity_cache[entity.email.email] = to_entity_id

                        entity_batch.append({
                            'id': to_entity_id,
                            'name': entity.name,
                            'email': entity.email.email,
                            'alias_names': json.dumps(entity.alias_names) if entity.alias_names else None,
                            'is_physical_person': entity.is_physical_person
                        })

                        # Process alias emails
                        if entity.alias_emails:
                            for alias_email in entity.alias_emails:
                                entity_alias_emails_batch.append({
                                    'id': str(uuid.uuid4()),
                                    'entity_id': to_entity_id,
                                    'email': alias_email.email
                                })
                    else:
                        to_entity_id = entity_cache[entity.email.email]

                    # Add to recipients relationship
                    to_recipients_batch.append({
                        'email_id': receiver_email.id,
                        'entity_id': to_entity_id
                    })

            # Process CC recipients
            if receiver_email.cc:
                for entity in receiver_email.cc:
                    if entity.email.email not in entity_cache:
                        cc_entity_id = str(uuid.uuid4())
                        entity_cache[entity.email.email] = cc_entity_id

                        entity_batch.append({
                            'id': cc_entity_id,
                            'name': entity.name,
                            'email': entity.email.email,
                            'alias_names': json.dumps(entity.alias_names) if entity.alias_names else None,
                            'is_physical_person': entity.is_physical_person
                        })
                    else:
                        cc_entity_id = entity_cache[entity.email.email]

                    # Add cc recipients relationship
                    cc_recipients_batch.append({
                        'email_id': receiver_email.id,
                        'entity_id': cc_entity_id
                    })

            # Process BCC recipients
            if receiver_email.bcc:
                for entity in receiver_email.bcc:
                    if entity.email.email not in entity_cache:
                        bcc_entity_id = str(uuid.uuid4())
                        entity_cache[entity.email.email] = bcc_entity_id

                        entity_batch.append({
                            'id': bcc_entity_id,
                            'name': entity.name,
                            'email': entity.email.email,
                            'alias_names': json.dumps(entity.alias_names) if entity.alias_names else None,
                            'is_physical_person': entity.is_physical_person
                        })
                    else:
                        bcc_entity_id = entity_cache[entity.email.email]

                    # Add bcc recipients relationship
                    bcc_recipients_batch.append({
                        'email_id': receiver_email.id,
                        'entity_id': bcc_entity_id
                    })

            # Process attachments
            if receiver_email.attachments:
                for attachment in receiver_email.attachments:
                    content_type = getattr(attachment, 'content_type', 'application/octet-stream')
                    size = getattr(attachment, 'size', len(attachment.content) if attachment.content else 0)

                    attachments_batch.append({
                        'id': str(uuid.uuid4()),
                        'email_id': receiver_email.id,
                        'filename': attachment.filename,
                        'content': attachment.content,
                        'content_type': content_type,
                        'size': size
                    })
        except Exception as e:
            print(f"Error processing message: {e}")
            continue

        # Process batch when it reaches the batch size or on the last message
        if len(receiver_email_batch) >= batch_size or i == total_messages - 1:
            try:
                # Insert entities
                if entity_batch:
                    entities_df = pd.DataFrame(entity_batch)
                    conn.execute("""
                    INSERT OR IGNORE INTO entities
                    SELECT * FROM entities_df
                    """)
                    entity_batch = []

                # Insert entity alias emails
                if entity_alias_emails_batch:
                    alias_emails_df = pd.DataFrame(entity_alias_emails_batch)
                    conn.execute("""
                    INSERT OR IGNORE INTO entity_alias_emails
                    SELECT * FROM alias_emails_df
                    """)
                    entity_alias_emails_batch = []

                # Insert mailing lists
                if mailing_list_batch:
                    mailing_lists_df = pd.DataFrame(mailing_list_batch)
                    conn.execute("""
                    INSERT OR IGNORE INTO mailing_lists
                    SELECT * FROM mailing_lists_df
                    """)
                    mailing_list_batch = []

                # Insert sender emails
                if sender_email_batch:
                    sender_emails_df = pd.DataFrame(sender_email_batch)
                    conn.execute("""
                    INSERT OR IGNORE INTO sender_emails
                    SELECT * FROM sender_emails_df
                    """)
                    sender_email_batch = []

                # Insert receiver emails
                if receiver_email_batch:
                    receiver_emails_df = pd.DataFrame(receiver_email_batch)
                    conn.execute("""
                    INSERT OR IGNORE INTO receiver_emails
                    SELECT * FROM receiver_emails_df
                    """)
                    receiver_email_batch = []

                # Insert recipient relationships
                if to_recipients_batch:
                    to_recipients_df = pd.DataFrame(to_recipients_batch)
                    conn.execute("""
                    INSERT OR IGNORE INTO email_recipients_to
                    SELECT * FROM to_recipients_df
                    """)
                    to_recipients_batch = []

                if cc_recipients_batch:
                    cc_recipients_df = pd.DataFrame(cc_recipients_batch)
                    conn.execute("""
                    INSERT OR IGNORE INTO email_recipients_cc
                    SELECT * FROM cc_recipients_df
                    """)
                    cc_recipients_batch = []

                if bcc_recipients_batch:
                    bcc_recipients_df = pd.DataFrame(bcc_recipients_batch)
                    conn.execute("""
                    INSERT OR IGNORE INTO email_recipients_bcc
                    SELECT * FROM bcc_recipients_df
                    """)
                    bcc_recipients_batch = []

                # Insert attachments
                if attachments_batch:
                    attachments_df = pd.DataFrame(attachments_batch)
                    conn.execute("""
                    INSERT OR IGNORE INTO attachments
                    SELECT * FROM attachments_df
                    """)
                    attachments_batch = []

                # Commit to save progress
                conn.commit()
            except Exception as e:
                print(f"Error inserting batch into database: {e}")
                # Continue processing even if one batch fails
                entity_batch = []
                entity_alias_emails_batch = []
                mailing_list_batch = []
                sender_email_batch = []
                receiver_email_batch = []
                to_recipients_batch = []
                cc_recipients_batch = []
                bcc_recipients_batch = []
                attachments_batch = []

    # At the end of the function, add:
    print(f"Completed processing {folder_name} ({total_messages} messages)")

    return entity_cache

In [None]:
def process_mbox_files(directory, output_path=None):
    """
    Process mbox files and save to DuckDB format with normalized tables

    Parameters:
    - directory: Directory containing .mbox files
    - output_path: Output file path (default: emails.duckdb)
    """
    # Set default output path if not provided
    if output_path is None:
        output_path = 'emails.duckdb'
    elif not output_path.endswith('.duckdb'):
        output_path = f"{output_path}.duckdb"

    # Setup database
    conn = setup_database(output_path)

    # Get list of mbox files
    mbox_files = [f for f in os.listdir(directory) if f.endswith('.mbox')]
    print(f"Found {len(mbox_files)} mbox files")

    # Entity cache to avoid duplicates across files
    entity_cache = {}

    # Process each file
    for filename in mbox_files:
        file_path = os.path.join(directory, filename)
        try:
            entity_cache = process_mbox_to_duckdb(file_path, conn, entity_cache=entity_cache)
        except Exception as e:
            print(f"Error processing {filename}: {e}")
            # Continue with next file

    try:
        # Create relationships between emails (mother/child relationships)
        print("Creating email thread relationships...")
        conn.execute("""
        UPDATE receiver_emails
        SET mother_email_id = (
            SELECT r2.id
            FROM receiver_emails r2
            WHERE r2.message_id = receiver_emails.in_reply_to
            LIMIT 1
        )
        WHERE in_reply_to IS NOT NULL
        """)

        # Populate the children relationships table
        print("Populating child email relationships...")
# Populate the children relationships table with additional safeguards
        conn.execute("""
        INSERT INTO email_children (parent_id, child_id)
        SELECT mother_email_id, id
        FROM receiver_emails
        WHERE mother_email_id IS NOT NULL
        AND mother_email_id IN (SELECT id FROM receiver_emails)
        AND id != mother_email_id  -- Prevent self-references
        """)
    except Exception as e:
        print(f"Warning: Error in relationship creation: {e}")
        print("Continuing with database optimization...")

    # Final optimization and cleanup
    print("Optimizing database...")
    conn.execute("PRAGMA enable_optimizer")
    conn.close()

    print(f"DuckDB database saved to {output_path}")
    print("""
Database structure:
- entities: Stores all senders and recipients
- entity_alias_emails: Stores alias emails for entities
- sender_emails: Stores email data from senders
- receiver_emails: Stores received email data
- email_recipients_to/cc/bcc: Links emails to recipient entities
- attachments: Stores email attachments
- email_children: Stores parent-child relationships between emails
- mailing_lists: Stores mailing list information
- organizations: Stores organization information
- positions: Stores position information
- entity_positions: Links entities to positions
""")

In [16]:
### Test des fonctions:

mbox_single_file_path = 'data/processed/mailbox_cecile/AG.mbox'
mbox_path = 'data/processed/mailbox_cecile/'
TEST_SAMPLE_PATH = 'data/processed/real_test_sample/'
db_path = 'data/database/database.duckdb'

In [11]:
duckdb_conn = setup_database(db_path)

In [13]:

setup_database(db_path)
duckdb_conn = duckdb.connect(db_path)
process_mbox_to_duckdb(mbox_single_file_path, duckdb_conn)

Processing AG (6 messages)


AG: 100%|██████████| 6/6 [00:02<00:00,  2.19it/s]


{'delegation_generale@archivistes.org': '91a688a1-d745-4144-91d9-b470fe8f0a7c',
 'celine.guyon@archivistes.org': '8b6aa1fe-11ca-45a4-a861-16c423b9fc2d',
 'catherine.bernard.aaf@gmail.com': '99b1fb03-165d-42b4-9b6b-3da7ed968fa8',
 'Laurent.Ducol@saint-gobain.com': '8d6d4310-d02c-4a8f-b6f7-66a9a41e8017',
 'vieassociative@archivistes.org': '1690f42c-29f3-4780-956e-5e12c434dae7',
 'aaf_adherents@listes.archivistes.org': '131ced62-b406-46e1-b829-a4d30a81f073',
 'aaf_bureau@listes.archivistes.org': 'f95a99ec-f961-40ce-af70-0a5e955d066a',
 'schirr@unistra.fr': '1f390472-aa50-4210-a442-874ca2f6870c',
 'laurent.ducol.aaf@gmail.com': 'f770392a-356c-4b61-85eb-0d9c53916f19'}

In [17]:
process_mbox_files(TEST_SAMPLE_PATH, db_path)

# mbox_path

Found 5 mbox files
Processing AG (6 messages)


AG: 100%|██████████| 6/6 [00:00<00:00, 35.47it/s]


Processing Ateliers (28 messages)


Ateliers: 100%|██████████| 28/28 [00:00<00:00, 107.85it/s]


Processing Instances (60 messages)


Instances: 100%|██████████| 60/60 [00:00<00:00, 62.07it/s]


Processing Plaidoyer (38 messages)


Plaidoyer: 100%|██████████| 38/38 [01:27<00:00,  2.30s/it] 


Processing RH (40 messages)


RH: 100%|██████████| 40/40 [00:00<00:00, 96.15it/s] 


Creating email thread relationships...
Error creating relationships: Constraint Error: Violates foreign key constraint because key "email_id: 509779ea-9914-4b3b-8a9a-edfc32498383" is still referenced by a foreign key in a different table. If this is an unexpected constraint violation, please refer to our foreign key limitations in the documentation
Optimizing database...


CatalogException: Catalog Error: Pragma Function with name optimize does not exist!
Did you mean "enable_optimizer"?

In [None]:
# setup_database()