In [None]:
%%shell
gdown --quiet 1ay5DcH64Qao1HR7CQnR6Cl1hbBMgGqXj
gdown --quiet 13BozEl3JtS43Xuu2Ek9IwMULpWjPH4VC
gdown --quiet 1It6GP8O2JqkmUtZKbYp1kpwpuwOXlLps
pip --quiet install pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone




# Social Triangle Using Apache Spark

The goal of this notebook is to implement a Social Triangle algorithm using Spark. For example, given the email dataset, we will list all "reciprocal" relationships in the company. More specifically:

If A emails B and B emails A, then A and B is *reciprocal*.

If A emails B but B doesn’t email A, then A and B is *directed*.

**Dataset:** We will use a subset of the open [Enron Email Dataset](https://www.cs.cmu.edu/~./enron/ "Enron Email Dataset"), which contains approximately 10,000 simplified email headers from the Enron Corporation. A subset of the data is available as **enron_mails_small.csv**

The file contains 3 columns *Date*, *From*, and *To*. Their description is as follows:

|Column name|Description|
|--|--|
|Date |The date and time of the email, in the format YYYY-MM-DD hh-mm-ss, <br />e.g. "1998-10-30 07:43:00" |
|From |The sender email address, <br />e.g. "mark.taylor@enron.com" |
|To | A list of recipients' email addresses separated by semicolons ';', <br />e.g. "jennifer.fraser@enron.com;jeffrey.hodge@enron.com" |

Note that, we only care about users employed by Enron, i.e. only relationships where email addresses end with *'@enron.com'*.

The expected output is also provided below. For each reciprocal relationship, please output a tuple consisting of two strings. The first one is always **'reciprocal'**. And the second one is a string showing the name of the two person in the following format: **'Jane Doe : John Doe'**. The names should be presented in the lexical order, i.e. there will not be a 'John Doe : Jane Doe' since 'Jane' is ordered before 'John.

Though the dataset only contains email addresses, not actual names, we're assuming that the email aliases were created based on their name. For example:

|Email Address|Converted Name|
|--|--|
|mark.taylor@enron.com|Mark Taylor|
|alan.aronowitz@enron.com|Alan Aronowitz|
|marc.r.cutler@enron.com|Marc R Cutler|
|hugh@enron.com|Hugh|

### You can choose to use either Spark's RDD or Spark's DataFrame in this task.
Regardless of your choice, you must perform all computation using Sparks' transformations, i.e. data must be read directly from the input file into your RDD and DataFrame and stay there for the entire computation.

### Using RDD

In [None]:
mail_csv = 'enron_mails_small.csv'
mail = sc.textFile(mail_csv, use_unicode=True).cache()
list(enumerate(mail.first().split(',')))

[(0, 'Date'), (1, 'From'), (2, 'To')]

In [None]:
def extractEmails(partId, rows):
    if partId == 0:
        next(rows)
    reader = csv.reader(rows)
    for fields in reader:
        if len(fields) == 2:
            continue
        From, To = fields[1], fields[2]
        if not (From.endswith('@enron.com') and To.endswith('@enron.com')):
            continue
        if any(char.isdigit() for char in From):
            continue
        # Splitting "To" email addresses using ";" separator and filter for Enron emails
        To_list = [to_email.strip() for to_email in To.split(';') if to_email.strip().endswith('@enron.com') and not any(char.isdigit() for char in to_email.strip())]
        for to_email in To_list:
            yield (From, to_email)

A2 = mail.mapPartitionsWithIndex(extractEmails)
A2.count()

NameError: ignored

In [None]:
def extract_name(email):
    return tuple(map(lambda x: x.split('@')[0].replace('.', ' ').title(), email))
B2= A2.map(lambda x: extract_name(x))
B2.take(5)

[('Mark Taylor', 'Shari Stack'),
 ('Mark Taylor', 'Yao Apasu'),
 ('Mark Taylor', 'Paul Simons'),
 ('Mark Taylor', 'Justin Boyd'),
 ('Mark Taylor', 'Tana Jones')]

In [None]:
# Remove duplicates
C2 = B2.distinct()
C2.count()

3390

In [None]:
# Sort by the From column
D2 = C2.sortByKey()
D2.take(5)

[('Andrew Lewis', 'Derek Davies'),
 ('Angela Mcculloch', 'Ione Irvine'),
 ('Angela Mcculloch', 'Attila Pazmandi'),
 ('Angela Mcculloch', 'Dan Dietrich'),
 ('Angela Mcculloch', 'Tara Sweitzer')]

In [None]:
def interactions(names):
    sorted_names = tuple(sorted(names))
    return (sorted_names, 1)

# Countting the occurrences of each email pair
E2 = D2.map(lambda x: interactions(x)) \
       .reduceByKey(lambda x, y: x + y)
E2.take(5)

[(('Andrew Lewis', 'Derek Davies'), 1),
 (('Angela Mcculloch', 'Ione Irvine'), 1),
 (('Angela Mcculloch', 'Attila Pazmandi'), 1),
 (('Angela Mcculloch', 'Dan Dietrich'), 1),
 (('Angela Mcculloch', 'Tara Sweitzer'), 1)]

In [None]:
# Your code to read and process data into rddTask2
rddTask2 = E2.filter(lambda x: x[1] >= 2) \
       .map(lambda x: x[0]) \
       .sortByKey()

# DO NOT EDIT BELOW
rddTask2.collect()

[('Brenda Whitehead', 'Elizabeth Sager'),
 ('Carol Clair', 'Sara Shackleton'),
 ('Carol Clair', 'Debra Perlingiere'),
 ('Carol Clair', 'Tana Jones'),
 ('Carol Clair', 'Mark Taylor'),
 ('Carol Clair', 'Richard Sanders'),
 ('Debra Perlingiere', 'Kevin Ruscitti'),
 ('Drew Fossum', 'Susan Scott'),
 ('Elizabeth Sager', 'Janette Elbertson'),
 ('Elizabeth Sager', 'Mark Haedicke'),
 ('Elizabeth Sager', 'Mark Taylor'),
 ('Elizabeth Sager', 'Richard Sanders'),
 ('Eric Bass', 'Susan Scott'),
 ('Fletcher Sturm', 'Greg Whalley'),
 ('Fletcher Sturm', 'Sally Beck'),
 ('Gerald Nemec', 'Susan Scott'),
 ('Grant Masson', 'Vince Kaminski'),
 ('Greg Whalley', 'Richard Sanders'),
 ('Janette Elbertson', 'Mark Taylor'),
 ('Janette Elbertson', 'Richard Sanders'),
 ('Liz Taylor', 'Mark Haedicke'),
 ('Mark Haedicke', 'Mark Taylor'),
 ('Mark Haedicke', 'Richard Sanders'),
 ('Mark Haedicke', 'Twanda Sweet'),
 ('Mark Haedicke', 'Michelle Cash'),
 ('Mark Taylor', 'Tana Jones'),
 ('Mark Taylor', 'Sara Shackleton'),
 

## Using DataFrame

In [None]:
# Your code to read and process data into dfTask2
dfTask2 = ...

# DO NOT EDIT BELOW
dfTask2.show(n=50)

+--------------------+-----------------+
|            Person 1|         Person 2|
+--------------------+-----------------+
|    Brenda Whitehead|  Elizabeth Sager|
|         Carol Clair|Debra Perlingiere|
|         Carol Clair|      Mark Taylor|
|         Carol Clair|  Richard Sanders|
|         Carol Clair|  Sara Shackleton|
|         Carol Clair|       Tana Jones|
|   Debra Perlingiere|   Kevin Ruscitti|
|         Drew Fossum|      Susan Scott|
|     Elizabeth Sager|Janette Elbertson|
|     Elizabeth Sager|    Mark Haedicke|
|     Elizabeth Sager|      Mark Taylor|
|     Elizabeth Sager|  Richard Sanders|
|           Eric Bass|      Susan Scott|
|      Fletcher Sturm|     Greg Whalley|
|      Fletcher Sturm|       Sally Beck|
|        Gerald Nemec|      Susan Scott|
|        Grant Masson|   Vince Kaminski|
|        Greg Whalley|  Richard Sanders|
|   Janette Elbertson|      Mark Taylor|
|   Janette Elbertson|  Richard Sanders|
|          Liz Taylor|    Mark Haedicke|
|       Mark Hae