# Homework 4 - MapReduce

### DUE: 03/02/2017 before class at 10:30am

In this homework, we are practicing the MapReduce programming paradigm. 

You are required to turn in this notebook as BDM\_HW4\_**NetId**.ipynb. You will be asked to complete each task using the accompanied *mapreduce* package (as **mapreduce.py**) using one or more MapReduce "jobs". For each such job (with mr.run()), you are expected to supply a mapper and a reducer as needed. Below are sample usage of the package:

```python
    # Run on input1 using your mapper1 and reducer1 function
    output = list(mr.run(input1, mapper1, reducer1))

    # Run on input2 using only your mapper2, no reduce phase
    output = list(mr.run(input2, mapper2))
    
    # Run on input3 using 2 nested MapReduce jobs
    output = list(mr.run(mr.run(input3, mapper3, reducer3), mapper4, reducer4))
```
    
Please note that the output the mr.run() is always a **generator**. You have to cast it to a list if you'd like to view, index or print it out.

**NOTE:** if you are using Python3 (though I still encourage you to use Python2 for this class), please download the file *mapreduce.py* from NYU Classes again.

## Task (10 points)

There is only one task in this homework. You are asked to implement the Social Triangle example discussed in class. In particular, given the email dataset, please list all "reciprocal" relationships in the company. Recall that:

If A emails B and B emails A, then A and B is *reciprocal*.

If A emails B but B doesn’t email A, then A and B is *directed*.

