# Explore Email Dataset

This notebook trims the 517k-message Enron corpus into a workshop-ready pool of threads.
We focus on three steps:
- profile the raw email table and capture baseline metrics
- parse headers into structured columns we can filter on
- apply time, keyword, and thread-length filters to surface high-signal conversations


In [2]:
import math
import os
import multiprocessing as mp
import re
from email.utils import parseaddr, getaddresses
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.display import Markdown, display
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from pprint import pprint

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 160)

DATA_PATH = Path('../data/emails.csv')
TIME_WINDOW = (pd.Timestamp('2001-03-01'), pd.Timestamp('2001-06-30'))
THREAD_LEN_RANGE = (2, 8)
ACTION_KEYWORDS = [
    'deadline',
    'deliver',
    'deliverable',
    'please review',
    'fyi',
    'action item',
    'follow up',
    'schedule',
    'update',
    'approve',
]
ACTION_PATTERN = re.compile('|'.join(re.escape(k) for k in ACTION_KEYWORDS), flags=re.IGNORECASE)
plt.style.use('seaborn-v0_8')


## Load raw email table

`emails.csv` stores the raw MIME messages with a source filepath. We keep the full table in
memory for now so downstream filters can operate on the entire quarter.

In [3]:
raw_df = pd.read_csv(DATA_PATH, usecols=['file', 'message']).rename(columns={'message': 'raw_message'})
raw_shape = raw_df.shape
raw_mem_mb = raw_df.memory_usage(deep=True).sum() / 1024 ** 2
print(f'Loaded {raw_shape[0]:,} rows – {raw_shape[1]} columns (~{raw_mem_mb:,.1f} MB)')
display(raw_df.head(3))


Loaded 517,401 rows – 2 columns (~1,403.6 MB)


Unnamed: 0,file,raw_message
0,allen-p/_sent_mail/1.,"Message-ID: <18782981.1075855378110.JavaMail.evans@thyme>\nDate: Mon, 14 May 2001 16:39:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: tim.belden@enron...."
1,allen-p/_sent_mail/10.,"Message-ID: <15464986.1075855378456.JavaMail.evans@thyme>\nDate: Fri, 4 May 2001 13:51:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: john.lavorato@enro..."
2,allen-p/_sent_mail/100.,"Message-ID: <24216240.1075855687451.JavaMail.evans@thyme>\nDate: Wed, 18 Oct 2000 03:00:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: leah.arsdall@enro..."


In [4]:
raw_df.sample(1)['raw_message'].values[0]

'Message-ID: <2332992.1075859168847.JavaMail.evans@thyme>\nDate: Mon, 10 Dec 2001 08:30:18 -0800 (PST)\nFrom: ecn38c2.conf.@enron.com\nTo: karen.gruesen@enron.com, jim.coffey@enron.com, lou.stoler@enron.com, \n\tstuart.zisman@enron.com, brian.redmond@enron.com, \n\tgerald.nemec@enron.com\nSubject: Gerald Nemec would like to meet re Bridgeline\nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Conf. Room ECN38C2 </O=ENRON/OU=NA/CN=RECIPIENTS/CN=MBX_CRECN38C2>\nX-To: Gruesen, Karen </O=ENRON/OU=NA/CN=RECIPIENTS/CN=Kgruese>, Coffey Jr., Jim </O=ENRON/OU=NA/CN=RECIPIENTS/CN=Jcoffey>, Stoler, Lou </O=ENRON/OU=NA/CN=RECIPIENTS/CN=Lstolle>, Zisman, Stuart </O=ENRON/OU=NA/CN=RECIPIENTS/CN=Szisman>, Redmond, Brian </O=ENRON/OU=NA/CN=RECIPIENTS/CN=Bredmon>, Nemec, Gerald </O=ENRON/OU=NA/CN=RECIPIENTS/CN=Gnemec>\nX-cc: \nX-bcc: \nX-Folder: \\Gerald_Nemec_Jan2002\\Nemec, Gerald\\Inbox\nX-Origin: Nemec-G\nX-FileName: gnemec (Non-Privileged).pst\

In [5]:
# Render a sampled raw message as a layperson-friendly preview
sample_row = raw_df.sample(1).iloc[0]
raw_message = sample_row['raw_message']

header_lines: list[str] = []
body_lines: list[str] = []
found_break = False
for line in raw_message.splitlines():
    if not found_break and line.strip() == '':
        found_break = True
        continue
    if found_break:
        body_lines.append(line.rstrip())
    else:
        header_lines.append(line.rstrip())

