In [2]:
import os, sys, email
import numpy as np 
import pandas as pd

from datetime import datetime
import gc
import time

from my_functions import get_user_name

In [3]:
from multiprocessing import Pool
import multiprocessing as mp
from multiprocessing import get_start_method
try: 
    mp.set_start_method('spawn')
except RuntimeError:
    method = get_start_method()
    print('process start method:', method)

nr_of_cpu_cores = mp.cpu_count()
print(nr_of_cpu_cores)

4


In [4]:
# helper functions
def get_text_from_email(msg):
    '''To get the content from email objects'''
    parts = []
    for part in msg.walk():
        if part.get_content_type() == 'text/plain':
            parts.append(part.get_payload())

        return ''.join(parts)
    
def split_email_addresses(line):
    '''To separate multiple email addresses'''
    if line:
        addrs = line.split(',')
        addrs = frozenset(map(lambda x: x.strip(), addrs))
    else: 
        addrs = None
    return addrs

In [5]:
def execution_time_str(start_time: time, end_time: time) -> str:
    execution_time = (end_time - start_time) 
    mins = int(execution_time // 60)
    seconds = execution_time % 60
    return f'{mins}mins nd{seconds:.2f}seconds'

In [6]:
from contextlib import contextmanager

@contextmanager
def measurement_of_time(block_name: str=''):
    start_time = time.time()
    yield
    end_time = time.time()
    print(f'block {block_name} executed in: {execution_time_str(start_time, end_time)}')

In [7]:
def extract_data_from_csv(file: str) -> pd.DataFrame:
    df = pd.read_csv(file, usecols=['message','file'], dtype={'message':str, 'file':str})
    messages_ls = df['message'].to_list()
    file_ls = df['file'].to_list()

    message = messages_ls[0]
    e = email.message_from_string(message)
    email_fields = e.keys()
    print(f'email fields:{email_fields}')

    # create email message object from the message list  using multiprocessing pool 
    pool = Pool(processes=nr_of_cpu_cores) 

    # with measurement_of_time(block_name='pool imap email.message_from_string'):
    e_messages = pool.imap(email.message_from_string, messages_ls, chunksize=2000)


    #get fields from parsed email objects and create a dataframe with cols for those fields

    # with measurement_of_time():
    rows = []
    for e in e_messages:
        row = dict((key,e[key]) for key in email_fields)
        row['content'] = get_text_from_email(e)
        row['To'] = split_email_addresses(row['To'])
        row['From'] = split_email_addresses(row['From'])
        rows.append(row)

    # with measurement_of_time():
    df = pd.DataFrame(rows, columns=email_fields)
    
    # with measurement_of_time():
    # get user from the file names for each email entry
    file_ls_chunk_sz = int(len(file_ls)/nr_of_cpu_cores)
    user_from_file_ls = pool.map(get_user_name,file_ls,chunksize=file_ls_chunk_sz)
    df['user'] = user_from_file_ls

    return df