In [1]:
import os
import dlt

import pandas as pd
import numpy as np
import duckdb

import os
import sys
from pathlib import Path

In [2]:
MAIN_DIR = Path(os.getcwd()).parent

data_path = os.path.join(MAIN_DIR, 'datasets', 'NYC_taxi')

In [54]:
year, month = "2023", "january" ### Change year and month on your choice
parquet_file = os.path.join(data_path, year, f"{year}-{month}.parquet")

In [55]:
df = pd.read_parquet(parquet_file, engine="pyarrow")

In [56]:
data = df.to_dict(orient="records")

pipeline = dlt.pipeline(
    pipeline_name="green_taxi_pipeline",
    destination="duckdb",
    dataset_name="green_trip",
)

load_info = pipeline.run(data, table_name="january_2023")

### Seeing what DuckDB container has

In [57]:
conn = duckdb.connect("green_taxi_pipeline.duckdb")

# let's see the tables
conn.sql("SET search_path = 'green_trip'")
print('Loaded tables: ')
display(conn.sql("show tables"))


Loaded tables: 


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ january_2023        │
└─────────────────────┘

In [58]:
# and the data

print("\n\n\n january_2023 table below:")

rides = conn.sql("""
    SELECT * FROM january_2023 
    LIMIT 3000      
                 """).df()
display(rides)




 january_2023 table below:


Unnamed: 0,vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecode_id,pu_location_id,do_location_id,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,_dlt_load_id,_dlt_id
0,2,2023-01-01 00:26:10+00:00,2023-01-01 00:37:11+00:00,N,1.0,166,143,1.0,2.58,14.9,...,0.5,4.03,0.0,1.0,24.18,1.0,1.0,2.75,1707911647.7917085,xodM+iJG4t0DFw
1,2,2023-01-01 00:51:03+00:00,2023-01-01 00:57:49+00:00,N,1.0,24,43,1.0,1.81,10.7,...,0.5,2.64,0.0,1.0,15.84,1.0,1.0,0.00,1707911647.7917085,sWRdKkekpus+7g
2,2,2023-01-01 00:35:12+00:00,2023-01-01 00:41:32+00:00,N,1.0,223,179,1.0,0.00,7.2,...,0.5,1.94,0.0,1.0,11.64,1.0,1.0,0.00,1707911647.7917085,kYR0fitHJYfqAw
3,1,2023-01-01 00:13:14+00:00,2023-01-01 00:19:03+00:00,N,1.0,41,238,1.0,1.30,6.5,...,1.5,1.70,0.0,1.0,10.20,1.0,1.0,0.00,1707911647.7917085,RKqUkxjHuLlNmA
4,1,2023-01-01 00:33:04+00:00,2023-01-01 00:39:02+00:00,N,1.0,41,74,1.0,1.10,6.0,...,1.5,0.00,0.0,1.0,8.00,1.0,1.0,0.00,1707911647.7917085,O4pkJrCBlC+TRw
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2995,2,2023-01-03 08:50:43+00:00,2023-01-03 08:56:49+00:00,N,1.0,74,43,6.0,1.35,8.6,...,0.5,2.02,0.0,1.0,12.12,1.0,1.0,0.00,1707911647.7917085,GeXAHCxs8s0hyQ
2996,2,2023-01-03 08:26:15+00:00,2023-01-03 08:36:43+00:00,N,1.0,74,75,1.0,1.95,12.1,...,0.5,0.00,0.0,1.0,13.60,2.0,1.0,0.00,1707911647.7917085,JPv1I92b8mau+A
2997,1,2023-01-03 08:35:35+00:00,2023-01-03 08:41:03+00:00,N,1.0,41,151,3.0,0.90,7.2,...,1.5,2.15,0.0,1.0,10.85,1.0,1.0,0.00,1707911647.7917085,0XrxAZPbvTyBOQ
2998,1,2023-01-03 08:49:42+00:00,2023-01-03 09:04:11+00:00,N,1.0,41,75,1.0,1.60,12.1,...,1.5,0.00,0.0,1.0,13.60,2.0,1.0,0.00,1707911647.7917085,OEmwvYqcUYBRPw


Notice that all uppercase character on each column has been turned to lowercase. 

In [7]:
rides.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3000 entries, 0 to 2999
Data columns (total 21 columns):
 #   Column                 Non-Null Count  Dtype                  
---  ------                 --------------  -----                  
 0   vendor_id              3000 non-null   int64                  
 1   lpep_pickup_datetime   3000 non-null   datetime64[us, Etc/UTC]
 2   lpep_dropoff_datetime  3000 non-null   datetime64[us, Etc/UTC]
 3   store_and_fwd_flag     3000 non-null   object                 
 4   ratecode_id            3000 non-null   float64                
 5   pu_location_id         3000 non-null   int64                  
 6   do_location_id         3000 non-null   int64                  
 7   passenger_count        3000 non-null   float64                
 8   trip_distance          3000 non-null   float64                
 9   fare_amount            3000 non-null   float64                
 10  extra                  3000 non-null   float64                
 11  mta_

