In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
cd drive/MyDrive/IE7374_Group14/colab/

/content/drive/MyDrive/IE7374_Group14/colab


In [3]:
import pandas as pd
import numpy as np

import os
import re
import random
from multiprocessing import Pool

import email
from email import policy
from email.policy import default

from email.parser import BytesParser
from email import message_from_string

import spacy


# Load Dataset

In [4]:
df = pd.read_csv('emails.csv')

In [5]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 517401 entries, 0 to 517400
Data columns (total 2 columns):
 #   Column   Non-Null Count   Dtype 
---  ------   --------------   ----- 
 0   file     517401 non-null  object
 1   message  517401 non-null  object
dtypes: object(2)
memory usage: 7.9+ MB


In [6]:
duplicates = df[df.duplicated(subset='message', keep=False)]
print(f"Number of duplicate messages: {duplicates.shape[0]}")

Number of duplicate messages: 0


# Split Message-ID

In [7]:
df_split1 = df.copy()

In [8]:
def split_metadata_and_body(email_text):

    pattern = r'(Message-ID:.*?X-FileName:.*?\n)'
    match = re.search(pattern, email_text, re.DOTALL | re.IGNORECASE)
    if match:
        metadata = match.group(1).strip()
        body = email_text[match.end():].strip()
    else:
        metadata, body = '', email_text.strip()
    return metadata, body


In [9]:
df_split1[['metadata_block', 'message_body']] = df_split1['message'].apply(
    lambda x: pd.Series(split_metadata_and_body(x))
)


# Clean Dataset

In [10]:
df_split1['message_length'] = df_split1['message_body'].apply(len)

In [11]:
# Calculate Q1 and Q3
Q1 = df_split1['message_length'].quantile(0.25)
Q3 = df_split1['message_length'].quantile(0.75)
IQR = Q3 - Q1

# Set boundaries (we can't have messages with len lower than 1)
lower_bound = max(0, Q1 - 1.5 * IQR)
upper_bound = Q3 + 1.5 * IQR

print(f"Lower Outlier Limit: {lower_bound}")
print(f"Upper Outlier Limit: {upper_bound}")


Lower Outlier Limit: 0
Upper Outlier Limit: 3952.0


In [12]:
df_clean2 = df_split1[df_split1['message_body'].str.len() < 3952].copy()

In [15]:
df_clean2.head(3)


Unnamed: 0,file,message,metadata_block,message_body,message_length
0,allen-p/_sent_mail/1.,Message-ID: <18782981.1075855378110.JavaMail.e...,Message-ID: <18782981.1075855378110.JavaMail.e...,Here is our forecast,20
1,allen-p/_sent_mail/10.,Message-ID: <15464986.1075855378456.JavaMail.e...,Message-ID: <15464986.1075855378456.JavaMail.e...,Traveling to have a business meeting takes the...,785
2,allen-p/_sent_mail/100.,Message-ID: <24216240.1075855687451.JavaMail.e...,Message-ID: <24216240.1075855687451.JavaMail.e...,test successful. way to go!!!,30


#Preprocess

In [24]:
df_processed3 = df_clean2.copy()

In [30]:
def anonymize_pii(text):
    if not text or not isinstance(text, str):
        return ""

    doc = nlp(text)
    anonymized_tokens = []

    for token in doc:
        if token.ent_type_ == "PERSON":
            anonymized_tokens.append("<ANON_NAME>")
        elif "@" in token.text and any(t.like_email for t in doc):
            anonymized_tokens.append("<ANON_EMAIL>")
        else:
            anonymized_tokens.append(token.text)

    anonymized_text = " ".join(anonymized_tokens)

    # Replace phone numbers
    anonymized_text = re.sub(
        r'(\b(\+?\d{1,2}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b)',
        '<ANON_PHONE>',
        anonymized_text
    )

    return anonymized_text


In [27]:

def get_text_payload(msg):

    if not msg.is_multipart():
        if msg.get_content_type() == 'text/plain':
            payload = msg.get_payload(decode=True)
            charset = msg.get_content_charset() or 'ascii'
            try:
                return payload.decode(charset, errors='replace')
            except (LookupError, TypeError):
                return payload.decode('ascii', errors='replace')
        else:
            return ""

    text_parts = []
    for part in msg.walk():
        payload = get_text_payload(part)
        if payload:
            text_parts.append(payload)

    return "\n".join(text_parts)

