In [4]:
import sys
import os
from pathlib import Path

# Get the root directory by moving up one level from "notebooks/"
root_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))

# Add root directory to sys.path
sys.path.append(root_dir)

# Now you can import using the module path
from pipeline.utils.duckdb_wrapper import DuckDBWrapper


In [5]:
# Initialize the DuckDBWrapper (in-memory DuckDB instance) You can connect directly to a DuckDB file by adding the path like con = DuckDBWrapper()
con = DuckDBWrapper()

In [None]:
# Define the repository root and base path
repo_root = Path.cwd().resolve().parents[0]  # Adjust to locate the repo root
base_path = "data/opendata"

# Define table names for normal (non-partitioned) files
bulk_table_names = [

]

# Define table names for partitioned files
partitioned_table_names = [
    "mta_subway_hourly_ridership",
    "mta_subway_origin_destination_2023",
    "mta_subway_origin_destination_2024",
    "crime_nypd_arrests",
    "nyc_threeoneone_requests"
]

# Register normal files as views
con.bulk_register_data(
    repo_root=repo_root,
    base_path=base_path,
    table_names=bulk_table_names,
    wildcard="*.parquet",
    as_table=False,  # Default, registers as views
    show_tables=False
)

# Register partitioned files as views
con.bulk_register_partitioned_data(
    repo_root=repo_root,
    base_path=base_path,
    table_names=partitioned_table_names,
    wildcard="year=*/month=*/*.parquet",
    as_table=False,  # Default, registers as views
    show_tables=True  # Show the registered tables
)

In [7]:
query = f"""

SELECT * from mta_subway_hourly_ridership limit 20000

"""

result = con.run_query(query)

print(result)


shape: (20_000, 13)
┌──────────────┬───────────┬──────────────┬─────────────┬───┬───────────┬───────────┬───────┬──────┐
│ fare_class_c ┆ latitude  ┆ transit_mode ┆ station_com ┆ … ┆ transfers ┆ borough   ┆ month ┆ year │
│ ategory      ┆ ---       ┆ ---          ┆ plex_id     ┆   ┆ ---       ┆ ---       ┆ ---   ┆ ---  │
│ ---          ┆ f64       ┆ str          ┆ ---         ┆   ┆ f64       ┆ str       ┆ str   ┆ i64  │
│ str          ┆           ┆              ┆ str         ┆   ┆           ┆           ┆       ┆      │
╞══════════════╪═══════════╪══════════════╪═════════════╪═══╪═══════════╪═══════════╪═══════╪══════╡
│ OMNY - Full  ┆ 40.763973 ┆ subway       ┆ 224         ┆ … ┆ 0.0       ┆ Manhattan ┆ 03    ┆ 2022 │
│ Fare         ┆           ┆              ┆             ┆   ┆           ┆           ┆       ┆      │
│ OMNY - Full  ┆ 40.745907 ┆ subway       ┆ 165         ┆ … ┆ 2.0       ┆ Manhattan ┆ 03    ┆ 2022 │
│ Fare         ┆           ┆              ┆             ┆   ┆          

In [8]:
#If you want a better looking table, set show_results=True. I'd recomend capping the limit at about 50 rows
#T

query = f"""

SELECT * from mta_subway_hourly_ridership limit 10
"""

result = con.run_query(query,show_results=True)


DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results,DuckDB Query Results
SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...,SELECT * from mta_subway_hourly_ridership limit ...
fare_class_category,latitude,transit_mode,station_complex_id,longitude,station_complex,payment_method,ridership,transit_timestamp,transfers,borough,month,year
OMNY - Full Fare,40.764,subway,224,−73.977,57 St (F),omny,21.0,2022-03-01 00:00:00.000000,0.0,Manhattan,3,2022.0
OMNY - Full Fare,40.746,subway,165,−73.998,"23 St (C,E)",omny,8.0,2022-03-01 00:00:00.000000,2.0,Manhattan,3,2022.0
Metrocard - Unlimited 7-Day,40.764,subway,224,−73.977,57 St (F),metrocard,24.0,2022-03-01 00:00:00.000000,0.0,Manhattan,3,2022.0
Metrocard - Full Fare,40.733,subway,118,−73.986,3 Av (L),metrocard,4.0,2022-03-01 00:00:00.000000,1.0,Manhattan,3,2022.0
Metrocard - Other,40.775,subway,1,−73.912,"Astoria-Ditmars Blvd (N,W)",metrocard,1.0,2022-03-01 00:00:00.000000,0.0,Queens,3,2022.0
Metrocard - Unlimited 7-Day,40.7,subway,279,−73.808,"Sutphin Blvd-Archer Av-JFK Airport (E,J,Z)",metrocard,14.0,2022-03-01 00:00:00.000000,0.0,Queens,3,2022.0
Metrocard - Fair Fare,40.726,subway,168,−74.004,"Spring St (C,E)",metrocard,3.0,2022-03-01 00:00:00.000000,0.0,Manhattan,3,2022.0
Metrocard - Other,40.714,subway,234,−73.990,East Broadway (F),metrocard,2.0,2022-03-01 00:00:00.000000,0.0,Manhattan,3,2022.0
OMNY - Full Fare,40.721,subway,169,−74.005,"Canal St (A,C,E)",omny,25.0,2022-03-01 00:00:00.000000,6.0,Manhattan,3,2022.0
Metrocard - Unlimited 30-Day,40.707,subway,100,−73.953,"Hewes St (M,J)",metrocard,1.0,2022-03-01 00:00:00.000000,0.0,Brooklyn,3,2022.0


