# Spark

In this exercise, I practice using Apache Spark by 1) finding all "reciprocal" relationships in a sample of emails uncovered during the investigation of Enron and 2) analyzing NYC high school SAT performance.

## Enron

**Reciprocal v Directed:**

If A emails B and B emails A, then A and B is *reciprocal*.

If A emails B but B doesn’t email A, then A and B is *directed*.

**Dataset:** A subset of the open [Enron Email Dataset](https://www.cs.cmu.edu/~./enron/ "Enron Email Dataset"), which contains approximately 10,000 simplified email headers from the Enron Corporation. The file contains 3 columns *Date*, *From*, and *To*. Their description is as follows:

|Column name|Description|
|--|--|
|Date |The date and time of the email, in the format YYYY-MM-DD hh-mm-ss, <br />e.g. "1998-10-30 07:43:00" |
|From |The sender email address, <br />e.g. "mark.taylor@enron.com" |
|To | A list of recipients' email addresses separated by semicolons ';', <br />e.g. "jennifer.fraser@enron.com;jeffrey.hodge@enron.com" |

Note that, we only care about users employed by Enron, or only relationships having email addresses that end with *'@enron.com'*. Though the dataset only contains email addresses, not actual names, we're assuming that the email aliases were created based on their name. For example:

|Email Address|Converted Name|
|--|--|
|mark.taylor@enron.com|Mark Taylor|
|alan.aronowitz@enron.com|Alan Aronowitz|
|marc.r.cutler@enron.com|Marc R Cutler|
|hugh@enron.com|Hugh|

In [1]:
sc

In [2]:
# Variable for original data
ENRON = 'data/enron_mails_small.csv'

In [3]:
# Create an RDD with the target data
enron = sc.textFile(ENRON, use_unicode=True).cache()

In [4]:
"""
Extract the froms and tos from the Enron email data set, check to make sure '@enron.com' is in the address, take only
the names, and capitalize them (k2 - v2: from - to).
"""
def extractNames(partitionId, rows):
    # Skip the first row
    if partitionId==0:
        next(rows)
    import csv
    # Read in the data
    reader = csv.reader(rows)
    for fields in reader:
        # Split the recipient email addresses
        for to in fields[2].strip().split(';'):
            # Check to make sure '@enron.com' is in the address
            if '@enron.com' in to and '@enron.com' in fields[1]:
                # Yield the froms and every to for each from
                yield (fields[1].split('@')[0].replace('.',' ').title(), \
                       to.split('@')[0].replace('.',' ').title())
    
enronNames = enron.mapPartitionsWithIndex(extractNames)
enronNames.take(5)

[('Mark Taylor', 'Shari Stack'),
 ('Mark Taylor', 'Yao Apasu'),
 ('Mark Taylor', 'Paul Simons'),
 ('Mark Taylor', 'Justin Boyd'),
 ('Mark Taylor', 'Tana Jones')]

In [5]:
"""
Count all the reciprocal relationships

Steps: 
1) Get all the from-to pairs, sort them and give each pair a 1 (k3 - v3: from-to - 1)
2) Reduce all the k3 - v3 pairs by the keys (k4 - v4: from-to - sum of v3)
3) Only take any k4 - v4 pairs where v4 > 1 (k5 - v5: from-to - sum of v4>1)
4) Format the result using map (k6 - v6: 'reciprocal' - name1 : name2)
5) Count
"""
enronNames.distinct() \
    .map(lambda x: (tuple(sorted(x)), 1)) \
    .reduceByKey(lambda x,y: x+y) \
    .filter(lambda x: x[1] > 1) \
    .map(lambda x: ('reciprocal', str(x[0][0]) + ' : ' + str(x[0][1]))) \
    .count()

35

In [6]:
"""
Get a list of all the reciprocal relationships

Steps: 
1) Get all the from-to pairs, sort them and give each pair a 1 (k3 - v3: from-to - 1)
2) Reduce all the k3 - v3 pairs by the keys (k4 - v4: from-to - sum of v3)
3) Only take any k4 - v4 pairs where v4 > 1 (k5 - v5: from-to - sum of v4>1)
4) Format the result using map (k6 - v6: 'reciprocal' - name1 : name2)
5) Collect
"""
sorted(enronNames.distinct() \
    .map(lambda x: (tuple(sorted(x)), 1)) \
    .reduceByKey(lambda x,y: x+y) \
    .filter(lambda x: x[1] > 1) \
    .map(lambda x: ('reciprocal', str(x[0][0]) + ' : ' + str(x[0][1]))) \
    .collect())

[('reciprocal', 'Brenda Whitehead : Elizabeth Sager'),
 ('reciprocal', 'Carol Clair : Debra Perlingiere'),
 ('reciprocal', 'Carol Clair : Mark Taylor'),
 ('reciprocal', 'Carol Clair : Richard Sanders'),
 ('reciprocal', 'Carol Clair : Sara Shackleton'),
 ('reciprocal', 'Carol Clair : Tana Jones'),
 ('reciprocal', 'Debra Perlingiere : Kevin Ruscitti'),
 ('reciprocal', 'Drew Fossum : Susan Scott'),
 ('reciprocal', 'Elizabeth Sager : Janette Elbertson'),
 ('reciprocal', 'Elizabeth Sager : Mark Haedicke'),
 ('reciprocal', 'Elizabeth Sager : Mark Taylor'),
 ('reciprocal', 'Elizabeth Sager : Richard Sanders'),
 ('reciprocal', 'Eric Bass : Susan Scott'),
 ('reciprocal', 'Fletcher Sturm : Greg Whalley'),
 ('reciprocal', 'Fletcher Sturm : Sally Beck'),
 ('reciprocal', 'Gerald Nemec : Susan Scott'),
 ('reciprocal', 'Grant Masson : Vince Kaminski'),
 ('reciprocal', 'Greg Whalley : Richard Sanders'),
 ('reciprocal', 'Janette Elbertson : Mark Taylor'),
 ('reciprocal', 'Janette Elbertson : Richard Sa

## NYC SAT performance

The objective for this assignment was to find out if math scores vary across bus or subway lines. To do this, I used two NYC open data sets: the SAT Results and the NYC High School Directory data sets. Both can be downloaded from the links below:

**SAT_Results.csv**
Source: https://nycopendata.socrata.com/Education/SAT-Results/f9bf-2cp4  
Description: “The most recent school level results for New York City on the SAT. Results are available at the school level for the graduating seniors of 2012.”

**DOE_High_School_Directory_2014-2015.csv**
Source: https://data.cityofnewyork.us/Education/DOE-High-School-Directory-2014-2015/n3p6-zve2  
Description: “Directory of NYC High Schools.”

The results are two lists:
1. A list of key/value pairs: with bus lines as keys and the average math scores as values.
2. A list of key/value pairs: with subway lines as keys and the average math scores as values.

In [7]:
# Store variables for the SAT results and school info
SAT_FN = 'data/SAT_Results.csv'
HSD_FN = 'data/DOE_High_School_Directory_2014-2015.csv'

In [8]:
# Create an RDD of the SAT results
sat = sc.textFile(SAT_FN, use_unicode=True).cache()

In [9]:
# Extract the DBNs, average math SAT scores, and number of test takers for all schools
def extractScore(partitionId, rows):
    # Skip the first row
    if partitionId==0:
        next(rows)
    import csv
    # Read in the data
    reader = csv.reader(rows)
    for fields in reader:
        # Skip rows where number of test takers has 's'
        if fields[2]!='s':
            # Yield the DBN, average math SAT score, and number of test takers
            yield (fields[0], (int(fields[4]), int(fields[2])))
    
satScores = sat.mapPartitionsWithIndex(extractScore)
satScores.take(5)

[('02M047', (400, 16)),
 ('21K410', (437, 475)),
 ('30Q301', (440, 98)),
 ('17K382', (374, 59)),
 ('18K637', (381, 35))]

In [10]:
# Creat an RDD from the schools data
schools = sc.textFile(HSD_FN, use_unicode=True).cache()

In [11]:
"""
Purpose of this code block: Extract the bus lines that access every school by DBN.

I use a split to separate each bus line so that my k2 - v2 is: DBN - individual bus line.
"""
def extractBusGrades(partitionId, rows):
    # Skip first row
    if partitionId==0:
        next(rows)
    import csv
    # Read in data
    reader = csv.reader(rows)
    for fields in reader:
        # Make sure all columns are present
        if len(fields)==58:
            # Split the buslines
            for busline in fields[10].strip().split(','):
                # Yield the DBN and busline
                yield (fields[0], busline.strip())
    
busGrades = schools.mapPartitionsWithIndex(extractBusGrades)
busGrades.take(5)

[('01M292', 'B39'),
 ('01M292', 'M14A'),
 ('01M292', 'M14D'),
 ('01M292', 'M15'),
 ('01M292', 'M15-SBS')]

In [12]:
"""
Purpose of this code block: Extract the subway lines that access every school by DBN.

I use a series of splits in order to separate out every subway line, remove station names, and then yield each school 
as a DBN with each subway line that has access to that school (k2 - v2: DBN - individual subway line).
"""
def extractSubwayGrades(partitionId, rows):
    # Skip first row
    if partitionId==0:
        next(rows)
    import csv
    # Read in data
    reader = csv.reader(rows)
    for fields in reader:
        # Check to make sure all columns are present
        if len(fields)==58:
            # First split for each subway line
            for subline_station in fields[11].split(';'):
                # Split the lines and the related stations
                for sublines in subline_station.split(' to'):
                    # Split each individual subway line
                    for line in sublines.strip().split(','):
                        # Only take the subway lines and not the stations
                        if len(line.strip()) == 1:
                            yield (fields[0], line.strip())

subwayGrades = schools.mapPartitionsWithIndex(extractSubwayGrades)
subwayGrades.take(5)

[('01M292', 'B'),
 ('01M292', 'D'),
 ('01M292', 'F'),
 ('01M292', 'J'),
 ('01M292', 'M')]

In [13]:
"""
Grades for bus lines only and sort by average math SAT score in descending order

Step-by-step: 
1) Union all bus info for schools and join with SAT scores (k3 - v3: DBN - (bus, (avg math SAT score, # of test takers))
2) Get the total average SAT score by multiplying the average SAT score for each line by the number of test takers (k4
 - v4: bus line - (total average math SAT score, number of test takers))
3) Reduce by the bus line as a key (k5 - v5: bus line - (total average math SAT score, total number of test takers))
4) Divide by the total number of test takers for each line to get the average for each line (k6 - v6: bus line - 
average math SAT score)
5) Switch the keys and values using map (k7 - v7: average math SAT score - bus line)
6) Sort by the keys aka average math SAT scores (k8 - v8: sorted average math SAT score - bus line)
7) Switch the keys and values again to get the proper key - value format (k9 - v9: bus line - sorted and rounded 
average math SAT score)
"""
busGrades.join(satScores) \
    .values() \
    .mapValues(lambda x: (x[0]*x[1], x[1])) \
    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])) \
    .mapValues(lambda x: x[0]/x[1]) \
    .map(lambda x: (x[1], x[0])) \
    .sortByKey(ascending=False) \
    .map(lambda x: (x[1], int(x[0]))) \
    .collect()

