<a href="https://colab.research.google.com/github/rashmishreev/gemini-ai-usecases/blob/main/Email_Parser_%26_Cleaner_(Regex).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **A Python utility to clean and parse raw email data for NLP and GenAI applications using Regex.**

Import libraries

# Setup

Connect drive and load dataset

In [None]:
import re
import pandas as pd
from google.colab import drive
from google.colab import files

Mount Google Drive to access folders and files in drive

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Uncomment the below cell to check the folders and files in your drive

In [1]:
# import os

# print("Files and folders in the root of your Google Drive:")
# for item in os.listdir('/content/drive/MyDrive/Colab Notebooks/):
#     print(item)

In [None]:
file_path = '/content/drive/MyDrive/Colab Notebooks/Data/applied_jobs_email.csv'  # File path of the dataset from google drive

try:
    df = pd.read_csv(file_path)
    #print(df.head(1)) # print the first 5 rows
    print(f"Shape of the DataFrame: {df.shape}") # print dataframe shape
except FileNotFoundError:
    print(f"Error: File not found at {file_path}")
except Exception as e:
    print(f"An error occurred: {e}")


Shape of the DataFrame: (358, 7)


# Pre-processing

Adding headers to the dataframe to define the column labels as the original dataset does not have column labels

In [None]:
# Add column headers
new_headers = ['subject_incomplete', 'sender', 'receiver', 'date', 'col5', 'col6', 'email_body'] #adjust the number of columns
df.columns = new_headers

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 358 entries, 0 to 357
Data columns (total 7 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   subject_incomplete  358 non-null    object
 1   sender              358 non-null    object
 2   receiver            358 non-null    object
 3   date                358 non-null    object
 4   col5                358 non-null    object
 5   col6                358 non-null    int64 
 6   email_body          358 non-null    object
dtypes: int64(1), object(6)
memory usage: 19.7+ KB


In [None]:
# Drop col5 and col6; their original meaning in the raw data was unclear and they are not required
cols_to_drop = ['col5', 'col6']
df = df.drop(columns=cols_to_drop, errors='ignore') # errors='ignore' prevents error if column does not exist.

## Derived Columns

Extract email subject and date to separate columns

In [None]:
import re
import pandas as pd

def extract_subject(text):
    match = re.search(r"Subject:\s*(.*?)(?:\n|From:)", text, re.DOTALL)
    return match.group(1).strip() if match else None

def extract_email_datetime(text):
    match = re.search(r"Date:\s*(.*?)(?:\n|To:)", text, re.DOTALL)
    return match.group(1).strip() if match else None


In [None]:
# Extract subject and datetime from body
df['extracted_subject'] = df['email_body'].apply(extract_subject)
df['extracted_datetime'] = df['email_body'].apply(extract_email_datetime)

Convert to datetime type

In [None]:
df['parsed_datetime'] = pd.to_datetime(
    df['extracted_datetime'],
    format='%m/%d/%y, %I:%M %p',
    errors='coerce'
)

Extract only date to separate column

In [None]:
df['date_only'] = pd.to_datetime(df['parsed_datetime'].dt.date)

Derive week, month, year, and days_since (days since original email) columns from date

In [None]:
df['week'] = df['date_only'].dt.isocalendar().week

In [None]:
df['month'] = df['date_only'].dt.month
df['year'] = df['date_only'].dt.year

In [None]:
df['days_since'] = (pd.Timestamp.today() - df['date_only']).dt.days

## Pre-processing Function

In [None]:
import re

def remove_headers(text):
    """Removes standard email headers like Subject, From, To, and Date."""
    header_fields = ["Subject", "From", "To", "Date"]
    for field in header_fields:
        text = re.sub(rf"(?i)({field}:.*?\n)(.*?\n)?", "", text)
    return text

def remove_email_addresses(text):
    """Removes all email addresses from the text."""
    return re.sub(r"\b[\w.-]+@[\w.-]+\.\w+\b", "", text)

def remove_noisy_phrases(text):
    """Removes banners, legal footers, and social media links using pattern matching."""
    noisy_phrases = [
        r"(?i)logo.*",
        r"(?i)follow us.*",
        r"(?i)unsubscribe.*",
        r"(?i)privacy policy.*",
        r"(?i)cookie policy.*",
        r"(?i)this mailbox is not monitored.*",
        r"(?i)please do not reply to this email.*",
        r"(?i)help & support.*",
        r"(?i)explore our careers.*",
        r"(?i)linkedin|facebook|glassdoor|youtube|instagram",
        r"(?i)registered office.*",
        r"(?i)company number.*",
        r"(?i)©.*",
    ]
    for pattern in noisy_phrases:
        text = re.sub(pattern, "", text)
    return text

def truncate_after_signoff(text):
    """Truncates the email body after a sign-off phrase like 'Best regards'."""
    lines = text.splitlines()
    sign_off_keywords = [
        "sincerely", "regards", "best regards", "warm regards", "kind regards"
    ]
    cutoff_index = len(lines)
    for i, line in enumerate(lines):
        if any(kw in line.lower() for kw in sign_off_keywords):
            cutoff_index = min(i + 2, len(lines))  # Keep 1 line after sign-off (Company name)
            break
    return "\n".join(lines[:cutoff_index])

def remove_footer_keywords(text):
    """Removes any leftover content that contains common footer keywords."""
    footer_keywords = [
        "unsubscribe", "privacy policy", "cookie policy", "help & support",
        "follow us", "linkedin", "glassdoor", "youtube", "facebook", "instagram",
        "do not reply", "data privacy"
    ]
    footer_pattern = r"|".join(footer_keywords)
    return re.split(rf"(?i){footer_pattern}", text)[0]

def clean_whitespace(text):
    """Cleans up excessive whitespace and line breaks."""
    text = re.sub(r"\n{2,}", "\n", text)    # Collapse multiple newlines
    text = re.sub(r"[ \t]{2,}", " ", text)  # Collapse multiple spaces/tabs
    return text.strip()

def remove_zero_width_chars(text: str) -> str:
    """
    Removes zero-width characters such as:
    - Zero Width Space (\u200b)
    - Zero Width Non-Joiner (\u200c)
    - Zero Width Joiner (\u200d)
    - Zero Width No-Break Space / BOM (\uFEFF)
    """
    return re.sub(r'[\u200b\u200c\u200d\uFEFF]', '', text)


def clean_email_body(text):
    """
    Cleans an email body by removing headers, email addresses, banners/logos,
    footers, and anything after sign-offs. Keeps only the meaningful message.

    Args:
        text (str): Raw email content.

    Returns:
        str: Cleaned email body.
    """
    text = remove_headers(text)
    text = remove_email_addresses(text)
    text = remove_noisy_phrases(text)
    text = truncate_after_signoff(text)
    text = remove_footer_keywords(text)
    text = clean_whitespace(text)
    text = remove_zero_width_chars(text)
    return text

In [None]:
sample_email = df.loc[15, 'email_body']  # Change 15 to the index of any email you want to test
cleaned_sample = clean_email_body(sample_email)
print(cleaned_sample)

Hi Rashmi Shree,
Thank you for applying to Version 1. This email confirms that we have received your application for our Data Storytelling Analyst role. There are many great companies out there, so we appreciate your interest in joining our growing team. 
So, what happens next? Our Talent Acquisition team will assess your experience and skills to see if they are a match for the role that you applied for, as well as for others that may be a fit. Please know we are committed to getting back to you with an update as soon as we can, but due to the volume of applications, it can sometimes take us longer to get in touch. In the meantime please check out our Hiring FAQs about our recruitment process. 
Please 
Kind Regards,
Version 1 Talent Acquisition Team


In [None]:
# Apply cleaning function to the 'email_body' column
df['cleaned_body'] = df['email_body'].apply(clean_email_body)

# Preview the results
df[['email_body', 'cleaned_body']].head()

Unnamed: 0,email_body,cleaned_body
0,Subject:\r\nDecision Scientist at Tesco: we’ve...,"Hello Rashmi Shree, \n \nWe’ve received your a..."
1,"Subject:\r\nRashmi Shree, your application was...",Your application was sent to Healthify\n͏ ͏ ͏ ...
2,Subject:\r\nThank you for Applying to Amazon!\...,Thank you for Applying to Amazon!\nAmazon.jobs...
3,Subject:\r\nYou have successfully submitted yo...,IBM Careers\nIBM Careers IBM \n \nDear Rashmi ...
4,Subject:\r\nYour IBM Application: Next Steps\r...,IBM Careers\nIBM Careers IBM \n \nRef: 14499 -...


The generated output will contain exclusively the email body, processed to remove all other components.

In [None]:
df['cleaned_body'][8]

'Rashmi Shree,\nThank you for your interest in our Operations Associate role at Coursera!\nWe have received your application and our team will review it promptly. If your skills and experience align with the requirements of the position, we will be in touch with you soon.\nRegards,\nCoursera'

### **Checkpoint:** Save the df of cleaned email bodies to a CSV



In [None]:
# Save to CSV
df.to_csv("cleaned_emails.csv", index=False)

# Copy the cleaned df to a new df
clean_emails = df.copy()

# Uncomment to download the csv
# files.download('cleaned_emails.csv')



---



## Additional Pre-processing to remove PII

PII replacement with "Michael Gary Scott".

In [None]:
# Subset first 10 rows for testing PII replacement
clean_emails_subset = clean_emails.head(10)

In [None]:
clean_emails_subset.loc[:, 'cleaned_body'] = clean_emails_subset['cleaned_body'].str.replace(
    r'\bRashmi(?:\sShree)?(?:\sVeeraiah)?\b',
    'Micheal Gary Scott',
    regex=True
)

In [None]:
clean_emails_subset['cleaned_body'][0]

'Hello Micheal Gary Scott, \n \nWe’ve received your application for the Decision Scientist position at Tesco. Thank you for taking the time to apply and for considering joining us. \n \nSo, what happens next? \n \nFirst of all, our recruitment team will take a look at your application – if your skills and experience match the position, we’ll get in touch to arrange next steps. \n \nWe know how much effort it takes to put together an application, so we review each one carefully. As soon as we’ve got any news for you, we’ll get in touch via email.\n \nWe are proud to have an inclusive culture at Tesco where everyone truly feels able to be themselves. This is because we not only celebrate diversity, but recognise the value and opportunity it brings. Therefore, we are committed to creating a workplace where differences are valued, and make sure that all colleagues are given the same opportunities. We’re proud to have been accredited Disability Confident Leader and we’re committed to provid

Anonymize email function by redacting original names.

The function will be applied to the entirety of the dataset to generate the result as previously described.

In [None]:
import pandas as pd
import re

def anonymize_name_from_email_body(df, column='cleaned_body', replacement='Michael Gary Scott'):
    """
    Anonymizes specific name patterns (e.g., 'Rashmi', 'RashmiShree', 'Rashmi Veeraiah')
    from the given text column in a DataFrame. Handles cases where names are written with
    or without spaces.

    Args:
        df (pd.DataFrame): Input DataFrame with a column containing email body text.
        column (str): Column name that contains the text to anonymize.
        replacement (str): Name to replace all matches with.

    Returns:
        pd.DataFrame: Copy of the input DataFrame with an added 'anonymized_email_body' column.
    """

    # Match patterns like 'John', 'JohnLam', 'John Lam', 'JohnPrinceLam', 'John Prince Lam'.
    name_pattern = r'\bRashmi(?:\s?Shree)?(?:\s?Veeraiah)?\b'

    df = df.copy()
    df['anonymized_email_body'] = df[column].str.replace(name_pattern, replacement, regex=True)
    return df

In [None]:
clean_emails = anonymize_name_from_email_body(clean_emails, column='cleaned_body')