## Data Merging

This notebook covers how we gathered & cleaned our data from the various sources (New York MTA, Google Maps data, IRS Income). The code here is included in **clean2.py** in order to easily ready the data for analysis in other Jupyter Notebooks.

The intent here is to prepare this dataset for visualization, particularly to create our map figures.

It also pulls geolocation data from Google (using its ``geocode api``). This allows us to add ``zipcode`` to our data in order to filter by **AGI (Adjusted Gross Income)**, which is pulled from US Census today.

In [1]:
import numpy as np
import pandas as pd
import datetime as dt
import googlemaps
import requests

Download MTA Data

In [3]:
url = "http://web.mta.info/developers/data/nyct/turnstile/turnstile_{}.txt"

# gather the week numbers of the data we want to pull from above urls
week_nums=[191228, 191221, 191214, 191207, 191130, 191123, 191116, 191109]
dfs = []

for week_num in week_nums:
    file_url = url.format(week_num)
    dfs.append(pd.read_csv(file_url, parse_dates=[["DATE", "TIME"]], keep_date_col=True))
    
df_turnstiles = pd.concat(dfs)
df_turnstiles.head(5)

Unnamed: 0,DATE_TIME,C/A,UNIT,SCP,STATION,LINENAME,DIVISION,DATE,TIME,DESC,ENTRIES,EXITS
0,2019-12-21 03:00:00,A002,R051,02-00-00,59 ST,NQR456W,BMT,12/21/2019,03:00:00,REGULAR,7318040,2480587
1,2019-12-21 07:00:00,A002,R051,02-00-00,59 ST,NQR456W,BMT,12/21/2019,07:00:00,REGULAR,7318049,2480598
2,2019-12-21 11:00:00,A002,R051,02-00-00,59 ST,NQR456W,BMT,12/21/2019,11:00:00,RECOVR AUD,7318101,2480680
3,2019-12-21 15:00:00,A002,R051,02-00-00,59 ST,NQR456W,BMT,12/21/2019,15:00:00,REGULAR,7318263,2480763
4,2019-12-21 19:00:00,A002,R051,02-00-00,59 ST,NQR456W,BMT,12/21/2019,19:00:00,REGULAR,7318559,2480823


