In [69]:
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os
import time

load_dotenv(dotenv_path='/home/sibtain-reza/Desktop/my-learning/DE-Zoomcamp/module-1-intro-and-prereqs/.env')
u = os.getenv('uname')
p = os.getenv('password')

In [70]:
# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet

In [71]:
df = pd.read_parquet('yellow_tripdata_2021-01.parquet',)

df.shape

(1369769, 19)

In [72]:
# putting data to the pg database running in docker
engine = engine = create_engine(f"postgresql+psycopg2://{u}:{p}@localhost:5432/ny_taxi")
engine.connect()

<sqlalchemy.engine.base.Connection at 0x70c71d9ac200>

In [73]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [74]:
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)

In [75]:
# create ddl in pg, then insert records 
df.head(n=0).to_sql(
    name='yellow_taxi_data',
    con=engine,
    if_exists='replace'
)

0

In [76]:
n_rows_total = df.shape[0]
chunk_size = 100000
counter = 0

start = time.time()

while counter < n_rows_total:

    # Determine the chunk's upper boundary, ensuring not to exceed the total number of rows
    chunk_end = min(counter + chunk_size, n_rows_total)
    
    # Get the chunk of data
    df_chunk = df[counter:chunk_end]
    
    # Insert the chunk into the database here (e.g., using SQLAlchemy or any method you're using)
    df_chunk.to_sql(
    name='yellow_taxi_data',
    con=engine,
    if_exists='append', index=False
    )
    
    print(f"Inserting rows {counter} to {chunk_end} (Chunk Size: {df_chunk.shape[0]})")

    # Update the counter to the next chunk
    counter += chunk_size

end = time.time() - start

print(f"Data insertion completed!. Took {end}")


Inserting rows 0 to 100000 (Chunk Size: 100000)
Inserting rows 100000 to 200000 (Chunk Size: 100000)
Inserting rows 200000 to 300000 (Chunk Size: 100000)
Inserting rows 300000 to 400000 (Chunk Size: 100000)
Inserting rows 400000 to 500000 (Chunk Size: 100000)
Inserting rows 500000 to 600000 (Chunk Size: 100000)
Inserting rows 600000 to 700000 (Chunk Size: 100000)
Inserting rows 700000 to 800000 (Chunk Size: 100000)
Inserting rows 800000 to 900000 (Chunk Size: 100000)
Inserting rows 900000 to 1000000 (Chunk Size: 100000)
Inserting rows 1000000 to 1100000 (Chunk Size: 100000)
Inserting rows 1100000 to 1200000 (Chunk Size: 100000)
Inserting rows 1200000 to 1300000 (Chunk Size: 100000)
Inserting rows 1300000 to 1369769 (Chunk Size: 69769)
Data insertion completed!. Took 102.58799600601196


In [77]:
# another chuck size method

df.to_sql(
    name='yellow_taxi_data',
    con=engine,
    if_exists='append', 
    index=False
    , chunksize=100000
)

13769