header_preview = '\n'.join(header_lines)
body_preview = '\n'.join(body_lines[:40]).strip()
if len(body_lines) > 40:
    body_preview += '\n...'
body_preview = body_preview or '[empty body]'

email_title = sample_row['file']

markdown = (
    f"### Sample email: {email_title}\n\n"
    "**Header preview**\n\n"
    "```\n"
    f"{header_preview}\n"
    "```\n\n"
    "**Body preview**\n\n"
    "```\n"
    f"{body_preview}\n"
    "```"
)

display(Markdown(markdown))


### Sample email: skilling-j/deleted_items/343.

**Header preview**

```
Message-ID: <19499770.1075852655147.JavaMail.evans@thyme>
Date: Sun, 17 Jun 2001 14:55:02 -0700 (PDT)
From: 40enron@enron.com
Subject: Credit Union Offers Flood Relief Loans
Mime-Version: 1.0
Content-Type: text/plain; charset=ANSI_X3.4-1968
Content-Transfer-Encoding: 7bit
X-From: Enron Federal Credit Union@ENRON <IMCEANOTES-Enron+20Federal+20Credit+20Union+40ENRON@ENRON.com>
X-To: All Enron Downtown@ENRON
X-cc:
X-bcc:
X-Folder: \JSKILLIN (Non-Privileged)\Deleted Items
X-Origin: Skilling-J
X-FileName: JSKILLIN (Non-Privileged).pst
```

**Body preview**

```
Credit Union Offers Flood Relief Loans
Our thoughts and support go out to all Enron employees who have been affected by our recent flooding.  Enron Federal Credit Union would like to assist those employees who may be in need of financial assistance with a Signature Loan or Auto Loan.

Two types of special Flood Relief Loans* are available including:

?	Signature Loans* for a 6 month term, single pay plan, at a 5.00% Fixed APR, and $10,000 maximum.

?	Auto Loans** are available at 6.49% Fixed APR on all conventional loan terms, with a 90-Day No Payment Option.

Apply on-line, 24x7 at enronfcu.com, then select Anytime Loans and the loan type.  Please type Flood Relief in the Purpose of Loan Field.  Or, call Anytime Loans at 800.235.8763 and reference our Flood Relief Loan.  For more information, please e-mail efcu@enron.com or call our Lending Department at x30578.

*Must be a Credit Union member to apply. Unpaid loans must pay interest due after 6 months, and renew on monthly payments at the regular lobby rate at time of renewal. **Term will be extended and finance charges will continue to accrue during the 90-day period. Subject to normal underwriting requirements. Auto Balloon Notes and current loans are excluded from this offer. No other discounts apply. EFCU is an Equal Housing Lender & NCUA Insured. Offers end July 15, 2001.


Water Damaged Vehicle Inspection Service Available
Due to the recent flooding in our area, many of you may now be in the market to buy a new or used vehicle. To help ensure you do not unknowingly purchase a new or used vehicle that has been flood damaged, please call our service partner, Auto Exam. Auto Exam will send a  technician to your location to perform a bumper-to-bumper inspection on the vehicle, and provide you with a detailed report on the condition of the vehicle. Complete inspections are available for $109.00. Call 713.944.2944 for an appointment today, or visit our web site at enronfcu.com and select Auto Center for more information.
```

In [6]:
# render properly meesage with pprint


## Extract structured headers

We split the RFC-822 header block from the body, handle folded header lines, and derive a
normalized subject for thread grouping.

In [10]:
os.cpu_count()

10

In [None]:

