In [1]:
sc

<pyspark.context.SparkContext at 0x6244828>

### Price hike Days

$2.25 base fare (December 30, 2010 – March 2, 2013)

$2.50 base fare (March 3, 2013 – March 21, 2015)

$2.75 base fare (March 22, 2015 – present)

#### Overall Ridership by price hike periods

In [6]:
def mapper4(index, data):
    if index == 0:
        data.next()
    import csv
    import datetime as dt
    reader = csv.reader(data)
    firstPriceHike = dt.datetime.strptime('12/30/2010','%m/%d/%Y')
    secndPriceHike = dt.datetime.strptime('03/03/2013','%m/%d/%Y')
    thirdPriceHike = dt.datetime.strptime('03/22/2015','%m/%d/%Y')
    for row in reader:
        date = dt.datetime.strptime(row[1],'%m/%d/%Y')
        numPeople = row[5].split('.')[0]
        if (date <= firstPriceHike):
            yield ('A',int(numPeople))
        if (date >= secndPriceHike and date < thirdPriceHike):
            yield ('B',int(numPeople))
        if (date >= thirdPriceHike):
            yield ('C',int(numPeople))
            
def mapper5(index,data):
    if index == 0:
        data.next()
    import csv
    import datetime as dt
    reader = csv.reader(data)
    firstPriceHike = dt.datetime.strptime('12/30/2010','%m/%d/%Y')
    secndPriceHike = dt.datetime.strptime('03/03/2013','%m/%d/%Y')
    thirdPriceHike = dt.datetime.strptime('03/22/2015','%m/%d/%Y')
    for row in reader:
        date = dt.datetime.strptime(row[1],'%m/%d/%Y')
        if (date <= firstPriceHike):
            yield (date,('A',1))
        if (date >= secndPriceHike and date < thirdPriceHike):
            yield (date,('B',1))
        if (date >= thirdPriceHike):
            yield (date,('C',1))
        
mtaData = sc.textFile('dataset/clean-mta-data/clean-mta-data.csv',use_unicode=False).cache()

# Gather the number of passengers for each price hike
# category => A == firstPriceHike, B == SecondPriceHike, C == ThirdPriceHike
# return (category, num_passengers)
rdd4 = mtaData.mapPartitionsWithIndex(mapper4) \
                .reduceByKey(lambda x,y: x+y)

    
# Categorize by price hikes then get unique days
# Re-format the data to (category, c)
# Perform summation to get number of days in each category
# return (category, num_days)
rdd5 = mtaData.mapPartitionsWithIndex(mapper5) \
                .distinct() \
                .map(lambda (d,(cat,c)): (cat, c)) \
                .reduceByKey(lambda x,y: x+y)
            
# Join the rdd's by category
# Get the average of number of people over the number of days
# return (category, avg)
rdd6 = rdd4.join(rdd5) \
            .map(lambda (cat,(s,c)): (cat, s/c)) \
            .sortByKey() \
            .collect()
            
rdd6

[('A', 3278021), ('B', 4695106L), ('C', 4711762L)]

#### Average ridership by month/year

In [5]:
import datetime as dt

def mapper1(index,data):
    # skip header row
    if index==0:
        data.next()
    import csv
    reader = csv.reader(data)
    for row in reader:
        date = row[1].split('/')[0] + '/' + row[1].split('/')[2]
        numPeople = row[5].split('.')[0]
        yield (date, int(numPeople))

def mapper2(index,data):
    # skip header row
    if index==0:
        data.next()
    import csv
    reader = csv.reader(data)
    for row in reader:
        date = row[1]
        yield date

mtaData = sc.textFile('dataset/clean-mta-data/clean-mta-data.csv',use_unicode=False).cache()

# For each month in 2011-2016, gather the number of passengers
# return (%m/%Y, num_passengers)
rdd1 = mtaData.mapPartitionsWithIndex(mapper1) \
                .reduceByKey(lambda x,y: x+y)
    
# Gather the number of days for each month (map+distinct)
# Reformat the %m/%d/%Y to %m/%Y (map)
# Perform a summation to get the total days per month/year (reduce)
# returns (%m/%Y, number of days in month)
rdd2 = mtaData.mapPartitionsWithIndex(mapper2) \
                .distinct() \
                .map(lambda x: (dt.datetime.strptime(x,'%m/%d/%Y').strftime('%m/%Y'),1)) \
                .reduceByKey(lambda x,y: x+y)
            
            
# Join the 2 rdds on date
# d = date, s = number_passengers_per_month, c = total_days_per_month
# Get the average number of people per day
# returns (date, avg)
rdd3 = rdd1.join(rdd2) \
            .map(lambda (d, (s, c)): (d, s/c)) \
            .sortByKey(True) \
            .collect()
            
rdd3

[('01/2011', 4228996),
 ('01/2012', 4440303),
 ('01/2013', 4590113),
 ('01/2014', 4543426),
 ('01/2015', 4381933),
 ('01/2016', 4410371),
 ('02/2011', 4542155),
 ('02/2012', 4723306),
 ('02/2013', 4778886),
 ('02/2014', 4742798),
 ('02/2015', 4734414),
 ('02/2016', 4709885),
 ('03/2011', 4689067),
 ('03/2012', 4759470),
 ('03/2013', 4867464),
 ('03/2014', 4760395),
 ('03/2015', 4840012),
 ('03/2016', 4936037),
 ('04/2011', 4432242),
 ('04/2012', 4611022),
 ('04/2013', 5047991),
 ('04/2014', 4833446),
 ('04/2015', 4875570),
 ('04/2016', 4727236),
 ('05/2011', 4466144),
 ('05/2012', 4900801),
 ('05/2013', 4944946),
 ('05/2014', 4779486),
 ('05/2015', 4746504),
 ('05/2016', 4720357),
 ('06/2011', 4555737),
 ('06/2012', 4822098),
 ('06/2013', 4769061),
 ('06/2014', 4773165),
 ('06/2015', 4921604),
 ('06/2016', 4816936),
 ('07/2011', 4085954),
 ('07/2012', 4514046),
 ('07/2013', 4570359),
 ('07/2014', 4593889),
 ('07/2015', 4606218),
 ('07/2016', 4290267),
 ('08/2011', 4056023),
 ('08/2012'