# Homework 4 - Spark

In this homework, we are practicing Apache Spark.

You are required to turn in this notebook as BDM\_HW4\_Spark\_**NetId**.ipynb. You will be asked to complete each task using Apache Spark. Output can be printed in the notebook.

## Task 1 (5 points)

You are asked to implement Homework 3 using Spark. The description is provided below for your convenience.

You are asked to implement the Social Triangle example discussed in class. In particular, given the email dataset, please list all "reciprocal" relationships in the company. Recall that:

If A emails B and B emails A, then A and B is *reciprocal*.

If A emails B but B doesn’t email A, then A and B is *directed*.

**Dataset:** We will use a subset of the open [Enron Email Dataset](https://www.cs.cmu.edu/~./enron/ "Enron Email Dataset"), which contains approximately 10,000 simplified email headers from the Enron Corporation. You can download this dataset from NYU Classes as **enron_mails_small.csv**. The file contains 3 columns *Date*, *From*, and *To*. Their description is as follows:

|Column name|Description|
|--|--|
|Date |The date and time of the email, in the format YYYY-MM-DD hh-mm-ss, <br />e.g. "1998-10-30 07:43:00" |
|From |The sender email address, <br />e.g. "mark.taylor@enron.com" |
|To | A list of recipients' email addresses separated by semicolons ';', <br />e.g. "jennifer.fraser@enron.com;jeffrey.hodge@enron.com" |

Note that, we only care about users employed by Enron, or only relationships having email addresses that end with *'@enron.com'*.

The expected output is also provided below. For each reciprocal relationship, please output a tuple consisting of two strings. The first one is always **'reciprocal'**. And the second one is a string showing the name of the two person in the following format: **'Jane Doe : John Doe'**. The names should be presented in the lexical order, i.e. there will not be a 'John Doe : Jane Doe' since 'Jane' is ordered before 'John.

Though the dataset only contains email addresses, not actual names, we're assuming that the email aliases were created based on their name. For example:

|Email Address|Converted Name|
|--|--|
|mark.taylor@enron.com|Mark Taylor|
|alan.aronowitz@enron.com|Alan Aronowitz|
|marc.r.cutler@enron.com|Marc R Cutler|
|hugh@enron.com|Hugh|

Please fill the code block with a series of MapReduce jobs using your own mapper and reducer functions. Be sure to include the naming convention logic into one of your mappers and/or reducers.

In [1]:
EN_FN='enron_mails_small.csv'

In [7]:
def oneToone(pid,rows):
    if pid ==0:
        next(rows)
    for row in rows:
        sender,receivers = row
        receiver_list = receivers.split(';')
        for receiver in receiver_list:
            yield(sender,receiver)
def emailToName(x):
    tx,rx = x
    tx = tx.split('@')[0]
    rx = rx.split('@')[0]
    tx =' '.join(map(lambda x:x.capitalize(),tx.split('.')))
    rx =' '.join(map(lambda x:x.capitalize(),rx.split('.')))
    return((tx,rx),1)

In [8]:
dfEmail = spark.read.load(EN_FN,format='csv',header = True,inferSchema=True)

In [9]:
dfEmail = dfEmail.select('From','To')
rddEmail = dfEmail.rdd.mapPartitionsWithIndex(oneToone)\
                 .filter(lambda x: '@enron.com' in x[0] and '.' in x[0].split('@')[0]  and ('@enron.com' in x[1] 
                    and '.' in x[1].split('@')[0]))\
                   .map(emailToName)\
                   .reduceByKey(lambda x,y:1)\
                   .map(lambda x: (tuple(sorted(x[0])),1))\
                   .reduceByKey(lambda x,y:x+y)\
                   .filter(lambda x:x[1]>1)\
                   .sortByKey()\
                   .map(lambda x:('recipropcal',x[0][0]+' : '+x[0][1]))
rddEmail.collect()

[('recipropcal', 'Brenda Whitehead : Elizabeth Sager'),
 ('recipropcal', 'Carol Clair : Debra Perlingiere'),
 ('recipropcal', 'Carol Clair : Mark Taylor'),
 ('recipropcal', 'Carol Clair : Richard Sanders'),
 ('recipropcal', 'Carol Clair : Sara Shackleton'),
 ('recipropcal', 'Carol Clair : Tana Jones'),
 ('recipropcal', 'Debra Perlingiere : Kevin Ruscitti'),
 ('recipropcal', 'Drew Fossum : Susan Scott'),
 ('recipropcal', 'Elizabeth Sager : Janette Elbertson'),
 ('recipropcal', 'Elizabeth Sager : Mark Haedicke'),
 ('recipropcal', 'Elizabeth Sager : Mark Taylor'),
 ('recipropcal', 'Elizabeth Sager : Richard Sanders'),
 ('recipropcal', 'Eric Bass : Susan Scott'),
 ('recipropcal', 'Fletcher Sturm : Greg Whalley'),
 ('recipropcal', 'Fletcher Sturm : Sally Beck'),
 ('recipropcal', 'Gerald Nemec : Susan Scott'),
 ('recipropcal', 'Grant Masson : Vince Kaminski'),
 ('recipropcal', 'Greg Whalley : Richard Sanders'),
 ('recipropcal', 'Janette Elbertson : Mark Taylor'),
 ('recipropcal', 'Janette El

## Task 2 (5 points)

You are asked to implement Task 2 of Lab 5. The description is provided below for your convenience.

We’ll be using two NYC open data sets: the SAT Results and the NYC High School Directory data sets. Both can be downloaded from the links below, or from online class resources.

**Dataset**: *Please note that each school is uniquely identified by an DBN code, which should be found on both data sets.*

**SAT_Results.csv**
Source: https://nycopendata.socrata.com/Education/SAT-Results/f9bf-2cp4  
Description: “The most recent school level results for New York City on the SAT. Results are available at the school level for the graduating seniors of 2012.”

**DOE_High_School_Directory_2014-2015.csv**
Source: https://data.cityofnewyork.us/Education/DOE-High-School-Directory-2014-2015/n3p6-zve2  
Description: “Directory of NYC High Schools.”

We would like to know how the Math scores vary across bus lines or subway lines serving the schools. Your task is to compute the average Math scores of all schools along each bus line and subway line. You can find the bus and subway lines serving each school in the High School Dictionary as bus and subway columns.

The expected results are two lists:
1. A list of key/value pairs: with bus line as keys, and the average Math scores as values.
2. A list of key/value pairs: with subway line as keys, and the average Math scores as values.

The top ten lines with highest score are shown below.

In [48]:
SAT_FN = 'SAT_Results.csv'
HSD_FN = 'DOE_High_School_Directory_2014-2015.csv'

In [49]:
from pyspark.sql.functions import lit
dfScores = spark.read.load(SAT_FN,format='csv',header = True,inferSchema=True)
dfScores = dfScores.select('DBN',
                          dfScores['Num of SAT Test Takers'].cast('int').alias('ntakers'),
                          dfScores['`SAT Math Avg. Score`'].cast('int').alias('score')
                          ).na.drop()
dfScores = dfScores.select('DBN',
                           (dfScores.score).alias('total'))

dfScores = dfScores.withColumn('count',lit(1).cast('int'))
dfScores.show()

+------+-----+-----+
|   DBN|total|count|
+------+-----+-----+
|02M047|  400|    1|
|21K410|  437|    1|
|30Q301|  440|    1|
|17K382|  374|    1|
|18K637|  381|    1|
|32K403|  366|    1|
|09X365|  339|    1|
|11X270|  394|    1|
|05M367|  366|    1|
|14K404|  357|    1|
|30Q575|  492|    1|
|13K336|  374|    1|
|04M635|  369|    1|
|24Q264|  454|    1|
|17K408|  342|    1|
|19K618|  371|    1|
|27Q309|  379|    1|
|32K552|  364|    1|
|13K499|  364|    1|
|07X600|  400|    1|
+------+-----+-----+
only showing top 20 rows



In [50]:
dfSchools = spark.read.load(HSD_FN,format='csv',header = True,inferSchema=True)
dfSchools = dfSchools.na.drop(subset=['boro'])
dfSchools = dfSchools.select('dbn','bus','subway')
dfSchools.show()

+------+--------------------+--------------------+
|   dbn|                 bus|              subway|
+------+--------------------+--------------------+
|01M292|B39, M14A, M14D, ...|B, D to Grand St ...|
|01M448|M14A, M14D, M15, ...|F to East Broadwa...|
|01M450|M101, M102, M103,...|6 to Astor Place ...|
|01M509|B39, M103, M14A, ...|B, D to Grand St ...|
|01M539|B39, M14A, M14D, ...|F, J, M, Z to Del...|
|01M696|M14A, M14D, M21, ...|                 N/A|
|02M047|M101, M102, M14A,...|4, 5, Q to 14th S...|
|02M135|M10, M104, M11, M...|1, C, E to 50th S...|
|02M139|M103, M15, M22, M...|1 to Chambers St ...|
|02M260|M104, M11, M20, M...|1, 2, 3, A, C, E ...|
|02M280|M103, M15, M22, M...|1 to Chambers St ...|
|02M282|M103, M15, M22, M...|1 to Chambers St ...|
|02M288|M104, M11, M31, M...|     C, E to 50th St|
|02M294|B39, M103, M14A, ...|B, D to Grand St ...|
|02M296|M104, M11, M31, M...|     C, E to 50th St|
|02M298|B39, M103, M14A, ...|6, N, Q, R to Can...|
|02M300|M104, M11, M31, M...|  

In [98]:
dfResults = dfSchools.join(dfScores, dfSchools.dbn==dfScores.DBN,how = 'inner')
#dfResults.show(1)
dfResults.filter(dfResults['subway'].contains('SIR')).show()

+------+--------------------+----------------+------+-----+-----+
|   dbn|                 bus|          subway|   DBN|total|count|
+------+--------------------+----------------+------+-----+-----+
|31R450|S40, S42, S44, S4...|SIR to St George|31R450|  435|    1|
|31R455|  S55, S56, S59, S78| SIR to Huguenot|31R455|  486|    1|
|31R600|S40, S42, S44, S4...|SIR to St George|31R600|  422|    1|
|31R605|S57, S74, S76, S7...| SIR to New Dorp|31R605|  682|    1|
+------+--------------------+----------------+------+-----+-----+



## For Bus

In [70]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
dfResultsByBus = dfResults.drop('subway')
dfResultsByBus = dfResultsByBus.withColumn("Bus", explode(split(dfResultsByBus.bus, "[,]")))
dfResultsByBus.show()

+------+--------+------+-----+-----+
|   dbn|     Bus|   DBN|total|count|
+------+--------+------+-----+-----+
|01M292|     B39|01M292|  404|    1|
|01M292|    M14A|01M292|  404|    1|
|01M292|    M14D|01M292|  404|    1|
|01M292|     M15|01M292|  404|    1|
|01M292| M15-SBS|01M292|  404|    1|
|01M292|     M21|01M292|  404|    1|
|01M292|     M22|01M292|  404|    1|
|01M292|      M9|01M292|  404|    1|
|01M448|    M14A|01M448|  423|    1|
|01M448|    M14D|01M448|  423|    1|
|01M448|     M15|01M448|  423|    1|
|01M448|     M21|01M448|  423|    1|
|01M448|     M22|01M448|  423|    1|
|01M448|      M9|01M448|  423|    1|
|01M450|    M101|01M450|  402|    1|
|01M450|    M102|01M450|  402|    1|
|01M450|    M103|01M450|  402|    1|
|01M450|    M14A|01M450|  402|    1|
|01M450|    M14D|01M450|  402|    1|
|01M450|     M15|01M450|  402|    1|
+------+--------+------+-----+-----+
only showing top 20 rows



In [71]:
from pyspark.sql.functions import trim
dfResultsByBus = dfResultsByBus.withColumn("Bus", trim(dfResultsByBus.Bus))
dfResultsByBus.show()

+------+-------+------+-----+-----+
|   dbn|    Bus|   DBN|total|count|
+------+-------+------+-----+-----+
|01M292|    B39|01M292|  404|    1|
|01M292|   M14A|01M292|  404|    1|
|01M292|   M14D|01M292|  404|    1|
|01M292|    M15|01M292|  404|    1|
|01M292|M15-SBS|01M292|  404|    1|
|01M292|    M21|01M292|  404|    1|
|01M292|    M22|01M292|  404|    1|
|01M292|     M9|01M292|  404|    1|
|01M448|   M14A|01M448|  423|    1|
|01M448|   M14D|01M448|  423|    1|
|01M448|    M15|01M448|  423|    1|
|01M448|    M21|01M448|  423|    1|
|01M448|    M22|01M448|  423|    1|
|01M448|     M9|01M448|  423|    1|
|01M450|   M101|01M450|  402|    1|
|01M450|   M102|01M450|  402|    1|
|01M450|   M103|01M450|  402|    1|
|01M450|   M14A|01M450|  402|    1|
|01M450|   M14D|01M450|  402|    1|
|01M450|    M15|01M450|  402|    1|
+------+-------+------+-----+-----+
only showing top 20 rows



In [72]:
dfResultsByBus = dfResultsByBus.groupBy('Bus').sum('count','total').na.drop()
dfResultsByBus = dfResultsByBus.filter(dfResultsByBus['Bus']!='N/A')
dfResultsByBus.show()

+-------+----------+----------+
|    Bus|sum(count)|sum(total)|
+-------+----------+----------+
|   Bx33|        12|      4836|
|   Q101|        16|      7387|
|    Q65|        12|      5693|
|B44-SBS|        15|      6131|
|     Q7|         5|      2024|
|   Q110|         3|      1518|
|    B39|        14|      5853|
|    Q32|        15|      6817|
|   Q100|        10|      4505|
|    Q16|         3|      1347|
|   M104|        21|      8991|
|    Q50|         2|       778|
|    Q42|         2|      1105|
|   M103|        15|      6430|
|    Q28|         4|      1870|
|     M4|        17|      7178|
|    M66|        12|      5685|
|     M7|        26|     11626|
|    Bx7|        15|      5990|
|     B4|        13|      5708|
+-------+----------+----------+
only showing top 20 rows



In [73]:
dfResultsByBus = dfResultsByBus.withColumn('avg',dfResultsByBus[2]/dfResultsByBus[1])\
              .select('Bus','avg')

In [83]:
from pyspark.sql.functions import desc
dfResultsByBus = dfResultsByBus.sort(desc('avg'))

In [84]:
dfResultsByBus.show()

+-------+------------------+
|    Bus|               avg|
+-------+------------------+
|    M79|             594.0|
|    Q42|             552.5|
|    Q64|             526.0|
|    Q35|             519.0|
|   Q20A|             518.5|
|    M86|             509.0|
|    S57|            507.75|
|     Q9|             506.0|
|   Q110|             506.0|
|    B11| 500.3333333333333|
|   Q20B|            497.75|
|    Q88|             496.8|
|S79-SBS|             495.5|
|    S74|             494.0|
|    S76|             494.0|
|    Q31|493.77777777777777|
|    S78|             492.4|
|    Q76|           488.125|
|    Q17| 487.9166666666667|
|    S55|             486.0|
+-------+------------------+
only showing top 20 rows



## For subway

In [87]:
dfResultsBySubway = dfResults.drop('bus')
dfResultsBySubway = dfResultsBySubway.withColumn("subway", split(dfResultsBySubway.subway, "[to]")[0])
dfResultsBySubway.show()

+------+--------------------+------+-----+-----+
|   dbn|              subway|   DBN|total|count|
+------+--------------------+------+-----+-----+
|01M292|               B, D |01M292|  404|    1|
|01M448|                  F |01M448|  423|    1|
|01M450|                  6 |01M450|  402|    1|
|01M509|               B, D |01M509|  433|    1|
|01M539|         F, J, M, Z |01M539|  574|    1|
|01M696|                 N/A|01M696|  604|    1|
|02M047|            4, 5, Q |02M047|  400|    1|
|02M288|               C, E |02M288|  393|    1|
|02M294|               B, D |02M294|  384|    1|
|02M296|               C, E |02M296|  375|    1|
|02M298|         6, N, Q, R |02M298|  438|    1|
|02M300|               C, E |02M300|  449|    1|
|02M303|               C, E |02M303|  358|    1|
|02M305|               B, D |02M305|  388|    1|
|02M308|               B, D |02M308|  392|    1|
|02M316|               1, R |02M316|  370|    1|
|02M374|      1, 2, 3, F, M |02M374|  391|    1|
|02M376|1, 6, A, J, 

In [88]:
dfResultsBySubway = dfResultsBySubway.withColumn("Subway", explode(split(dfResultsBySubway.subway, "[,]")))

In [90]:
dfResultsBySubway.show()

+------+------+------+-----+-----+
|   dbn|Subway|   DBN|total|count|
+------+------+------+-----+-----+
|01M292|     B|01M292|  404|    1|
|01M292|    D |01M292|  404|    1|
|01M448|    F |01M448|  423|    1|
|01M450|    6 |01M450|  402|    1|
|01M509|     B|01M509|  433|    1|
|01M509|    D |01M509|  433|    1|
|01M539|     F|01M539|  574|    1|
|01M539|     J|01M539|  574|    1|
|01M539|     M|01M539|  574|    1|
|01M539|    Z |01M539|  574|    1|
|01M696|   N/A|01M696|  604|    1|
|02M047|     4|02M047|  400|    1|
|02M047|     5|02M047|  400|    1|
|02M047|    Q |02M047|  400|    1|
|02M288|     C|02M288|  393|    1|
|02M288|    E |02M288|  393|    1|
|02M294|     B|02M294|  384|    1|
|02M294|    D |02M294|  384|    1|
|02M296|     C|02M296|  375|    1|
|02M296|    E |02M296|  375|    1|
+------+------+------+-----+-----+
only showing top 20 rows



In [92]:
dfResultsBySubway = dfResultsBySubway.withColumn("Subway", trim(dfResultsBySubway.Subway))

In [93]:
dfResultsBySubway = dfResultsBySubway.groupBy('Subway').sum('count','total')
dfResultsBySubway = dfResultsBySubway.filter(dfResultsBySubway['Subway']!='N/A')
dfResultsBySubway.show()

+------+----------+----------+
|Subway|sum(count)|sum(total)|
+------+----------+----------+
|     7|        17|      7427|
|     3|        32|     13319|
|     F|        16|      6879|
|     Q|        26|     11367|
|     E|        12|      5592|
|     B|        41|     16344|
|     5|        72|     29033|
|   SIR|         4|      2025|
|     M|        13|      5859|
|     L|        12|      4998|
|     6|        19|      7896|
|     D|        38|     15276|
|     C|        22|      9123|
|     J|        16|      6845|
|     Z|        12|      5319|
|     A|        22|      9316|
|     N|        16|      7201|
|     S|        10|      4042|
|     R|        27|     11930|
|     G|        15|      5891|
+------+----------+----------+
only showing top 20 rows



In [94]:
dfResultsBySubway = dfResultsBySubway.withColumn('avg',dfResultsBySubway[2]/dfResultsBySubway[1])\
              .select('Subway','avg')

In [103]:
dfResultsBySubway = dfResultsBySubway.sort(desc('avg'))
dfResultsBySubway.show(40)

+------+------------------+
|Subway|               avg|
+------+------------------+
|   SIR|            506.25|
|     E|             466.0|
|     M| 450.6923076923077|
|     N|          450.0625|
|     Z|            443.25|
|     R|441.85185185185185|
|     1|440.78723404255317|
|     Q| 437.1923076923077|
|     7|436.88235294117646|
|     F|          429.9375|
|     J|          427.8125|
|     4| 423.6904761904762|
|     A|423.45454545454544|
|     L|             416.5|
|     3|         416.21875|
|     6|415.57894736842104|
|     C| 414.6818181818182|
|     S|             404.2|
|     5| 403.2361111111111|
|     D|             402.0|
|     2|399.93150684931504|
|     B| 398.6341463414634|
|     G|392.73333333333335|
+------+------------------+



In [99]:
listResultsByBus = dfResultsByBus.select('Bus','avg').rdd.map(lambda x: {x[0]:x[1]}).collect()
listResultsBySubway = dfResultsBySubway.select('Subway','avg').rdd.map(lambda x: {x[0]:x[1]}).collect()

In [100]:
listResultsBySubway

[{'7': 436.88235294117646},
 {'3': 416.21875},
 {'F': 429.9375},
 {'Q': 437.1923076923077},
 {'E': 466.0},
 {'B': 398.6341463414634},
 {'5': 403.2361111111111},
 {'SIR': 506.25},
 {'M': 450.6923076923077},
 {'L': 416.5},
 {'6': 415.57894736842104},
 {'D': 402.0},
 {'C': 414.6818181818182},
 {'J': 427.8125},
 {'Z': 443.25},
 {'A': 423.45454545454544},
 {'N': 450.0625},
 {'S': 404.2},
 {'R': 441.85185185185185},
 {'G': 392.73333333333335},
 {'1': 440.78723404255317},
 {'4': 423.6904761904762},
 {'2': 399.93150684931504}]

In [101]:
listResultsByBus

[{'M79': 594.0},
 {'Q42': 552.5},
 {'Q64': 526.0},
 {'Q35': 519.0},
 {'Q20A': 518.5},
 {'M86': 509.0},
 {'S57': 507.75},
 {'Q110': 506.0},
 {'Q9': 506.0},
 {'B11': 500.3333333333333},
 {'Q20B': 497.75},
 {'Q88': 496.8},
 {'S79-SBS': 495.5},
 {'S76': 494.0},
 {'S74': 494.0},
 {'Q31': 493.77777777777777},
 {'S78': 492.4},
 {'Q76': 488.125},
 {'Q17': 487.9166666666667},
 {'S55': 486.0},
 {'B9': 485.3333333333333},
 {'Q104': 483.25},
 {'S1115': 482.25},
 {'Q36': 482.1666666666667},
 {'Q112': 481.75},
 {'Q5': 481.0},
 {'Q30': 478.6666666666667},
 {'Q84': 478.0},
 {'Q25': 477.72727272727275},
 {'Q102': 476.3333333333333},
 {'B70': 476.0},
 {'B2': 475.0},
 {'B31': 475.0},
 {'B100': 475.0},
 {'Q65': 474.4166666666667},
 {'S54': 474.0},
 {'M66': 473.75},
 {'M22': 473.73333333333335},
 {'Q66': 473.6363636363636},
 {'M72': 472.0},
 {'Q41': 470.6},
 {'Q60': 468.6470588235294},
 {'M20': 468.0},
 {'Q28': 467.5},
 {'Q13': 467.5},
 {'Q46': 464.6666666666667},
 {'S53': 463.0},
 {'Bx10': 462.2},
 {'S59'