In [1]:
import glob
import pandas as pd
import email
from email import policy
from src.parser.MailTextParser import parse_mime_tree
from src.parser.MailSubjectParser import parse_mime_subject
from src.parser.MailSenderParser import parse_mime_sender
from typing import Dict, List, Optional, Tuple

## TODO

1. Mail Distribution YEAR, MONTH
2. SUBJECT ANALYSIS
3. SENDER ANALYSIS (cleaning first)
4. TEXT ANALYSIS (cleaning first)

## Helper stuff


In [2]:
MONTH_MAPPING = {
    "01": "JAN",
    "02": "FEB",
    "03": "MAR",
    "04": "APR",
    "05": "MAI",
    "06": "JUN",
    "07": "JUL",
    "08": "AUG",
    "09": "SEP",
    "10": "OCT",
    "11": "NOV",
    "12": "DEC"
}

In [3]:
def guess_encoding(file_path):
    encodings = ['utf-8', 'iso-8859-1', 'windows-1252']  # Add more as needed
    for enc in encodings:
        try:
            with open(file_path, encoding=enc) as file:
                file.read()
                return enc
        except UnicodeDecodeError:
            continue
    return None

## MAIN FUNCTIONS

In [19]:
def get_emails_from_folder(year_range: Tuple[int,int]) -> Dict[int, Dict[str, Dict[int, email.message.Message]]]:
    '''Get all emails from email folders stored under data/.

        Parameters
        ----------
            year_range: Tuple(int,int)
                The year range, where starting year is _inclusive_ and end year is _exclusive_.

        Returns
        -------
            (Dict) 
            
            {
                YEAR: {
                    MONTH: {
                        1: <email.message.Message>
                    }
                    ...
                }
            }
    '''
    emails = dict()
    counter = 0
    for year in range(year_range[0],year_range[1]):
        emails[year] = dict()
        for month in MONTH_MAPPING:
            emails[year][month] = dict()
            for idx, file in enumerate(glob.glob(f"data/{year}/yggdrasill_{year}-{month}/*")):
                # guess encoding first, since there are quite a few non-UTF-8 mails
                enc = guess_encoding(file)
                with open(file, "r", encoding=enc) as f:
                    emails[year][month][idx] = email.message_from_file(f, policy=policy.default) # the policy-default is very important for subject titles
                    counter += 1
    print(f"Received {counter} emails from {year_range[0]} to {year_range[1]-1}.") 
    return emails

In [5]:
emails = get_emails_from_folder((1997,2000))

Received 598 emails from 1997 to 1999.


In [None]:
#emails

In [20]:
def get_content_from_emails(email_dict: Dict[int, Dict[str, Dict[int, email.message.Message]]]) -> Dict:
    '''Extract CONTENT, SUBJECT, SENDER from emails retrieved via get_emails_from_folder().

        Parameters
        ----------
            email_dict: Dict[str, Dict[str, Dict[int, email.message.Message]]
                The dictionary with emails, retrieved using get_emails_from_folder().

        Returns
        -------
            (Dict) 
            
            {
                YEAR: {
                    MONTH: {
                        1: {
                            "subject": "<SUBJECT>",
                            "text": "<TEXT>",
                            "sender": "<SENDER>"
                        }
                        ...
                        "counter": <INT>
                    }
                    ...
                    "counter": <INT>
                }
                "counter": <INT>
            }
    '''
    emails = dict()
    counter = 0
    for year in email_dict:
        counter_year = 0
        emails[year] = dict()
        for month in email_dict[year]:
            counter_month = 0
            emails[year][month] = dict()
            for idx, mail in email_dict[year][month].items():
                text = parse_mime_tree(mail)
                sender = parse_mime_sender(mail)
                subject = parse_mime_subject(mail)
                emails[year][month][idx] = {
                    "text": text,
                    "subject": subject,
                    "sender": sender
                }
                # incrementing counters
                counter += 1
                counter_year += 1
                counter_month += 1
            emails[year][month]["counter"] = counter_month
        emails[year]["counter"] = counter_year
    emails["counter"] = counter
    return emails

In [14]:
content = get_content_from_emails(emails)

In [26]:
content[1999]["counter"]

127

In [33]:
# sanity check
sum_ = 0
for y in content:
    if type(y) == int:
        for m in content[y]:
            if m != "counter":
                sum_ += content[y][m]["counter"]

assert sum_ == content["counter"]

598