# CS5234J: Summative Group Assessment 2
**Goals**: In this assignment you will be practising basic tools and creating 
building blocks that will be useful in your final projects. The assignment requires
you to solve two realistic data processing problems on the Spark platform.

**Before you start:**
* This assignment is **summative** coursework.
* It constitutes 4% of the final course mark.
* It consists of 2 questions.
* The answers should be given by filling in blanks in the code cells of a copy of this 
notebook as instructed in the question descriptions and the comments in the code.
* Do **not** create your own cells as these will not be checked!
* Submission deadline is **7 June 2021, 10:00**
* Submit a copy of this notebook with your answers by following the Assignment 2 
submission link on Moodle. For example, if viewing the notebook in Jupyter, select `File->Download as->Notebook (.ipynb)` to download a copy of the notebook.
* Please note that submitting anything rather than a copy of this notebook (e.g., a PDF file
or a ZIP archive) will automatically result in your entire submission receiving a mark of 0. 
Likewise, any code cells that do not compile (for whatever reason, including
accidental comments, incorrect indentation, unbalanced parentheses, etc.) will be penalized by deducting the **entire** 
quantity of marks associated with the relevant question. This is in line with the requirements 
of the departmental policy for electronic submissions: 
https://intranet.royalholloway.ac.uk/computerscience/documents/pdf/electronicsubmissionstudentversion.pdf
* You can work in teams of **two** people. 
* If you formed a team for Assignment 1, you **must** work as part of the same team for this assignment and the final project.

**Running the code**
To run the code, we recommend using an instance of the Jupyter Notebook server integrated with 
PySpark, which can be accessed as follows:
* Start NoMachine, and log into `linux.cim.rhul.ac.uk`
* Open a terminal window
* At the prompt, type `ssh -X bigdata`. Note the `X` must be capitalized.
* Type `/home/local/ufac001/pyspark-jupyter.sh` and hit `enter` 
    to launch a Jupyter Notebook server integrated with 
PySpark. If everything works as expected, this will open up a tab in a web browser through which
you could load and work on the notebook.

As an alternative, you can also use the Databricks Community Edition cloud, but please be
aware that their automated notebook synchronisation may not always work as expected
potentially resulting in the loss of work. One possible workaround is to connect your notebook
to a Git repository, and then use the provided 
commit interface to force synchronisation as necessary. If 
you would like to follow this route, and need help creating a private repository
on GitHub (available to all RHUL students), please contact the CS Helpdesk.

**Spark Restrictions**
Your solution should use pyspark and the RDD APIs. In particular, you should *not* use
DataFrames/DataSets or SparkSQL as part of your solution.

## Question 1: Regular Expressions (5%)
Write a regular expression pattern matching a _valid Email address_. For the purposes of this exercise, a valid Email address is any string of the form `local@domain` where 
    
* `local` is any combination of alphanumeric (i.e., both letters and numbers) charcters in 
    either lower or upper case, dots (`.`) and the following characters 
    ``!#$%&'*+-/=?^_`{|}~``, and
    
* `domain` is a sequence of labels separated by a single `.` (dot) character where each label 
    is a combination of alphanumeric (i.e., both letters and numbers) characters in either lower 
    or upper case, and the rightmost label representing the top-level domain is not all numbers.

For example,  all of the following strings are valid EMails: joe<span>@example.com, 
joe.doe<span>@bigdata.cs.rhul.ac.uk, joe..doe123$<span>@stratospheric, 
j0e.==.D_OE<span>@123dotcom.net, and the strings 
joe<span>@doe.xxx<span>@example.com, joe<span>@.example.com, joe.doe<span>@example.123, 
and joe.doe<span>@example.123..com are all invalid.

In addition, write the following lambda expression:
* `valid_email`: takes a string `s` as argument and returns `True` if `s` 
matches `email_regex`, and `False`, otherwise

Your solution for `email_regex` is correct if the value returned by `re.compile(email_regex).fullmatch(s)` is not
`None` for every string `s`, which is a valid Email address according to 
the definition above, and `None`, otherwise.

In [1]:
import re

# Put your pattern inside ''
email_regex = r'^([a-zA-Z0-9\!\#\$\%\&\'*\+\-\/\=\?\^\_\`\{\|\}\~\.]+)@(([a-zA-Z0-9]+\.)*(?=.*[a-zA-Z])([a-zA-Z0-9]+))$'

# Replace the right-hand side of the lambda with your code
valid_email = lambda s: True if re.compile(email_regex).fullmatch(s) else False

