In [1]:
sc

# TASK 1:
You are asked to compute the average SAT Math score of all high schools with 500 students or more, for
each borough of the city. Meaning: what is the average SAT Math score of all high schools with 500 students
or more in Manhattan, in Brooklyn, in Queens, in Bronx and in Staten Island.
You must use Apache Spark for this lab. Both data sets must be loaded into RDDs, where all your
manipulations must be applied on, though you are free to transform these RDDs into Spark’s DataFrame or
SQL Context (though we have not covered this yet). The final result is expected to be a list of tuples borough
names as the first elements, and the average scores as the second.
Note 1: since the SAT Results also provide the number of test takers along with the average scores, you
should use this information in computing the exact average scores above.
Note 2: if a DBN in the SAT Results data set is not found in the High School Directory, you can safely ignore
the test scores for that school.

In [2]:
rdd = sc.textFile('/Applications/spark-2.3.1-bin-hadoop2.6/README.md')
rdd1 = sc.parallelize([1,2,3,4,5])

In [3]:
rdd1.collect()

[1, 2, 3, 4, 5]

In [4]:
rdd.take(5)

[u'# Apache Spark',
 u'',
 u'Spark is a fast and general cluster computing system for Big Data. It provides',
 u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 u'supports general computation graphs for data analysis. It also supports a']

In [5]:
rdd.flatMap(lambda line: line.split()).take(4)

[u'#', u'Apache', u'Spark', u'Spark']

In [6]:
rdd.flatMap(lambda line: line.split()) \
    .map(lambda x: (x.lower(),1)) \
    .groupByKey() \
    .mapValues(lambda x: sum(x)) \
    .take(5)

[(u'when', 1),
 (u'alternatively,', 1),
 (u'"local"', 1),
 (u'including', 4),
 (u'computation', 1)]

In [7]:
rdd.flatMap(lambda line: line.split()) \
    .map(lambda x: (x.lower(),1)) \
    .reduceByKey(lambda x,y: x+y) \
    .take(5)

[(u'when', 1),
 (u'alternatively,', 1),
 (u'"local"', 1),
 (u'including', 4),
 (u'computation', 1)]

In [8]:
rdd.flatMap(lambda line: line.split()) \
    .map(lambda x: (x.lower(),1)) \
    .reduceByKey(lambda x,y: x+y) \
    .top(5, key=lambda x: x[1])

[(u'the', 25), (u'to', 19), (u'spark', 16), (u'for', 15), (u'and', 10)]

In [9]:
SAT_FN = 'SAT_Results.csv'
HSD_FN = 'DOE_High_School_Directory_2014-2015.csv'

In [10]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(HSD_FN)

In [11]:
df.show()

+-------------------+--------------------+---------+-------------+------------+------------+--------------+--------------+-----------------+-----------------+--------------------+--------------------+----------------------+--------+----------+-----+--------------------+--------------+-----------+-----------------+--------------------+--------------------+--------------------+-------------------------+--------------------+-----------------------+--------------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+----------+--------+--------------------+------------+--------------------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------+-------

In [12]:
df.count()

1305

In [13]:
hsd = sc.textFile(HSD_FN, use_unicode=False).cache()

In [14]:
hsd.take(3)

