## PySpark

In [1]:
sc

<pyspark.context.SparkContext at 0x1029a8c18>

In [2]:
rdd = sc.textFile('/usr/local/Cellar/apache-spark/2.1.0/README.md')
testrdd = sc.parallelize([1,2,3,4,5])

In [3]:
testrdd.collect()

[1, 2, 3, 4, 5]

In [4]:
rdd.take(10)  ## take returns the text.. top returns the top work counts

['# Apache Spark',
 '',
 'Spark is a fast and general cluster computing system for Big Data. It provides',
 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 'supports general computation graphs for data analysis. It also supports a',
 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 'MLlib for machine learning, GraphX for graph processing,',
 'and Spark Streaming for stream processing.',
 '',
 '<http://spark.apache.org/>']

In [5]:
wc = rdd.flatMap(lambda line: line.split()) \
                .map(lambda x: (x.lower(), 1)) \
                .groupByKey() \
                .mapValues(lambda values: sum(values))

wc.take(2)

[('guide,', 1), ('development', 1)]

In [6]:
wc = rdd.flatMap(lambda line: line.split()) \
                .map(lambda x: (x.lower(),1)) \
                .reduceByKey(lambda x,y: x+y)
wc.take(2)

[('guide,', 1), ('development', 1)]

## Task 1 : Compute the average SAT Math score

In [7]:
Sat_FN = 'SAT_Results.csv'
HSD_FN = 'DOE_High_School_Directory_2014-2015.csv'

In [8]:
## read csv format in spark as a dataframe 
# Below is a way to read CSV file from within Spark directly into a 
# Spark's DataFrame, which we will not be covering yet. Just putting
# it here so that we have a reference for now. Note that, the 
# 'parserLib' option is important for reading multi-line fields of CSV.
df = spark.read \
            .format("com.databricks.spark.csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("parserLib", "UNIVOCITY") \
            .load(HSD_FN)

In [9]:
# We read the SAT score to our RDD. Note that the use_unicode can be
# changed accordingly to your data file to handle Unicode. If you cannot
# parse your data due to an 'utf8' or 'ascii' decoding issue, it might
# be a good thing to try flipping the use_unicode parameter here.

sat = sc.textFile(Sat_FN, use_unicode=True).cache()

# This line for us to list the column index and column names to see
# which column we need to use for our task. In this case, we're
# interested in the number of test takers (#2) and the math score (#4).
#list(enumerate(sat.first().split(',')))

In [10]:
def extractScores(partId, records):  #parts: list of records, partid: first element of first block
    #skipping tthe first line
    if partId == 0:  
        records.__next__()
    import csv
    reader = csv.reader(records)
    for row in reader:
        if row[2]!='s': # to filter our bad-quality data
            (dbn,takers,score) = (row[0], int(row[2]), int(row[4]))
            yield (dbn, (score*takers, takers))

## the large school ID and the test takes and the scores. 
satScores = sat.mapPartitionsWithIndex(extractScores)
satScores.take(5)

[('01M292', (11716, 29)),
 ('01M448', (38493, 91)),
 ('01M450', (28140, 70)),
 ('01M458', (2807, 7)),
 ('01M509', (19052, 44))]

In [11]:
noHeaderRDD = sat.filter(lambda x: not x.startswith('DBN,SCHOOL'))
print (sat.first())
#print (noHeaderRDD.first())

DBN,SCHOOL NAME,Num of SAT Test Takers,SAT Critical Reading Avg. Score,SAT Math Avg. Score,SAT Writing Avg. Score


In [12]:
schools = sc.textFile(HSD_FN, use_unicode=True).cache()
list(enumerate(schools.first().split(',')))

[(0, 'dbn'),
 (1, 'school_name'),
 (2, 'boro'),
 (3, 'building_code'),
 (4, 'phone_number'),
 (5, 'fax_number'),
 (6, 'grade_span_min'),
 (7, 'grade_span_max'),
 (8, 'expgrade_span_min'),
 (9, 'expgrade_span_max'),
 (10, 'bus'),
 (11, 'subway'),
 (12, 'primary_address_line_1'),
 (13, 'city'),
 (14, 'state_code'),
 (15, 'zip'),
 (16, 'website'),
 (17, 'total_students'),
 (18, 'campus_name'),
 (19, 'school_type'),
 (20, 'overview_paragraph'),
 (21, 'program_highlights'),
 (22, 'language_classes'),
 (23, 'advancedplacement_courses'),
 (24, 'online_ap_courses'),
 (25, 'online_language_courses'),
 (26, 'extracurricular_activities'),
 (27, 'psal_sports_boys'),
 (28, 'psal_sports_girls'),
 (29, 'psal_sports_coed'),
 (30, 'school_sports'),
 (31, 'partner_cbo'),
 (32, 'partner_hospital'),
 (33, 'partner_highered'),
 (34, 'partner_cultural'),
 (35, 'partner_nonprofit'),
 (36, 'partner_corporate'),
 (37, 'partner_financial'),
 (38, 'partner_other'),
 (39, 'addtl_info1'),
 (40, 'addtl_info2'),
 (4

In [13]:
def extractSchools(partID, list_of_records):
    '''
    This function extracts the school ID for large schools and the boro name for each school
    '''
    if partID==0:
        list_of_records.__next__()    ## skipping the  first line
    import csv
    reader = csv.reader(list_of_records)    
    for row in reader:
        if len(row)==58 and row[17].isdigit():
            (dbn,boro, total_students) = (row[0], row[2], int(row[17]))
            if total_students>500:
                yield (dbn, boro)
        
        
largeSchools = schools.mapPartitionsWithIndex(extractSchools)
largeSchools.take(10)

[('09X327', 'Bronx'),
 ('28Q680', 'Queens'),
 ('14K474', 'Brooklyn'),
 ('02M420', 'Manhattan'),
 ('12X271', 'Bronx'),
 ('02M520', 'Manhattan'),
 ('24Q600', 'Queens'),
 ('10X368', 'Bronx'),
 ('06M348', 'Manhattan'),
 ('13K419', 'Brooklyn')]

In [14]:
scores = largeSchools.join(satScores).values()

In [15]:
scores.take(3)

[('Brooklyn', (22066, 59)),
 ('Manhattan', (150864, 336)),
 ('Queens', (66420, 135))]

In [16]:
scores = largeSchools.join(satScores).values() \
    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])) \
    .mapValues(lambda x: int(x[0]/x[1])) \
    .collect()

