In [10]:
import pandas as pd
import pyarrow.parquet as pq
import fsspec
import polars as pl
from tqdm.auto import tqdm
from sqlalchemy import create_engine
import time
import psycopg2
# Read a sample of the data
# prefix = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/'
# df = pd.read_csv(prefix + '/yellow_tripdata_2021-01.csv.gz', nrows=100)

In [3]:
# Read a sample of the data
prefix = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/'
# df = pd.read_csv(prefix + '/yellow_tripdata_2021-01.csv.gz', nrows=100)

In [4]:
dtype = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "RatecodeID": "Int64",
    "store_and_fwd_flag": "string",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64"
}

parse_dates = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime"
]

df_iter = pd.read_csv(
    prefix + '/yellow_tripdata_2021-01.csv.gz',
    dtype=dtype,
    parse_dates=parse_dates,
    iterator=True,
    chunksize=100000
)

In [5]:
db = 'postgresql://root:root@localhost:5432/ny_taxi'
engine = create_engine(db)

In [6]:
first = True
start = time.time()
records=0
for df_chunk in df_iter:
    if first:
        # Create table schema (no data)
        df_chunk.head(0).to_sql(
            name="yellow_taxi_data_202101",
            con=engine,
            if_exists="replace"
        )
        first = False
        print("Table created")

    # Insert chunk
    df_chunk.to_sql(
        name="yellow_taxi_data_202101",
        con=engine,
        if_exists="append"
    )

    print("Inserted:", len(df_chunk))
    records+=len(df_chunk)
print(f'{time.time()-start}s spent inserting {records} records.')

Table created
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 100000
Inserted: 69765
109.30260610580444s spent inserting 1369765 records.


In [7]:
# Read Parquet data with polars
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet'
pl_dtype = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "RatecodeID": "Int64",
    "store_and_fwd_flag": "string",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64"
}
for k,v in dtype.items():
    if v == 'Int64':
        pl_dtype[k] = pl.Int64
    if v == 'float64':
        pl_dtype[k] = pl.Float64
    if v == 'string':
        pl_dtype[k] = pl.String

df = pl.read_parquet(url).cast(pl_dtype)

In [8]:
start = time.time()
df.write_database(
    table_name="yellow_taxi_data_202511_pl",
    connection=db,
    if_table_exists="replace", # or "append"
    engine="adbc"              # Faster engine for Postgres
)
print(f'{time.time()-start}s spent inserting {len(df)} records.')

32.673789978027344s spent inserting 4181444 records.


In [12]:
conn = psycopg2.connect(db)
conn.set_session(autocommit=True) # REQUIRED for VACUUM
cur = conn.cursor()
cur.execute("VACUUM ANALYZE yellow_taxi_data_202511_pl;")
cur.close()
conn.close()

In [13]:
engine.dispose()