# SAT Results & NYC High School Directory Analysis

The goal of this notebook is to analyze two large csv files using Apache Spark. The datasets used are below:

DATASET: Each school is uniquely identified by an DBN code, which should be found on both data sets.

SAT_Results.csv
* Source: https://nycopendata.socrata.com/Education/SAT-Results/f9bf-2cp4
* Description: “The most recent school level results for New York City on the SAT. Results are available at the
school level for the graduating seniors of 2012.”

DOE_High_School_Directory_2014-2015.csv
* Source: https://data.cityofnewyork.us/Education/DOE-High-School-Directory-2014-2015/n3p6-zve2
* Description: “Directory of NYC High Schools.”

### Tasks:

TASK 1:

* Compute the average SAT Math score of all high schools with 500 students or more, for
each borough of the city. Meaning: what is the average SAT Math score of all high schools with 500 students
or more in Manhattan, in Brooklyn, in Queens, in Bronx and in Staten Island. The final result is expected to be a list of tuples borough
names as the first elements, and the average scores as the second.
* Note 1: since the SAT Results also provide the number of test takers along with the average scores, you should
use this information in computing the exact average scores above.
* Note 2: if a DBN in the SAT Results data set is not found in the High School Directory, you can safely ignore
the test scores for that school.

TASK 2:
* We would like to know how the Math scores vary across bus lines or subway lines serving the schools. Compute the average Math scores of all schools along each bus line and subway line. You can find the
bus and subway lines serving each school in the High School Dictionary as bus and subway columns.
The expected results are two lists:
1. A list of key/value pairs: with bus line as keys, and the average Math scores as values.
2. A list of key/value pairs: with subway line as keys, and the average Math scores as values.

## Environment Setup

In [None]:
%%shell
gdown --quiet 1ay5DcH64Qao1HR7CQnR6Cl1hbBMgGqXj
gdown --quiet 13BozEl3JtS43Xuu2Ek9IwMULpWjPH4VC
gdown --quiet 1It6GP8O2JqkmUtZKbYp1kpwpuwOXlLps
pip --quiet install pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone




In [None]:
SAT_FN = 'SAT_Results.csv'
HSD_FN = 'DOE_High_School_Directory_2014-2015.csv'
ENRON_FN = 'enron_mails_small.csv'

import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
spark

### A. Load **SAT Scores** Dataset without Header

In [None]:
# We read the SAT score to our RDD.
sat = sc.textFile(SAT_FN, use_unicode=True).cache()
satHeader = sat.first().split(',')
A = sat.filter(lambda x: not x.startswith('DBN,SCHOOL'))

# This line for us to list the column index and column names to see
# which column we need to use for our task. In this case, we're
# interested in the number of test takers (#2) and the math score (#4).
display(list(enumerate(satHeader)))
A.take(3)

[(0, 'DBN'),
 (1, 'SCHOOL NAME'),
 (2, 'Num of SAT Test Takers'),
 (3, 'SAT Critical Reading Avg. Score'),
 (4, 'SAT Math Avg. Score'),
 (5, 'SAT Writing Avg. Score')]

['02M047,47 THE AMERICAN SIGN LANGUAGE AND ENGLISH SECONDARY SCHOOL,16,395,400,387',
 '21K410,ABRAHAM LINCOLN HIGH SCHOOL,475,396,437,393',
 '21K412/21K411,ABRAHAM LINCOLN YABC/LEARNING TO WORK GED AT ABRAHAM LINCOLN,s,s,s,s']

### B. Extract `DBN`, `Num of SAT Test Takers` and `SAT Math Avg. Score` columns

Here is a different approach to read the SAT score file using `map()` instead of `mapPartitionsWithIndex()`. Please note the use of `csv.reader` with a single input line.

In [None]:
import csv

B = A.map(lambda x: next(csv.reader([x]))) \
    .filter(lambda x: x[2]!='s') \
    .map(lambda x: (x[0], (int(x[2]), int(x[4]))))

B.take(3)

[('02M047', (16, 400)), ('21K410', (475, 437)), ('30Q301', (98, 440))]

### C. Load **High School Directory** Dataset and check its header

In [None]:
# Here we do the same thing with the school directory data
C = sc.textFile(HSD_FN, use_unicode=True).cache()
hsdHeader = C.first().split(',')
list(enumerate(hsdHeader))

