In [18]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import re
import hashlib
from sqlalchemy import text

In [19]:
# https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet
# https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-01.parquet

In [20]:
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/"
color = "green"
year = 2025
month = 1
file_name = f"{color}_tripdata_{year}-{month:02d}.parquet"
source_url = f"{url}{file_name}"

df = pd.read_parquet(source_url)
df

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee
0,2,2025-01-01 00:03:01,2025-01-01 00:17:12,N,1.0,75,235,1.0,5.93,24.70,...,0.5,6.80,0.00,,1.0,34.00,1.0,1.0,0.00,0.0
1,2,2025-01-01 00:19:59,2025-01-01 00:25:52,N,1.0,166,75,1.0,1.32,8.60,...,0.5,0.00,0.00,,1.0,11.10,2.0,1.0,0.00,0.0
2,2,2025-01-01 00:05:29,2025-01-01 00:07:21,N,5.0,171,73,1.0,0.41,25.55,...,0.0,0.00,0.00,,1.0,26.55,2.0,2.0,0.00,0.0
3,2,2025-01-01 00:52:24,2025-01-01 01:07:52,N,1.0,74,223,1.0,4.12,21.20,...,0.5,6.13,6.94,,1.0,36.77,1.0,1.0,0.00,0.0
4,2,2025-01-01 00:25:05,2025-01-01 01:01:10,N,1.0,66,158,1.0,4.71,33.80,...,0.5,7.81,0.00,,1.0,46.86,1.0,1.0,2.75,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
48321,2,2025-01-31 19:36:00,2025-01-31 20:05:00,,,179,132,,13.99,55.61,...,0.5,11.42,0.00,,1.0,68.53,,,,
48322,2,2025-01-31 20:33:00,2025-01-31 20:41:00,,,166,75,,1.51,13.58,...,0.5,2.26,0.00,,1.0,17.34,,,,
48323,2,2025-01-31 21:09:00,2025-01-31 21:30:00,,,41,42,,2.90,30.89,...,0.5,0.00,0.00,,1.0,32.39,,,,
48324,2,2025-01-31 22:22:00,2025-01-31 22:25:00,,,75,43,,0.34,14.78,...,0.5,2.44,0.00,,1.0,18.72,,,,


In [21]:
df.dtypes

VendorID                          int32
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag               object
RatecodeID                      float64
PULocationID                      int32
DOLocationID                      int32
passenger_count                 float64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
dtype: object

In [5]:
unique_combined = (
    df['VendorID'].astype(str) + 
    df['lpep_pickup_datetime'].astype(str) + 
    df['lpep_dropoff_datetime'].astype(str) + 
    df['PULocationID'].astype(str) + 
    df['DOLocationID'].astype(str) + 
    df['fare_amount'].astype(str) + 
    df['trip_distance'].astype(str)
)

# Hash semuanya sekaligus
df['unique_id'] = unique_combined.apply(lambda x: hashlib.md5(x.encode()).hexdigest())

In [6]:
df['file_name'] =  file_name

In [7]:
df.sort_values(by=["lpep_dropoff_datetime"], ascending=True, inplace=True)
df

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee,unique_id,file_name
320,2,2024-12-25 23:13:15,2024-12-25 23:13:17,N,5.0,7,264,3.0,0.00,35.0,...,0.00,,1.0,43.20,1.0,2.0,0.00,0.0,ee5c8607547e4290679cfeaef18f7d17,green_tripdata_2025-01.parquet
4108,2,2024-12-29 01:23:41,2024-12-29 01:41:08,N,5.0,92,75,6.0,10.15,65.0,...,6.94,,1.0,72.94,2.0,2.0,0.00,0.0,5ac7ed2dfb7ddbe5896c8c74042e3c1d,green_tripdata_2025-01.parquet
251,2,2024-12-31 19:56:14,2024-12-31 20:03:33,N,1.0,82,129,1.0,1.00,8.6,...,0.00,,1.0,13.32,1.0,1.0,0.00,0.0,779622295c75105699e107d134cf11b9,green_tripdata_2025-01.parquet
9,2,2024-12-31 22:42:13,2024-12-31 22:42:31,N,3.0,74,74,1.0,0.06,23.0,...,0.00,,1.0,25.00,2.0,1.0,0.00,0.0,61ddebb2d9a897325eb5c518eb9aae5b,green_tripdata_2025-01.parquet
10,2,2024-12-31 23:01:11,2024-12-31 23:04:29,N,1.0,75,236,1.0,0.66,5.8,...,0.00,,1.0,13.81,1.0,1.0,2.75,0.0,19117780bd5ef061297ff9467bcc4355,green_tripdata_2025-01.parquet
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
44795,2,2025-02-05 14:07:59,2025-02-05 14:22:09,N,1.0,65,40,1.0,1.39,13.5,...,0.00,,1.0,16.00,2.0,1.0,0.00,0.0,2ad72e8bd7f2c7c4f683fb86098cd56a,green_tripdata_2025-01.parquet
44826,2,2025-02-05 15:19:41,2025-02-05 15:29:16,N,1.0,97,49,1.0,0.98,9.3,...,0.00,,1.0,11.80,2.0,1.0,0.00,0.0,a07c30bafc7d08e13071e0e90afd2f0f,green_tripdata_2025-01.parquet
44827,2,2025-02-05 15:55:48,2025-02-05 16:20:49,N,1.0,97,17,1.0,2.42,21.9,...,0.00,,1.0,29.28,1.0,1.0,0.00,0.0,78cb5e2e51947da9b4df1815bd86942c,green_tripdata_2025-01.parquet
44855,2,2025-02-05 17:18:31,2025-02-05 17:35:41,N,1.0,65,49,1.0,1.73,15.6,...,0.00,,1.0,21.72,1.0,1.0,0.00,0.0,2b20ac8621e74c66d1f0a5a4a84baab0,green_tripdata_2025-01.parquet


