https://data.ny.gov/Transportation/MTA-Subway-Hourly-Ridership-Beginning-February-202/wujg-7c2s/about_data

In [17]:
import polars as pl
import re

In [18]:
def add_dec_prec(row):
    latitude = row["latitude"]
    longitude = row["longitude"]
    # Assuming latitude and longitude are strings; extract the precision part
    lat_precision = len(latitude.split(".")[-1]) if "." in latitude else 0
    long_precision = len(longitude.split(".")[-1]) if "." in longitude else 0
    # Return the total precision as the sum of both
    return lat_precision + long_precision

In [19]:
def clean_column_name(name):
    return re.sub(r"[ \-&]", "_", name).replace("___", "_").replace("__","_").lower()

In [20]:
#Ridership table
columns_to_keep = ["transit_timestamp", "station_complex_id", "fare_class_category", "ridership"]
original_ridership = pl.read_parquet("data/hist.parquet", columns=columns_to_keep, low_memory=True)

ridership_wide = original_ridership.with_columns(
    [pl.col("transit_timestamp").str.strptime(pl.Datetime, format="%m/%d/%Y %I:%M:%S %p"),
     pl.col("ridership").cast(pl.Int16)]
).pivot(
    index=["transit_timestamp", "station_complex_id"],
    columns="fare_class_category",
    values="ridership",
    aggregate_function="sum",
    sort_columns=True
).sort(
    ["transit_timestamp", "station_complex_id"], descending=[False, False]
).fill_null(0)

ridership_columns = [col for col in ridership_wide.columns if "Metrocard" in col or "OMNY" in col]
ridership = ridership_wide.with_columns(
    total_ridership=pl.sum_horizontal(col for col in ridership_columns)
)

rename_mapping = {col: clean_column_name(col) for col in ridership.columns}
ridership = ridership.rename(rename_mapping)

#Since we got rid of the shuttle and TRAM lines, we filter them out here too.
ridership = ridership.filter(~pl.col("station_complex_id").str.contains("TRAM")).filter(~pl.col("station_complex_id").str.contains("141"))
ridership

transit_timestamp,station_complex_id,metrocard_fair_fare,metrocard_full_fare,metrocard_other,metrocard_seniors_disability,metrocard_students,metrocard_unlimited_30_day,metrocard_unlimited_7_day,omny_full_fare,omny_other,omny_seniors_disability,total_ridership
datetime[μs],str,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64
2022-02-01 00:00:00,"""1""",0,10,0,0,0,3,4,0,0,0,17
2022-02-01 00:00:00,"""10""",2,41,2,10,0,16,29,0,0,0,100
2022-02-01 00:00:00,"""100""",0,6,0,0,0,1,3,0,0,0,10
2022-02-01 00:00:00,"""101""",4,13,3,1,0,2,2,0,0,0,25
2022-02-01 00:00:00,"""103""",0,8,0,0,0,6,3,0,0,0,17
…,…,…,…,…,…,…,…,…,…,…,…,…
2024-03-16 23:00:00,"""95""",2,6,0,0,0,2,9,17,0,0,36
2024-03-16 23:00:00,"""96""",5,4,2,1,0,3,7,23,0,0,45
2024-03-16 23:00:00,"""97""",6,10,3,0,0,12,4,104,0,0,139
2024-03-16 23:00:00,"""98""",1,7,5,1,0,5,8,24,0,0,51


In [66]:
#Subset of the data for the other tables in the schema
df = pl.read_parquet("data/hist.parquet", n_rows=30_000_000, low_memory=True)

In [67]:
#Stations table
stations = df.select(["station_complex_id", "station_complex", "borough", "latitude", "longitude"]).unique()

df_with_precision = stations.with_columns(
    [(pl.struct(["latitude", "longitude"]).map_batches(
        lambda batch: batch.map_elements(add_dec_prec, return_dtype=pl.Int64)
    )).alias("total_precision"),
     pl.col("latitude").cast(pl.Float64),
     pl.col("longitude").cast(pl.Float64)]
)
df_with_precision = df_with_precision.sort(['station_complex_id', 'total_precision'], descending=[False, True]).unique(subset=["station_complex_id"])

stations = df_with_precision.select(["station_complex_id", "station_complex", "borough", "latitude", "longitude"]).unique().sort("station_complex_id")

stations = stations.with_columns(
    pl.col("station_complex")
    .str.replace_all(r"\,S", "")
    .str.replace_all(r"\(110 St\)", "- 110 St")
    .str.replace_all(r"\/Botanic Garden \(S\)", "")
    .str.strip_chars()
    ).filter(~pl.col("station_complex_id").str.contains("TRAM")
    ).filter(~pl.col("station_complex").str.contains(r"\(S\)")
    ).sort("station_complex_id")
stations

station_complex_id,station_complex,borough,latitude,longitude
str,str,str,f64,f64
"""1""","""Astoria-Ditmar…","""Queens""",40.775036,-73.912033
"""10""","""49 St (N,R,W)""","""Manhattan""",40.759899,-73.984138
"""100""","""Hewes St (M,J)…","""Brooklyn""",40.706871,-73.95343
"""101""","""Marcy Av (M,J,…","""Brooklyn""",40.708359,-73.957756
"""103""","""Bowery (J,Z)""","""Manhattan""",40.72028,-73.993912
…,…,…,…,…
"""95""","""Gates Av (J,Z)…","""Brooklyn""",40.689629,-73.922272
"""96""","""Kosciuszko St …","""Brooklyn""",40.69334,-73.928818
"""97""","""Myrtle Av (M,J…","""Brooklyn""",40.697208,-73.935654
"""98""","""Flushing Av (M…","""Brooklyn""",40.70026,-73.941124


: 

In [104]:
#Routes table
station_list = stations["station_complex"].to_list()
regex_pattern = r"\(([^)]+)\)"

unique_train_lines = set()

for station in station_list:
    matches = re.findall(regex_pattern, station)
    if matches:
        for line in matches[0].split(','):
            unique_train_lines.add(line.strip())


routes = pl.DataFrame({
    "route_name": sorted(list(unique_train_lines))
})
routes

route_name
str
"""1"""
"""2"""
"""3"""
"""4"""
"""5"""
…
"""N"""
"""Q"""
"""R"""
"""W"""


In [65]:
#Station_routes table
station_routes = stations.with_columns(
    pl.col("station_complex")
    .str.extract_all(r"\((.*?)\)")
    .map_elements(lambda groups: ','.join(groups), return_dtype=str)
    .str.replace_all(r"\(", "")
    .str.replace_all(r"\)", "")
    .str.split(",")
    .alias("route_list"))

# Step 3: Explode the list into separate rows
stations_exploded = station_routes.explode("route_list")

# Step 4: Select and rename columns to fit the SQL schema, remove duplicates
station_routes = stations_exploded.select([
    pl.col("station_complex_id"),
    pl.col("route_list").alias("route_name")
]).unique()

station_routes = station_routes.sort("station_complex_id")
station_routes

station_complex_id,route_name
str,str
"""1""","""W"""
"""1""","""N"""
"""10""","""R"""
"""10""","""N"""
"""10""","""W"""
…,…
"""97""","""J"""
"""98""","""J"""
"""98""","""M"""
"""99""","""M"""