HEADER_BREAK = re.compile(r'?
?
')
SUBJECT_PREFIX_RE = re.compile(r'^(re|fw|fwd):\s*', flags=re.IGNORECASE)
MESSAGE_ID_RE = re.compile(r'<([^>]+)>')
WHITESPACE_RE = re.compile(r'\s+')


def clean_message_id(value: str | None) -> str | None:
    if not value:
        return None
    value = value.strip()
    if value.startswith('<') and value.endswith('>'):
        value = value[1:-1]
    value = value.strip()
    if not value:
        return None
    return value.lower()


def extract_message_ids(raw: str | None) -> list[str]:
    if not raw:
        return []
    matches = MESSAGE_ID_RE.findall(raw)
    if not matches:
        tokens = re.split(r'[\s,]+', raw)
        matches = [token for token in tokens if token]
    cleaned: list[str] = []
    for candidate in matches:
        clean = clean_message_id(candidate)
        if clean:
            cleaned.append(clean)
    return cleaned


def split_headers_body(raw: str) -> tuple[str, str]:
    if not isinstance(raw, str):
        return '', ''
    parts = HEADER_BREAK.split(raw, maxsplit=1)
    header_block = parts[0] if parts else ''
    body = parts[1] if len(parts) > 1 else ''
    return header_block, body


def parse_header_block(block: str) -> dict[str, str]:
    headers: dict[str, str] = {}
    current_key: str | None = None
    for line in block.splitlines():
        if not line:
            current_key = None
            continue
        if line.startswith((' ', '	')) and current_key:
            headers[current_key] += f' {line.strip()}'
            continue
        if ':' not in line:
            current_key = None
            continue
        key, value = line.split(':', 1)
        current_key = key.strip().lower()
        headers[current_key] = value.strip()
    return headers


def normalize_subject(subject: str) -> str:
    if not subject:
        return ''
    cleaned = subject
    for _ in range(5):
        updated = SUBJECT_PREFIX_RE.sub('', cleaned)
        if updated == cleaned:
            break
        cleaned = updated
    cleaned = WHITESPACE_RE.sub(' ', cleaned)
    return cleaned.strip().lower()


def parse_email_fields(raw: str) -> dict[str, object]:
    header_block, body = split_headers_body(raw)
    headers = parse_header_block(header_block)

    subject = headers.get('subject', '').strip()
    normalized_subject = normalize_subject(subject)

    message_id = clean_message_id(headers.get('message-id'))
    in_reply_to = clean_message_id(headers.get('in-reply-to'))
    references = extract_message_ids(headers.get('references'))

    date_raw = headers.get('date', '').strip() or None
    from_raw = headers.get('from', '').strip() or None
    from_email = parseaddr(from_raw)[1].lower() if from_raw else None

    to_raw = headers.get('to', '').strip() or None
    to_emails = [addr.lower() for _, addr in getaddresses([to_raw])] if to_raw else []
    cc_raw = headers.get('cc', '').strip() or None
    cc_emails = [addr.lower() for _, addr in getaddresses([cc_raw])] if cc_raw else []

    participants = set()
    if from_email:
        participants.add(from_email)
    participants.update(to_emails)
    participants.update(cc_emails)
    participants_key = ';'.join(sorted(participants)) if participants else None

    keyword_text = f"{subject}
{body}" if subject or body else ''
    action_hit = bool(ACTION_PATTERN.search(keyword_text))

    return {
        'subject': subject or None,
        'normalized_subject': normalized_subject or None,
        'from_raw': from_raw,
        'from_email': from_email,
        'from_domain': from_email.split('@')[-1] if from_email and '@' in from_email else None,
        'to_raw': to_raw,
        'to_emails': ';'.join(to_emails) or None,
        'cc_raw': cc_raw,
        'cc_emails': ';'.join(cc_emails) or None,
        'date_raw': date_raw,
        'body': body.strip(),
        'body_char_len': len(body),
        'action_hit': action_hit,
        'message_id': message_id,
        'in_reply_to': in_reply_to,
        'references': references or None,
        'participants_key': participants_key,
    }


def assign_thread_ids(df: pd.DataFrame) -> pd.Series:
    id_to_parent: dict[str, str | None] = {}
    index_to_mid: dict[int, str | None] = {}
    for row in df[['message_id', 'in_reply_to']].itertuples():
        mid = row.message_id
        parent = row.in_reply_to
        if mid:
            id_to_parent[mid] = parent if parent else None
        index_to_mid[row.Index] = mid

    root_cache: dict[str, str] = {}

    def find_root(mid: str) -> str:
        trail: list[str] = []
        current = mid
        while current:
            if current in root_cache:
                root = root_cache[current]
                break
            trail.append(current)
            parent = id_to_parent.get(current)
            if parent and parent in id_to_parent:
                current = parent
                continue
            root = current
            break
        else:
            root = mid
        for node in trail:
            root_cache[node] = root
        return root

    thread_ids: list[str] = []
    for row in df.itertuples():
        mid = index_to_mid[row.Index]
        thread_id = None
        if mid:
            root_id = find_root(mid)
            thread_id = f'mid::{root_id}'
        if not thread_id:
            sent_at_value = row.sent_at
            week_bucket = 'unknown-week'
            if pd.notna(sent_at_value):
                sent_ts = pd.Timestamp(sent_at_value)
                week_bucket = sent_ts.to_period('W').start_time.strftime('%Y-%m-%d')
            parts = [part for part in ((row.normalized_subject or '').strip(), (row.participants_key or '').strip(), week_bucket) if part]
            fallback = 'heuristic::' + '|'.join(parts)
            if fallback == 'heuristic::':
                fallback = f'row::{row.Index}'
            thread_id = fallback
        thread_ids.append(thread_id)
    return pd.Series(thread_ids, index=df.index, name='thread_id')


def parse_email_fields_parallel(messages: pd.Series, *, max_workers: int | None = None, chunksize: int = 2000) -> pd.DataFrame:
    total = len(messages)
    if total == 0:
        return pd.DataFrame(index=messages.index)
    cpu_count = os.cpu_count() or 1
    if max_workers is None:
        max_workers = max(1, min(cpu_count, 8)) if cpu_count else 1
    message_list = messages.tolist()
    try:
        ctx = mp.get_context('fork')
    except (AttributeError, ValueError):
        ctx = mp.get_context()
    try:
        with ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx) as executor:
            parsed_iter = executor.map(parse_email_fields, message_list, chunksize=max(1, chunksize))
            parsed_records = list(parsed_iter)
    except BrokenProcessPool:
        parsed_records = [parse_email_fields(raw) for raw in message_list]
    return pd.DataFrame(parsed_records, index=messages.index)


