# DATA PIPELINE

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

#disable chain assignment warning
pd.options.mode.chained_assignment = None

In [2]:
#do..while
print('LOADING DATA...')
startdate = datetime(2017, 9, 16)
enddate = datetime(2016, 9, 16)
#enddate = datetime(2015, 9, 16)
currdate = startdate
datestring = currdate.strftime('%y%m%d')
filename = 'turnstile_{}.txt'.format(datestring)
print(filename)
turnstile_df = pd.read_csv(filename)
while currdate > enddate:
    currdate = currdate - timedelta(days=7)
    datestring = currdate.strftime('%y%m%d')
    filename = 'turnstile_{}.txt'.format(datestring)
    print(filename)
    if turnstile_df is not None:
        turnstile_df = turnstile_df.append(pd.read_csv(filename))
    else:
        turnstile_df = pd.read_csv(filename)
print('DATA LOADED!')

LOADING DATA...
turnstile_170916.txt
turnstile_170909.txt
turnstile_170902.txt
turnstile_170826.txt
turnstile_170819.txt
turnstile_170812.txt
turnstile_170805.txt
turnstile_170729.txt
turnstile_170722.txt
turnstile_170715.txt
turnstile_170708.txt
turnstile_170701.txt
turnstile_170624.txt
turnstile_170617.txt
turnstile_170610.txt
turnstile_170603.txt
turnstile_170527.txt
turnstile_170520.txt
turnstile_170513.txt
turnstile_170506.txt
turnstile_170429.txt
turnstile_170422.txt
turnstile_170415.txt
turnstile_170408.txt
turnstile_170401.txt
turnstile_170325.txt
turnstile_170318.txt
turnstile_170311.txt
turnstile_170304.txt
turnstile_170225.txt
turnstile_170218.txt
turnstile_170211.txt
turnstile_170204.txt
turnstile_170128.txt
turnstile_170121.txt
turnstile_170114.txt
turnstile_170107.txt
turnstile_161231.txt
turnstile_161224.txt
turnstile_161217.txt
turnstile_161210.txt
turnstile_161203.txt
turnstile_161126.txt
turnstile_161119.txt
turnstile_161112.txt
turnstile_161105.txt
turnstile_161029.t

In [3]:
#strip/fix columns and index
df = turnstile_df.reset_index()
df.columns = [col.strip() for col in df.columns]

In [17]:
#common functions:

#this function allows us to reduce down our dataset
def filter_series(df, control_area=None, unit=None, device_address=None, station=None):
    return_series = df.copy()
    #filter can be string or list of strings
    if control_area:
        if isinstance(control_area, str):
            return_series = return_series.loc[df['C/A'] == control_area]
        elif isinstance(control_area, list):
            return_series = return_series.loc[df['C/A'].isin(control_area)]
        else:
            print('Warning: {} cannot be used as a filter because it is not a string or list'.format(control_area))
    if unit:
        if isinstance(unit, str):
            return_series = return_series.loc[df['UNIT'] == unit]
        elif isinstance(unit, list):
            return_series = return_series.loc[df['UNIT'].isin(unit)]
        else:
            print('Warning: {} cannot be used as a filter because it is not a string or list'.format(unit))
    if device_address:
        if isinstance(device_address, str):
            return_series = return_series.loc[df['SCP'] == device_address]
        elif isinstance(device_address, list):
            return_series = return_series.loc[df['SCP'].isin(device_address)]
        else:
            print('Warning: {} cannot be used as a filter because it is not a string or list'.format(device_address))
    if station:
        if isinstance(station, str):
            return_series = return_series.loc[df['STATION'] == station]
        elif isinstance(station, list):
            return_series = return_series.loc[df['STATION'].isin(station)]
        else:
            print('Warning: {} cannot be used as a filter because it is not a string or list'.format(station))
    return return_series

#generate a cleaned-up time series for a single turnstile's dataframe
def get_time_series(df):
    t_time_series = df
    t_time_series['DATETIME_STR'] = t_time_series['DATE'] + ' ' + t_time_series['TIME']
    def str2datetime(datetime_str):
        return datetime.strptime(datetime_str, '%m/%d/%Y %H:%M:%S')
    def str2date(datetime_str):
        return datetime.strptime(datetime_str, '%m/%d/%Y')
    t_time_series['DATETIME'] = t_time_series['DATETIME_STR'].apply(str2datetime)
    t_time_series['DATE'] = t_time_series['DATE'].apply(str2date)
    t_time_series.sort_values('DATETIME', inplace=True)
    # now that we are sorted, we can subtract rows
    t_time_series['D_ENTRIES'] = t_time_series['ENTRIES'].diff()
    t_time_series['D_EXITS'] = t_time_series['EXITS'].diff()
    time_series = t_time_series.loc[:,['STATION','C/A','UNIT','SCP','DATE','DATETIME','D_ENTRIES','D_EXITS']]
    return time_series

In [18]:
#DEBUG
#(optionally reduce dataset by filtering it)
df2 = df
#df2 = filter_series(df, station=['14 ST-UNION SQ','TIMES SQ-42 ST','116 ST-COLUMBIA'])
df2.shape

