# Trips and Stations
* Create yearly trip parquet files
* Create bike dock stations parquet file

In [None]:
import pandas as pd
import os
import dask.dataframe as dd
import pyarrow as pa
import logging
import requests, json
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter

In [None]:
DATA_DIR = "data/"
# CSV_DIR = DATA_DIR + "tripdata_csv/"
PARQUET_DIR = DATA_DIR + "tripdata_parquet/"
NY_DIR = PARQUET_DIR + "NY/"
NJ_DIR = PARQUET_DIR + "NJ/"
STATIONS_DIR = DATA_DIR + "stations/"
PARQUET_EXTENSION = ".parquet"
STATION_INFO_URL = "https://gbfs.citibikenyc.com/gbfs/en/station_information.json"

logging.basicConfig(level=logging.WARNING)

logging.info(
    f"{len(os.listdir(NJ_DIR))} Jersey City files and {len(os.listdir(NY_DIR))} New York City files"
)

# schema for parquet files in
TRIPDATA_COLUMN_DTYPES = {
    "tripduration": "int32",
    "starttime": "datetime64",
    "stoptime": "datetime64",
    "startstationid": "category",
    "startstationname": "category",
    "startstationlatitude": "category",
    "startstationlongitude": "category",
    "endstationid": "category",
    "endstationname": "category",
    "endstationlatitude": "category",
    "endstationlongitude": "category",
    "bikeid": "category",
    "usertype": "category",
    "birthyear": "category",
    "gender": "category",
}

In [None]:
def merge_monthly_trips(year, directory: str) -> None:
    """
    Creates a merged parquet file from parquet files in a directory
    :param year: the year (int) to merge monthly data for. if None, then merge all files in directory
    :param directory: a directory containing parquet files with identical schema (column names) across files
    :return: None
    """
    if year:
        range_start = str(year) + "-01"
        range_end = str(year) + "-13"
        month_files = sorted(
            [
                directory + f
                for f in os.listdir(directory)
                if range_start <= f <= range_end
            ]
        )
    else:
        month_files = sorted(
            [
                directory + f
                for f in os.listdir(directory)
                if f.endswith(PARQUET_EXTENSION)
            ]
        )

    parquet_ddfs: list[dd.DataFrame] = []
    for month_file in month_files:
        if os.path.exists(month_file):
            ddf = dd.read_parquet(month_file)
            ddf.astype(TRIPDATA_COLUMN_DTYPES)
            ddf["birthyear"] = ddf["birthyear"].astype(
                "str"
            )  # some issue with birthyear in particular
            parquet_ddfs.append(ddf)

    all_trips = dd.concat(parquet_ddfs)
    filename = str(year) if year else "alltrips"
    all_trips.to_parquet(
        directory + filename + PARQUET_EXTENSION,
        schema={"birthyear": pa.string()},
        engine="pyarrow",
    )

In [None]:
%%time
# create parquet file from all trip data (NY)
# NOTE run this before running the below cell if you want this large file. running it after will not work
merge_monthly_trips(year=None, directory=NY_DIR)