In [13]:
parsed_records = parse_email_fields_parallel(raw_df['raw_message'])
parsed_df = pd.concat([raw_df.drop(columns=['raw_message']), parsed_records], axis=1)
parsed_df['sent_at'] = pd.to_datetime(parsed_df['date_raw'], errors='coerce', utc=True).dt.tz_localize(None)
parsed_df = parsed_df.dropna(subset=['sent_at', 'normalized_subject'])
parsed_df = parsed_df[parsed_df['normalized_subject'].str.len() > 0]
parsed_df = parsed_df.drop_duplicates(subset=['file', 'sent_at', 'from_email', 'subject', 'body'])
parsed_df['thread_id'] = assign_thread_ids(parsed_df)

display(parsed_df[['file', 'sent_at', 'from_email', 'subject']].head(5))
print(f'Parsed rows: {len(parsed_df):,} (unique threads: {parsed_df["normalized_subject"].nunique():,})')


  parsed_df['sent_at'] = pd.to_datetime(parsed_df['date_raw'], errors='coerce', utc=True).dt.tz_localize(None)


Unnamed: 0,file,sent_at,from_email,subject
2,allen-p/_sent_mail/100.,2000-10-18 10:00:00,phillip.allen@enron.com,Re: test
4,allen-p/_sent_mail/1001.,2000-08-31 12:07:00,phillip.allen@enron.com,Re: Hello
5,allen-p/_sent_mail/1002.,2000-08-31 11:17:00,phillip.allen@enron.com,Re: Hello
7,allen-p/_sent_mail/1004.,2000-07-14 13:59:00,phillip.allen@enron.com,Re: PRC review - phone calls
8,allen-p/_sent_mail/101.,2000-10-17 09:26:00,phillip.allen@enron.com,Re: High Speed Internet Access


Parsed rows: 484,164 (unique threads: 126,481)


In [14]:
parsed_df.iloc[0]

file                                allen-p/_sent_mail/100.
subject                                            Re: test
normalized_subject                                     test
from_raw                            phillip.allen@enron.com
from_email                          phillip.allen@enron.com
from_domain                                       enron.com
to_raw                               leah.arsdall@enron.com
date_raw              Wed, 18 Oct 2000 03:00:00 -0700 (PDT)
body                         test successful.  way to go!!!
body_char_len                                            30
action_hit                                            False
sent_at                                 2000-10-18 10:00:00
Name: 2, dtype: object

## Apply message-level filters

We focus on a high-signal quarter and emails that contain action cues or concrete
commitments in the subject/body.

In [16]:
def snapshot(df: pd.DataFrame, stage: str) -> dict[str, object]:
    return {
        'stage': stage,
        'emails': len(df),
        'threads': df['thread_id'].nunique(),
    }

filtered_df = parsed_df.copy()
progress = [snapshot(filtered_df, 'parsed')]

if TIME_WINDOW:
    start, end = TIME_WINDOW
    time_mask = filtered_df['sent_at'].between(start, end)
    filtered_df = filtered_df[time_mask]
    progress.append(snapshot(filtered_df, f'time window {start.date()}→{end.date()}'))