In [9]:
#More complicated query

query = f"""

WITH weekly_ridership AS (
    SELECT 
        station_complex, 
        DATE_TRUNC('week', transit_timestamp) AS week_start,
        SUM(ridership) AS total_weekly_ridership,
        MIN(latitude) AS latitude,  -- Assuming latitude is the same for each station complex, use MIN() or MAX()
        MIN(longitude) AS longitude  -- Assuming longitude is the same for each station complex, use MIN() or MAX()
    FROM 
        mta_subway_hourly_ridership
    GROUP BY 
        station_complex, 
        DATE_TRUNC('week', transit_timestamp)
),
weekly_weather AS (
    SELECT 
        DATE_TRUNC('week', date) AS week_start,
        AVG(temperature_mean) AS avg_weekly_temperature,
        SUM(precipitation_sum) AS total_weekly_precipitation
    FROM 
        daily_weather_asset
    GROUP BY 
        DATE_TRUNC('week', date)
)
SELECT 
    wr.station_complex, 
    wr.week_start, 
    wr.total_weekly_ridership,
    wr.latitude,
    wr.longitude,
    ww.avg_weekly_temperature,
    ww.total_weekly_precipitation
FROM 
    weekly_ridership wr
LEFT JOIN 
    weekly_weather ww
ON 
    wr.week_start = ww.week_start
WHERE 
    wr.week_start < '2024-09-17'
ORDER BY 
    wr.station_complex, 
    wr.week_start
 
"""

result = con.run_query(query)

print(result)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

shape: (57_349, 7)
┌───────────────┬────────────┬───────────────┬───────────┬───────────┬──────────────┬──────────────┐
│ station_compl ┆ week_start ┆ total_weekly_ ┆ latitude  ┆ longitude ┆ avg_weekly_t ┆ total_weekly │
│ ex            ┆ ---        ┆ ridership     ┆ ---       ┆ ---       ┆ emperature   ┆ _precipitati │
│ ---           ┆ date       ┆ ---           ┆ f64       ┆ f64       ┆ ---          ┆ on           │
│ str           ┆            ┆ f64           ┆           ┆           ┆ f64          ┆ ---          │
│               ┆            ┆               ┆           ┆           ┆              ┆ f64          │
╞═══════════════╪════════════╪═══════════════╪═══════════╪═══════════╪══════════════╪══════════════╡
│ 1 Av (L)      ┆ 2022-02-28 ┆ 87331.0       ┆ 40.730953 ┆ -73.98163 ┆ 37.042857    ┆ 5.8          │
│ 1 Av (L)      ┆ 2022-03-07 ┆ 102571.0      ┆ 40.730953 ┆ -73.98163 ┆ 40.0         ┆ 34.6         │
│ 1 Av (L)      ┆ 2022-03-14 ┆ 100996.0      ┆ 40.730953 ┆ -73.98163 ┆ 4

In [10]:
# Show the tables registered
con.show_tables()


In [11]:
# Show the schema of a specific table
con.show_schema("mta_subway_hourly_ridership")

In [None]:
query = f"""

SELECT * from mta_subway_hourly_ridership where year=2024 limit 100 

"""

result = con.run_query(query)

print(result)

repo_root = Path.cwd().resolve().parents[0]  # Adjust to locate the repo root
base_path = repo_root / "data/exports"
file_name = "mta_subway_hourly_ridership_data_sample"
file_type= "csv"
# Export the query result to CSV
con.export(result, file_type=file_type, base_path=base_path, file_name=file_name)