In [34]:
import os
import requests
import pandas as pd
import pyarrow.parquet as pq

In [41]:
from sqlalchemy import create_engine
from tqdm.auto import tqdm

In [54]:
taxi_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-11.parquet'
zone_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv'
pq_file_name = 'green_trip_data.parquet'

In [55]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [56]:
dtypes = {
    'VendorID': 'Int64',
    'lpep_pickup_datetime': 'datetime64[ns]',
    'lpep_dropoff_datetime': 'datetime64[ns]',
    'store_and_fwd_flag': 'str',
    'RatecodeID': 'Int64',
    'PULocationID': 'Int64',
    'DOLocationID': 'Int64',
    'passenger_count': 'Int64',
    'trip_distance': 'float64',
    'fare_amount': 'float64',
    'extra': 'float64',
    'mta_tax': 'float64',
    'tip_amount': 'float64',
    'tolls_amount': 'float64',
    'ehail_fee': 'float64',
    'improvement_surcharge': 'float64',
    'total_amount': 'float64',
    'payment_type': 'Int64',
    'trip_type': 'Int64',
    'congestion_surcharge': 'float64',
    'cbd_congestion_fee': 'float64'
}

In [57]:
# Check if file exists
if not os.path.exists(file_name):
    # Download file using requests
    print(f'Downloading {url}')
    response = requests.get(url)
    
    # Check if download was successful
    if response.status_code==200:
        with open(file_name, 'wb') as f:
            f.write(response.content)
        print('Download complete!')
    else:
        print(f'Failed to download. Status code: {response.status_code}')
else:
    print(f'File: {file_name} already exists. Skipping download.')

File: green_trip_data.parquet already exists. Skipping download.


In [58]:
# Create a Parquet File object from the disk
pq_file = pq.ParquetFile(file_name)

In [59]:
make_taxi_table = True
pq_iter = pq_file.iter_batches(batch_size=10000)
for chunk in tqdm(pq_iter):
    df = chunk.to_pandas()
    
    # 1. Create table schema for taxi data if one does not exist
    if make_taxi_table:
        df.head(0).to_sql(
            name='green_taxi',
            con=engine,
            if_exists='replace'
        )
        make_taxi_table = False
        print('Table name: green_taxi created')
        
    # 2. Transform data types and load to sql
    print(f"Processing a chunk of {len(df)} rows...")
    df.astype(dtypes)
    df.to_sql(name='green_taxi', con=engine, if_exists='append')

0it [00:00, ?it/s]

Table name: green_taxi created
Processing a chunk of 10000 rows...
Processing a chunk of 10000 rows...
Processing a chunk of 10000 rows...
Processing a chunk of 10000 rows...
Processing a chunk of 6912 rows...


In [60]:
# Create table schema for zones and load data to sql
df_zones = pd.read_csv(zone_url)
df_zones.to_sql(name='zones', con=engine, if_exists='replace')
print(f'Loaded {len(df_zones)} rows to zones')

Loaded 265 rows to zones


In [61]:
!jupyter nbconvert --to script data_ingestion.ipynb

[NbConvertApp] Converting notebook data_ingestion.ipynb to script
[NbConvertApp] Writing 2770 bytes to data_ingestion.py