filtered_df = filtered_df[filtered_df['body_char_len'] > 0]
progress.append(snapshot(filtered_df, 'non-empty body'))

filtered_df = filtered_df[filtered_df['action_hit']]
progress.append(snapshot(filtered_df, 'keyword match'))

progress_df = pd.DataFrame(progress)
display(progress_df)


Unnamed: 0,stage,emails,threads
0,parsed,484164,126481
1,time window 2001-03-01→2001-06-30,113187,27600
2,non-empty body,113187,27600
3,keyword match,40200,9991


## Evaluate thread structure

Group by the metadata-derived `thread_id` (Message-ID chains with a subject/participant fallback), then keep those with 2–8 messages and at least two distinct senders.

In [None]:
thread_stats = (
    filtered_df.groupby('thread_id')
    .agg(
        subject=('subject', 'first'),
        normalized_subject=('normalized_subject', 'first'),
        message_count=('thread_id', 'size'),
        first_sent=('sent_at', 'min'),
        last_sent=('sent_at', 'max'),
        keyword_hits=('action_hit', 'sum'),
        unique_senders=('from_email', lambda s: s.dropna().nunique()),
    )
    .reset_index()
)

valid_threads = thread_stats[
    (thread_stats['message_count'].between(*THREAD_LEN_RANGE))
    & (thread_stats['keyword_hits'] > 0)
    & (thread_stats['unique_senders'] >= 2)
]

candidate_df = filtered_df[filtered_df['thread_id'].isin(valid_threads['thread_id'])]
progress.append(snapshot(candidate_df, 'thread filters applied'))
progress_df = pd.DataFrame(progress)

preview_cols = ['thread_id', 'subject', 'message_count', 'unique_senders', 'first_sent', 'last_sent']
display(valid_threads[preview_cols].sort_values('message_count', ascending=False).head(10))
display(progress_df)
print(f'Candidate threads: {len(valid_threads):,}')


NameError: name 'filtered_df' is not defined

In [None]:
if not valid_threads.empty:
    counts = valid_threads['message_count'].value_counts().sort_index()
    ax = counts.plot(kind='bar', color='#1f77b4', figsize=(6, 3))
    ax.set_title('Thread length distribution (filtered)')
    ax.set_xlabel('Messages per thread')
    ax.set_ylabel('Thread count')
    plt.tight_layout()
    plt.show()
else:
    print('No threads matched the current filter settings.')


NameError: name 'valid_threads' is not defined

## Preview sample threads

Spot-checking a few threads helps ensure the filters captured decisions and commitments
that will make the manual labeling exercise engaging.

In [None]:
def render_thread(thread_id: str, df: pd.DataFrame, body_lines: int = 10) -> None:
    thread = df[df['thread_id'] == thread_id].sort_values('sent_at')
    if thread.empty:
        return
    subject = thread['subject'].iloc[0] or '(no subject)'
    normalized_subject = thread['normalized_subject'].iloc[0] or '(no normalized subject)'
    md_lines: list[str] = [
        f"### {subject}",
        f"- Thread ID: `{thread_id}`",
        f"- Normalized subject: `{normalized_subject}`",
        f"- Messages: {len(thread)}",
        f"- Date span: {thread['sent_at'].min():%Y-%m-%d} → {thread['sent_at'].max():%Y-%m-%d}",
    ]
    for row in thread.itertuples():
        preview = '
'.join(row.body.splitlines()[:body_lines]).strip()
        if not preview:
            preview = '*no body text*'
        elif len(row.body.splitlines()) > body_lines:
            preview += '
...'
        sender = row.from_raw or row.from_email or 'unknown sender'
        md_lines.append(f"**{sender}** — {row.sent_at:%Y-%m-%d %H:%M}

{preview}")
    display(Markdown('

'.join(md_lines)))

if not valid_threads.empty:
    sample_threads = (
        valid_threads
        .sort_values(['message_count', 'keyword_hits'], ascending=[False, False])
        .head(3)['thread_id']
    )
    for thread_id in sample_threads:
        render_thread(thread_id, candidate_df)
else:
    print('No threads available to preview.')


SyntaxError: unterminated string literal (detected at line 13) (3840932027.py, line 13)

## Next steps

- Export `candidate_df` once you are happy with the filters (e.g., `candidate_df.to_json('data/threads_raw.jsonl', orient='records', lines=True)`).
- Conduct manual review to curate ~40 golden threads, balancing your failure taxonomy.
- Move parsing helpers into `workshop_utils.py` when finalizing the data pipeline.