In [1]:
import pandas as pd
from glob import iglob
from itertools import chain
from sqlalchemy import create_engine
mta_engine = create_engine('sqlite:///mta.db')

In [2]:
def import_raw_post_20141018():
    """
    
        CREATE TABLE raw (
        "index" BIGINT, 
        "C/A" TEXT, 
        "UNIT" TEXT, 
        "SCP" TEXT, 
        "STATION" TEXT, 
        "LINENAME" TEXT, 
        "DIVISION" TEXT, 
        "DATE" TEXT, 
        "TIME" TEXT, 
        "DESC" TEXT, 
        "ENTRIES" BIGINT, 
        "EXITS" BIGINT
    );

    """
    mta_engine = create_engine('sqlite:///mta.db')
    for data_file in iglob('./data/*.txt'):
        if strip_file_date(data_file) >= 141018:
            with open(data_file) as ofile:
                print(data_file)
                df = pd.read_csv(ofile, low_memory=False)
                df.to_sql(name='raw', con=mta_engine, if_exists='append')

In [3]:
def create_station_lookup(engine):
    #I think knowing the C/A is sufficient to determine the Station
    tbl = mta_engine.execute("CREATE TABLE IF NOT EXISTS main.station_lookup AS SELECT DISTINCT raw.'C/A', STATION FROM raw")
    tbl.close()

In [4]:
def main():
    import_raw_post_20141018()
    create_station_lookup()

In [5]:
def fix_pre_20141018(csv_path):
    """MTA turnstile .txt files before 20141018 have a strange format.
    This function makes that format the same as the post 20141018 format
    station_lookup is a csv file that can find station name based on ('C/A', UNIT, SCP)
    """
    header_names = ['C/A','UNIT','SCP','DATE1','TIME1','DESC1','ENTRIES1','EXITS1','DATE2','TIME2','DESC2','ENTRIES2','EXITS2','DATE3','TIME3','DESC3','ENTRIES3','EXITS3','DATE4','TIME4','DESC4','ENTRIES4','EXITS4','DATE5','TIME5','DESC5','ENTRIES5','EXITS5','DATE6','TIME6','DESC6','ENTRIES6','EXITS6','DATE7','TIME7','DESC7','ENTRIES7','EXITS7','DATE8','TIME8','DESC8','ENTRIES8','EXITS8']
    df = pd.read_csv(csv_path, header=None, names=header_names, low_memory=False)
    splits = ["""C/A,UNIT,SCP,DATE{index},TIME{index},DESC{index},ENTRIES{index},EXITS{index}""".format(index=i).split(',') for i in range(1,9)]
    result = []
    clean_column_names = ['C/A','UNIT','SCP','DATE','TIME','DESC','ENTRIES','EXITS']
    for split in splits:
        temp = df[split]
        temp.columns = clean_column_names
        result.append(temp)
    df_long = pd.concat(result)
    return df_long

In [6]:
def strip_file_date(file_path):
    """expecting the files to look like ./data/turnstile_100612.txt
    
    >>> strip_file_date('./data/turnstile_100612.txt') == 100612
    
    """
    
    return int(file_path.split('_')[1].split('.')[0])

In [7]:
def clean_headers(headers):
    #remove trailing whitespace from the column names
    return [c.strip().lower() for c in headers]

In [8]:
station_lookup = pd.read_sql('select distinct "C/A", STATION from station_lookup', mta_engine)

In [11]:
for data_file in iglob('./data/*.txt'):
    print(data_file)
    try:
        with open(data_file) as data:
            if strip_file_date(data_file) < 141018:
                df = fix_pre_20141018(data_file)
                df = pd.merge(df, station_lookup, on='C/A') #consider making this a left join
            else:
                df = pd.read_csv(data, low_memory=False)
            #remove trailing whitespace from the column names
            df.columns = [c.strip().lower() for c in df.columns]
            #just use 'regular' records
            df = df[df.desc == 'REGULAR']
            #create proper timestamp column
            df['ts'] = pd.to_datetime(df.date + ' ' + df.time)
            #test for uniquness
            #assert sum(df[['ts','c/a','unit', 'scp', 'station']].duplicated()) == 0
            #subset to minimum required set of vars
            df = df[['c/a','unit','scp','station','entries','exits', 'ts']]
            #order by tunrstile identity and timeseries
            df = df.sort(['station','c/a','unit','scp','ts'])
            df = df.set_index(['station','c/a','unit','scp','ts'])
            #get the delta between observations and remove outliers and missing values
            gb = df.groupby(level=['station','c/a','unit','scp']).diff()
            gb = gb.fillna(-1)
            entries = gb[gb.entries >= 0]['entries']
            exits = gb[gb.exits >= 0]['exits']
            #only keep observations less than or equal to the 99% quantile
            #TODO: consider removing outliers at the station level, instead of across the board
            entries = entries[entries <= entries.quantile(.99)]
            exits = exits[exits <= exits.quantile(.99)]
            #create a clean dataframe
            df_clean = pd.DataFrame([entries.groupby(level=['station', 'ts']).sum(),exits.groupby(level=['station', 'ts']).sum()]).T
            #aggregate to the 'day' level instead of hourly
            df_clean = df_clean.reset_index()
            df_clean['ts'] = df_clean['ts'].apply(lambda x: x.date())
            df_clean = df_clean.rename(columns={'ts': 'day'})
            df_clean = df_clean.groupby(['station', 'day']).sum()
            df_clean.to_sql(name='clean_data', con=mta_engine, if_exists='append')
    except Exception as e:
        with open('error.log', 'a') as errors:
            errors.write(data_file)
            errors.write(str(e))
            errors.write('\n')

./data/turnstile_100505.txt
./data/turnstile_100508.txt
./data/turnstile_100515.txt
./data/turnstile_100522.txt
./data/turnstile_100605.txt
./data/turnstile_100612.txt
./data/turnstile_100619.txt
./data/turnstile_100626.txt
./data/turnstile_100703.txt
./data/turnstile_100710.txt
./data/turnstile_100717.txt
./data/turnstile_100724.txt
./data/turnstile_100731.txt
./data/turnstile_100807.txt
./data/turnstile_100814.txt
./data/turnstile_100821.txt
./data/turnstile_100828.txt
./data/turnstile_100904.txt
./data/turnstile_100911.txt
./data/turnstile_100918.txt
./data/turnstile_100925.txt
./data/turnstile_101002.txt
./data/turnstile_101009.txt
./data/turnstile_101016.txt
./data/turnstile_101023.txt
./data/turnstile_101030.txt
./data/turnstile_101106.txt
./data/turnstile_101113.txt
./data/turnstile_101120.txt
./data/turnstile_101127.txt
./data/turnstile_101204.txt
./data/turnstile_101211.txt
./data/turnstile_101218.txt
./data/turnstile_101225.txt
./data/turnstile_110101.txt
./data/turnstile_110