### Continuing using a Pipeline

In [59]:
from dlt.sources.helpers import requests

# Specify the URL of the API endpoint
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-02.parquet"

# Make a request and check if it was successful
response = requests.get(url)
response.raise_for_status()

In [60]:
response

<Response [200]>

In [61]:
df2 = pd.read_parquet(url, engine="pyarrow")

In [62]:
df2.tail()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
64804,2,2023-02-28 21:49:00,2023-02-28 22:04:00,,,7,263,,3.83,16.7,0.0,0.0,2.0,0.0,,1.0,22.45,,,
64805,2,2023-02-28 21:05:00,2023-02-28 21:08:00,,,41,42,,0.67,12.95,0.0,0.0,2.79,0.0,,1.0,16.74,,,
64806,2,2023-02-28 22:42:00,2023-02-28 22:59:00,,,166,141,,3.91,19.6,0.0,0.0,3.5,0.0,,1.0,26.85,,,
64807,2,2023-02-28 23:21:00,2023-02-28 23:38:00,,,41,244,,4.54,19.36,0.0,0.0,4.07,0.0,,1.0,24.43,,,
64808,2,2023-02-28 23:10:00,2023-02-28 23:15:00,,,75,140,,2.61,13.18,0.0,0.0,3.39,0.0,,1.0,20.32,,,


In [63]:
load_info_feb = pipeline.run(df2.to_dict(orient="records"), table_name="february_2023")

In [65]:
# let's see the tables
conn.sql("SET search_path = 'green_trip'")
print('Loaded tables: ')
display(conn.sql("show tables"))

Loaded tables: 


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ february_2023       │
│ january_2023        │
└─────────────────────┘

In [81]:
year, month = "2023", "march" ### Change year and month on your choice
parquet_file = os.path.join(data_path, year, f"{year}-{month}.parquet")
df3 = pd.read_parquet(parquet_file, engine="pyarrow")
load_info_mar = pipeline.run(df3.to_dict(orient="records"), table_name="march_2023")

In [82]:
# let's see the tables
conn.sql("SET search_path = 'green_trip'")
print('Loaded tables: ')
display(conn.sql("show tables"))

Loaded tables: 


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ february_2023       │
│ january_2023        │
│ march_2023          │
└─────────────────────┘

In [83]:
print("\n\n\n march_2023 table below:")

rides = conn.sql("""
    SELECT * FROM march_2023 
    LIMIT 3000    
                 """).df()
display(rides)




 march_2023 table below:


Unnamed: 0,vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,ratecode_id,pu_location_id,do_location_id,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,_dlt_load_id,_dlt_id
0,2,2023-03-01 00:25:10+00:00,2023-03-01 00:35:47+00:00,N,1.0,82,196,1.0,2.36,13.5,...,0.5,0.00,0.0,1.0,16.00,2.0,1.0,0.00,1707912047.8666115,KkuPkt0arM6hHQ
1,2,2023-03-01 00:14:29+00:00,2023-03-01 00:25:04+00:00,N,1.0,7,7,1.0,0.78,-6.5,...,-0.5,0.00,0.0,-1.0,-9.00,3.0,1.0,0.00,1707912047.8666115,NeHoYShtckc6lQ
2,2,2023-03-01 00:14:29+00:00,2023-03-01 00:25:04+00:00,N,1.0,7,7,1.0,0.78,6.5,...,0.5,0.00,0.0,1.0,9.00,3.0,1.0,0.00,1707912047.8666115,KffejtHi8K5+JQ
3,2,2023-02-28 22:59:46+00:00,2023-02-28 23:08:38+00:00,N,1.0,166,74,1.0,1.66,11.4,...,0.5,2.78,0.0,1.0,16.68,1.0,1.0,0.00,1707912047.8666115,+eLdW8GovsiLOg
4,2,2023-03-01 00:54:03+00:00,2023-03-01 01:03:14+00:00,N,1.0,236,229,1.0,3.14,15.6,...,0.5,4.17,0.0,1.0,25.02,1.0,1.0,2.75,1707912047.8666115,ZIJmMp/ao2Apvg
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2995,2,2023-03-02 08:55:55+00:00,2023-03-02 09:10:38+00:00,N,1.0,130,218,1.0,2.35,15.6,...,0.5,5.13,0.0,1.0,22.23,1.0,1.0,0.00,1707912047.8666115,sM07JaQnXO3hpQ
2996,2,2023-03-02 09:04:21+00:00,2023-03-02 09:18:56+00:00,N,1.0,75,237,1.0,2.21,14.2,...,0.5,1.00,0.0,0.3,18.75,1.0,1.0,2.75,1707912047.8666115,Tt00hfmiXrJIgA
2997,2,2023-03-02 09:49:35+00:00,2023-03-02 10:03:44+00:00,N,1.0,41,238,1.0,2.43,15.6,...,0.5,4.79,0.0,0.3,23.94,1.0,1.0,2.75,1707912047.8666115,QA0TX/EeRLUg3Q
2998,2,2023-03-02 09:08:13+00:00,2023-03-02 09:13:14+00:00,N,1.0,41,43,1.0,1.27,7.9,...,0.5,1.88,0.0,1.0,11.28,1.0,1.0,0.00,1707912047.8666115,fE9gAAYg5chuDw