[(0, 'dbn'),
 (1, 'school_name'),
 (2, 'boro'),
 (3, 'building_code'),
 (4, 'phone_number'),
 (5, 'fax_number'),
 (6, 'grade_span_min'),
 (7, 'grade_span_max'),
 (8, 'expgrade_span_min'),
 (9, 'expgrade_span_max'),
 (10, 'bus'),
 (11, 'subway'),
 (12, 'primary_address_line_1'),
 (13, 'city'),
 (14, 'state_code'),
 (15, 'zip'),
 (16, 'website'),
 (17, 'total_students'),
 (18, 'campus_name'),
 (19, 'school_type'),
 (20, 'overview_paragraph'),
 (21, 'program_highlights'),
 (22, 'language_classes'),
 (23, 'advancedplacement_courses'),
 (24, 'online_ap_courses'),
 (25, 'online_language_courses'),
 (26, 'extracurricular_activities'),
 (27, 'psal_sports_boys'),
 (28, 'psal_sports_girls'),
 (29, 'psal_sports_coed'),
 (30, 'school_sports'),
 (31, 'partner_cbo'),
 (32, 'partner_hospital'),
 (33, 'partner_highered'),
 (34, 'partner_cultural'),
 (35, 'partner_nonprofit'),
 (36, 'partner_corporate'),
 (37, 'partner_financial'),
 (38, 'partner_other'),
 (39, 'addtl_info1'),
 (40, 'addtl_info2'),
 (4

### D. Extract `DBN`, `bus` and `subway` columns from the HSD data

In [None]:
# Complete the extractFeatures function to get the desire output
def extractFeatures(partId, rows):
    #skip first row, which are the feature names 
    if partId == 0: 
      next(rows)
    
    #importing csv to read text data 
    import csv
    reader = csv.reader(rows)
    #looping through each row of the csv.reader results
    for row in reader:
      #check if there are no null values 
      if (len(row) == 58 and row[10]!='NA' and row[11]!='NA'):
        #yield the rows that we want 
        yield (row[0],row[10],row[11])


### DO NOT EDIT BELOW
D = C.mapPartitionsWithIndex(extractFeatures)
D.take(5)

[('01M292',
  'B39, M14A, M14D, M15, M15-SBS, M21, M22, M9',
  'B, D to Grand St ; F to East Broadway ; J, M, Z to Delancey St-Essex St'),
 ('01M448',
  'M14A, M14D, M15, M21, M22, M9',
  'F to East Broadway ; J, M, Z to Delancey St-Essex St'),
 ('01M450',
  'M101, M102, M103, M14A, M14D, M15, M15-SBS, M2, M23, M3, M8, M9',
  '6 to Astor Place ; L to 1st Ave'),
 ('01M509',
  'B39, M103, M14A, M14D, M15, M15-SBS, M21, M22, M8, M9',
  'B, D to Grand St ; F, J, M, Z to Delancey St-Essex St'),
 ('01M539',
  'B39, M14A, M14D, M21, M22, M8, M9',
  'F, J, M, Z to Delancey St-Essex St')]

### E. Convert the bus and subway description into the list of bus and subway lines

For the subway, we do not care about the direction of the lines, e.g. `B, D to Grand St` should be treated as just `B, D`.

In [None]:
#function for converting string of direction of lines to just subway lines
def funcE(row):
  from functools import reduce
  return list(map(lambda x: x[0],reduce(lambda x,y: x+y,list(map(lambda x: x.split(', '),row.split(' ; '))),[])))

In [None]:
#Testing Code 
row = 'B, D to Grand St ; F to East Broadway ; J, M, Z to Delancey St-Essex St'
funcE(row)

['B', 'D', 'F', 'J', 'M', 'Z']

In [None]:
# Derive E from above
E = D.map(lambda x: (x[0],(x[1].split(', '),funcE(x[2]))))

### DO NOT EDIT BELOW
E.take(5)

[('01M292',
  (['B39', 'M14A', 'M14D', 'M15', 'M15-SBS', 'M21', 'M22', 'M9'],
   ['B', 'D', 'F', 'J', 'M', 'Z'])),
 ('01M448',
  (['M14A', 'M14D', 'M15', 'M21', 'M22', 'M9'], ['F', 'J', 'M', 'Z'])),
 ('01M450',
  (['M101',
    'M102',
    'M103',
    'M14A',
    'M14D',
    'M15',
    'M15-SBS',
    'M2',
    'M23',
    'M3',
    'M8',
    'M9'],
   ['6', 'L'])),
 ('01M509',
  (['B39', 'M103', 'M14A', 'M14D', 'M15', 'M15-SBS', 'M21', 'M22', 'M8', 'M9'],
   ['B', 'D', 'F', 'J', 'M', 'Z'])),
 ('01M539',
  (['B39', 'M14A', 'M14D', 'M21', 'M22', 'M8', 'M9'], ['F', 'J', 'M', 'Z']))]

### F. Join `B` (SAT) and `E`(HSD) datasets so that the score is together with the list of bus lines and subway lines.

We no longer need to the DBN information after this.

In [None]:
F = B.join(E).values()
F.take(5)

[((59, 374),
  (['B12', 'B16', 'B35', 'B41', 'B44', 'B49'], ['2', '5', 'B', 'Q'])),
 ((135, 492),
  (['B62',
    'Q100',
    'Q101',
    'Q102',
    'Q103',
    'Q32',
    'Q39',
    'Q60',
    'Q66',
    'Q67',
    'Q69'],
   ['7', 'N', 'Q', 'E', 'M', 'R', 'F', 'G'])),
 ((228, 456),
  (['Bx15',
    'Bx19',
    'Bx33',
    'M10',
    'M100',
    'M101',
    'M104',
    'M11',
    'M2',
    'M3',
    'M4',
    'M5'],
   ['1', 'A', 'D', 'B', 'C'])),
 ((270, 441),
  (['M101', 'M102', 'M15', 'M2', 'M31', 'M50', 'M57', 'Q101', 'Q32', 'Q60'],
   ['4', '5', '6', 'N', 'Q', 'R', '6', 'E', 'M', 'F'])),
 ((60, 391), (['Bx12', 'Bx12-SBS', 'Bx26', 'Bx39', 'Bx8'], ['2', '5']))]

### G. Group the scores by the subway line and bus line so that we can compute the average by line
Note that, one school may belong to multiple lines, so we need to somehow replicate the school score for each bus line. The output of this step should be in the key/value pair format with the key is the line, and the value is the average score.

To make it easier, please store the bus and subway data in `G_bus` and `G_subway` respectively.

In [None]:
#function for taking a tuple of list of transporation and a tuple of number of test takers and score, 
#then returning list of tuples of each transp and a tuple of takers and scores
def funcG_trans_num_scores(tup):
  transpList,num_score = tup
  return list(map(lambda x: (x,num_score) ,transpList))


In [None]:
#testing funcG
funcG_trans_num_scores((['A','B'],(25,500)))

[('A', (25, 500)), ('B', (25, 500))]

In [None]:
#function for calculating the average from a tuple of number of test takers and the average sat score for those takers 
def funcG_avg(nums_scores):
  #separate nums and scores 
  nums, scores = zip(*nums_scores)
  #calculate the total score by multplying the number of takers and average score 
  tot_score = sum(map(lambda x,y: x*y,nums,scores))
  #total number of test takers 
  tot_nums = sum(nums)
  #return average 
  return tot_score/tot_nums

In [None]:
funcG_avg([(10,20),(10,50)])

35.0

In [None]:
# Create G_bus and G_subway here as RDDs
G_bus = F.map(lambda x: (x[1][0],x[0])) \
  .flatMap(funcG_trans_num_scores) \
  .groupByKey() \
  .mapValues(funcG_avg)

G_subway = F.map(lambda x: (x[1][1],x[0])) \
  .flatMap(funcG_trans_num_scores) \
  .groupByKey() \
  .mapValues(funcG_avg)

## DO NOT EDIT BELOW
display(G_bus.take(5))
display(G_subway.take(5))

[('B44', 465.0336676217765),
 ('Q66', 461.55149181905676),
 ('Q69', 444.03056234718827),
 ('M100', 427.83587443946186),
 ('M3', 432.531451213472)]

[('N', 483.49450806263144),
 ('4', 495.29238227146817),
 ('L', 426.3222871994802),
 ('J', 439.1299656694458),
 ('B', 491.95760524225574)]

### H. Get the line with the highest score

In [None]:
display(G_bus.max(lambda x: x[1]))
display(G_subway.max(lambda x: x[1]))

('S1115', 612.2545811518324)

('3', 513.4009556313994)