## Question 1: Spark (80%) 
Write a function `proc_headers(lst)` that takes a list `lst` of Email headers, and returns a list of tuples `(E1, E2)` for _every_ Email transmission from `E1` to `E2`. Each header in `lst` is described by a tuple `(FROM, TO, CC, BCC)` where `FROM` is the Email address of the sender, and each of the `TO`, `CC`, and `BCC` is a string holding a list of comma-separated EMail addresses matching the `csv_regex` pattern from Assignment 1 Question 2 (A1-Q2).<p>
Your code should be written as a series of the following Spark transformations:
1. Use `sc.parallelize()` to create a base RDD from `lst`.
2. Apply a `filter()` transformation to the base RDD created in step 1 to exclude all tuples where `FROM` is not a valid Email address. Use the `valid_email` lambda from Q1 above.
3. Apply a `map()` transformation to the RDD produced at step 2 to convert each `(FROM, TO, CC, BCC)` tuple to  a `(FROM, RECPIENTS)` tuple where `RECIPIENTS` is a concatenation of `TO`, `CC`, and `BCC` obtained by a lambda which is a composition of two `concat_csv_strings` lambdas from A1-Q4.2.
4. Apply a `map()` transformation to the RDD produced at step 3 to convert each `(FROM, RECPIENTS)` tuple to a `(FROM, EMAIL_SEQ)` tuple where `EMAIL_SEQ` is a sequence of EMail addresses in `RECPIENTS` extracted using the helper generator function `gen_seq_from_csv_string()` below.
5. Apply a `flatMap()` transformation to the RDD produced at step 4 to convert each `(FROM, EMAIL_SEQ)` tuple to a sequence of tuples `(FROM, E)` for every Email `E` in `EMAIL_SEQ`. Use the `val_by_vec` lambda from A1-Q4.3.
6. Apply a `filter()` tranformation to the result of step 5 to exclude all tuples with an invalid recipient address. Use the `valid_email` lambda from Q1 above.
7. Apply another `filter()` transformation to the outcome of step 6 to exclude all tuples having the same sender and recipient Emails. Use the `not_self_loop` lambda from A1-Q4.4.
8. Apply a `collect()` action to the RDD produced at step 7, and return the resulting string.

In [2]:
def gen_seq_from_csv_string(s):
    csv_regex = '([^\s,]+),?'
    match = re.compile(csv_regex).search(s,0)
    while match:
        yield match.group(1)
        match = re.compile(csv_regex).search(s, match.end())
    
def proc_headers(lst):
    '''
    lst: a list of tuples (FROM, TO, CC, BCC) 
    representing EMail headers
    Returns a list of tuples `(E1, E2)` for every Email transmission from `E1` to `E2`
    using a series of Spark operations as described in the question.

    Replace pass with your code. Use `sc` to reference the Spark context. 
    '''
    # 1
    rdd = sc.parallelize(lst)
    # 2
    data = rdd.filter(lambda x: valid_email(x[0]))
    # 3
    concat_csv_string = lambda s1, s2: f'{s1},{s2}'
    data = data.map(lambda x: (x[0], concat_csv_string(x[1], concat_csv_string(x[2], x[3]))))
    # 4
    data = data.map(lambda x: (x[0], list(gen_seq_from_csv_string(x[1]))))
    # 5
    val_by_vec = lambda x, t: [(x, val) for val in t]
    data = data.flatMap(lambda x: val_by_vec(x[0], x[1]))
    # 6
    data = data.filter(lambda x: valid_email(x[1]))
    # 7
    not_self_loop = lambda t: True if t[0] != t[1] else False
    data = data.filter(lambda x: not_self_loop(x))
    return data.collect()

You can use the following code to test your implementation of `proc_headers()`:

In [3]:
header1 = ('bill.cordes@enron.com', 
           'mike.mcconnell@enron.com,cathy.phillips@enron.com,john.haggerty@enron.com',
           'george.mcclellan@enron.com,tom.kearney@enron.com',
           'tom.kearney@enron.com,cathy.phillips@enron.com'
          )
header2 = ('mike.mcconnell@enron..com', 
           'bill.cordes@enron.com,tom.kearney@enro@n.com,cathy.phillips@enron.com,john.haggerty@enron.com',
           'george.mcclellan@enron.com',
           'mike.mcconnell@enron.com'
          )
header3 = ('stuart.staley@enron.com',
           'mike.mcconnell@enron.com,jeffrey.shankman@enron..com',
           'bill.cordes@enron.com,tom.kearney@enron.com,cathy.phillips@en@ron.com',
           'george.mcclellan@enron.com,stuart.staley@enron.com'
          )