In [8]:
df.dtypes

VendorID                          int32
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag               object
RatecodeID                      float64
PULocationID                      int32
DOLocationID                      int32
passenger_count                 float64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
unique_id                        object
file_name                        object
dtype: object

In [9]:
df.shape

(48326, 23)

In [10]:
cast_data_types = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "RatecodeID": "Int64",
    "store_and_fwd_flag": "string",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "file_name": "string",
    "unique_id": "string"
}

In [11]:
df = df.astype(cast_data_types)

In [12]:
df.dtypes

VendorID                          Int64
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag       string[python]
RatecodeID                        Int64
PULocationID                      Int64
DOLocationID                      Int64
passenger_count                   Int64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                      Int64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
unique_id                string[python]
file_name                string[python]
dtype: object

In [13]:
# 1. Cabut kolom dari posisi lama dan simpan ke variabel sementara
col_uid = df.pop('unique_id')
col_filename = df.pop('file_name')

# 2. Masukkan kembali ke posisi paling depan (index 0 dan 1)
df.insert(0, 'unique_id', col_uid)
df.insert(1, 'file_name', col_filename)

In [14]:
df.dtypes

unique_id                string[python]
file_name                string[python]
VendorID                          Int64
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag       string[python]
RatecodeID                        Int64
PULocationID                      Int64
DOLocationID                      Int64
passenger_count                   Int64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                      Int64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
dtype: object

In [15]:
def to_snake_case(name):
    # Tambahkan underscore sebelum huruf besar yang diawali huruf kecil (VendorID -> Vendor_ID)
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    # Pisahkan singkatan besar (Vendor_ID -> vendor_id)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

In [16]:
df.columns = [to_snake_case(col) for col in df.columns]

In [17]:
df.dtypes

unique_id                string[python]
file_name                string[python]
vendor_id                         Int64
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag       string[python]
ratecode_id                       Int64
pu_location_id                    Int64
do_location_id                    Int64
passenger_count                   Int64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                      Int64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
dtype: object

In [None]:
from sqlalchemy import create_engine

In [None]:
engine = create_engine('postgresql://dateng26:dateng26@localhost:5432/dateng')

In [None]:
# print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))

In [None]:
query_create_staging = f"""
CREATE TABLE IF NOT EXISTS staging_{color}_taxi_data (
    unique_id TEXT,
    file_name TEXT,
    vendor_id INT,
    tpep_pickup_datetime TIMESTAMP,
    tpep_dropoff_datetime TIMESTAMP,
    passenger_count INT,
    trip_distance numeric(10,2),
    ratecode_id INT,
    store_and_fwd_flag TEXT,
    pu_location_id INT,
    do_location_id INT,
    payment_type INT,
    fare_amount numeric(10,2),
    extra numeric(10,2),
    mta_tax numeric(10,2),
    tip_amount numeric(10,2),
    tolls_amount numeric(10,2),
    improvement_surcharge numeric(10,2),
    total_amount numeric(10,2),
    congestion_surcharge numeric(10,2),
    airport_fee numeric(10,2),
    cbd_congestion_fee numeric(10,2)
)"""

In [None]:
with engine.begin() as conn:
    conn.execute(text(query_create_staging))
    print(f"Tabel staging_{color}_taxi_data created.")

In [None]:
chunk_size = 5000
table_name = f"staging_{color}_taxi_data"

total_chunks = int(np.ceil(len(df) / chunk_size))

with tqdm(total=total_chunks, desc="Load staging") as progressbar:
    for i, chunk in enumerate(range(0, len(df), chunk_size)):
        df_chunk = df.iloc[chunk : chunk + chunk_size]

        df_chunk.to_sql(
            name=table_name, con=engine, if_exists="append", index=False, method="multi"
        )

        progressbar.update(1)

print("Load data selesai!")

In [None]:
query_merge = f"""
MERGE INTO {color}_taxi_data AS S
USING staging_{color}_taxi_data AS T
ON S.unique_id = T.unique_id
WHEN NOT MATCHED THEN
    INSERT (unique_id, file_name, vendor_id, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecode_id, store_and_fwd_flag, pu_location_id, do_location_id, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, cbd_congestion_fee)
    VALUES (T.unique_id, T.file_name, T.vendor_id, T.tpep_pickup_datetime, T.tpep_dropoff_datetime, T.passenger_count, T.trip_distance, T.ratecode_id, T.store_and_fwd_flag, T.pu_location_id, T.do_location_id, T.payment_type, T.fare_amount, T.extra, T.mta_tax, T.tip_amount, T.tolls_amount, T.improvement_surcharge, T.total_amount, T.congestion_surcharge, T.airport_fee, T.cbd_congestion_fee)
    """
with engine.begin() as conn:
    result = conn.execute(text(query_merge))
    print(f"Berhasil memindahkan {result.rowcount} baris ke tabel target.")

In [None]:
query_truncate = f"TRUNCATE TABLE staging_{color}_taxi_data;"
with engine.begin() as conn:
    conn.execute(text(query_truncate))
    print(f"Tabel staging_{color}_taxi_data telah dikosongkan.")