In [17]:
scores

[('Staten Island', 477),
 ('Bronx', 470),
 ('Brooklyn', 487),
 ('Manhattan', 514),
 ('Queens', 474)]

## Task 2: Math scores across every bus line or subway line serving the schools.

In [18]:
def extractSchools_busline(partId, list_of_records):
    '''
    This function extracts the large school Ids and the bus line numbers
    '''
    if partId==0:
        # Skip the header
        list_of_records.__next__()    ## for python 2 compatibility use 'next()' instead of '__next__()'  
    import csv
    reader = csv.reader(list_of_records)
    
    for row in reader:
        if len(row)==58 and row[17].isdigit():
            (dbn, bus, tot_students) = (row[0], row[10], int(row[17]))
            if tot_students > 1:
                yield(dbn, bus)

largeSchools = schools.mapPartitionsWithIndex(extractSchools_busline)
largeSchools.take(5)   ## the school id and the bus lines 

[('17K548', 'B41, B43, B44-SBS, B45, B48, B49, B69'),
 ('09X543', 'Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A, Bx6'),
 ('09X327', 'Bx1, Bx11, Bx13, Bx18, Bx2, Bx3, Bx32, Bx35, Bx36'),
 ('28Q680', 'Q25, Q46, Q65'),
 ('08X348', 'Bx21, Bx24, Bx31, Bx4, Bx40, Bx42, Bx4A, Bx8')]

In [19]:
#Joining satscores to school dataset which has the bus line information. 
scores_buslines = largeSchools.join(satScores).values()
scores_buslines.take(2)     ## buslines and the sat scores

[('Bx12, Bx12-SBS, Bx26, Bx39, Bx8', (23460, 60)),
 ('Bx1, Bx10, Bx7, Bx9, M100', (21330, 54))]

In [20]:
def pairing(values):
    '''
    This function creates a pair of keys and values associated with the key. 
    '''
    count = 0 
    list_values = []
    for value in values:
        keys = value[0]
        score = value[1]
        for key in keys:
            list_values.append((key, score))
    yield list_values