[('S1115', 612),
 ('M79', 594),
 ('Q42', 582),
 ('M22', 574),
 ('Bx3', 571),
 ('B52', 560),
 ('B63', 557),
 ('B69', 548),
 ('B54', 543),
 ('B25', 541),
 ('M20', 540),
 ('M9', 539),
 ('M86', 538),
 ('B65', 538),
 ('B45', 534),
 ('Bx10', 534),
 ('Bx26', 533),
 ('B103', 531),
 ('Q64', 529),
 ('Bx22', 525),
 ('M72', 523),
 ('B41', 520),
 ('B38', 520),
 ('M5', 520),
 ('Q35', 519),
 ('M66', 518),
 ('B62', 513),
 ('Q88', 508),
 ('Q30', 507),
 ('Q84', 507),
 ('S79-SBS', 505),
 ('Q20A', 505),
 ('Q31', 504),
 ('B11', 503),
 ('M35', 496),
 ('Q17', 495),
 ('M10', 495),
 ('Q28', 492),
 ('Q13', 492),
 ('S57', 490),
 ('M31', 490),
 ('Bx28', 489),
 ('B9', 489),
 ('Q76', 488),
 ('S74', 486),
 ('S76', 486),
 ('S78', 486),
 ('S55', 486),
 ('M57', 485),
 ('M21', 485),
 ('Q85', 485),
 ('B8', 485),
 ('M7', 483),
 ('Q23', 482),
 ('Q20B', 481),
 ('Q27', 481),
 ('Q60', 479),
 ('Q4', 479),
 ('M104', 477),
 ('M11', 477),
 ('B70', 476),
 ('S56', 476),
 ('M14A', 475),
 ('B100', 475),
 ('B31', 475),
 ('B2', 475),
 

In [14]:
"""
Grades for subway lines only and sort by average math SAT score in descending order

Step-by-step: 
1) Union all subway info for schools and join with SAT scores (k3 - v3: DBN - (subway, (avg math SAT score, # of test 
takers))
2) Get the total average SAT score by multiplying the average SAT score for each line by the number of test takers (k4
 - v4: subway line - (total average math SAT score, number of test takers))
3) Reduce by the subway line as a key (k5 - v5: subway line - (total average math SAT score, total number of test 
takers))
4) Divide by the total number of test takers for each line to get the average for each line (k6 - v6: subway line - 
average math SAT score)
5) Switch the keys and values using map (k7 - v7: average math SAT score - subway line)
6) Sort by the keys aka average math SAT scores (k8 - v8: sorted average math SAT score - subway line)
7) Switch the keys and values again to get the proper key - value format (k9 - v9: subway line - sorted and rounded 
average math SAT score)
"""
subwayGrades.join(satScores) \
    .values() \
    .mapValues(lambda x: (x[0]*x[1], x[1])) \
    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])) \
    .mapValues(lambda x: x[0]/x[1]) \
    .map(lambda x: (x[1], x[0])) \
    .sortByKey(ascending=False) \
    .map(lambda x: (x[1], int(x[0]))) \
    .collect()

[('3', 513),
 ('C', 510),
 ('A', 510),
 ('R', 508),
 ('G', 503),
 ('D', 502),
 ('E', 501),
 ('1', 499),
 ('4', 495),
 ('N', 493),
 ('B', 491),
 ('2', 488),
 ('Q', 482),
 ('5', 461),
 ('7', 457),
 ('M', 454),
 ('F', 445),
 ('J', 439),
 ('Z', 438),
 ('6', 432),
 ('S', 427),
 ('L', 426)]