def final_scrub_text(text):

    # Remove forwarded message headers that might have been missed
    lines = text.split('\n')
    first_real_line_index = 0
    for i, line in enumerate(lines):
        if re.match(r'^\s*(from|sent|to|cc|subject|date|forwarded):', line, re.IGNORECASE):
            continue
        # Stop at the first line that is not a header-like line.
        if line.strip() not in ('', '>'):
            first_real_line_index = i
            break
    text = '\n'.join(lines[first_real_line_index:])

    text = anonymize_pii(text)

    # Remove other common email artifacts
    # URLs
    text = re.sub(r'https?://\S+|www\.\S+', '<ANON_URL>', text)
    # Quoted reply lines (e.g., "> blah blah blah")
    text = re.sub(r'^\s*>\s?.*$', '', text, flags=re.MULTILINE)
    # MIME encoding artifacts (e.g., "=20", "=0A")
    text = re.sub(r'=[0-9A-F]{2}', '', text)
    # Horizontal lines/separators
    text = re.sub(r'[-_*=]{3,}', '', text)

    # Final whitespace cleanup
    text = re.sub(r'\n\s*\n', '\n', text)
    return text.strip()


In [28]:

def format_and_split_thread(body_text):
    """
    Splits an email thread, tags replies and the original message.
    """
    blocks = re.split(r'-{5,}\s*Original Message\s*-{5,}', body_text, flags=re.IGNORECASE)

    # Filter out any empty blocks that might result from the split
    blocks = [block.strip() for block in blocks if block.strip()]

    if not blocks:
        return []

    tagged_messages = []
    num_blocks = len(blocks)
    for i, block in enumerate(blocks):
        # The first block is the newest message, the last is the oldest.
        if i == num_blocks - 1:
            tag = '<|original|>'
        else:
            tag = f'<|reply{num_blocks - 1 - i}|>'

        # Apply the final, aggressive scrub to each individual message block
        clean_text = final_scrub_text(block)

        # Only add the message if it's not empty after cleaning
        if clean_text:
            tagged_messages.append({'tag': tag, 'text': clean_text})

    return tagged_messages

def process_email_row(row):
    message_id = row['metadata_block'].split('\n')[0]  # Or however you extract it
    body_text = row['message_body']

    threaded_messages = format_and_split_thread(body_text)

    entries = []
    for part in threaded_messages:
        entries.append({
            'message_id': message_id,
            'tag': part['tag'],
            'clean_message': part['text']
        })

    return entries



In [31]:

if __name__ == '__main__':

    with Pool() as pool:
        results = pool.map(process_email_row, df_processed3.to_dict('records'))

    # Flatten list of lists
    cleaned = [item for sublist in results for item in sublist]
    df_processed3 = pd.DataFrame(cleaned)

    df_processed3.to_csv('enron_cleaned_v8.csv', index=False)
    print(df_processed3.head())


                                          message_id           tag  \
0  Message-ID: <18782981.1075855378110.JavaMail.e...  <|original|>   
1  Message-ID: <15464986.1075855378456.JavaMail.e...  <|original|>   
2  Message-ID: <24216240.1075855687451.JavaMail.e...  <|original|>   
3  Message-ID: <13505866.1075863688222.JavaMail.e...  <|original|>   
4  Message-ID: <30922949.1075863688243.JavaMail.e...  <|original|>   

                                       clean_message  
0                               Here is our forecast  
1  Traveling to have a business meeting takes the...  
2                test successful .   way to go ! ! !  
3  <ANON_NAME> , \n  Can you send me a schedule o...  
4                Let 's shoot for Tuesday at 11:45 .  


# Other Code

In [16]:
from multiprocessing import Pool, cpu_count
from functools import partial
import logging

In [17]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [18]:
try:
    nlp = spacy.load("en_core_web_sm", disable=["parser", "tagger", "lemmatizer"])
    # Only keep NER pipeline component for better performance
    nlp.max_length = 2000000  # Increase max length if needed
except OSError:
    logger.error("spaCy model not found. Install with: python -m spacy download en_core_web_sm")
    nlp = None

def get_text_payload(msg):
    """
    Walks through a multipart email message and extracts the first text/plain payload.
    This is the most reliable way to get the email body while ignoring attachments.
    """
    if not msg.is_multipart():
        if msg.get_content_type() == 'text/plain':
            payload = msg.get_payload(decode=True)
            charset = msg.get_content_charset() or 'ascii'
            try:
                return payload.decode(charset, errors='replace')
            except (LookupError, TypeError):
                return payload.decode('ascii', errors='replace')
        else:
            return ""

    text_parts = []
    for part in msg.walk():
        payload = get_text_payload(part)
        if payload:
            text_parts.append(payload)

    return "\n".join(text_parts)


