### Load emails from a milbox folder and dump the meta data and text body extracted to tabular format for processing.

In [1]:
import os
import re
import imaplib
import email
from datetime import datetime
from dotenv import load_dotenv, find_dotenv

from tqdm import tqdm
import pandas as pd

load_dotenv(find_dotenv())
EMAIL = os.environ.get("EMAIL")

# email passwords can be used inplace of this but app
# passwords provides more security as app passwords can be revoked easily
APP_PASSWORD = os.environ.get("APP_PASSWORD")

In [2]:
def login_to_gmail(email: str, app_password: str) -> imaplib.IMAP4_SSL:
    try:
        mail = imaplib.IMAP4_SSL("imap.gmail.com")
        mail.login(email, app_password)
        print("Login successful")
        return mail
    except imaplib.IMAP4.error:
        print("Login failed")
        return None


def list_mailboxes(mail: imaplib.IMAP4_SSL) -> None:
    print("List of Mailboxes:")
    status, mailbox_list = mail.list()
    if status == "OK":
        for mailbox in mailbox_list:
            mbox_str = [
                re.sub(r"[\\\(\)]", "", e.strip())
                for e in mailbox.decode().split('"/"')
            ]
            print(f"{mbox_str[1]}: {mbox_str[0]}")


def get_body_text(msg):
    body_texts = []
    # process multi-part message
    if msg.is_multipart():
        for part in msg.walk():
            if "attachment" not in str(part.get("Content-Disposition")):
                body = part.get_payload(decode=True)
                if body:
                    try:
                        body_texts.append(body.decode())
                    except UnicodeDecodeError:
                        continue
    else:
        # process message with single part
        try:
            body_texts.append(msg.get_payload(decode=True).decode())
        except UnicodeDecodeError:
            pass
    return "\n".join(body_texts)


def mailbox_df(email_id_list, mail):
    all_emails = []
    for entry in tqdm(email_id_list):

        status, msg_data = mail.fetch(entry, "(RFC822)")
        if status == "OK":
            msg = email.message_from_bytes(msg_data[0][1])
            all_emails.append(
                {
                    "subject": msg.get("Subject"),
                    "from": msg.get("From"),
                    "body": get_body_text(msg),
                    "date": msg.get("Date"),
                }
            )
    return pd.DataFrame(all_emails)

In [3]:
mail = login_to_gmail(EMAIL, APP_PASSWORD)

Login successful


In [6]:
list_mailboxes(mail)

List of Mailboxes:
b'(\\HasNoChildren) "/" "INBOX"'
b'(\\HasChildren \\Noselect) "/" "Job"'
b'(\\HasChildren) "/" "Job/Interviews"'
b'(\\HasNoChildren) "/" "Job/Interviews/Interviews"'
b'(\\HasNoChildren) "/" "Job/Interviews/tests"'
b'(\\HasNoChildren) "/" "Mercor-Remotasks"'
b'(\\HasNoChildren) "/" "Notes"'
b'(\\HasChildren) "/" "Receipts"'
b'(\\HasNoChildren) "/" "Receipts/ITR"'
b'(\\HasNoChildren) "/" "Receipts/SIP"'
b'(\\HasNoChildren) "/" "Receipts/travel"'
b'(\\HasChildren \\Noselect) "/" "[Gmail]"'
b'(\\All \\HasNoChildren) "/" "[Gmail]/All Mail"'
b'(\\HasNoChildren \\Trash) "/" "[Gmail]/Bin"'
b'(\\Drafts \\HasNoChildren) "/" "[Gmail]/Drafts"'
b'(\\HasNoChildren \\Important) "/" "[Gmail]/Important"'
b'(\\HasNoChildren \\Sent) "/" "[Gmail]/Sent Mail"'
b'(\\HasNoChildren \\Junk) "/" "[Gmail]/Spam"'
b'(\\Flagged \\HasNoChildren) "/" "[Gmail]/Starred"'
b'(\\HasChildren) "/" "documents"'
b'(\\HasNoChildren) "/" "documents/Project samples"'
b'(\\HasNoChildren) "/" "documents/codvo.ai"

In [11]:
# mailbox names don't support spaces in-between, need a workaround/feature for this

mailbox = "documents/spam_samples"
status, _ = mail.select(mailbox)
if status == "OK":
    print(f"{mailbox} Selected")
else:
    print(f"Error Selecting {mailbox} ")

documents/spam_samples Selected


In [12]:
status, email_ids = mail.search(None, "ALL")
email_id_list = email_ids[0].split()

In [13]:
email_df = mailbox_df(email_id_list, mail)
email_df["date"] = email_df["date"].apply(
    lambda v: datetime.strptime(
        re.sub(r"\([A-Z]+\)", "", v).strip(), "%a, %d %b %Y %H:%M:%S %z"
    )
)

100%|██████████| 34/34 [00:18<00:00,  1.84it/s]


In [14]:
# close and logout from email connection
mail.close()
mail.logout()

('BYE', [b'LOGOUT Requested'])