## Assignment - Single Machine Parallelization

In this assignment we will be investigating the Enron email corpus from the 2002 Federal Energy Regulatory Commission (FERB) investigation.  This corpus of emails is still used to train AI on topics related to corporate communication, as this is one of the few public datasets available. Though there are AI ethics questions if the data is used for language purposes without proper precautions. [Read this article](https://qz.com/work/1546565/the-emails-that-brought-down-enron-still-shape-our-daily-lives/) to learn more about the dataset and its implications for AI.

Start off by choosing the `ml.m5.xlarge` instance as your kernel. Then start conducting the lab.

---

### Task 0: Download dataset and explore it

The research dataset is located here, maintained by Carnegie Mellon University, https://www.cs.cmu.edu/~enron/. Though due to its size we have created a smaller version on S3. Follow these steps to get the file:

1. Copy the file from the class's public S3 bucket into your own S3 bucket. You must change the bucket name to yours with your net id. `aws s3 cp s3://bigdatateaching/enron/maildir.zip s3://[NET_ID]-labdata`

In [1]:
aws s3 cp s3://bigdatateaching/enron/maildir.zip s3://pm1178-labdata

SyntaxError: invalid syntax (<ipython-input-1-b53f475c3241>, line 1)

2. Copy the file from your own S3 bucket to SageMaker. Use the boto3 package for this. This will palce the `maildir` directory from the zip file into the main folder of your git repository.

In [13]:
import boto3
my_bucket = 'pm1178-labdata'
my_file = 'maildir.zip'

s3client = boto3.client('s3')
#s3.download_file('BUCKET_NAME', 'OBJECT_NAME', 'FILE_NAME')
s3client.download_file(my_bucket, my_file, 'maildir.zip')

3. Run the following command in a new cell to unzip the directory. This could take a while, there are almost 80,000 files!!

In [14]:
!conda install -y -c conda-forge unzip


Collecting package metadata (current_repodata.json): done
Solving environment: done

# All requested packages already installed.



In [None]:
!unzip ./maildir.zip

Archive:  ./maildir.zip
   creating: maildir/
   creating: maildir/allen-p/
   creating: maildir/allen-p/all_documents/
   creating: maildir/allen-p/contacts/
   creating: maildir/allen-p/deleted_items/
   creating: maildir/allen-p/discussion_threads/
   creating: maildir/allen-p/inbox/
   creating: maildir/allen-p/inbox/.ipynb_checkpoints/
  inflating: maildir/allen-p/inbox/.ipynb_checkpoints/14_-checkpoint  
  inflating: maildir/allen-p/inbox/10_  
  inflating: maildir/allen-p/inbox/11_  
  inflating: maildir/allen-p/inbox/12_  
  inflating: maildir/allen-p/inbox/13_  
  inflating: maildir/allen-p/inbox/14_  
  inflating: maildir/allen-p/inbox/15_  
  inflating: maildir/allen-p/inbox/16_  
  inflating: maildir/allen-p/inbox/17_  
  inflating: maildir/allen-p/inbox/18_  
  inflating: maildir/allen-p/inbox/19_  
  inflating: maildir/allen-p/inbox/1_  
  inflating: maildir/allen-p/inbox/20_  
  inflating: maildir/allen-p/inbox/21_  
  inflating: maildir/allen-p/inbox/22_  
  inflating: 

The bullets below represent the file structure of interest from the Enron dataset. To confirm that you have unzipped and placed the data in the correct location, click on the links of files, e.g. `1_`, `2_`, etc., which will open the text files for you to explore. Notice that these are all raw text files.

* [maildir](./maildir)
 * [allen-p](./maildir/allen-p)
   * [inbox](./maildir/allen-p/inbox)
       * [1_](./maildir/allen-p/inbox/1_)
       * [2_](./maildir/allen-p/inbox/2_)
       * [3_](./maildir/allen-p/inbox/3_)
       ...
   * [sent_items](./maildir/allen-p/sent_items)
       * [1_](./maildir/allen-p/sent_items/1_)
       * [2_](./maildir/allen-p/sent_items/2_)
       * [3_](./maildir/allen-p/sent_items/3_)
       * etc...
   * etc...
 * [arnold-j](./maildir/arnold-j)
   * [inbox](./maildir/arnold-j/inbox)
       * [1_](./maildir/arnold-j/inbox/1_)
       * [2_](./maildir/arnold-j/inbox/2_)
       * [3_](./maildir/arnold-j/inbox/3_)
       * etc ...
   * [sent_items](./maildir/arnold-j/sent_items)
       * [1_](./maildir/arnold-j/sent_items/1_)
       * [2_](./maildir/arnold-j/sent_items/2_)
       * [3_](./maildir/arnold-j/sent_items/3_)
       * etc...
   * etc...
 * [arora-h](./maildir/arora-h)
   * [inbox](./maildir/arora-h/inbox)
       * [1_](./maildir/arora-h/inbox/1_)
       * [2_](./maildir/arora-h/inbox/2_)
       * [3_](./maildir/arora-h/inbox/3_)
       * etc...
   * [sent_items](./maildir/arora-h/sent_items)
       * [1_](./maildir/arora-h/sent_items/1_)
       * [2_](./maildir/arora-h/sent_items/2_)
       * [3_](./maildir/arora-h/sent_items/3_)
       * etc...
   * etc...
 * etc... etc...

---------------------------

### Task #1: Collect all of the file paths of emails in the maildir (6 points)

The output object will be called `list_emails` should contain 79,429 items and look similar to the following:
```
['maildir\\allen-p\\inbox\\10_',
 'maildir\\allen-p\\inbox\\11_',
 'maildir\\allen-p\\inbox\\12_',
 'maildir\\allen-p\\inbox\\13_',
 'maildir\\allen-p\\inbox\\14_',
 'maildir\\allen-p\\inbox\\15_',
 'maildir\\allen-p\\inbox\\16_',
 'maildir\\allen-p\\inbox\\17_',
 'maildir\\allen-p\\inbox\\18_',
 'maildir\\allen-p\\inbox\\19_',
 .....
 ]
```

The broad strokes to creating this list goes as follows:

- use the command `os.listdir('maildir')` to see the folders in the mail directory
- then use the command `os.listdir('maildir/allen-p')` to see the folder in Allen P's folder
- then use the command `os.listdir('maildir/allen-p/inbox')` to see the contents in Allen P's inbox and collect the full file paths
- you need to also collect all the files in both the inbox and sent_items folders for Allen P, do not track emails from the other folders
- next move onto Arnold J!
- next move onto Arora H!
- ....

Light bulbs should start going off in your head at this point about ways to solve this problem. This is a repeated process that needs to happen many many times. A for loop is the simplest way to solve a repetitive task. **Use for loops** to collect all 79,429 file paths into a single list.

**Hint:** look up `os.path.join()` or f-strings if you are having trouble creating the appropriate argument for the `os.listdir()` command

In the following cell(s) produce the list of emails as directed and save the result to the object `list_email_paths`

In [7]:
import os.path
from os import path
list_emails=[]
directory = '/root/hw-3-assignment-prakhar7m/maildir'
for folder in os.listdir(directory):
    subdir=os.fsdecode(folder)
    inbox=os.path.join(directory, subdir, 'inbox')
    sent=os.path.join(directory, subdir, 'sent_items')
    if path.exists(inbox):
        for file in os.listdir(inbox):
            list_emails.append((os.path.join(inbox, file)))
    if path.exists(sent):
        for f2 in os.listdir(sent):
            list_emails.append((os.path.join(sent, f2)))
         # Do something
    #sent.append(os.path.join(directory, subdir, 'sent'))
    


In the following cell(s) print the first 10 items of the list as well as the length of the list

In [8]:
list_emails[0:10]

['/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/395_',
 '/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/197_',
 '/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/278_',
 '/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/100_',
 '/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/179_',
 '/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/1_',
 '/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/89_',
 '/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/377_',
 '/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/10_',
 '/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/458_']

In [9]:
len(list_emails)

79430

In [10]:
list_emails.remove('/root/hw-3-assignment-prakhar7m/maildir/allen-p/inbox/.ipynb_checkpoints')

--------------------

### Task 2: Read in one text email and parse out relevant information using a function (8 points)

Before you get to parallelizing, you need to build the function that will execute on every email in your list. The objective of this task is to build the function called `email_process()` which will take as an input the path of an email and output the following information in a dictionary:

* Email of the sender
* Email(s) of the recipient
* Email timestamp
* Email subject
* Email body

An example output from the function on the first email in Allen P's inbox looks like the following:

```
{'from': 'heather.dunton@enron.com',
 'to': 'k..allen@enron.com',
 'timestamp': 'Fri, 7 Dec 2001 10:06:42 -0800 (PST)'
 'subject': 'RE: West Position',
 'body': ' \nPlease let me know if you still need Curve Shift.\n\nThanks,\nHeather\n -----Original Message-----\nFrom: \tAllen, Phillip K.  \nSent:\tFriday, December 07, 2001 5:14 AM\nTo:\tDunton, Heather\nSubject:\tRE: West Position\n\nHeather,\n\nDid you attach the file to this email?\n\n -----Original Message-----\nFrom: \tDunton, Heather  \nSent:\tWednesday, December 05, 2001 1:43 PM\nTo:\tAllen, Phillip K.; Belden, Tim\nSubject:\tFW: West Position\n\nAttached is the Delta position for 1/16, 1/30, 6/19, 7/13, 9/21\n\n\n -----Original Message-----\nFrom: \tAllen, Phillip K.  \nSent:\tWednesday, December 05, 2001 6:41 AM\nTo:\tDunton, Heather\nSubject:\tRE: West Position\n\nHeather,\n\nThis is exactly what we need.  Would it possible to add the prior day for each of the dates below to the pivot table.  In order to validate the curve shift on the dates below we also need the prior days ending positions.\n\nThank you,\n\nPhillip Allen\n\n -----Original Message-----\nFrom: \tDunton, Heather  \nSent:\tTuesday, December 04, 2001 3:12 PM\nTo:\tBelden, Tim; Allen, Phillip K.\nCc:\tDriscoll, Michael M.\nSubject:\tWest Position\n\n\nAttached is the Delta position for 1/18, 1/31, 6/20, 7/16, 9/24\n\n\n\n << File: west_delta_pos.xls >> \n\nLet me know if you have any questions.\n\n\nHeather'
 }
```

To process the raw text file, we recommend that you use the `email.parser.Parser()` function. Read documentation on it [here](https://docs.python.org/3/library/email.parser.html).

The to field will need some cleaning. Remove any carriage returns, tabs, or spaces from the to field. 

It's OK that the email body is really messy right now. A future task is to clean it up.

In the following cell(s) produce the function `email_process()`

In [12]:
list_emails[0]


'/root/hw-3-assignment-prakhar7m/maildir/giron-d/inbox/395_'

In [2]:
def email_process(filepath):
    from email.parser import Parser
    final_dict ={}
    items = ["From", "To", "Subject", "Date"]
    data= open(filepath, encoding='latin-1').read()
    #print(data)
    email_parsed =Parser().parsestr(data, headersonly=False)     
    #print(email_message)
    """"for key, value in email_parsed.items(): 
        setattr(email_parsed,key,value)"""
    
    
    final_dict["From"]=email_parsed['From']
    final_dict["To"]=email_parsed['To']
    final_dict["Date"]=email_parsed['Date']
    final_dict["Subject"]=email_parsed['Subject']
    
    """for key, value in email_parsed.items(): 
        setattr(email_parsed,key,value)"""
        
    #print(email_parsed.Subject)
    
    """for key, value in email_parsed.items(): 
        if key in items:
            final_dict[key] = []
            final_dict[key].append(value)"""
            
    if email_parsed.is_multipart():
        for part in email_parsed.get_payload():
            final_dict["Body"] = (part.get_payload())
    else:
        final_dict["Body"] = (email_parsed.get_payload())
    
     
    
    return final_dict





    
    

----------------

In [4]:
email_1= email_process('/root/hw-3-assignment-prakhar7m/maildir/allen-p/inbox/1_')

In [5]:
print(email_1)

{'From': 'heather.dunton@enron.com', 'To': 'k..allen@enron.com', 'Date': 'Fri, 7 Dec 2001 10:06:42 -0800 (PST)', 'Subject': 'RE: West Position', 'Body': ' \nPlease let me know if you still need Curve Shift.\n\nThanks,\nHeather\n -----Original Message-----\nFrom: \tAllen, Phillip K.  \nSent:\tFriday, December 07, 2001 5:14 AM\nTo:\tDunton, Heather\nSubject:\tRE: West Position\n\nHeather,\n\nDid you attach the file to this email?\n\n -----Original Message-----\nFrom: \tDunton, Heather  \nSent:\tWednesday, December 05, 2001 1:43 PM\nTo:\tAllen, Phillip K.; Belden, Tim\nSubject:\tFW: West Position\n\nAttached is the Delta position for 1/16, 1/30, 6/19, 7/13, 9/21\n\n\n -----Original Message-----\nFrom: \tAllen, Phillip K.  \nSent:\tWednesday, December 05, 2001 6:41 AM\nTo:\tDunton, Heather\nSubject:\tRE: West Position\n\nHeather,\n\nThis is exactly what we need.  Would it possible to add the prior day for each of the dates below to the pivot table.  In order to validate the curve shift on 

### Task 3: Read in all emails using multiprocessing (8 points)

There are two ways to implement multiprocessing on a list of data:

1. **The vanilla approach to parallelization** involves getting a list of the items to process, then using the multiple worker processes to complete all the items in parallel like queue system. The list gets processes by workers until there are no items left.
2. **The splitting approach to parallelization** involves getting a list of items to process, splitting that list into a number of sublists, then split each sublist to a worker. Each worker then processes the sublist in serial.

For more background on these two ideas, check out this useful [medium post](https://medium.com/idealo-tech-blog/parallelisation-in-python-an-alternative-approach-b2749b49a1e).

**If you are using a Windows machine (not in this lab)**, you will be unable to save your function in a cell and then run with multiprocesing. Instead, save two scripts `mail_functions_vanilla.py` and `mail_functions_split.py` with the functions needed for each approach. Import the module that you will execute in parallel by calling `import mail_functions_vanilla` and then calling a function within that module, just like you would any other module in python.

Use the `time` module to track the time taken for each approach.

In the following cell(s), implement the vanilla approach to parallelization to process all emails using `pool.map()`. Save the output list to an object called `out_vanilla`.

In [13]:
from multiprocessing import Pool
import time



In [14]:
starttime = time.time()
pool = Pool(10)
out_vanilla=[]
out_vanilla=pool.map(email_process, list_emails)
pool.close()
endtime = time.time()
print(f"Time taken {endtime-starttime} seconds")

Time taken 52.80142879486084 seconds


In the following cell(s), implement the splitting approach to parallelization to process all emails using `pool.map()`. Save the output list to an object called `out_split`.

In [15]:
from multiprocessing import pool
import time
starttime = time.time()
pool = Pool(10)
out_split=[]
out_split=pool.map(email_process, list_emails, 1000)
pool.close()
endtime = time.time()
print(f"Time taken {endtime-starttime} seconds")

Time taken 13.847573518753052 seconds


In the following cell(s), confirm that the results from each method are the same. If they are not, then figure out how to make them the same.

In [16]:
if len(out_vanilla)==len(out_split):
    print(True)
else:
    print(False)

True


In [17]:
import pandas as pd
df_emails=pd.DataFrame(columns=['From','To','Date','Subject','Body'], dtype=object)
df_emails.reset_index()

Unnamed: 0,index,From,To,Date,Subject,Body


In the following cell(s), convert the list `out_split` to a Pandas DataFrame and save it as `df_emails`

In [18]:
for data in out_split:
    df_emails=df_emails.append(data, ignore_index=True)

In [19]:
df_emails.head()

Unnamed: 0,From,To,Date,Subject,Body
0,patrick.tucker@enron.com,"bryant.frihart@enron.com, c..giron@enron.com","Wed, 21 Nov 2001 12:43:53 -0800 (PST)",Wednesday's Activity Recap,Jim @ Kaztex bought 20/d NBPL @ Harper on our ...
1,dmoore21@charter.net,c..giron@enron.com,"Mon, 13 Aug 2001 04:05:04 -0700 (PDT)",Kirk,\nSo sorry that you and Kristi are having to g...
2,lisa.kinsey@enron.com,t..hodge@enron.com,"Thu, 29 Nov 2001 08:56:03 -0800 (PST)",FW: Immediate Suspension of Gas Deliveries,"\n\n-----Original Message-----\nFrom: Lenart, ..."
3,deirdre.mccaffrey@enron.com,c..giron@enron.com,"Wed, 23 Jan 2002 12:21:22 -0800 (PST)",RE: Peoples Cash Flow 1-22-02 - Russell.xls,thanks - it all makes sense!! Scary\n\nDeirdre...
4,greg.couch@enron.com,c..giron@enron.com,"Fri, 29 Jun 2001 12:04:12 -0700 (PDT)",FW: Life as a Guy,\n\n -----Original Message-----\nFrom: \tBaile...


In [20]:
df_emails['From'] =df_emails['From'].astype(str)

In [21]:
df_emails['To'] =df_emails['To'].astype(str)

### Task 4: Find the most frequent emailers in the dataset (8 points)

We want to know which people sent or received the most emails in the dataset. Use your Pandas dataframe fields `to` and `from` to count the number of emails where an email address appears in the data.

**You must use a dictionary** to keep track of the connections between email addresses. This will involve looping through each row and countering the dictionary up for each email instance found. For example, if we processed rows of the data and found that abc@enron.com sent one email to def@enron.com and ghi@enron.com, another sent email to jkl@enron.com as well as received an email from ghi@enron.com. This would result in a dictionary like so:

```
{
 'abc@enron.com' : 3,
 'ghi@enron.com' : 2,
 'def@enron.com' : 1,
 'jkl@enron.com' : 1
}
```

Note that we are counting the email address whether it is the recipient or the sender of the email. It is certainly possible that both sides of the email conversation appears in the dataset, but it is OK to count that twice. The `To` field can contain multiple emails. You have to parse and count each email recipient separately!

This process might require a lot of computing power to process every email! You must use `pool.map()` or another `multiprocessing` module function for the mapping part of the problem. The output will be a list of dictionaries like the example above. You will write a function to execute on a small chunk of the dataframe. After you have the list of dictionaries from each chunk, reduce them into a single dictionary.

**BONUS:** 3 points - implement the same map and reduce operations leveraging pandas vectorized operations and groupby-summarize functions.


**Hint:** break your dataframe into a list of smaller dataframes, and use that list to pass through `pool.map()`

Use the following cell(s) to answer this question:

In the following cell, save a dictionary with only the 20 email addresses with the most emails to the object `dict_top_addresses_sent`

**Hint:** The top email address should be `'jeff.dasovich@enron.com'` with 2,624 emails (though a slightly different number if OK)

In [22]:
ecount=[]
for x in df_emails['To']:
    to_list = x.split(",")
    for y in to_list:
        ecount.append(y)

for x in df_emails['From']:
    ecount.append(x)

In [23]:
##From_list= df_emails['From'].tolist() 
len(ecount)

540572

In [24]:
##

In [25]:
len(ecount)

540572

In [26]:
substring = 'jeff.dasovich@enron.com'
for i in range(0,len(ecount)):
    if substring in ecount[i]:
        ecount[i] = substring

In [27]:
'''
from collections import Counter
freq=Counter(ecount)
print(freq)
'''

'\nfrom collections import Counter\nfreq=Counter(ecount)\nprint(freq)\n'

In [28]:
import numpy as np
res2=np.array_split(ecount,100)

In [29]:
def to_frequency_table(data):
    frequencytable = {}
    for key in data:
        if key in frequencytable:
            frequencytable[key] = frequencytable[key] + 1
        else:
            frequencytable[key] = 1
    return frequencytable



In [30]:
from multiprocessing import Pool

pool = Pool(8)
frequency = pool.map(to_frequency_table, res2)
pool.close()

In [31]:
##frequency[:10]

In [32]:
final_dict = {}
for x in frequency:
    for key, value in x.items():
        if key in final_dict:
            final_dict[key] += value
        else:
            final_dict[key] = value

In [33]:
final_dict['jeff.dasovich@enron.com']


2624

### Final Task: Run the following cell so your outputs can be checked for accuracy - this is a requirement

In [35]:
grading_dict = {'len_paths' : len(list_emails),
 'email_1' : email_1,
 'vanilla_split_match' : out_vanilla == out_split,
 'df_emails' : df_emails.head(20),
 'dict_top_email_addresses' : final_dict['jeff.dasovich@enron.com']
 }

import pickle
pickle.dump(grading_dict, open('grading_objects.pkl', 'wb'))