In [19]:
def final_scrub_text(text):
    """
    Performs a final, aggressive cleaning of the text to remove common email artifacts,
    PII, and other noise. This is the key function to ensure high-quality output.
    """
    # 1. Remove forwarded message headers that might have been missed
    lines = text.split('\n')
    first_real_line_index = 0
    for i, line in enumerate(lines):
        if re.match(r'^\s*(from|sent|to|cc|subject|date|forwarded):', line, re.IGNORECASE):
            continue
        if line.strip() not in ('', '>'):
            first_real_line_index = i
            break
    text = '\n'.join(lines[first_real_line_index:])

    # 2. Remove legal disclaimers and confidentiality notices
    disclaimer_patterns = [
        r'\*+\s*original message\s*\*+',
        r'this e-mail is the property of enron corp\..*',
        r'the information contained in this communication is intended only for the use of the designated recipients.*',
        r'internet communications are not secure and therefore.*'
    ]
    for pattern in disclaimer_patterns:
        text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)

    # 3. Fast regex-based PII anonymization (first pass)
    # Email addresses
    text = re.sub(r'\b[\w\.\-+=_%]+@[\w\.-]+\.\w{2,}\b', '[EMAIL]', text)
    # Phone/Fax numbers (various formats)
    text = re.sub(r'(\b(\+?\d{1,2}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b)', '[PHONE]', text)

    # 4. Remove other common email artifacts
    # URLs
    text = re.sub(r'https?://\S+|www\.\S+', '[URL]', text)
    # Quoted reply lines (e.g., "> blah blah blah")
    text = re.sub(r'^\s*>\s?.*$', '', text, flags=re.MULTILINE)
    # MIME encoding artifacts (e.g., "=20", "=0A")
    text = re.sub(r'=[0-9A-F]{2}', '', text)
    # Horizontal lines/separators
    text = re.sub(r'[-_*=]{3,}', '', text)

    # 5. Final whitespace cleanup
    text = re.sub(r'\n\s*\n', '\n', text)
    return text.strip()

In [20]:
def fast_regex_name_anonymization(text):
    """
    Fast regex-based name anonymization as alternative to spaCy.
    This catches most common name patterns but may have false positives.
    """
    # Common name patterns
    patterns = [
        # "LastName, FirstName" format
        (r'\b[A-Z][a-z]{2,15},\s+[A-Z][a-z]{2,15}\b', '[NAME]'),
        # "FirstName LastName" format (Title Case)
        (r'\b[A-Z][a-z]{2,15}\s+[A-Z][a-z]{2,15}\b', '[NAME]'),
        # "FirstName MiddleInitial LastName" format
        (r'\b[A-Z][a-z]{2,15}\s+[A-Z]\.\s+[A-Z][a-z]{2,15}\b', '[NAME]'),
        # Common salutations
        (r'\b(Mr|Mrs|Ms|Dr|Prof)\.\s+[A-Z][a-z]+\b', '[NAME]'),
    ]

    for pattern, replacement in patterns:
        text = re.sub(pattern, replacement, text)

    return text

def optimized_spacy_anonymization(text, max_length=100000):
    """
    Optimized spaCy-based name anonymization with length limits and batching.
    """
    if not nlp:
        logger.warning("spaCy not available, falling back to regex")
        return fast_regex_name_anonymization(text)

    # Truncate very long texts to avoid memory issues
    if len(text) > max_length:
        text = text[:max_length] + "... [TRUNCATED]"

    try:
        # Process with spaCy
        doc = nlp(text)

        # Use character-based replacement for better performance
        replacements = []
        for ent in doc.ents:
            if ent.label_ in ["PERSON", "ORG"]:  # Include organizations too
                replacements.append((ent.start_char, ent.end_char, "[NAME]"))

        # Apply replacements in reverse order to maintain indices
        for start, end, replacement in sorted(replacements, reverse=True):
            text = text[:start] + replacement + text[end:]

        return text

    except Exception as e:
        logger.warning(f"spaCy processing failed: {e}, falling back to regex")
        return fast_regex_name_anonymization(text)