(10568184, 13)

In [19]:
# print the keys for each turnstile (in case we need to access it later)
turnstiles = df2.groupby(['STATION','C/A','UNIT','SCP'])
print('There are {} unique turnstiles.'.format(len(turnstiles)))
# for key, turnstile in turnstiles:
#     print(key)

There are 4772 unique turnstiles.


In [20]:
%%time

# iterate over the turnstiles and generate the time series for each
# this takes the longest
turnstile_timeseries_list = []
for key, turnstile in turnstiles:
    print(key)
    turnstile_timeseries_list.append(get_time_series(turnstile))

CPU times: user 5min 29s, sys: 5.11 s, total: 5min 34s
Wall time: 5min 45s


In [21]:
#number of turnstiles we collected data for:
len(turnstile_timeseries_list)

4772

In [22]:
# pull together the full dataframe that we can aggregate over
def generate_full_df(timeseries_list):
    full_df = None
    for df in timeseries_list:
        if full_df is None:
            full_df = df
        else:
            full_df = pd.concat([full_df, df], ignore_index=True)
    return full_df

In [23]:
all_turnstiles = generate_full_df(turnstile_timeseries_list)

In [24]:
#THIS ALLOWS US TO LOOK @ THE ORIGINAL DATA
# R148	R033	01-00-01	TIMES SQ-42 ST
#00-00-04	14 ST-UNION SQ	2017-09-12	2017-09-12 15:45:56
# df3 = filter_series(df, station='14 ST-UNION SQ', control_area='A035', unit='R170', device_address='00-00-04')
# df3 = filter_series(df, station='TIMES SQ-42 ST', control_area='R148', unit='R033', device_address='01-00-01')
# df3.head()

In [25]:
#wrong = all_turnstiles[(all_turnstiles['D_ENTRIES'] < 0) | (all_turnstiles['D_EXITS'] < 0)]
#wrong

In [26]:
# do some data exploration to make sure there's nothing off about it

#full_df.groupby(['STATION', 'DATE']).sum()
#all_turnstiles[(all_turnstiles.STATION == '14 ST-UNION SQ') & (all_turnstiles.D_ENTRIES > 10)]

In [27]:
# a bit of clean-up tasks for the dataframe
all_turnstiles = all_turnstiles.dropna(subset=['D_ENTRIES', 'D_EXITS'])
all_turnstiles['D_ENTRIES'] = all_turnstiles['D_ENTRIES'].apply(lambda x: abs(x))
all_turnstiles['D_EXITS'] = all_turnstiles['D_EXITS'].apply(lambda x: abs(x))
all_turnstiles['D_ENTRIES'] = all_turnstiles['D_ENTRIES'].apply(lambda x: 0 if x > 100000 else x)
all_turnstiles['D_EXITS'] = all_turnstiles['D_EXITS'].apply(lambda x: 0 if x > 100000 else x)

In [28]:
all_turnstiles

Unnamed: 0,STATION,C/A,UNIT,SCP,DATE,DATETIME,D_ENTRIES,D_EXITS
1,1 AV,H007,R248,00-00-00,2016-09-03,2016-09-03 04:00:00,102.0,276.0
2,1 AV,H007,R248,00-00-00,2016-09-03,2016-09-03 08:00:00,86.0,259.0
3,1 AV,H007,R248,00-00-00,2016-09-03,2016-09-03 12:00:00,412.0,635.0
4,1 AV,H007,R248,00-00-00,2016-09-03,2016-09-03 16:00:00,528.0,879.0
5,1 AV,H007,R248,00-00-00,2016-09-03,2016-09-03 20:00:00,441.0,971.0
6,1 AV,H007,R248,00-00-00,2016-09-04,2016-09-04 00:00:00,328.0,694.0
7,1 AV,H007,R248,00-00-00,2016-09-04,2016-09-04 04:00:00,87.0,315.0
8,1 AV,H007,R248,00-00-00,2016-09-04,2016-09-04 08:00:00,45.0,181.0
9,1 AV,H007,R248,00-00-00,2016-09-04,2016-09-04 12:00:00,352.0,495.0
10,1 AV,H007,R248,00-00-00,2016-09-04,2016-09-04 16:00:00,503.0,725.0


In [29]:
all_turnstiles.groupby(['STATION','DATE']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,D_ENTRIES,D_EXITS
STATION,DATE,Unnamed: 2_level_1,Unnamed: 3_level_1
1 AV,2016-09-03,11258.0,11755.0
1 AV,2016-09-04,12951.0,13530.0
1 AV,2016-09-05,11376.0,13112.0
1 AV,2016-09-06,19007.0,20073.0
1 AV,2016-09-07,20756.0,21216.0
1 AV,2016-09-08,23347.0,23496.0
1 AV,2016-09-09,24124.0,24297.0
1 AV,2016-09-10,19432.0,19089.0
1 AV,2016-09-11,15778.0,16543.0
1 AV,2016-09-12,19249.0,19796.0


In [30]:
all_turnstiles.to_csv('all_turnstiles_9-2016-9-2017.csv')