In [1]:
#!pip install deltalake
#!pip install duckdb
#!pip install pandas
#!pip install numpy

In [2]:
import duckdb
from deltalake.writer import write_deltalake
from deltalake import DeltaTable
from deltalake import DataCatalog

In [16]:
aws_access_key_id=""
aws_secret_access_key=""
aws_session_token=""
aws_region=""
src_s3_uri=""
target_s3_uri=""
table_name = "tbl_nyc_taxi_trips"

In [12]:
storage_options = {
    "AWS_REGION": aws_region,
    'AWS_ACCESS_KEY_ID': aws_access_key_id,
    'AWS_SECRET_ACCESS_KEY': aws_secret_access_key,
    'AWS_SESSION_TOKEN': aws_session_token,
    'AWS_S3_ALLOW_UNSAFE_RENAME': 'true'
}

In [13]:
create_secret_ddl = f"""
    CREATE OR REPLACE SECRET my_ephem_sts_token 
        ( 
            TYPE S3,
            KEY_ID '{aws_access_key_id}',
            SECRET '{aws_secret_access_key}',
            SESSION_TOKEN '{aws_session_token}',
            REGION '{aws_region}'
        );"""

duckdb.sql(create_secret_ddl).show()

┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ true    │
└─────────┘



In [32]:
src_green_sql = f"""
    SELECT
      gr.vendorid,
      gr.lpep_pickup_datetime as pickup_datetime,
      gr.lpep_dropoff_datetime as dropoff_datetime,
      gr.ratecodeid,
      gr.PULocationID as pickup_location_id,
      gr.DOLocationID as drop_off_location_id,
      gr.passenger_count,
      gr.trip_distance,
      gr.fare_amount,
      gr.extra,
      gr.mta_tax,
      gr.tip_amount,
      gr.tolls_amount,
      gr.ehail_fee,
      gr.improvement_surcharge,
      gr.total_amount,
      gr.payment_type,
      gr.trip_type,
      gr.congestion_surcharge,
      0.0 as airport_fee,
      'green' as cab_type,
      '2024' as trip_year,
      'January' as trip_month
    FROM 
      '{src_s3_uri}/green_tripdata_2024-01.parquet' as gr;
"""

jan_green_cab_trips = duckdb.sql(src_green_sql)

In [33]:
# test AWS connection
jan_green_cab_trips.df().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 56551 entries, 0 to 56550
Data columns (total 23 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   VendorID               56551 non-null  int32         
 1   pickup_datetime        56551 non-null  datetime64[us]
 2   dropoff_datetime       56551 non-null  datetime64[us]
 3   RatecodeID             53136 non-null  float64       
 4   pickup_location_id     56551 non-null  int32         
 5   drop_off_location_id   56551 non-null  int32         
 6   passenger_count        53136 non-null  float64       
 7   trip_distance          56551 non-null  float64       
 8   fare_amount            56551 non-null  float64       
 9   extra                  56551 non-null  float64       
 10  mta_tax                56551 non-null  float64       
 11  tip_amount             56551 non-null  float64       
 12  tolls_amount           56551 non-null  float64       
 13  e

In [34]:
write_deltalake(
    table_or_uri=f"{target_s3_uri}/{table_name}",
    storage_options=storage_options,
    data=jan_green_cab_trips.df(),
    mode='append',
    name=table_name
)

### Load 2nd file with Yellow Taxi Data for Jan, 2024

In [37]:
src_yellow_sql = f"""
    SELECT
      yl.vendorid,
      yl.tpep_pickup_datetime as pickup_datetime,
      yl.tpep_dropoff_datetime as dropoff_datetime,
      yl.ratecodeid,
      yl.PULocationID as pickup_location_id,
      yl.DOLocationID as drop_off_location_id,
      yl.passenger_count,
      yl.trip_distance,
      yl.fare_amount,
      yl.extra,
      yl.mta_tax,
      yl.tip_amount,
      yl.tolls_amount,
      0.0 as ehail_fee,
      yl.improvement_surcharge,
      yl.total_amount,
      yl.payment_type,
      0 as trip_type,
      yl.congestion_surcharge,
      yl.Airport_fee as airport_fee,
      'yellow' as cab_type,
      '2024' as trip_year,
      'January' as trip_month
    FROM 
      '{src_s3_uri}/yellow_tripdata_2024-01.parquet' as yl;
"""
yellow_cab_trips = duckdb.sql(src_yellow_sql)

In [38]:
# test AWS connection
yellow_cab_trips.df().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2964624 entries, 0 to 2964623
Data columns (total 23 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int32         
 1   pickup_datetime        datetime64[us]
 2   dropoff_datetime       datetime64[us]
 3   RatecodeID             float64       
 4   pickup_location_id     int32         
 5   drop_off_location_id   int32         
 6   passenger_count        float64       
 7   trip_distance          float64       
 8   fare_amount            float64       
 9   extra                  float64       
 10  mta_tax                float64       
 11  tip_amount             float64       
 12  tolls_amount           float64       
 13  ehail_fee              float64       
 14  improvement_surcharge  float64       
 15  total_amount           float64       
 16  payment_type           int64         
 17  trip_type              int32         
 18  congestion_surcharge  

In [39]:
write_deltalake(
    table_or_uri=f"{target_s3_uri}/{table_name}",
    storage_options=storage_options,
    data=yellow_cab_trips.df(),
    mode='append',
    name=table_name
)