def process_single_message(message_body, anonymization_method="regex"):
    """
    Process a single message body with cleaning and anonymization.
    """
    try:
        # Clean the text first
        cleaned_text = final_scrub_text(message_body)

        if not cleaned_text.strip():
            return ""

        # Apply anonymization
        if anonymization_method == "spacy":
            anonymized_text = optimized_spacy_anonymization(cleaned_text)
        else:
            anonymized_text = fast_regex_name_anonymization(cleaned_text)

        return anonymized_text

    except Exception as e:
        logger.error(f"Error processing message: {e}")
        return ""

def process_batch_multiprocess(batch_data):
    """
    Process a batch of messages using multiprocessing.
    """
    message_bodies, anonymization_method = batch_data
    results = []

    for message_body in message_bodies:
        result = process_single_message(message_body, anonymization_method)
        results.append(result)

    return results

def process_dataframe_optimized(df, anonymization_method="regex", batch_size=1000, n_processes=None):
    """
    Process the entire dataframe with optimizations.

    Parameters:
    - df: DataFrame with 'message_body' column
    - anonymization_method: "regex" (fast) or "spacy" (more accurate)
    - batch_size: Number of messages to process in each batch
    - n_processes: Number of processes to use (None for auto-detect)
    """
    if n_processes is None:
        n_processes = min(cpu_count(), 8)  # Cap at 8 to avoid memory issues

    logger.info(f"Processing {len(df)} messages using {n_processes} processes")
    logger.info(f"Anonymization method: {anonymization_method}")

    # Split data into batches
    message_bodies = df['message_body'].tolist()
    batches = []

    for i in range(0, len(message_bodies), batch_size):
        batch_messages = message_bodies[i:i + batch_size]
        batches.append((batch_messages, anonymization_method))

    # Process batches in parallel
    if n_processes > 1:
        with Pool(processes=n_processes) as pool:
            batch_results = pool.map(process_batch_multiprocess, batches)
    else:
        # Single process for debugging
        batch_results = [process_batch_multiprocess(batch) for batch in batches]

    # Flatten results
    all_results = []
    for batch_result in batch_results:
        all_results.extend(batch_result)

    # Create new dataframe
    df_processed = df.copy()
    df_processed['anonymized_message_body'] = all_results

    return df_processed

# Example usage and testing functions
def test_anonymization_methods(sample_text):
    """
    Test different anonymization methods on sample text.
    """
    print("Original text:")
    print(sample_text)
    print("\n" + "="*50 + "\n")

    print("Regex anonymization:")
    regex_result = fast_regex_name_anonymization(sample_text)
    print(regex_result)
    print("\n" + "="*50 + "\n")

    if nlp:
        print("spaCy anonymization:")
        spacy_result = optimized_spacy_anonymization(sample_text)
        print(spacy_result)

def process_large_dataframe_in_chunks(df_path, chunk_size=10000, output_path=None):
    """
    Process a very large CSV file in chunks to manage memory usage.
    """
    if output_path is None:
        output_path = df_path.replace('.csv', '_anonymized.csv')

    # Process first chunk to get column structure
    first_chunk = True

    for chunk_df in pd.read_csv(df_path, chunksize=chunk_size):
        logger.info(f"Processing chunk of size {len(chunk_df)}")

        # Process the chunk
        processed_chunk = process_dataframe_optimized(
            chunk_df,
            anonymization_method="regex",  # Use regex for speed on large datasets
            batch_size=500,
            n_processes=4
        )

        # Save to file
        if first_chunk:
            processed_chunk.to_csv(output_path, index=False)
            first_chunk = False
        else:
            processed_chunk.to_csv(output_path, mode='a', header=False, index=False)

        logger.info(f"Chunk processed and saved")

In [23]:
if __name__ == "__main__":

    test_anonymization_methods(df_processed3['message_body'].iloc[12])

Original text:
Dave, 

 Here are the names of the west desk members by category.  The origination 
side is very sparse.  





Phillip


Regex anonymization:
[NAME] are the names of the west desk members by category.  The origination 
side is very sparse.  





Phillip


spaCy anonymization:
[NAME], 

 Here are the names of the west desk members by category.  The origination 
side is very sparse.  





[NAME]


In [25]:
df_processed = process_dataframe_optimized(
    df_processed3,
    anonymization_method="spacy",
    batch_size=500,
    n_processes=4
)


Process ForkPoolWorker-1:
Process ForkPoolWorker-3:
Process ForkPoolWorker-2:
Traceback (most recent call last):
Traceback (most recent call last):
Process ForkPoolWorker-4:
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    ^^^^^^^^^^^^^^^^^^^
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiproces

KeyboardInterrupt: 