['dbn,school_name,boro,building_code,phone_number,fax_number,grade_span_min,grade_span_max,expgrade_span_min,expgrade_span_max,bus,subway,primary_address_line_1,city,state_code,zip,website,total_students,campus_name,school_type,overview_paragraph,program_highlights,language_classes,advancedplacement_courses,online_ap_courses,online_language_courses,extracurricular_activities,psal_sports_boys,psal_sports_girls,psal_sports_coed,school_sports,partner_cbo,partner_hospital,partner_highered,partner_cultural,partner_nonprofit,partner_corporate,partner_financial,partner_other,addtl_info1,addtl_info2,start_time,end_time,se_services,ell_programs,school_accessibility_description,number_programs,priority01,priority02,priority03,priority04,priority05,priority06,priority07,priority08,priority09,priority10,Location 1',
 '01M292,Henry Street School for International Studies,Manhattan,M056,212-406-9411,212-406-9417,6,12,,,"B39, M14A, M14D, M15, M15-SBS, M21, M22, M9","B, D to Grand St ; F to East Broad

In [15]:
list(enumerate(hsd.first().split(',')))

[(0, 'dbn'),
 (1, 'school_name'),
 (2, 'boro'),
 (3, 'building_code'),
 (4, 'phone_number'),
 (5, 'fax_number'),
 (6, 'grade_span_min'),
 (7, 'grade_span_max'),
 (8, 'expgrade_span_min'),
 (9, 'expgrade_span_max'),
 (10, 'bus'),
 (11, 'subway'),
 (12, 'primary_address_line_1'),
 (13, 'city'),
 (14, 'state_code'),
 (15, 'zip'),
 (16, 'website'),
 (17, 'total_students'),
 (18, 'campus_name'),
 (19, 'school_type'),
 (20, 'overview_paragraph'),
 (21, 'program_highlights'),
 (22, 'language_classes'),
 (23, 'advancedplacement_courses'),
 (24, 'online_ap_courses'),
 (25, 'online_language_courses'),
 (26, 'extracurricular_activities'),
 (27, 'psal_sports_boys'),
 (28, 'psal_sports_girls'),
 (29, 'psal_sports_coed'),
 (30, 'school_sports'),
 (31, 'partner_cbo'),
 (32, 'partner_hospital'),
 (33, 'partner_highered'),
 (34, 'partner_cultural'),
 (35, 'partner_nonprofit'),
 (36, 'partner_corporate'),
 (37, 'partner_financial'),
 (38, 'partner_other'),
 (39, 'addtl_info1'),
 (40, 'addtl_info2'),
 (4

In [16]:
def extractSchools(index, lines):
    if index==0:
        next(lines)
    import csv
    reader = csv.reader(lines)
    for row in reader:
        if (len(row) == 57+1) and row[17].isdigit():
            if int(row[17]) > 500:
                yield (row[0], row[2])
schools = hsd.mapPartitionsWithIndex(extractSchools)
schools.take(5)

[('01M450', 'Manhattan'),
 ('01M539', 'Manhattan'),
 ('01M696', 'Manhattan'),
 ('02M374', 'Manhattan'),
 ('02M400', 'Manhattan')]

In [17]:
sat = sc.textFile(SAT_FN, use_unicode=False).cache()
list(enumerate(sat.first().split(',')))

[(0, 'DBN'),
 (1, 'SCHOOL NAME'),
 (2, 'Num of SAT Test Takers'),
 (3, 'SAT Critical Reading Avg. Score'),
 (4, 'SAT Math Avg. Score'),
 (5, 'SAT Writing Avg. Score')]

In [18]:
# extract DBN and #Test Takers, and the total scores
def extractScores(index, lines):
    if index==0:
        next(lines)
    import csv
    reader = csv.reader(lines)
    for row in reader:
        if row[2] != 's':
            yield (row[0], (int(row[2])*int(row[4]), int(row[2])))
scores = sat.mapPartitionsWithIndex(extractScores)
scores.take(4)

[('02M047', (6400, 16)),
 ('21K410', (207575, 475)),
 ('30Q301', (43120, 98)),
 ('17K382', (22066, 59))]

In [19]:
schools.join(scores).values().take(4)

[('Bronx', (23069, 59)),
 ('Staten Island', (52216, 107)),
 ('Bronx', (16317, 49)),
 ('Brooklyn', (33235, 85))]

In [20]:
schools.join(scores).values() \
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
    .mapValues(lambda x: x[0]/x[1]) \
    .collect()

[('Bronx', 470),
 ('Manhattan', 514),
 ('Brooklyn', 487),
 ('Staten Island', 477),
 ('Queens', 474)]

## TASK 2:
We would like to know how the Math scores vary across bus lines or subway lines serving the schools.
Your task is to compute the average Math scores of all schools along each bus line and subway line. You can
find the bus and subway lines serving each school in the High School Dictionary as bus and subway
columns.
The expected results are two lists:
1. A list of key/value pairs: with bus line as keys, and the average Math scores as values.
2. A list of key/value pairs: with subway line as keys, and the average Math scores as values

In [26]:
HSD_FN

'DOE_High_School_Directory_2014-2015.csv'

In [21]:
buses = "B39, M14A, M14D, M15, M15-SBS, M21, M9"
subways = "B, D to Grand St; F to East Broadway ; J, M, Z to Delancey St-Essex St"

In [23]:
lines = buses.split(', ')
lines = subways.split(' ; ')[1].split(' to ')[0].split(', ')
lines

['J', 'M', 'Z']

In [39]:
rowLen = len(schools.first().split(','))

def extractSchools2(partId, list_of_records):
    if partId==0: 
        next(list_of_records) # skipping the first line
    import csv
    reader = csv.reader(list_of_records)
    for row in reader:
        if len(row)==rowLen:
            (dbn, buses, subways) = (row[0], row[10], row[11])
            lines = buses.split(', ') + ', '.join([direction.split(' to' )[0]
                                                  for direction in subways.split(' ; ')]).splot(', ') 
    yield (dbn, lines)

schoolsWithLines = schools.mapPartitionsWithIndex(extractSchools2)

AttributeError: 'tuple' object has no attribute 'split'

In [34]:
schoolsWithLines.join(satScores).take(3)

NameError: name 'satScores' is not defined