**Dataset:** We will use a subset of the open [Enron Email Dataset](https://www.cs.cmu.edu/~./enron/ "Enron Email Dataset"), which contains approximately 10,000 simplified email headers from the Enron Corporation. You can download this dataset from NYU Classes as **enron_mails_small.csv**. The file contains 3 columns *Date*, *From*, and *To*. Their description is as follows:

|Column name|Description|
|--|--|
|Date |The date and time of the email, in the format YYYY-MM-DD hh-mm-ss, <br />e.g. "1998-10-30 07:43:00" |
|From |The sender email address, <br />e.g. "mark.taylor@enron.com" |
|To | A list of recipients' email addresses separated by semicolons ';', <br />e.g. "jennifer.fraser@enron.com;jeffrey.hodge@enron.com" |

Note that, we only care about users employed by Enron, or only relationships having email addresses that end with *'@enron.com'*.

The expected output is also provided below. For each reciprocal relationship, please output a tuple consisting of two strings. The first one is always **'reciprocal'**. And the second one is a string showing the name of the two person in the following format: **'Jane Doe : John Doe'**. The names should be presented in the lexical order, i.e. there will not be a 'John Doe : Jane Doe' since 'Jane' is ordered before 'John.

Though the dataset only contains email addresses, not actual names, we're assuming that the email aliases were created based on their name. For example:

|Email Address|Converted Name|
|--|--|
|mark.taylor@enron.com|Mark Taylor|
|alan.aronowitz@enron.com|Alan Aronowitz|
|marc.r.cutler@enron.com|Marc R Cutler|
|hugh@enron.com|Hugh|

Please fill the code block with a series of MapReduce jobs using your own mapper and reducer functions. Be sure to include the naming convention logic into one of your mappers and/or reducers.

In [77]:
import csv

def streamProcess(reader):
    reciprocal = set()
    connected = set()
    for row in reader:
        sender = row['From']
        if sender.endswith('enron.com'):
            for recipient in row['To'].split(';'):
                if recipient.endswith('enron.com') and sender!=recipient:
                    pair = (sender, recipient)
                    reverse = (recipient, sender)
                    if reverse in connected:
                        reciprocal.add(min(pair, reverse))
                    connected.add(pair)
    return list(reciprocal)

def formatName(email):
    return email.split('@')[0].replace('.', ' ').title()

def formatRelationship((person1, person2)):
    return ('reciprocal', '%s : %s' % (formatName(person1), formatName(person2)))
    
with open('enron_mails_small.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output1 = map(formatRelationship, sorted(streamProcess(reader)))

print(len(output1))
output1

35


[('reciprocal', 'Brenda Whitehead : Elizabeth Sager'),
 ('reciprocal', 'Carol Clair : Debra Perlingiere'),
 ('reciprocal', 'Carol Clair : Mark Taylor'),
 ('reciprocal', 'Carol Clair : Richard Sanders'),
 ('reciprocal', 'Carol Clair : Sara Shackleton'),
 ('reciprocal', 'Carol Clair : Tana Jones'),
 ('reciprocal', 'Debra Perlingiere : Kevin Ruscitti'),
 ('reciprocal', 'Drew Fossum : Susan Scott'),
 ('reciprocal', 'Elizabeth Sager : Janette Elbertson'),
 ('reciprocal', 'Elizabeth Sager : Mark Haedicke'),
 ('reciprocal', 'Elizabeth Sager : Mark Taylor'),
 ('reciprocal', 'Elizabeth Sager : Richard Sanders'),
 ('reciprocal', 'Eric Bass : Susan Scott'),
 ('reciprocal', 'Fletcher Sturm : Greg Whalley'),
 ('reciprocal', 'Fletcher Sturm : Sally Beck'),
 ('reciprocal', 'Gerald Nemec : Susan Scott'),
 ('reciprocal', 'Grant Masson : Vince Kaminski'),
 ('reciprocal', 'Greg Whalley : Richard Sanders'),
 ('reciprocal', 'Janette Elbertson : Mark Taylor'),
 ('reciprocal', 'Janette Elbertson : Richard Sa

In [76]:
import csv
import mapreduce as mr
import operator

def mapper1(row):
    sender = row['From']
    if sender.endswith('enron.com'):
        recipients = filter(lambda x: x.endswith('enron.com') and x!=sender,
                            row['To'].split(';'))
        if recipients:
            yield (sender, recipients)

def reducer1((sender, recipients)):
    return (sender, list(set().union(*recipients)))

def mapper2(row):
    sender = row['From']
    if sender.endswith('enron.com'):
        recipients = filter(lambda x: x.endswith('enron.com') and x!=sender,
                            row['To'].split(';'))
        for recipient in recipients:
            yield (recipient, sender)

def reducer2((recipient, senders)):
    return (recipient, list(set(senders)))

def mapper3(row):
    yield row

def reducer3((person, edges)):
    if len(edges)==2:
        return [(person, reciprocal) for reciprocal in (set(edges[0]) & set(edges[1]))]
    return []

def mapper4(pairs):
    for (p1,p2) in pairs:
        yield (tuple(sorted([p1, p2])), None)

def formatName(email):
    return email.split('@')[0].replace('.', ' ').title()

def reducer4(((person1, person2), __unused)):
    return ('reciprocal', '%s : %s' % (formatName(person1), formatName(person2)))
    
with open('enron_mails_small.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    data = list(reader)
    output1 = list(mr.run(data, mapper1, reducer1))
    output2 = list(mr.run(data, mapper2, reducer2))
    output3 = list(mr.run(mr.run(output1+output2, mapper3, reducer3), mapper4, reducer4))

print(len(output3))
output3

35


[('recipropcal', 'Brenda Whitehead : Elizabeth Sager'),
 ('recipropcal', 'Carol Clair : Debra Perlingiere'),
 ('recipropcal', 'Carol Clair : Mark Taylor'),
 ('recipropcal', 'Carol Clair : Richard Sanders'),
 ('recipropcal', 'Carol Clair : Sara Shackleton'),
 ('recipropcal', 'Carol Clair : Tana Jones'),
 ('recipropcal', 'Debra Perlingiere : Kevin Ruscitti'),
 ('recipropcal', 'Drew Fossum : Susan Scott'),
 ('recipropcal', 'Elizabeth Sager : Janette Elbertson'),
 ('recipropcal', 'Elizabeth Sager : Mark Haedicke'),
 ('recipropcal', 'Elizabeth Sager : Mark Taylor'),
 ('recipropcal', 'Elizabeth Sager : Richard Sanders'),
 ('recipropcal', 'Eric Bass : Susan Scott'),
 ('recipropcal', 'Fletcher Sturm : Greg Whalley'),
 ('recipropcal', 'Fletcher Sturm : Sally Beck'),
 ('recipropcal', 'Gerald Nemec : Susan Scott'),
 ('recipropcal', 'Grant Masson : Vince Kaminski'),
 ('recipropcal', 'Greg Whalley : Richard Sanders'),
 ('recipropcal', 'Janette Elbertson : Mark Taylor'),
 ('recipropcal', 'Janette El

In [86]:
import csv
import mapreduce as mr

def mapper1(row):
    sender = row['From']
    if sender.endswith('enron.com'):
        recipients = filter(lambda x: x.endswith('enron.com') and x!=sender,
                            row['To'].split(';'))
        if recipients:
            yield (sender, (0, recipients))
            for recipient in recipients:
                yield (recipient, (1, sender))

def reducer1((person, others)):
    senders = map(lambda x: x[1], filter(lambda x: x[0]==1, others))
    recipients = map(lambda x: x[1], filter(lambda x: x[0]==0, others))
    reciprocals = set(senders) & (set().union(*recipients))
    return [(person, person2) for person2 in reciprocals]

def mapper4(pairs):
    for (p1,p2) in pairs:
        yield (tuple(sorted([p1, p2])), None)

def formatName(email):
    return email.split('@')[0].replace('.', ' ').title()

def reducer4(((person1, person2), __unused)):
    return ('reciprocal', '%s : %s' % (formatName(person1), formatName(person2)))
    
with open('enron_mails_small.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    data = list(reader)
    output1 = list(mr.run(data, mapper1, reducer1))
    output3 = list(mr.run(output1, mapper4, reducer4))

print(len(output3))
output3

35


[('reciprocal', 'Brenda Whitehead : Elizabeth Sager'),
 ('reciprocal', 'Carol Clair : Debra Perlingiere'),
 ('reciprocal', 'Carol Clair : Mark Taylor'),
 ('reciprocal', 'Carol Clair : Richard Sanders'),
 ('reciprocal', 'Carol Clair : Sara Shackleton'),
 ('reciprocal', 'Carol Clair : Tana Jones'),
 ('reciprocal', 'Debra Perlingiere : Kevin Ruscitti'),
 ('reciprocal', 'Drew Fossum : Susan Scott'),
 ('reciprocal', 'Elizabeth Sager : Janette Elbertson'),
 ('reciprocal', 'Elizabeth Sager : Mark Haedicke'),
 ('reciprocal', 'Elizabeth Sager : Mark Taylor'),
 ('reciprocal', 'Elizabeth Sager : Richard Sanders'),
 ('reciprocal', 'Eric Bass : Susan Scott'),
 ('reciprocal', 'Fletcher Sturm : Greg Whalley'),
 ('reciprocal', 'Fletcher Sturm : Sally Beck'),
 ('reciprocal', 'Gerald Nemec : Susan Scott'),
 ('reciprocal', 'Grant Masson : Vince Kaminski'),
 ('reciprocal', 'Greg Whalley : Richard Sanders'),
 ('reciprocal', 'Janette Elbertson : Mark Taylor'),
 ('reciprocal', 'Janette Elbertson : Richard Sa

In [79]:
import csv
import mapreduce as mr

def mapper1(row):
    sender = row['From']
    if sender.endswith('@enron.com'):
        for recipient in row['To'].split(';'):
            if recipient.endswith('@enron.com'):
                if sender<recipient:
                    yield ((sender, recipient), (1,0))
                else:
                    yield ((recipient, sender), (0,1))
                ## the same as:
                # yield tuple(zip(*sorted([(sender, 1), (recipient, 0)])))

def reducer1((pair, counts)):
    return (pair, reduce(lambda x,y: (x[0]+y[0], x[1]+y[1]), counts))

def formatName(email):
    return email.split('@')[0].replace('.', ' ').title()

def mapper2(((person1, person2), total)):
    if total[0]*total[1]>0:
        yield ('reciprocal', '%s : %s' % (formatName(person1), formatName(person2)))
    
with open('enron_mails_small.csv', 'r') as fi:
    reader = csv.DictReader(fi)
    output1 = list(mr.run(mr.run(reader, mapper1, reducer1), mapper2))

print(len(output1))
output1

35


[('reciprocal', 'Brenda Whitehead : Elizabeth Sager'),
 ('reciprocal', 'Carol Clair : Debra Perlingiere'),
 ('reciprocal', 'Carol Clair : Mark Taylor'),
 ('reciprocal', 'Carol Clair : Richard Sanders'),
 ('reciprocal', 'Carol Clair : Sara Shackleton'),
 ('reciprocal', 'Carol Clair : Tana Jones'),
 ('reciprocal', 'Debra Perlingiere : Kevin Ruscitti'),
 ('reciprocal', 'Drew Fossum : Susan Scott'),
 ('reciprocal', 'Elizabeth Sager : Janette Elbertson'),
 ('reciprocal', 'Elizabeth Sager : Mark Haedicke'),
 ('reciprocal', 'Elizabeth Sager : Mark Taylor'),
 ('reciprocal', 'Elizabeth Sager : Richard Sanders'),
 ('reciprocal', 'Eric Bass : Susan Scott'),
 ('reciprocal', 'Fletcher Sturm : Greg Whalley'),
 ('reciprocal', 'Fletcher Sturm : Sally Beck'),
 ('reciprocal', 'Gerald Nemec : Susan Scott'),
 ('reciprocal', 'Grant Masson : Vince Kaminski'),
 ('reciprocal', 'Greg Whalley : Richard Sanders'),
 ('reciprocal', 'Janette Elbertson : Mark Taylor'),
 ('reciprocal', 'Janette Elbertson : Richard Sa