### Load data to PostgreSQL

In [75]:
pipeline_postgres = dlt.pipeline(
    pipeline_name='pipeline_postgres', 
    destination=dlt.destinations.postgres(credentials="postgresql://postgres:postgres@localhost:5432/postgres"), 
    dataset_name='green_trip_postgres'
)

In [76]:
load_info_postgre = pipeline_postgres.run(df3[:10].to_dict(orient="records"), table_name="march_2023")

In [84]:
year, month = "2023", "april" ### Change year and month on your choice
parquet_file = os.path.join(data_path, year, f"{year}-{month}.parquet")
df4 = pd.read_parquet(parquet_file, engine="pyarrow")
df4.tail()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
65387,2,2023-04-30 22:02:00,2023-04-30 22:12:00,,,42,168,,1.97,11.41,0.0,0.0,3.48,0.0,,1.0,15.89,,,
65388,2,2023-04-30 23:16:00,2023-04-30 23:43:00,,,130,170,,12.97,41.96,0.0,0.0,2.61,6.55,,1.0,54.87,,,
65389,2,2023-04-30 23:25:00,2023-04-30 23:40:00,,,36,112,,3.44,13.5,0.0,0.0,3.9,0.0,,1.0,18.4,,,
65390,2,2023-04-30 23:36:00,2023-04-30 23:57:00,,,112,236,,5.49,25.02,0.0,0.0,0.0,0.0,,1.0,28.77,,,
65391,2,2023-04-30 23:24:00,2023-04-30 23:44:00,,,80,74,,9.23,30.51,0.0,0.0,7.61,6.55,,1.0,45.67,,,


In [85]:
load_info_postgre = pipeline_postgres.run(df4[:10].to_dict(orient="records"), table_name="april_2023")

#### Using 'Replace' without duplicating the data

In [128]:
# The response contains a list of issues
load_info_replace = pipeline_postgres.run(
    df4[:20].to_dict(orient="records"),
    table_name="april_2023",
    write_disposition="replace"  # <-- Add this line
)


#### Load only new data (incremental loading)

In [123]:
df4[20:25]

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
20,2,2023-04-01 00:03:42,2023-04-01 00:18:09,N,1.0,244,151,1.0,3.74,19.1,1.0,0.5,2.0,0.0,,1.0,23.6,1.0,1.0,0.0
21,2,2023-04-01 00:22:00,2023-04-01 00:36:41,N,1.0,97,89,1.0,3.02,12.5,0.5,0.5,0.0,0.0,,0.3,13.8,1.0,1.0,0.0
22,2,2023-04-01 00:56:20,2023-04-01 01:13:41,N,1.0,75,229,2.0,2.85,18.4,1.0,0.5,0.0,0.0,,1.0,23.65,2.0,1.0,2.75
23,2,2023-04-01 00:19:01,2023-04-01 00:25:29,N,1.0,255,256,1.0,1.16,7.9,1.0,0.5,2.6,0.0,,1.0,13.0,1.0,1.0,0.0
24,2,2023-04-01 00:46:55,2023-04-01 01:25:23,N,1.0,112,244,1.0,10.93,51.3,1.0,0.5,4.0,0.0,,1.0,60.55,1.0,1.0,2.75


In [134]:
@dlt.resource(table_name="april_2023", write_disposition="append")
def get_new_records():
    # Using the generator function
    for _, row in df4[25:40].iterrows():
        yield row.to_dict()

In [135]:
load_info = pipeline_postgres.run(get_new_records)

In [136]:
row_counts = pipeline_postgres.last_trace.last_normalize_info

In [137]:
print(row_counts)
print("------")
print(load_info)

Normalized data for the following tables:
- april_2023: 15 row(s)

Load package 1707937276.143141 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs
------
Pipeline pipeline_postgres load step completed in 0.23 seconds
1 load package(s) were loaded to destination postgres and into dataset green_trip_postgres
The postgres destination used postgresql://postgres:***@localhost:5432/postgres location to store data
Load package 1707937276.143141 is LOADED and contains no failed jobs