['data/tripdata_parquet/NY/2013-06.parquet', 'data/tripdata_parquet/NY/2013-07.parquet', 'data/tripdata_parquet/NY/2013-08.parquet', 'data/tripdata_parquet/NY/2013-09.parquet', 'data/tripdata_parquet/NY/2013-10.parquet', 'data/tripdata_parquet/NY/2013-11.parquet', 'data/tripdata_parquet/NY/2013-12.parquet', 'data/tripdata_parquet/NY/2014-01.parquet', 'data/tripdata_parquet/NY/2014-02.parquet', 'data/tripdata_parquet/NY/2014-03.parquet', 'data/tripdata_parquet/NY/2014-04.parquet', 'data/tripdata_parquet/NY/2014-05.parquet', 'data/tripdata_parquet/NY/2014-06.parquet', 'data/tripdata_parquet/NY/2014-07.parquet', 'data/tripdata_parquet/NY/2014-08.parquet', 'data/tripdata_parquet/NY/2014-09.parquet', 'data/tripdata_parquet/NY/2014-10.parquet', 'data/tripdata_parquet/NY/2014-11.parquet', 'data/tripdata_parquet/NY/2014-12.parquet', 'data/tripdata_parquet/NY/2015-01.parquet', 'data/tripdata_parquet/NY/2015-02.parquet', 'data/tripdata_parquet/NY/2015-03.parquet', 'data/tripdata_parquet/NY/2015-

In [None]:
%%time
# create yearly trip data parquet files
for year in range(2013, 2022):
    merge_monthly_trips(year, NY_DIR)

CPU times: user 3min 29s, sys: 1min 16s, total: 4min 46s
Wall time: 3min 34s


In [None]:
# example: read a yearly parquet file (2019)

trip_columns = [
    "tripduration",
    "starttime",
    "stoptime",
    "startstationid",
    "endstationid",
    "bikeid",
    "usertype",
    "birthyear",
    "gender",
]  # specify columns you want to read
test = pd.read_parquet(
    NY_DIR + "2019.parquet", engine="pyarrow", columns=trip_columns
).reset_index()
test.drop(test.columns[0], axis=1, inplace=True)  # drop the dask index
test

Unnamed: 0,tripduration,starttime,stoptime,startstationid,endstationid,bikeid,usertype,birthyear,gender
0,320,2019-01-01 00:01:47.4010,2019-01-01 00:07:07.5810,3160.0,3283.0,15839,Subscriber,1971,1
1,316,2019-01-01 00:04:43.7360,2019-01-01 00:10:00.6080,519.0,518.0,32723,Subscriber,1964,1
2,591,2019-01-01 00:06:03.9970,2019-01-01 00:15:55.4380,3171.0,3154.0,27451,Subscriber,1987,1
3,2719,2019-01-01 00:07:03.5450,2019-01-01 00:52:22.6500,504.0,3709.0,21579,Subscriber,1990,1
4,303,2019-01-01 00:07:35.9450,2019-01-01 00:12:39.5020,229.0,503.0,35379,Subscriber,1979,1
...,...,...,...,...,...,...,...,...,...
20551512,729,2019-10-31 23:59:12.1900,2019-11-01 00:11:21.4860,237.0,311.0,25725,Subscriber,1995,1
20551513,645,2019-10-31 23:59:17.0470,2019-11-01 00:10:02.9450,3259.0,461.0,39583,Customer,1969,0
20551514,257,2019-10-31 23:59:22.5140,2019-11-01 00:03:40.2600,3798.0,505.0,21240,Subscriber,1985,1
20551515,466,2019-10-31 23:59:23.1710,2019-11-01 00:07:09.2050,328.0,361.0,34916,Subscriber,1989,0


In [None]:
if not os.path.exists(STATIONS_DIR):
    os.makedirs(os.path.dirname(STATIONS_DIR))

In [None]:
def create_stations(year, directory):
    """
    Creates station table for year, saves to parquet file
    :param year: year to create stations for using trip data for that year
    :param directory: directory with the trip data parquet file
    :return: None
    """
    trip_filepath = directory + str(year) + PARQUET_EXTENSION
    trips = pd.read_parquet(trip_filepath, engine="pyarrow").reset_index()
    trips.drop(trips.columns[0], axis=1, inplace=True)  # drop the dask index

    station_columns = [
        "startstationid",
        "startstationname",
        "startstationlatitude",
        "startstationlongitude",
    ]
    stations = trips[station_columns]
    col_rename = {
        "startstationid": "stationid",
        "startstationname": "stationname",
        "startstationlatitude": "latitude",
        "startstationlongitude": "longitude",
    }
    stations.rename(columns=col_rename, inplace=True)
    stations.drop_duplicates(subset=["stationid"], inplace=True)

    stations_filepath = STATIONS_DIR + str(year) + PARQUET_EXTENSION
    stations.to_parquet(stations_filepath, engine="pyarrow")

    # when reading a trip parquet file, just specify the columns to use
    # # remove unneeded cols from trips and save back to itself
    # drop_cols = [
    #     "startstationname",
    #     "startstationlatitude",
    #     "startstationlongitude",
    #     "endstationname",
    #     "endstationlatitude",
    #     "endstationlongitude",
    # ]
    # trips.drop(drop_cols, axis=1, inplace=True)
    # trips.to_parquet(directory + str(year) + PARQUET_EXTENSION, schema={"birthyear": pa.string()}, engine='pyarrow')

In [None]:
%%time
for year in range(2013, 2022):
    create_stations(year, NY_DIR)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stations.rename(columns=col_rename, inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stations.drop_duplicates(subset=["stationid"], inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stations.rename(columns=col_rename, inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  station

CPU times: user 2min 44s, sys: 3min 50s, total: 6min 35s
Wall time: 6min 8s


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stations.rename(columns=col_rename, inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stations.drop_duplicates(subset=["stationid"], inplace=True)


In [None]:
def merge_stations() -> pd.DataFrame:
    """
    Return merged yearly station files
    """
    stations_dfs = []
    stations_files = [
        f for f in os.listdir(STATIONS_DIR) if not f.startswith("stations")
    ]
    for station_file in stations_files:
        filepath = STATIONS_DIR + station_file
        stations_dfs.append(pd.read_parquet(filepath))

    all_stations = pd.concat(stations_dfs)
    all_stations.drop_duplicates(subset=["stationid"], inplace=True)

    return all_stations

In [None]:
def add_station_capacity(stations: pd.DataFrame) -> pd.DataFrame:
    """
    Adds station capacity info from Citibike GBFS feed
    :param stations:
    :return: stations with capacity info
    """
    # get station info
    url = requests.get(STATION_INFO_URL)
    data = json.loads(url.text)
    station_details = pd.DataFrame.from_dict(data["data"]["stations"])

    # extract capacity and merge back to dataframe
    station_details = station_details[["name", "capacity"]]
    station_details.rename(columns={"name": "stationname"}, inplace=True)

    return stations.merge(station_details, how="left", on="stationname")

In [None]:
def add_station_geodata(stations: pd.DataFrame) -> pd.DataFrame:
    """
    Adds station geodata info
    :param stations:
    :return: station with geodata info
    """
    logging.debug("reverse geocoding boro and neighbourhood, wait 15-20 mins...")
    geolocator = Nominatim(user_agent="bikegeocode")
    reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1)
    locations_lst = []
    for index, row in stations.iterrows():
        locations_lst.append(
            reverse("{}, {}".format(row["latitude"], row["longitude"])).raw["address"]
        )
    logging.debug("geocode complete, merging...")
    locations = pd.DataFrame(locations_lst, index=stations.stationid).reset_index()
    locations = locations[["stationid", "neighbourhood", "suburb", "postcode"]]
    locations.rename(columns={"suburb": "boro", "postcode": "zipcode"}, inplace=True)
    locations = locations.astype("category")

    return stations.merge(locations, how="left", on="stationid")

In [None]:
# merge yearly stations data, get capacity, get geodata, save
# TODO get elevation
stations = merge_stations()

In [None]:
stations = add_station_capacity(stations)

In [None]:
%%time
stations = add_station_geodata(stations)



CPU times: user 4.66 s, sys: 713 ms, total: 5.37 s
Wall time: 23min 58s


In [None]:
stations.to_parquet(STATIONS_DIR + "stations" + PARQUET_EXTENSION, engine="pyarrow")

In [None]:
# TODO FIX two capacity columns
# example: read stations (all stations seen across all years)
stations = pd.read_parquet(
    STATIONS_DIR + "stations" + PARQUET_EXTENSION, engine="pyarrow"
)
stations

Unnamed: 0,stationid,stationname,latitude,longitude,capacity,neighbourhood,boro,zipcode
0,455.0,1 Ave & E 44 St,40.750020,-73.969053,59.0,Turtle Bay,Manhattan,10017-6927
1,434.0,9 Ave & W 18 St,40.743174,-74.003664,60.0,Chelsea District,Manhattan,10019
2,491.0,E 24 St & Park Ave S,40.740964,-73.986022,,Manhattan Community Board 5,Manhattan,10010
3,384.0,Fulton St & Waverly Ave,40.683178,-73.965964,31.0,,Brooklyn,11238
4,474.0,5 Ave & E 29 St,40.745168,-73.986831,56.0,Midtown South,Manhattan,10035
...,...,...,...,...,...,...,...,...
1425,3685.0,Prospect Park - 5 Year Anniversary Celebration,40.660652,-73.964590,,,Brooklyn,11225
1426,3695.0,E 5 St & 2 Ave,40.726870,-73.989190,,East Village,Manhattan,10003
1427,3700.0,E 87 St & 3 Ave,40.779406,-73.953336,,Carnegie Hill,Manhattan,10028
1428,3805.0,E 80 St & Park Ave,40.776173,-73.959757,,Manhattan Community Board 8,Manhattan,10075
