Create clean database
============

From my raw database `raw_station` (table: `raw`), I can create a good database with a table for each station and a table that has the station information. I will be combining stations based on unit number, and *trying* to get a consistent time pattern for everything (00, 04, 08, 12, 16, 20h) to make comparison easier.

I know that some stations are offset from that pattern by an hour, and there are additional measurements that fall in between regular updates. Additionally, I need to clean the data to get actual riderships and not counters, handle the large faulty numbers, and deal with negative ridership (counter counting down).

In [1]:
%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database
import psycopg2

In [2]:
user = 'mikemoran'
raw_database = 'raw_stations'
raw_table = 'raw'

raw_engine = create_engine(f'postgres://{user}@localhost/{raw_database}')
raw_conn = psycopg2.connect(database=raw_database, user=user)

In [None]:
units = '''
    select distinct unit, station, linename, c_a from raw;
'''

station_details = pd.read_sql(units, raw_conn)
station_details.head()

In [None]:
station_details.dropna(inplace=True)  # c_a is what I want...
station_details[station_details.c_a == 'PTH16']

We need the GPS coordinates of the station as well. Instead of worrying about the individual entrances and exits, we'll just use the general station coordinates from a separate table that I have.

In [None]:
known_stations = pd.read_csv('other_data/NYC_Transit_Subway_Entrance_And_Exit_Data.csv', header=0)
#                               usecols=['Division', 'Line',
#                                        'Station Name', 'Station Latitude', 'Station Longitude',
#                                        *[f'Route{n}' for n in range(1, 12)]])
# known_stations.columns
known_stations.head()

In [None]:
routes = [f'Route{n}' for n in range(1, 12)]
known_stations.fillna('', inplace=True)

known_stations['combined_routes'] = known_stations[routes].apply(
    lambda x: ''.join(sorted([str(x[r]) for r in routes])), axis=1)

In [None]:
known_stations.drop([*routes, 'Line'], axis=1)

In [None]:
clean_database = 'stations'
station_info = 'station_info'

info_engine = create_engine(f'postgres://{user}@localhost/{clean_database}')
if not database_exists(engine.url):
    create_database(engine.url)

# info_conn = psycopg2.connect(database=clean_database, user=user)
station_details.to_sql(station_info, info_engine, if_exists='replace')

In [None]:
ca_unique = station_details.c_a.unique()
ca_unique.shape, ca_unique

In [None]:
query = '''
    select * from raw where c_a = '{}';
'''

# for ca in ca_unique:
#     print(ca)
#     df = pd.read_sql(query.format(ca), raw_conn)
#     df[['entries', 'exits']] = df.groupby(['unit', 'scp'])[['entries', 'exits']].diff()
#     df.set_index('date_time', inplace=True)
    
ca = ca_unique[0]
print(ca)
df = pd.read_sql(query.format(ca), raw_conn)

In [None]:
# df = df.set_index('date_time').sort_index()
# df.reset_index(inplace=True, drop=True)
# df.drop('index', inplace=True, axis=1)
df.sort_values(by=['c_a', 'unit', 'scp', 'date_time'], inplace=True)
df[['dentries', 'dexits']] = df.groupby(['unit', 'scp'])[['entries', 'exits']].diff()
df.head()

In [None]:
test = df.groupby('date_time').sum()
# test.head()
test_rs = test[['dentries', 'dexits']].resample('4H')
test_result = test_rs.sum()
test_result.head()

In [None]:
test_result['dentries'].sum(), df['dentries'].sum()

In [None]:
query = '''
    select * from raw where c_a = '{}';
'''

for ca in ca_unique:
    print(ca)
    df = pd.read_sql(query.format(ca), raw_conn)
    df.sort_values(by=['c_a', 'unit', 'scp', 'date_time'], inplace=True)
    df[['dentries', 'dexits']] = df.groupby(['unit', 'scp'])[['entries', 'exits']].diff()
    df_ts = df.groupby('date_time').sum()
    df_ts = df_ts[['dentries', 'dexits']].resample('4H').sum()
    df_ts.columns = ['entries', 'exits']
    df.to_sql(ca, info_engine, if_exists='replace')

The above takes approximately 24 hrs to run on my machine, since there aren't many data redution steps taken into account. Since the changepoints are what I'm interested in, the final database (if there is one) will be much smaller.