# Multiprocessing with the Enron email corpus

# Introduction

## Who am I (and why do you care)?

* **Contact Info:**
    * Mike Busch
    * Regis University 
       * Assistant Professor - Data Science
       * Undergraduate Data Science Director
       * Data Science lab director
    * mbusch@regis.edu
<img align="right" style="padding-right:10px;" src="figures/symbiosdolphins.jpg" width=200><br>    
* **Background:**
    * Computers since 1983
        * Apple ]\[+, 48K RAM 
    * Work
        * Symbios Logic
    <img align="right" style="padding-right:10px;" src="figures/rogue-wave-software.svg" width=200><br>     
        * Rogue Wave Software    
        * McKesson medical software
        * Regis University   
    * Education
        * Doctorate - CTU
            * Research: Genome assembly using GPUs and Hadoop
        * Masters - Regis
        * Bachelors - Regis  
    <img align="right" style="padding-right:10px;" src="figures/regis-university.jpg" width=200>        
    * Python - Intermediate
        * Using since 2012
* **Current Research**
    * Computer vision and augmented reality of 3 dimensional objects
        * Hardware:
            * Microsoft Azure Kinect, HoloLens
        * Software:
            * OpenCV
            * Open3D
            * PyTorch
            * Torch Points 3D

## The what and why of multiprocessing

### A tale of two threads

#### Parallel execution

<img align="left" style="padding-right:10px;" src="figures/thistall.jpg" width=400>

<p style="text-align:left;">-- Dave Baron, Mozilla developer</p>

**Background info - multithreading and multiprocessing**

**Definitions**

* **Process**-- Generally, one program
* **Thread**-- "Lightweight" process that shares memory with other threads
* **Multiprocessing**-- One program with multiple parallel processes
* **Multithreading**-- Multiple threads sharing memory for parallel processing

<img align="right" style="padding-right:10px;" src="figures/Multithreaded_process.svg" width=400><br>
[**History of Multithreading**](https://people.cs.clemson.edu/~mark/multithreading.html#:~:text=Summary%3A%20Multithreading%20first%20appeared%20in,investigated%20by%20IBM%20in%201968.&text=Most%20attempts%20at%20a%20history,PPUs%20in%20the%20CDC%206600.)
Mark Smotherman, Nov. 2007

> Summary: Multithreading first appeared in the 1950s, and simultaneous multithreading (SMT) was investigated by IBM in 1968.

[Wikipedia: Thread (computing)](https://en.wikipedia.org/wiki/Thread_(computing))<br>

* **Thread (of execution)** -- The smallest sequence of programmed instructions that can be managed independently by an OS scheduler, which is typically a part of the operating system.
    * A thread is a component of a process. 
        * Multiple threads can exist within one process, 
        * Execute concurrently and share resources such as memory
            * threads share process's executable code 
            * process's dynamically allocated variables and non-thread-local global variables
        
* **Process** -- One "package" of executable code, memory, etc.
    * Different processes do not share resources (usually).

<img align="center" style="padding-right:10px;" src="figures/Concepts-_Program_vs._Process_vs._Thread.jpg" width=800><br>

Many times, when we have large, slow tasks, we **break them up into smaller tasks and try to do more than one of them at once**. Although there are other definitions and uses, this is called **parallel processing**. 

Unfortunately, Python was written before "multi-core" CPUs were even a concept. Due to this, a shortcut was taken to mitigate inherent problems with multi-threading: the **Global Interpreter Lock (GIL).** When one Python thread is running on the CPU, the GIL locks out all other Python threads. Also unfortunately, the GIL is such a fundamental part of the Python interpreter, it cannot really be removed to take advantage of modern CPUs.

All hope is not lost, though. Python has no problem with multi-processing -- allowing one program to control multiple independent processes that do not share memory with each other.

Also, the GIL only comes into play when a slow process is **CPU-bound**, like a difficult math problem. I/O (input/output)-bound programs do not have to deal with the GIL. 

Since each email message resides on disk completely seperately from all other messages, there is nothing stopping us from reading several messages. Even though our mail parsing is I/O-bound, we will investigate multiprocessing to help speed up ingestion. **Note:** there are a few libraries and methods to help make multiprocessing easier but we will stay with the Python standard library tool, **concurrent futures.**

# Concurrent Speedup

The original idea to work with the new(ish) Concurrent library came from the KD Nuggets article [Get a 2–6x Speed-up on Your Data Pre-processing with Python](https://www.kdnuggets.com/2018/10/get-speed-up-data-pre-processing-python.html) used OpenCV to resize images. 

Unfortunately, Python 3.8 (which I am using) and concurrent.futures (for which this was originally written) have some compatibility problem. Therefore, instead of using `ProcessPoolExecutor()` and the `map()` function from concurrent.futures, I will be using `Pool()` and `map()` from the Multiprocessing library.

This investigation has been adapted to reading and parsing the Enron dataset using Email library code from[Analysing the Enron Email Corpus](https://www.pythonforengineers.com/analysing-the-enron-email-corpus/)

## Standard image processing, using single process

```
import glob
import os
import cv2

### Loop through all jpg files in the current folder 
### Resize each one to size 600x600

for image_filename in glob.glob("*.jpg"):

  ### Read in the image data
  img = cv2.imread(image_filename)
  
  ### Resize the image
  img = cv2.resize(img, (600, 600)) 
  return img
```

## Adding Multiprocessing

```
import glob
import os
import cv2
#import concurrent.futures

from multiprocess import Pool, Process, cpu_count


def load_and_resize(image_filename):

  ### Read in the image data
  img = cv2.imread(image_filename)
  
  ### Resize the image
  img = cv2.resize(img, (600, 600)) 
  return img
  

### Create a pool of processes. 
### By default, one is created for each 
### CPU in your machine.

workers = cpu_count()
with Pool(workers) as p:

    ### Get a list of files to process
    image_files = glob.glob("*.jpg")

    ### Process the list of files, 
    ### but split the work across the 
    ### process pool to use all CPUs
    ### Loop through all jpg files in the current folder 
    ### Resize each one to size 600x600
    result = p.map(load_and_resize, image_files)
```

The line `workers = cpu_count()` gets a count of how many CPUs can (potentially) be used for processes.

The line<br>
`    with Pool(workers) as p:`<br>
starts as many Python processes as the machine has CPU cores.

Processing happens with this line:<br>
`    p.map(load_and_resize, image_files)`
    
The **p.map()** takes as input the function you would like to run and a list where each element of the list is a **single input to our function.**

The data you are processing also needs to be a type that Python knows how to “pickle”. Luckily, these are quite common. From the official Python documentation:

**Pickle-able** types:
* None, True, and False
* integers, floating point numbers, complex numbers
* strings, bytes, bytearrays
* tuples, lists, sets, and dictionaries containing only picklable objects
* functions defined at the top level of a module (using def, not lambda)
* built-in functions defined at the top level of a module
* classes that are defined at the top level of a module
* instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details). 

# Our dataset

[Enron](https://en.wikipedia.org/wiki/Enron) was an energy company headquartered in Houston, Texas that was found in 2001 to be perpetrating a massive accounting fraud.

More than 500,000 executive's email messages, totalling around 1.7GB after decompression. 

[email corpus home](https://www.cs.cmu.edu/~enron/)

## Enron Emails

First, let's make sure we can access the directories:

In [1]:
import os

rootdir = "data/maildir"

In [2]:
for directory, subdirectory, filenames in  os.walk(rootdir):
    print(directory, subdirectory, len(filenames))

data/maildir ['rodrique-r', 'shapiro-r', 'jones-t', 'lewis-a', 'lay-k', 'slinger-r', 'arnold-j', 'saibi-e', 'mckay-j', 'kitchen-l', 'mims-thurston-p', 'king-j', 'harris-s', 'storey-g', 'horton-s', 'love-p', 'benson-r', 'schwieger-j', 'crandell-s', 'mann-k', 'haedicke-m', 'allen-p', 'staab-t', 'stokley-c', 'lavorato-j', 'shankman-j', 'martin-t', 'whalley-l', 'weldon-c', 'quigley-d', 'shively-h', 'dean-c', 'white-s', 'quenet-j', 'merriss-s', 'mckay-b', 'gang-l', 'reitmeyer-j', 'germany-c', 'presto-k', 'ward-k', 'hodge-j', 'may-l', 'causholli-m', 'scholtes-d', 'heard-m', 'hernandez-j', 'shackleton-s', 'ermis-f', 'gilbertsmith-d', 'nemec-g', 'baughman-d', 'phanis-s', 'grigsby-m', 'cash-m', 'sturm-f', 'salisbury-h', 'richey-c', 'williams-j', 'hain-m', 'kean-s', 'sanders-r', 'perlingiere-d', 'lucci-p', 'davis-d', 'fossum-d', 'gay-r', 'neal-s', 'rogers-b', 'pimenov-v', 'thomas-p', 'lokey-t', 'symes-k', 'beck-s', 'hyvl-d', 'meyers-a', 'geaccone-t', 'donohoe-t', 'hendrickson-s', 'skilling-j', '

Now, let's parse a single email message:

In [3]:
from email.parser import Parser

file_to_read = rootdir + "/lay-k/all_documents/1."

with open(file_to_read, "r") as f:
    data = f.read()

#print(data)

email = Parser().parsestr(data)

print("\nTo: " , email['to'])
print("\n From: " , email['from'])

print("\n Subject: " , email['subject'])

print("\n \n Body: " , email.get_payload())


To:  mmilken@knowledgeu.com

 From:  rosalee.fleming@enron.com

 Subject:  Re: testing

 
 Body:  Hi -

We did receive the e-mail.

Rosalee for Ken Lay





"Michael Milken" <mmilken@knowledgeu.com> on 07/02/99 10:21:40 AM
To: Kenneth Lay/Corp/Enron@Enron
cc:  
Subject: testing









### The MailMessage dataclass

I'm going to try something else new - a data class.

In [4]:
from dataclasses import dataclass, field
from typing import List

In [5]:
@dataclass
class MailMessage:
    to_addr: List[str] = field(default_factory=list)
    from_addr: str = field(default_factory=str)
    subject: str = field(default_factory=str)
    body: str = field(default_factory=str)

#### Usage examples

In [6]:
# Single address
myMsg = MailMessage('mickey@mouse.com', 'daffy@duck.com', 'Lunch tomorrow?', 'Burgers at Five Guys around noon?')
print(myMsg)
myMsg.to_addr

MailMessage(to_addr='mickey@mouse.com', from_addr='daffy@duck.com', subject='Lunch tomorrow?', body='Burgers at Five Guys around noon?')


'mickey@mouse.com'

In [7]:
# multiple address
myMsg = MailMessage(['mickey@mouse.com', 'minnie@mouse.com'], 'daffy@duck.com', 'Lunch tomorrow?', 'Burgers at Five Guys around noon?')
print(myMsg)
myMsg.to_addr

MailMessage(to_addr=['mickey@mouse.com', 'minnie@mouse.com'], from_addr='daffy@duck.com', subject='Lunch tomorrow?', body='Burgers at Five Guys around noon?')


['mickey@mouse.com', 'minnie@mouse.com']

---
Hmmmm... What does a multiple to: line look like? By inspection in a text editor, it looks like #7 has multiples.

In [8]:
file_to_read = rootdir + "/lay-k/all_documents/7."

with open(file_to_read, "r") as f:
    data = f.read()

#print(data)

email = Parser().parsestr(data)

email['to']

'gary.fitch@enron.com, joan.richard@enron.com, steven.kean@enron.com, \n\tbill.donovan@enron.com, anna.harris@enron.com'

OK, I see ',' and '\n\t'. We might be able to make some assumptions about that structure to make code more efficient if this gets really painful.

## A function to parse each email and return a MailMessage object:

In [9]:
def email_analyse(inputfile):
    with open(inputfile, "r", encoding = "ISO-8859-1") as f:
        data = f.read()

    email = Parser().parsestr(data)
    
    if email['to']:
        email_to = email['to']
        email_to = email_to.replace("\n", "")
        email_to = email_to.replace("\t", "")
        email_to = email_to.replace(" ", "")

        email_to = email_to.split(",")
    else:
        email_to = []

    from_email = email['from']

    email_body = email.get_payload()
    email_subject = email['subject']
    return MailMessage(email_to, from_email, email_subject, email_body)

## Function to return a list of files with full path

In [21]:
import pathlib
def pathlist(rootdir):
    path_list = []
    for directory, subdirectory, filenames in  os.walk(rootdir):
        for name in filenames:
            if name != '.DS_Store':
                path_list.append(pathlib.PurePath(directory, name))
    return path_list
        
file_list = pathlist(rootdir)
print(file_list[0])
print(len(file_list))


data/maildir/rodrique-r/dublin/13.
517401


---
Now, let's test on our two cases above:

In [11]:
msg1 = email_analyse(rootdir + "/lay-k/all_documents/1.")
msg2 = email_analyse(rootdir + "/lay-k/all_documents/7.")
print(msg1)
print('\n\n')
print(msg2)

MailMessage(to_addr=['mmilken@knowledgeu.com'], from_addr='rosalee.fleming@enron.com', subject='Re: testing', body='Hi -\n\nWe did receive the e-mail.\n\nRosalee for Ken Lay\n\n\n\n\n\n"Michael Milken" <mmilken@knowledgeu.com> on 07/02/99 10:21:40 AM\nTo: Kenneth Lay/Corp/Enron@Enron\ncc:  \nSubject: testing\n\n\n\n\n\n\n')



MailMessage(to_addr=['gary.fitch@enron.com', 'joan.richard@enron.com', 'steven.kean@enron.com', 'bill.donovan@enron.com', 'anna.harris@enron.com'], from_addr='rosalee.fleming@enron.com', subject='Persons Authorized to Approve Use of the Company Planes', body='FURTHER CLARIFICATION TO THE MESSAGE BELOW, for PERSONAL AIRCRAFT USE, the \nmembers of the Office of the Chairman must approve the requests.  In addition \nto Ken and Jeff, Joe Sutton will also approve personal requests.\n\n\nThis is the revised list of persons authorized to approve use of the company \nplane.  If there is a duplicate request for a plane, one of the members of \nthe Office of the Chairman  

---

Looks like that worked pretty well. Let's try a run to see how long it takes. 

---

<div class="alert alert-block alert-danger">
<b>Important::</b> May not want to do this as a live demo. It takes a minute or two.
</div>

In [12]:
%%time
mail_list = []
for fname in file_list:
    msg = email_analyse(fname)
    mail_list.append(msg)

CPU times: user 58.9 s, sys: 3.79 s, total: 1min 2s
Wall time: 1min 2s


**Note:** Copied from earlier run on 2017 MacBook Pro

```
CPU times: user 1min 47s, sys: 59.5 s, total: 2min 46s
Wall time: 5min 16s
```

**Current Frankenbox:**
```
CPU times: user 1min 6s, sys: 5.42 s, total: 1min 12s
Wall time: 1min 13s
```

## Pympler -- Showing object size

[Pympler docs](https://pympler.readthedocs.io/en/latest/)

>Pympler is a development tool to measure, monitor and analyze the memory behavior of Python objects in a running Python application.
>
>By pympling a Python application, detailed insight in the size and the lifetime of Python objects can be obtained. Undesirable or unexpected runtime behavior like memory bloat and other “pymples” can easily be identified.

In [15]:
# !pip install pympler

Collecting pympler
  Using cached Pympler-0.9-py3-none-any.whl
Installing collected packages: pympler
Successfully installed pympler-0.9


In [13]:
from pympler import asizeof
mail_size = asizeof.asizeof(mail_list)
print(f'{len(mail_list)} objects totalling {mail_size} bytes')
del mail_list

517401 objects totalling 1470658408 bytes


# Now, let's parallel-ize it

## Process Pool

In [15]:
from multiprocessing import Pool, Process, cpu_count

In [16]:
processes = []
workers = cpu_count()
workers

32

In [17]:
%%time
with Pool(workers) as p:
    result = p.map(email_analyse, file_list)
    
print(len(result))

517401
CPU times: user 4.97 s, sys: 1.77 s, total: 6.74 s
Wall time: 7.25 s


In [18]:
result[0]

MailMessage(to_addr=['gary.stadler@enron.com', 'gabriel.monroy@enron.com', 'robin.rodrigue@enron.com'], from_addr='c.griffin@enron.com', subject='EES Volumetric Positions', body="Attached are  EES's Volumetric NYMEX and Basis positions for 7/17/2000.   \n\n\n\n\nRobin, I'm expecting our position to be a little shorter (by about 30 \ncontracts for Nymex).  Please contact me at x3-5070 if you have any questions.\n\nThanks,\n\nMonica for Kyle\n")

## Manually Multiprocessing

In [11]:
len (file_list)

517401

In [12]:
len(file_list) // workers # Integer division

16168

In [19]:
def process_files(file_list):
    for f in file_list:
        email_analyse(f)

In [20]:
%%time
for i in range(0, workers):
    sub_list = [file_list[j] for j in range(0, len(file_list)) if j % workers == i]
    if len(sub_list) > 0:
        p = Process(target=process_files, args=(sub_list,))
        p.start()
        processes.append(p)
    # input_file_list.put('STOP')


# Wait for processes to stop
for p in processes:
    p.join()

CPU times: user 1.45 s, sys: 1.48 s, total: 2.93 s
Wall time: 5.73 s


* **Numba**: is a performance library made by Anaconda. It is generally used to compile CPU-intensive math functions in Python. Numba can also be used to **turn off the GIL**, parallelize loops, and leverage the GPU. 

* **Concurrent.futures**: A higher-level interface to the Multiprocessing library. This article was originally written for concurrent.futures, however a change in Python 3.8 affected the way it runs. If you are running a Python version **other** than 3.8, you may want to give it a try. 

* **Deco**: [Deco](https://github.com/alex-sherman/deco) (DEcorated COncurrency) is "A simplified parallel computing model for Python. DECO automatically parallelizes Python programs, and requires minimal modifications to existing serial programs." Suffers from the same Python 3.8 problem as concurrent.futures. 

# Demo: Practical application -- Extracting Point Clouds from Video

# Other alternatives