An important note about this dataFrame:
- The ``ENTRIES`` and ``EXITS`` columns are **cumulative**. In order for us to analyze AMPM traffic, we need to convert these columns from cumulative to hourly (in 4 hour increments, since that's how the data is formatted).
    - before we can do that, we need to remove some rows that were added as a result of an audit (can think of these as corrections). There is a relatively small amount of them (about 7k of 1.63M rows using the default weeks.)


In [4]:
mask = df_turnstiles.DESC == 'RECOVR AUD'
df_turnstiles[mask].shape, df_turnstiles.shape

((7102, 12), (1649098, 12))

Now let's get rid of the rows where ``turnstiles_df.DESC == 'RECOVR AUD'``. To do this, we will sort the rows so that the correct entry will come before the 'RECOVR AUD' entry. Then, we can apply the ``drop_duplicates`` dataFrame method to get rid of those rows.

Then, since we've removed all rows where ``DESC`` is not ``'REGULAR'``, we can drop the column entirely.

In [5]:
df_turnstiles.sort_values(
            ["C/A", "UNIT", "SCP", "STATION", "DATE_TIME"],
            inplace=True,
            ascending=False)

# keeps top row, deletes others
df_turnstiles.drop_duplicates(
    subset=["C/A", "UNIT", "SCP", "STATION", "DATE_TIME"], inplace=True
)

# remove DESC column
df_turnstiles = df_turnstiles.drop(["DESC"], axis=1, errors="ignore")

df_turnstiles.head(5)

Unnamed: 0,DATE_TIME,C/A,UNIT,SCP,STATION,LINENAME,DIVISION,DATE,TIME,ENTRIES,EXITS
206706,2019-12-27 20:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,20:00:00,5554,420
206705,2019-12-27 16:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,16:00:00,5554,420
206704,2019-12-27 12:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,12:00:00,5554,420
206703,2019-12-27 08:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,08:00:00,5554,420
206702,2019-12-27 04:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,04:00:00,5554,420


Great, now we can see that there is no more DESC column. 

As you can see in the output of the below cell, the ``EXITS`` column has a lot of spaces after it. Let's fix that so we can easily select this column in the future.

In [6]:
df_turnstiles.columns

Index(['DATE_TIME', 'C/A', 'UNIT', 'SCP', 'STATION', 'LINENAME', 'DIVISION',
       'DATE', 'TIME', 'ENTRIES',
       'EXITS                                                               '],
      dtype='object')

In [7]:
df_turnstiles.rename(columns={"EXITS                                                               ": "EXITS"},
            inplace=True)

Now, let's add the AMPM column.

**NOTE**: This cell takes a while, so avoid re-running if possible.

In [8]:
df_turnstiles["AMPM"] = (pd.DatetimeIndex(df_turnstiles["TIME"]).strftime("%r").str[-2:])
df_turnstiles.head(3)

Unnamed: 0,DATE_TIME,C/A,UNIT,SCP,STATION,LINENAME,DIVISION,DATE,TIME,ENTRIES,EXITS,AMPM
206706,2019-12-27 20:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,20:00:00,5554,420,PM
206705,2019-12-27 16:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,16:00:00,5554,420,PM
206704,2019-12-27 12:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,12:00:00,5554,420,PM


And day name.

In [9]:
df_turnstiles["DAY_NAME"] = pd.to_datetime(df_turnstiles["DATE"]).dt.day_name()
df_turnstiles.head(3)

Unnamed: 0,DATE_TIME,C/A,UNIT,SCP,STATION,LINENAME,DIVISION,DATE,TIME,ENTRIES,EXITS,AMPM,DAY_NAME
206706,2019-12-27 20:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,20:00:00,5554,420,PM,Friday
206705,2019-12-27 16:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,16:00:00,5554,420,PM,Friday
206704,2019-12-27 12:00:00,TRAM2,R469,00-05-01,RIT-ROOSEVELT,R,RIT,12/27/2019,12:00:00,5554,420,PM,Friday


We eventually want to add the adjusted gross income of each station. To get that, we need zipcode. And to get zipcode, we will to take the lat/lon data provided in MTA's station data and pass it to Google's **geocode** API.

In [10]:
# Read in mta station's zipcode and income data into ``df_turnstiles``
mta_station_info = pd.read_csv("http://web.mta.info/developers/data/nyct/subway/Stations.csv")
mta_station_info.rename(columns={'Stop Name': 'STATION', 'GTFS Latitude': 'Lat', 'GTFS Longitude': 'Lon'}, inplace=True)

mta_station_info.head()

Unnamed: 0,Station ID,Complex ID,GTFS Stop ID,Division,Line,STATION,Borough,Daytime Routes,Structure,Lat,Lon,North Direction Label,South Direction Label,ADA,ADA Notes
0,1,1,R01,BMT,Astoria,Astoria-Ditmars Blvd,Q,N W,Elevated,40.775036,-73.912034,,Manhattan,0,
1,2,2,R03,BMT,Astoria,Astoria Blvd,Q,N W,Elevated,40.770258,-73.917843,Ditmars Blvd,Manhattan,1,
2,3,3,R04,BMT,Astoria,30 Av,Q,N W,Elevated,40.766779,-73.921479,Astoria - Ditmars Blvd,Manhattan,0,
3,4,4,R05,BMT,Astoria,Broadway,Q,N W,Elevated,40.76182,-73.925508,Astoria - Ditmars Blvd,Manhattan,0,
4,5,5,R06,BMT,Astoria,36 Av,Q,N W,Elevated,40.756804,-73.929575,Astoria - Ditmars Blvd,Manhattan,0,


Nice! Now we have the station data...but still no zipcode. Let's get the station names and let Google do the rest.


**NOTE 1**: This part requires a key for Google's geocode API. The below key will be deactivated before it's made public.

**NOTE 2**: This will take some time. Go get yourself a drink!

In [22]:
import googlemaps

# you will need to get your own API Key, this API key will not work for you.
# get your own at: https://developers.google.com/maps/documentation/geocoding/start
gmaps = googlemaps.Client(key='AIzaSyCGo0NcvTdM9fgFx7y8BzShf5OJLHH562U') 

station_zips = {}
mta_station_names = list(mta_station_info.STATION.unique())

for station in mta_station_names:
    # we want to query geocode with the most accurate address possible
    address = station + ' Station New York City, NY'
    geocode_result = gmaps.geocode(address)
    try:
        # due to the funky json returned from geocode we need to access zip code like this
        zipcode = geocode_result[0]['address_components'][6]['long_name']
        if len(zipcode) == 5:
            station_zips[station.upper()] = str(zipcode) 
    except:
        continue

Let's take a look at what we just generated.

In [38]:
station_zips

{'ASTORIA-DITMARS BLVD': '11105',
 '30 AV': '11102',
 '36 AV': '11106',
 '39 AV-DUTCH KILLS': '11101',
 'LEXINGTON AV/59 ST': '10065',
 '5 AV/59 ST': '10019',
 '57 ST-7 AV': '10106',
 '49 ST': '10019',
 'TIMES SQ-42 ST': '10018',
 '34 ST-HERALD SQ': '10001',
 '28 ST': '10001',
 '23 ST': '10011',
 '14 ST-UNION SQ': '10003',
 '8 ST-NYU': '10003',
 'PRINCE ST': '10012',
 'CANAL ST': '10013',
 'CITY HALL': '10013',
 'CORTLANDT ST': '10007',
 'RECTOR ST': '10006',
 'COURT ST': '11201',
 'JAY ST-METROTECH': '11201',
 'DEKALB AV': '11217',
 'UNION ST': '11215',
 '4 AV-9 ST': '11215',
 'PROSPECT AV': '11215',
 '36 ST': '10012',
 '53 ST': '10022',
 'BAY RIDGE AV': '11220',
 'BAY RIDGE-95 ST': '11209',
 '7 AV': '10019',
 'PARKSIDE AV': '11226',
 'CHURCH AV': '11226',
 'BEVERLEY RD': '11226',
 'CORTELYOU RD': '11226',
 'NEWKIRK PLAZA': '11226',
 'AVENUE H': '11230',
 'AVENUE J': '11230',
 'AVENUE M': '11230',
 'KINGS HWY': '11229',
 'AVENUE U': '11223',
 'NECK RD': '11229',
 'SHEEPSHEAD BAY': '11

Alright, now that we already have this data, let's save it to the data folder. That way, we don't need to ping Google's API every time we run ``clean2.py``.

In [36]:
import json
out_file = open("data/station_zips.json", "w") 
  
json.dump(station_zips, out_file) 
  
out_file.close() 

Now let's test out that we can load this back into memory by loading it back.

In [37]:
new_dict = json.load(open("data/station_zips.json", 'r'))
new_dict

{'ASTORIA-DITMARS BLVD': '11105',
 '30 AV': '11102',
 '36 AV': '11106',
 '39 AV-DUTCH KILLS': '11101',
 'LEXINGTON AV/59 ST': '10065',
 '5 AV/59 ST': '10019',
 '57 ST-7 AV': '10106',
 '49 ST': '10019',
 'TIMES SQ-42 ST': '10018',
 '34 ST-HERALD SQ': '10001',
 '28 ST': '10001',
 '23 ST': '10011',
 '14 ST-UNION SQ': '10003',
 '8 ST-NYU': '10003',
 'PRINCE ST': '10012',
 'CANAL ST': '10013',
 'CITY HALL': '10013',
 'CORTLANDT ST': '10007',
 'RECTOR ST': '10006',
 'COURT ST': '11201',
 'JAY ST-METROTECH': '11201',
 'DEKALB AV': '11217',
 'UNION ST': '11215',
 '4 AV-9 ST': '11215',
 'PROSPECT AV': '11215',
 '36 ST': '10012',
 '53 ST': '10022',
 'BAY RIDGE AV': '11220',
 'BAY RIDGE-95 ST': '11209',
 '7 AV': '10019',
 'PARKSIDE AV': '11226',
 'CHURCH AV': '11226',
 'BEVERLEY RD': '11226',
 'CORTELYOU RD': '11226',
 'NEWKIRK PLAZA': '11226',
 'AVENUE H': '11230',
 'AVENUE J': '11230',
 'AVENUE M': '11230',
 'KINGS HWY': '11229',
 'AVENUE U': '11223',
 'NECK RD': '11229',
 'SHEEPSHEAD BAY': '11

``station_zips`` now contains the zip codes for each station. Some are missing - that's because some zip codes in New York are so small that Google doesn't have the data for them.

Now let's add that to our dataFrames!

In [14]:
mta_station_info['ZIPCODE'] = mta_station_info['STATION'].str.upper().map(station_zips)
df_turnstiles['ZIPCODE'] = df_turnstiles['STATION'].map(station_zips)
df_turnstiles.sample(10)

Unnamed: 0,DATE_TIME,C/A,UNIT,SCP,STATION,LINENAME,DIVISION,DATE,TIME,ENTRIES,EXITS,AMPM,DAY_NAME,ZIPCODE
146055,2019-12-05 16:00:00,R172,R192,00-00-03,CATHEDRAL PKWY,1,IRT,12/05/2019,16:00:00,279680,88961,PM,Thursday,
116398,2019-12-02 07:34:51,PTH02,R544,00-03-00,HARRISON,1,PTH,12/02/2019,07:34:51,9192,1169,AM,Monday,
118629,2019-12-17 18:35:11,PTH03,R552,00-01-05,JOURNAL SQUARE,1,PTH,12/17/2019,18:35:11,61691,23710,PM,Tuesday,
196389,2019-12-18 15:00:00,R601A,R108,02-05-01,BOROUGH HALL,2345R,IRT,12/18/2019,15:00:00,5,1927,PM,Wednesday,
23505,2019-11-13 07:00:00,C023,R213,00-03-00,BAY RIDGE AV,R,BMT,11/13/2019,07:00:00,5983918,2982779,AM,Wednesday,11220.0
112120,2019-11-19 11:00:00,N607,R025,01-00-01,JAMAICA CENTER,EJZ,IND,11/19/2019,11:00:00,86310,99878,AM,Tuesday,
71485,2019-11-15 15:00:00,N191,R335,00-05-01,BEACH 67 ST,A,IND,11/15/2019,15:00:00,0,0,PM,Friday,11692.0
26208,2019-11-17 20:00:00,D006,R398,01-06-01,NEW UTRECHT AV,ND,BMT,11/17/2019,20:00:00,2611017,2733813,PM,Sunday,11219.0
159662,2019-11-30 19:00:00,R238,R046,00-00-06,GRD CNTRL-42 ST,4567S,IRT,11/30/2019,19:00:00,86167,136987,PM,Saturday,
83291,2019-11-19 19:00:00,N324,R018,00-03-04,JKSN HT-ROOSVLT,EFMR7,IND,11/19/2019,19:00:00,2072027,242633,PM,Tuesday,


Now, we'll retrieve income information for the entire United States by zipcode (from the IRS)

In [15]:
us_zips_agi = pd.read_csv("https://www.irs.gov/pub/irs-soi/18zpallagi.csv")
us_zips_agi.rename(columns={'A00100':'adj_gross_inc'}, inplace=True) # in 18zpallagi.csv, A00100 stands for AGI
us_zips_agi = us_zips_agi[['zipcode','adj_gross_inc']].groupby('zipcode').agg(sum) 

We can sort this zipcode/agi data into NYC zipcodes by joining the data with a list of nyc_zipcodes (ny_zips.csv)

In [16]:
nyc_zips = pd.read_csv("data/ny_zips.csv")
nyc_zips.dropna(axis=1, how='all', inplace=True)
nyc_agi_by_zip = nyc_zips.join(us_zips_agi, how='inner', on='zipcode')

# must capitalize col name & change from dtype 'object' to 'str' in order to merge into df_turnstiles
nyc_agi_by_zip.columns = nyc_agi_by_zip.columns.str.upper()
nyc_agi_by_zip['ZIPCODE'] = nyc_agi_by_zip.ZIPCODE.astype(str)

nyc_agi_by_zip.head(10)


Unnamed: 0,ZIPCODE,AREA,ADJ_GROSS_INC
0,10001,Manhattan,2906435.0
1,10002,Manhattan,2718913.0
2,10003,Manhattan,8191737.0
3,10004,Manhattan,944925.0
4,10005,Manhattan,2603668.0
5,10006,Manhattan,577145.0
6,10007,Manhattan,2910802.0
7,10009,Manhattan,2948597.0
8,10010,Manhattan,4542337.0
9,10011,Manhattan,9331779.0


Great! Now we can add AGI to our turnstiles DataFrames. Let's do that now.

In [18]:
# convert ZIPCODE and ADJ_GROSS_INC cols to dict{zipcode: AGI}
zipcode_agis = nyc_agi_by_zip[['ZIPCODE', 'ADJ_GROSS_INC']].set_index('ZIPCODE').to_dict()['ADJ_GROSS_INC']

# use the dictionary to add ZIPCODE_AGI column to df_turnstiles
df_turnstiles['ZIPCODE_AGI'] = df_turnstiles['ZIPCODE'].map(zipcode_agis)
df_turnstiles.sample(10)

Unnamed: 0,DATE_TIME,C/A,UNIT,SCP,STATION,LINENAME,DIVISION,DATE,TIME,ENTRIES,EXITS,AMPM,DAY_NAME,ZIPCODE,ZIPCODE_AGI
95041,2019-12-13 04:00:00,N420B,R317,00-03-01,CLINTON-WASH AV,G,IND,12/13/2019,04:00:00,20589,10459,AM,Friday,,
82103,2019-12-27 23:00:00,N314,R238,01-00-01,STEINWAY ST,MR,IND,12/27/2019,23:00:00,1435513,629484,PM,Friday,,
114884,2019-11-24 22:51:28,PTH01,R549,00-00-02,NEWARK HW BMEBE,1,PTH,11/24/2019,22:51:28,60620,1773,PM,Sunday,,
166103,2019-12-09 20:00:00,R256,R182,00-00-00,116 ST,6,IRT,12/09/2019,20:00:00,9649158,3725454,PM,Monday,10035.0,833400.0
131362,2019-12-26 00:00:00,R108A,R305,05-00-01,WTC-CORTLANDT,1,IRT,12/26/2019,00:00:00,29894,118031,AM,Thursday,,
8768,2019-11-11 12:00:00,A049,R088,02-03-02,CORTLANDT ST,RNW,BMT,11/11/2019,12:00:00,1174292,408841,PM,Monday,10007.0,2910802.0
53874,2019-11-12 03:00:00,N067,R012,00-03-05,34 ST-PENN STA,ACE,IND,11/12/2019,03:00:00,535592,251859,AM,Tuesday,,
71315,2019-12-03 07:00:00,N182,R414,00-05-01,HOWARD BCH JFK,A,IND,12/03/2019,07:00:00,3,5129,AM,Tuesday,,
162873,2019-11-17 23:00:00,R245A,R051,01-06-00,59 ST,456NQRW,IRT,11/17/2019,23:00:00,3386511,4793828,PM,Sunday,,
108603,2019-12-07 00:00:00,N558,R130,01-06-00,KINGS HWY,F,IND,12/07/2019,00:00:00,118061160,248833,AM,Saturday,11229.0,2423575.0


Cool, that worked! Now we have both zipcode and zipcode_agi in our dataFrame.

Now it's time to convert ENTRIES and EXITS columns from cumulative to it's change from previous value.

We do this by shifting the previous values forward, then subtracting the previous values from the current ones. For this to work, the data must be sorted by station & datetime (it already is).

In [20]:
# group data by AMPM, taking the maximum entries/exits for each date 
ampm_station_group = df_turnstiles.groupby(["C/A", "UNIT", "SCP", "STATION", "ZIPCODE", "ZIPCODE_AGI", "DATE", "AMPM", "DAY_NAME"],
    as_index=False)

df_ampm = ampm_station_group.ENTRIES.max()
ampm_station_exits = ampm_station_group.EXITS.max()
df_ampm["EXITS"] = ampm_station_exits["EXITS"]

# create prev_date and prev_entries cols by shifting these columns forward one day
# if shifting date and entries, don't group by date
df_ampm[["PREV_DATE", "PREV_ENTRIES", "PREV_EXITS"]] = df_ampm.groupby(
    ["C/A", "UNIT", "SCP", "STATION", "ZIPCODE", "ZIPCODE_AGI"]
)[["DATE", "ENTRIES", "EXITS"]].apply(lambda grp: grp.shift(1))

# Drop the rows for the earliest date in the df, which are now NaNs for prev_date and prev_entries cols
df_ampm.dropna(subset=["PREV_DATE"], axis=0, inplace=True)

df_ampm.head()

Unnamed: 0,C/A,UNIT,SCP,STATION,ZIPCODE,ZIPCODE_AGI,DATE,AMPM,DAY_NAME,ENTRIES,EXITS,PREV_DATE,PREV_ENTRIES,PREV_EXITS
1,A006,R079,00-00-00,5 AV/59 ST,10019,8005583.0,11/02/2019,PM,Saturday,4098923,7058586,11/02/2019,4097957.0,7057072.0
2,A006,R079,00-00-00,5 AV/59 ST,10019,8005583.0,11/03/2019,AM,Sunday,4099092,7058848,11/02/2019,4098923.0,7058586.0
3,A006,R079,00-00-00,5 AV/59 ST,10019,8005583.0,11/03/2019,PM,Sunday,4099979,7060287,11/03/2019,4099092.0,7058848.0
4,A006,R079,00-00-00,5 AV/59 ST,10019,8005583.0,11/04/2019,AM,Monday,4100128,7061222,11/03/2019,4099979.0,7060287.0
5,A006,R079,00-00-00,5 AV/59 ST,10019,8005583.0,11/04/2019,PM,Monday,4101641,7063433,11/04/2019,4100128.0,7061222.0


Alright, as we can see above, we now have ``PREV_ENTRIES`` and ``PREV_EXITS`` columns, and they seem mostly correct. However, there is a lot of data, and we noticed that sometimes ``PREV_ENTRIES`` > ``ENTRIES``. This shouldn't be...in fact, we determined that some stations were counting in reverse! In other cases, the variance to the previous count was hundreds of thousands, sometimes millions. We decided to cap these at 200,000 to maintain our data's integrity as best as possible. 

The below functions takes care of this for us for both ENTRIES and EXITS.

In [None]:
def add_counts(row, max_counter, column_name):
    """
    Takes:
        - max_counter is the maximum difference between entries/exits & their prev. row values that
    we will allow.
    column_name (string): which column to count
    """

    counter = row[column_name] - row[f"PREV_{column_name}"]
    if counter < 0:
        # Maybe counter is reversed?
        counter = -counter

    if counter > max_counter:
        # Maybe counter was reset to 0?
        # take the lower value as the counter for this row
        counter = min(row[column_name], row[f"PREV_{column_name}"])

    if counter > max_counter:
        # Check it again to make sure we're not still giving a counter that's too big
        return 0

    return counter
    

Alright, now let's apply this function to our dataFrames.

In [None]:
# we will use a 200k counter - anything more seems incorrect.
df_ampm["TMP_ENTRIES"] = df_ampm.apply(
    add_counts, axis=1, max_counter=200000, column_name='ENTRIES')

df_ampm["TMP_EXITS"] = df_ampm.apply(
    add_counts, axis=1, max_counter=200000, column_name='EXITS')

Let's make sure the new columns correctly account for AMPM...

In [None]:
mask = df_ampm.STATION == '50 ST'
df_ampm[mask].head(20)

That's all for the data cleaning! As I mentioned before, all of this code is scripted out in ``clean2.py``'s ``data_wrangling`` function. It returns a tuple of the dataFrames we created above, ``df_turnstiles`` and ``df_ampm``.

Now, you can open ``fresh_start.ipynb`` and try it out for yourself!