In [1]:
import pandas as pd
import requests
import dlt
import io
from loguru import logger
from datetime import datetime
from dateutil.relativedelta import relativedelta

In [2]:
def get_months_between_dates(start_date: str, end_date: str):
    start = datetime.strptime(start_date, '%Y-%m')
    end = datetime.strptime(end_date, '%Y-%m')
    
    months = []
    
    while start <= end:
        months.append(start.strftime('%Y-%m'))
        start += relativedelta(months=1)
    
    return months

months_green_yellow = get_months_between_dates('2019-01', '2020-12')
months_fhv = get_months_between_dates('2019-01', '2019-12')

In [3]:
green_taxi_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_{month}.csv.gz'
yellow_taxi_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_{month}.csv.gz'
fhv_taxi_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_{month}.csv.gz'

In [11]:
def load_data_from_url(url: str,
                       is_chunks: bool=False,
                       chunksize: int = 1000000):
    response = requests.get(url)
    response.raise_for_status()
    if is_chunks:
        for chunk in pd.read_csv(filepath_or_buffer=io.BytesIO(response.content), compression='gzip', chunksize=chunksize):
            yield chunk
    else:
        return pd.read_csv(filepath_or_buffer=io.BytesIO(response.content), compression='gzip')

In [12]:
pipeline = dlt.pipeline(
    pipeline_name='taxi_data_to_postgres',
    destination='postgres',
    dataset_name='public'
)

In [None]:
# upload green_taxi data

for month in months_green_yellow:
    df_green = load_data_from_url(green_taxi_url.format(month=month))
    
    pipeline.run(
        data=df_green,
        table_name='green_taxi',
        write_disposition='append'
        )
    logger.info(f'inserted green data for {month}')

In [None]:
# upload yellow_taxi data

for month in months_green_yellow:
    count_chunks = 0
    for chunk in load_data_from_url(
        url=yellow_taxi_url.format(month=month),
        is_chunks=True
        ):
        try:
            pipeline.run(
                data=chunk,
                table_name='yellow_taxi',
                write_disposition='append',
                )
            count_chunks += 1  
        except Exception as e:
            print(f'Error loading chunk : {count_chunks} for {month}')
    logger.info(f'inserted yellow data for {month}, chunks_count: {count_chunks}')

[32m2025-02-23 12:30:40.286[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m13[0m - [1minserted yellow data for 2019-01, chunks_count: 8[0m
[32m2025-02-23 12:33:32.781[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m13[0m - [1minserted yellow data for 2019-02, chunks_count: 8[0m
[32m2025-02-23 12:36:52.022[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m13[0m - [1minserted yellow data for 2019-03, chunks_count: 8[0m
[32m2025-02-23 12:39:57.616[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m13[0m - [1minserted yellow data for 2019-04, chunks_count: 8[0m
[32m2025-02-23 12:43:37.142[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m13[0m - [1minserted yellow data for 2019-05, chunks_count: 8[0m
[32m2025-02-23 12:46:06.788[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m13[0m - [1minserted yellow data for 2019-06, chunks_count: 7[0m
  for chunk in pd.read_csv(f

In [None]:
# upload fhv dataset

for month in months_fhv:
    df_fhv = load_data_from_url(fhv_taxi_url.format(month=month))

    pipeline.run(
        data=df_fhv,
        table_name='fhv_taxi',
        write_disposition='append'
        )
    logger.info(f'inserted fhv data for {month}')

[32m2025-02-23 01:11:00.231[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1minserted fhv data for 2019-01[0m
[32m2025-02-23 01:11:29.282[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1minserted fhv data for 2019-02[0m
[32m2025-02-23 01:11:49.786[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1minserted fhv data for 2019-03[0m
[32m2025-02-23 01:12:09.709[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1minserted fhv data for 2019-04[0m
[32m2025-02-23 01:12:34.463[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1minserted fhv data for 2019-05[0m
[32m2025-02-23 01:13:15.690[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1minserted fhv data for 2019-06[0m
[32m2025-02-23 01:14:05.124[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1minserted fhv data for 2019-07[0m
[32m2025-02-

In [None]:
# upload taxi_zone_lookup
# better use dbt seed

df = pd.read_csv(
    filepath_or_buffer='../zoomcamp/seeds/taxi_zone_lookup.csv'
)

pipeline.run(
    data=df,
    table_name='taxi_zone_lookup',
    write_disposition='append'
    )