In [21]:
bus_avg_score = scores_buslines.map(lambda x: (x[0].split(','), x[1]))\
                .mapPartitions(pairing)\
                .flatMap(lambda x: x)\
                .map(lambda x: (x[0].strip(), x[1]))\
                .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))\
                .mapValues(lambda x: int(x[0]/x[1]))   

In [22]:
sorted(bus_avg_score.collect())

[('B1', 461),
 ('B100', 475),
 ('B103', 531),
 ('B11', 503),
 ('B12', 390),
 ('B13', 397),
 ('B14', 390),
 ('B15', 382),
 ('B16', 448),
 ('B17', 385),
 ('B2', 475),
 ('B20', 372),
 ('B24', 432),
 ('B25', 541),
 ('B26', 389),
 ('B3', 382),
 ('B31', 475),
 ('B32', 409),
 ('B35', 390),
 ('B36', 432),
 ('B38', 520),
 ('B39', 438),
 ('B4', 466),
 ('B41', 520),
 ('B42', 384),
 ('B43', 411),
 ('B44', 465),
 ('B44-SBS', 465),
 ('B45', 534),
 ('B46', 390),
 ('B47', 374),
 ('B48', 412),
 ('B49', 471),
 ('B52', 560),
 ('B54', 543),
 ('B57', 405),
 ('B6', 467),
 ('B60', 398),
 ('B61', 400),
 ('B62', 513),
 ('B63', 557),
 ('B64', 454),
 ('B65', 538),
 ('B67', 420),
 ('B68', 465),
 ('B69', 548),
 ('B7', 378),
 ('B70', 476),
 ('B8', 485),
 ('B82', 440),
 ('B83', 376),
 ('B84', 432),
 ('B9', 489),
 ('Bx1', 464),
 ('Bx10', 534),
 ('Bx11', 389),
 ('Bx12', 405),
 ('Bx12-SBS', 405),
 ('Bx13', 398),
 ('Bx15', 393),
 ('Bx17', 389),
 ('Bx18', 382),
 ('Bx19', 413),
 ('Bx2', 470),
 ('Bx20', 455),
 ('Bx21', 398

In [23]:
def extractSchools_subwayline(partId, list_of_records):
    '''
    The function extracts the large school Ids and the subway lines
    '''
    if partId==0:
        # Skip first header line
        list_of_records.__next__()    ## for python 2 compatibility use 'next()' instead of '__next__()' 
    import csv
    reader = csv.reader(list_of_records)
    
    for row in reader:
        if len(row)==58 and row[17].isdigit():
            (dbn, subway, tot_students) = (row[0], row[11], int(row[17]))
            if tot_students > 1:
                if subway == 'N/A':
                    continue
                yield(dbn, subway)

largeSchools = schools.mapPartitionsWithIndex(extractSchools_subwayline)
largeSchools.take(2)

[('17K548', '2, 3, 4, 5, F, S to Botanic Garden ; B, Q to Prospect Park'),
 ('09X543', '2, 5 to Intervale Ave')]

In [24]:
#Joining satscores to school dataset which has the subway line information
scores = largeSchools.join(satScores).values()
scores.take(2)

[('2, 5 to Pelham Parkway', (23460, 60)),
 ('2, 5, B, Q to Church Ave', (22066, 59))]

In [25]:
subway_score = scores.map(lambda x: (x[0].split(' ; '), x[1]))\
                .mapPartitions(pairing)\
                .flatMap(lambda x: x)\
                .map(lambda x: (x[0].split(' to')[0], x[1]))\
                .map(lambda x: (x[0].split(','), x[1]))\
                .mapPartitions(pairing).flatMap(lambda x: x).map(lambda x: (x[0].strip(), x[1]))\
                .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))\
                .mapValues(lambda x: int(x[0]/x[1]))

In [26]:
sorted(subway_score.collect())

[('1', 499),
 ('2', 488),
 ('3', 513),
 ('4', 495),
 ('5', 461),
 ('6', 432),
 ('7', 457),
 ('A', 510),
 ('B', 491),
 ('C', 510),
 ('D', 502),
 ('E', 501),
 ('F', 445),
 ('G', 503),
 ('J', 439),
 ('L', 426),
 ('M', 454),
 ('N', 493),
 ('Q', 482),
 ('R', 508),
 ('S', 427),
 ('SIR', 498),
 ('Z', 438)]