print('\n'.join(str(t) for t in proc_headers([header1, header2, header3])))
'''
The output produced by the line above when executed with the model implementation
of proc_headers() was as follows:

('bill.cordes@enron.com', 'mike.mcconnell@enron.com')
('bill.cordes@enron.com', 'cathy.phillips@enron.com')
('bill.cordes@enron.com', 'john.haggerty@enron.com')
('bill.cordes@enron.com', 'george.mcclellan@enron.com')
('bill.cordes@enron.com', 'tom.kearney@enron.com')
('bill.cordes@enron.com', 'tom.kearney@enron.com')
('bill.cordes@enron.com', 'cathy.phillips@enron.com')
('stuart.staley@enron.com', 'mike.mcconnell@enron.com')
('stuart.staley@enron.com', 'bill.cordes@enron.com')
('stuart.staley@enron.com', 'tom.kearney@enron.com')
('stuart.staley@enron.com', 'george.mcclellan@enron.com')
'''

('bill.cordes@enron.com', 'mike.mcconnell@enron.com')
('bill.cordes@enron.com', 'cathy.phillips@enron.com')
('bill.cordes@enron.com', 'john.haggerty@enron.com')
('bill.cordes@enron.com', 'george.mcclellan@enron.com')
('bill.cordes@enron.com', 'tom.kearney@enron.com')
('bill.cordes@enron.com', 'tom.kearney@enron.com')
('bill.cordes@enron.com', 'cathy.phillips@enron.com')
('stuart.staley@enron.com', 'mike.mcconnell@enron.com')
('stuart.staley@enron.com', 'bill.cordes@enron.com')
('stuart.staley@enron.com', 'tom.kearney@enron.com')
('stuart.staley@enron.com', 'george.mcclellan@enron.com')


"\nThe output produced by the line above when executed with the model implementation\nof proc_headers() was as follows:\n\n('bill.cordes@enron.com', 'mike.mcconnell@enron.com')\n('bill.cordes@enron.com', 'cathy.phillips@enron.com')\n('bill.cordes@enron.com', 'john.haggerty@enron.com')\n('bill.cordes@enron.com', 'george.mcclellan@enron.com')\n('bill.cordes@enron.com', 'tom.kearney@enron.com')\n('bill.cordes@enron.com', 'tom.kearney@enron.com')\n('bill.cordes@enron.com', 'cathy.phillips@enron.com')\n('stuart.staley@enron.com', 'mike.mcconnell@enron.com')\n('stuart.staley@enron.com', 'bill.cordes@enron.com')\n('stuart.staley@enron.com', 'tom.kearney@enron.com')\n('stuart.staley@enron.com', 'george.mcclellan@enron.com')\n"

## Question 2: Spark (20%)
Write a function `proc_headers(lst, N)` that applies further transformations to the dataset
returned by the `proc_headers(lst)` function from Question 2 to identify `N` Email
addresses, which are _most popular_ in terms of the number of Emails that where sent to them. <p>
The output must be a list of `N` tuples `(n, E)` where `n` is the number of Email transmissions having `E` as their recipient address. The list must be sorted in the _descending_ lexicographical order, that is, `(n1, E1) > (n2, E2)` if and only if  either `n1 > n2` or `n1 == n2` and `E1 > E2`.<p>
_Hint_: Use a `map`/`reduceByKey` pattern as in the word count example to pair Email addresses with their popularity counts, the `sortBy` transformation to sort them in the descending lexicographical order, and the `take()` action to extract the top-N records.
<p>

Note that calling `proc_headers()` first is only needed to prevent any errors 
in the implementation of Question 2 from propagating to the solution of this 
question as this way, we will be able to use the model implementation 
of `proc_headers()` for testing.
A more efficient solution would avoid materializing the results
of `proc_headers()` in the driver, and instead directly extend the processing 
steps of Question 2 with further operations. Make sure you understand why 
it is important!

In [4]:
def get_top_emails(lst, N):
    '''
    lst: a list of tuples (FROM, TO, CC, BCC) 
    representing EMail headers
    N: a positive integer
    Returns a list of N tuples `(n, E)` sorted in the descending lexicographical order
    representing the top N most popular EMail destinations as described in the 
    question.
    '''
    rdd = sc.parallelize(proc_headers(lst))
    # Insert your code after this line
    data = rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (x[1], x[0]))
    data = data.sortBy(lambda x: x, ascending=False)
    return data.take(N)

You can use the following code to test your impementation of `get_top_emails()`:

In [5]:
print('\n'.join(str(t) for t in get_top_emails([header1, header2, header3], 3)))
'''
The output produced by the line above when executed with the model implementation
of get_top_emails() was as follows:

(3, 'tom.kearney@enron.com')
(2, 'mike.mcconnell@enron.com')
(2, 'george.mcclellan@enron.com')
'''

(3, 'tom.kearney@enron.com')
(2, 'mike.mcconnell@enron.com')
(2, 'george.mcclellan@enron.com')


"\nThe output produced by the line above when executed with the model implementation\nof get_top_emails() was as follows:\n\n(3, 'tom.kearney@enron.com')\n(2, 'mike.mcconnell@enron.com')\n(2, 'george.mcclellan@enron.com')\n"