# Social Triangle Using Apache Spark

The goal of this notebook is to implement a Social Triangle algorithm using Spark. For example, given the email dataset, we will list all "reciprocal" relationships in the company. More specifically:

If A emails B and B emails A, then A and B is *reciprocal*.

If A emails B but B doesn’t email A, then A and B is *directed*.

**Dataset:** We will use a subset of the open [Enron Email Dataset](https://www.cs.cmu.edu/~./enron/ "Enron Email Dataset"), which contains approximately 10,000 simplified email headers from the Enron Corporation. A subset of the data is available as **enron_mails_small.csv**

The file contains 3 columns *Date*, *From*, and *To*. Their description is as follows:

|Column name|Description|
|--|--|
|Date |The date and time of the email, in the format YYYY-MM-DD hh-mm-ss, <br />e.g. "1998-10-30 07:43:00" |
|From |The sender email address, <br />e.g. "mark.taylor@enron.com" |
|To | A list of recipients' email addresses separated by semicolons ';', <br />e.g. "jennifer.fraser@enron.com;jeffrey.hodge@enron.com" |

Note that, we only care about users employed by Enron, i.e. only relationships where email addresses end with *'@enron.com'*.

The expected output is also provided below. For each reciprocal relationship, please output a tuple consisting of two strings. The first one is always **'reciprocal'**. And the second one is a string showing the name of the two person in the following format: **'Jane Doe : John Doe'**. The names should be presented in the lexical order, i.e. there will not be a 'John Doe : Jane Doe' since 'Jane' is ordered before 'John.

Though the dataset only contains email addresses, not actual names, we're assuming that the email aliases were created based on their name. For example:

|Email Address|Converted Name|
|--|--|
|mark.taylor@enron.com|Mark Taylor|
|alan.aronowitz@enron.com|Alan Aronowitz|
|marc.r.cutler@enron.com|Marc R Cutler|
|hugh@enron.com|Hugh|

## Environment Setup

In [None]:
%%shell
gdown --quiet 1ay5DcH64Qao1HR7CQnR6Cl1hbBMgGqXj
gdown --quiet 13BozEl3JtS43Xuu2Ek9IwMULpWjPH4VC
gdown --quiet 1It6GP8O2JqkmUtZKbYp1kpwpuwOXlLps
pip --quiet install pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone




In [None]:
ENRON_FN = 'enron_mails_small.csv'

import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
spark

### Using RDD

In [None]:
#loading the enron data
enron = sc.textFile(ENRON_FN, use_unicode=True).cache()
#display header
enronHeader = enron.first().split(',')
display(list(enumerate(enronHeader)))

[(0, 'Date'), (1, 'From'), (2, 'To')]

In [None]:
#generator function to extract the to and from data from the entron file 
def extractEron(partId,rows):
  #skip if header
  if partId == 0:
    next(rows)
  #extracting the to and from data
  import csv 
  for row in csv.reader(rows):
    yield (row[1],row[2])

enron_data = enron.mapPartitionsWithIndex(extractEron)
enron_data.take(5)


[('mark.taylor@enron.com', 'marc.r.cutler@bankamerica.com'),
 ('mark.taylor@enron.com', 'marc.r.cutler@bankamerica.com'),
 ('mark.taylor@enron.com', 'shari.stack@enron.com'),
 ('mark.taylor@enron.com', 'marc.r.cutler@bankamerica.com'),
 ('mark.taylor@enron.com', 'yao.apasu@enron.com')]

In [None]:
#filter function for returning only enron employees 
def enron_filter(values):
  return list(filter(lambda x: '@enron.com' in x, values))

In [None]:
#testing enron_filter
test = ('phillip.love@enron.com',{'b.palmer@enron.com','bryan.hull@enron.com','ed.dowling@msl.redstone.army.mil','test.test@enron.com'})

enron_filter(test[1])

['bryan.hull@enron.com', 'b.palmer@enron.com', 'test.test@enron.com']

In [None]:
#function to turn a tuple of from and list of to, to tuples of from and to for every to 
def list_to_tuples(tup):
  fro,l = tup
  return list(map(lambda x: (fro,x),l))

In [None]:
test2 = ('rosalee.fleming@enron.com',['jeffrey.mcclellan@enron.com','sanjay.bhatnagar@enron.com'])

list_to_tuples(test2)

[('rosalee.fleming@enron.com', 'jeffrey.mcclellan@enron.com'),
 ('rosalee.fleming@enron.com', 'sanjay.bhatnagar@enron.com')]

In [None]:
#function to take a tuple of emails and return a tuple of names 
def tup_to_names(tup):
  names = list(map(lambda x: (x.split('@')[0].split('.')),tup))
  temp = []
  for i in names:
    first, last = i 
    temp.append((first.capitalize()+' '+last.capitalize()))
  return tuple(temp)

In [None]:
#testing function
tup_to_names((('sara.shackleton@enron.com', 'tana.jones@enron.com')))

('Sara Shackleton', 'Tana Jones')

In [None]:
# Your code to read and process data into rddTask2
rddTask2 = enron_data.reduceByKey(lambda x,y: x+';'+y) \
  .mapValues(lambda x: set(x.split(';'))) \
  .mapValues(enron_filter) \
  .flatMap(list_to_tuples) \
  .map(lambda x: (tuple(sorted(x)),1)) \
  .groupByKey() \
  .mapValues(lambda values: sum(values)) \
  .filter(lambda x: x[1]>1) \
  .map(lambda x: tup_to_names(x[0])).sortBy(lambda x: x[0])

# DO NOT EDIT BELOW
rddTask2.collect()

[('Brenda Whitehead', 'Elizabeth Sager'),
 ('Carol Clair', 'Mark Taylor'),
 ('Carol Clair', 'Richard Sanders'),
 ('Carol Clair', 'Debra Perlingiere'),
 ('Carol Clair', 'Tana Jones'),
 ('Carol Clair', 'Sara Shackleton'),
 ('Debra Perlingiere', 'Kevin Ruscitti'),
 ('Drew Fossum', 'Susan Scott'),
 ('Elizabeth Sager', 'Janette Elbertson'),
 ('Elizabeth Sager', 'Richard Sanders'),
 ('Elizabeth Sager', 'Mark Taylor'),
 ('Elizabeth Sager', 'Mark Haedicke'),
 ('Eric Bass', 'Susan Scott'),
 ('Fletcher Sturm', 'Greg Whalley'),
 ('Fletcher Sturm', 'Sally Beck'),
 ('Gerald Nemec', 'Susan Scott'),
 ('Grant Masson', 'Vince Kaminski'),
 ('Greg Whalley', 'Richard Sanders'),
 ('Janette Elbertson', 'Mark Taylor'),
 ('Janette Elbertson', 'Richard Sanders'),
 ('Liz Taylor', 'Mark Haedicke'),
 ('Mark Haedicke', 'Mark Taylor'),
 ('Mark Haedicke', 'Twanda Sweet'),
 ('Mark Haedicke', 'Michelle Cash'),
 ('Mark Haedicke', 'Richard Sanders'),
 ('Mark Taylor', 'Tana Jones'),
 ('Mark Taylor', 'Sara Shackleton'),
 