# NY yellow taxi data loading

In [1]:
import pandas as pd
from time import time
from sqlalchemy import create_engine

### load csv file to df

In [2]:
df = pd.read_csv('/workspaces/DE-Zoomcamp-2024/data/ny_taxi.csv' , nrows = 100)
df.head(5)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,01/01/2021 12:30:10 AM,01/01/2021 12:36:12 AM,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,01/01/2021 12:51:20 AM,01/01/2021 12:52:19 AM,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,01/01/2021 12:43:30 AM,01/01/2021 01:11:06 AM,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,01/01/2021 12:15:48 AM,01/01/2021 12:31:01 AM,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,01/01/2021 12:31:49 AM,01/01/2021 12:48:21 AM,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


### Edit datatime format

In [24]:
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'] , format= '%m/%d/%Y %I:%M:%S %p') 
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'] , format= '%m/%d/%Y %I:%M:%S %p') 

### Create db engine

In [25]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

### Get  DDL from the dataframe to create the table

In [38]:
print(pd.io.sql.get_schema(df , 'ny_yellow_taxi_data' ,con = engine))


CREATE TABLE ny_yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




### Split the data into chunks

In [33]:
df_iter = pd.read_csv('/workspaces/DE-Zoomcamp-2024/data/ny_taxi.csv' , iterator = True , chunksize = 100000)

In [None]:
df = next(df_iter)

### Create the table

In [35]:
df.head(0).to_sql('ny_yellow_taxi', engine, if_exists='replace', index=False)

0

### Iterate over chunks and load into PostgreSQL

In [37]:

for i, chunk in enumerate(df_iter):
    start_time = time()
    
    chunk.to_sql('ny_yellow_taxi', engine, if_exists='append', index=False)
    
    end_time = time()
    elapsed_time = end_time - start_time

    print(f'Chunk {i + 1} loaded into PostgreSQL. Elapsed Time: {elapsed_time:.2f} seconds')

print('Data loading complete.')

Chunk 1 loaded into PostgreSQL. Elapsed Time: 7.79 seconds
Chunk 2 loaded into PostgreSQL. Elapsed Time: 8.06 seconds
Chunk 3 loaded into PostgreSQL. Elapsed Time: 7.88 seconds
Chunk 4 loaded into PostgreSQL. Elapsed Time: 7.85 seconds
Chunk 5 loaded into PostgreSQL. Elapsed Time: 8.28 seconds
Chunk 6 loaded into PostgreSQL. Elapsed Time: 7.59 seconds
Chunk 7 loaded into PostgreSQL. Elapsed Time: 7.75 seconds
Chunk 8 loaded into PostgreSQL. Elapsed Time: 7.98 seconds
Chunk 9 loaded into PostgreSQL. Elapsed Time: 8.29 seconds
Chunk 10 loaded into PostgreSQL. Elapsed Time: 7.84 seconds
Chunk 11 loaded into PostgreSQL. Elapsed Time: 7.67 seconds


  for i, chunk in enumerate(df_iter):


Chunk 12 loaded into PostgreSQL. Elapsed Time: 7.96 seconds


  for i, chunk in enumerate(df_iter):


Chunk 13 loaded into PostgreSQL. Elapsed Time: 7.32 seconds
Chunk 14 loaded into PostgreSQL. Elapsed Time: 7.89 seconds
Chunk 15 loaded into PostgreSQL. Elapsed Time: 7.78 seconds
Chunk 16 loaded into PostgreSQL. Elapsed Time: 8.19 seconds
Chunk 17 loaded into PostgreSQL. Elapsed Time: 7.89 seconds
Chunk 18 loaded into PostgreSQL. Elapsed Time: 7.92 seconds
Chunk 19 loaded into PostgreSQL. Elapsed Time: 8.14 seconds
Chunk 20 loaded into PostgreSQL. Elapsed Time: 7.74 seconds
Chunk 21 loaded into PostgreSQL. Elapsed Time: 7.95 seconds
Chunk 22 loaded into PostgreSQL. Elapsed Time: 7.77 seconds
Chunk 23 loaded into PostgreSQL. Elapsed Time: 8.40 seconds
Chunk 24 loaded into PostgreSQL. Elapsed Time: 8.21 seconds
Chunk 25 loaded into PostgreSQL. Elapsed Time: 7.67 seconds


  for i, chunk in enumerate(df_iter):


Chunk 26 loaded into PostgreSQL. Elapsed Time: 7.62 seconds


  for i, chunk in enumerate(df_iter):


Chunk 27 loaded into PostgreSQL. Elapsed Time: 7.90 seconds
Chunk 28 loaded into PostgreSQL. Elapsed Time: 7.92 seconds
Chunk 29 loaded into PostgreSQL. Elapsed Time: 7.79 seconds
Chunk 30 loaded into PostgreSQL. Elapsed Time: 7.85 seconds
Chunk 31 loaded into PostgreSQL. Elapsed Time: 8.64 seconds
Chunk 32 loaded into PostgreSQL. Elapsed Time: 8.14 seconds
Chunk 33 loaded into PostgreSQL. Elapsed Time: 7.75 seconds
Chunk 34 loaded into PostgreSQL. Elapsed Time: 8.03 seconds
Chunk 35 loaded into PostgreSQL. Elapsed Time: 8.99 seconds
Chunk 36 loaded into PostgreSQL. Elapsed Time: 8.58 seconds
Chunk 37 loaded into PostgreSQL. Elapsed Time: 8.01 seconds
Chunk 38 loaded into PostgreSQL. Elapsed Time: 8.34 seconds
Chunk 39 loaded into PostgreSQL. Elapsed Time: 8.18 seconds
Chunk 40 loaded into PostgreSQL. Elapsed Time: 8.12 seconds
Chunk 41 loaded into PostgreSQL. Elapsed Time: 7.99 seconds
Chunk 42 loaded into PostgreSQL. Elapsed Time: 8.31 seconds
Chunk 43 loaded into PostgreSQL. Elapsed

  for i, chunk in enumerate(df_iter):


Chunk 45 loaded into PostgreSQL. Elapsed Time: 8.04 seconds


  for i, chunk in enumerate(df_iter):


Chunk 46 loaded into PostgreSQL. Elapsed Time: 7.77 seconds
Chunk 47 loaded into PostgreSQL. Elapsed Time: 7.90 seconds
Chunk 48 loaded into PostgreSQL. Elapsed Time: 7.86 seconds
Chunk 49 loaded into PostgreSQL. Elapsed Time: 8.03 seconds
Chunk 50 loaded into PostgreSQL. Elapsed Time: 7.96 seconds
Chunk 51 loaded into PostgreSQL. Elapsed Time: 7.85 seconds
Chunk 52 loaded into PostgreSQL. Elapsed Time: 7.88 seconds
Chunk 53 loaded into PostgreSQL. Elapsed Time: 8.85 seconds
Chunk 54 loaded into PostgreSQL. Elapsed Time: 7.94 seconds
Chunk 55 loaded into PostgreSQL. Elapsed Time: 7.87 seconds
Chunk 56 loaded into PostgreSQL. Elapsed Time: 8.01 seconds
Chunk 57 loaded into PostgreSQL. Elapsed Time: 7.85 seconds
Chunk 58 loaded into PostgreSQL. Elapsed Time: 8.12 seconds
Chunk 59 loaded into PostgreSQL. Elapsed Time: 7.80 seconds
Chunk 60 loaded into PostgreSQL. Elapsed Time: 8.50 seconds
Chunk 61 loaded into PostgreSQL. Elapsed Time: 8.01 seconds
Chunk 62 loaded into PostgreSQL. Elapsed

  for i, chunk in enumerate(df_iter):


Chunk 67 loaded into PostgreSQL. Elapsed Time: 8.12 seconds


  for i, chunk in enumerate(df_iter):


Chunk 68 loaded into PostgreSQL. Elapsed Time: 8.78 seconds
Chunk 69 loaded into PostgreSQL. Elapsed Time: 8.04 seconds
Chunk 70 loaded into PostgreSQL. Elapsed Time: 8.02 seconds
Chunk 71 loaded into PostgreSQL. Elapsed Time: 7.89 seconds
Chunk 72 loaded into PostgreSQL. Elapsed Time: 8.07 seconds
Chunk 73 loaded into PostgreSQL. Elapsed Time: 7.65 seconds
Chunk 74 loaded into PostgreSQL. Elapsed Time: 7.70 seconds
Chunk 75 loaded into PostgreSQL. Elapsed Time: 7.81 seconds
Chunk 76 loaded into PostgreSQL. Elapsed Time: 7.90 seconds
Chunk 77 loaded into PostgreSQL. Elapsed Time: 7.86 seconds
Chunk 78 loaded into PostgreSQL. Elapsed Time: 7.81 seconds
Chunk 79 loaded into PostgreSQL. Elapsed Time: 7.87 seconds
Chunk 80 loaded into PostgreSQL. Elapsed Time: 7.89 seconds
Chunk 81 loaded into PostgreSQL. Elapsed Time: 7.65 seconds
Chunk 82 loaded into PostgreSQL. Elapsed Time: 8.26 seconds
Chunk 83 loaded into PostgreSQL. Elapsed Time: 9.05 seconds
Chunk 84 loaded into PostgreSQL. Elapsed

  for i, chunk in enumerate(df_iter):


Chunk 92 loaded into PostgreSQL. Elapsed Time: 7.26 seconds


  for i, chunk in enumerate(df_iter):


Chunk 93 loaded into PostgreSQL. Elapsed Time: 7.75 seconds
Chunk 94 loaded into PostgreSQL. Elapsed Time: 8.62 seconds
Chunk 95 loaded into PostgreSQL. Elapsed Time: 11.85 seconds
Chunk 96 loaded into PostgreSQL. Elapsed Time: 8.35 seconds
Chunk 97 loaded into PostgreSQL. Elapsed Time: 7.93 seconds
Chunk 98 loaded into PostgreSQL. Elapsed Time: 8.12 seconds
Chunk 99 loaded into PostgreSQL. Elapsed Time: 8.59 seconds
Chunk 100 loaded into PostgreSQL. Elapsed Time: 8.43 seconds
Chunk 101 loaded into PostgreSQL. Elapsed Time: 8.15 seconds
Chunk 102 loaded into PostgreSQL. Elapsed Time: 7.99 seconds
Chunk 103 loaded into PostgreSQL. Elapsed Time: 8.76 seconds
Chunk 104 loaded into PostgreSQL. Elapsed Time: 9.81 seconds
Chunk 105 loaded into PostgreSQL. Elapsed Time: 8.03 seconds
Chunk 106 loaded into PostgreSQL. Elapsed Time: 8.16 seconds
Chunk 107 loaded into PostgreSQL. Elapsed Time: 8.05 seconds
Chunk 108 loaded into PostgreSQL. Elapsed Time: 7.81 seconds
Chunk 109 loaded into PostgreS

  for i, chunk in enumerate(df_iter):


Chunk 120 loaded into PostgreSQL. Elapsed Time: 7.66 seconds


  for i, chunk in enumerate(df_iter):


Chunk 121 loaded into PostgreSQL. Elapsed Time: 7.95 seconds
Chunk 122 loaded into PostgreSQL. Elapsed Time: 8.44 seconds
Chunk 123 loaded into PostgreSQL. Elapsed Time: 7.95 seconds
Chunk 124 loaded into PostgreSQL. Elapsed Time: 8.29 seconds
Chunk 125 loaded into PostgreSQL. Elapsed Time: 8.92 seconds
Chunk 126 loaded into PostgreSQL. Elapsed Time: 8.81 seconds
Chunk 127 loaded into PostgreSQL. Elapsed Time: 8.90 seconds
Chunk 128 loaded into PostgreSQL. Elapsed Time: 8.41 seconds
Chunk 129 loaded into PostgreSQL. Elapsed Time: 8.79 seconds
Chunk 130 loaded into PostgreSQL. Elapsed Time: 8.00 seconds
Chunk 131 loaded into PostgreSQL. Elapsed Time: 8.23 seconds
Chunk 132 loaded into PostgreSQL. Elapsed Time: 10.10 seconds
Chunk 133 loaded into PostgreSQL. Elapsed Time: 8.21 seconds
Chunk 134 loaded into PostgreSQL. Elapsed Time: 8.11 seconds
Chunk 135 loaded into PostgreSQL. Elapsed Time: 8.86 seconds
Chunk 136 loaded into PostgreSQL. Elapsed Time: 8.68 seconds
Chunk 137 loaded into P

  for i, chunk in enumerate(df_iter):


Chunk 148 loaded into PostgreSQL. Elapsed Time: 7.83 seconds
Chunk 149 loaded into PostgreSQL. Elapsed Time: 7.23 seconds
Chunk 150 loaded into PostgreSQL. Elapsed Time: 8.12 seconds
Chunk 151 loaded into PostgreSQL. Elapsed Time: 8.10 seconds
Chunk 152 loaded into PostgreSQL. Elapsed Time: 7.87 seconds
Chunk 153 loaded into PostgreSQL. Elapsed Time: 8.03 seconds
Chunk 154 loaded into PostgreSQL. Elapsed Time: 7.89 seconds
Chunk 155 loaded into PostgreSQL. Elapsed Time: 7.85 seconds
Chunk 156 loaded into PostgreSQL. Elapsed Time: 8.01 seconds
Chunk 157 loaded into PostgreSQL. Elapsed Time: 8.08 seconds
Chunk 158 loaded into PostgreSQL. Elapsed Time: 7.82 seconds
Chunk 159 loaded into PostgreSQL. Elapsed Time: 8.70 seconds
Chunk 160 loaded into PostgreSQL. Elapsed Time: 8.73 seconds
Chunk 161 loaded into PostgreSQL. Elapsed Time: 8.89 seconds
Chunk 162 loaded into PostgreSQL. Elapsed Time: 9.87 seconds
Chunk 163 loaded into PostgreSQL. Elapsed Time: 9.57 seconds
Chunk 164 loaded into Po

  for i, chunk in enumerate(df_iter):


Chunk 176 loaded into PostgreSQL. Elapsed Time: 8.05 seconds


  for i, chunk in enumerate(df_iter):


Chunk 177 loaded into PostgreSQL. Elapsed Time: 7.95 seconds
Chunk 178 loaded into PostgreSQL. Elapsed Time: 8.33 seconds
Chunk 179 loaded into PostgreSQL. Elapsed Time: 8.40 seconds
Chunk 180 loaded into PostgreSQL. Elapsed Time: 8.63 seconds
Chunk 181 loaded into PostgreSQL. Elapsed Time: 9.07 seconds
Chunk 182 loaded into PostgreSQL. Elapsed Time: 9.78 seconds
Chunk 183 loaded into PostgreSQL. Elapsed Time: 8.71 seconds
Chunk 184 loaded into PostgreSQL. Elapsed Time: 8.39 seconds
Chunk 185 loaded into PostgreSQL. Elapsed Time: 8.59 seconds
Chunk 186 loaded into PostgreSQL. Elapsed Time: 8.39 seconds
Chunk 187 loaded into PostgreSQL. Elapsed Time: 8.68 seconds
Chunk 188 loaded into PostgreSQL. Elapsed Time: 8.48 seconds
Chunk 189 loaded into PostgreSQL. Elapsed Time: 8.31 seconds
Chunk 190 loaded into PostgreSQL. Elapsed Time: 8.26 seconds
Chunk 191 loaded into PostgreSQL. Elapsed Time: 8.68 seconds
Chunk 192 loaded into PostgreSQL. Elapsed Time: 9.04 seconds
Chunk 193 loaded into Po

  for i, chunk in enumerate(df_iter):


Chunk 206 loaded into PostgreSQL. Elapsed Time: 7.88 seconds


  for i, chunk in enumerate(df_iter):


Chunk 207 loaded into PostgreSQL. Elapsed Time: 8.33 seconds
Chunk 208 loaded into PostgreSQL. Elapsed Time: 8.67 seconds
Chunk 209 loaded into PostgreSQL. Elapsed Time: 8.54 seconds
Chunk 210 loaded into PostgreSQL. Elapsed Time: 8.36 seconds
Chunk 211 loaded into PostgreSQL. Elapsed Time: 8.37 seconds
Chunk 212 loaded into PostgreSQL. Elapsed Time: 9.04 seconds
Chunk 213 loaded into PostgreSQL. Elapsed Time: 10.71 seconds
Chunk 214 loaded into PostgreSQL. Elapsed Time: 9.67 seconds
Chunk 215 loaded into PostgreSQL. Elapsed Time: 10.24 seconds
Chunk 216 loaded into PostgreSQL. Elapsed Time: 8.79 seconds
Chunk 217 loaded into PostgreSQL. Elapsed Time: 8.27 seconds
Chunk 218 loaded into PostgreSQL. Elapsed Time: 9.73 seconds
Chunk 219 loaded into PostgreSQL. Elapsed Time: 8.62 seconds
Chunk 220 loaded into PostgreSQL. Elapsed Time: 8.34 seconds
Chunk 221 loaded into PostgreSQL. Elapsed Time: 8.45 seconds
Chunk 222 loaded into PostgreSQL. Elapsed Time: 8.67 seconds
Chunk 223 loaded into 

  for i, chunk in enumerate(df_iter):


Chunk 240 loaded into PostgreSQL. Elapsed Time: 8.86 seconds
Chunk 241 loaded into PostgreSQL. Elapsed Time: 8.60 seconds
Chunk 242 loaded into PostgreSQL. Elapsed Time: 8.83 seconds
Chunk 243 loaded into PostgreSQL. Elapsed Time: 9.22 seconds
Chunk 244 loaded into PostgreSQL. Elapsed Time: 8.99 seconds
Chunk 245 loaded into PostgreSQL. Elapsed Time: 8.77 seconds
Chunk 246 loaded into PostgreSQL. Elapsed Time: 8.71 seconds
Chunk 247 loaded into PostgreSQL. Elapsed Time: 8.70 seconds
Chunk 248 loaded into PostgreSQL. Elapsed Time: 9.78 seconds
Chunk 249 loaded into PostgreSQL. Elapsed Time: 8.87 seconds
Chunk 250 loaded into PostgreSQL. Elapsed Time: 19.07 seconds
Chunk 251 loaded into PostgreSQL. Elapsed Time: 18.05 seconds
Chunk 252 loaded into PostgreSQL. Elapsed Time: 9.15 seconds
Chunk 253 loaded into PostgreSQL. Elapsed Time: 8.07 seconds
Chunk 254 loaded into PostgreSQL. Elapsed Time: 8.36 seconds
Chunk 255 loaded into PostgreSQL. Elapsed Time: 8.41 seconds
Chunk 256 loaded into 

  for i, chunk in enumerate(df_iter):


Chunk 275 loaded into PostgreSQL. Elapsed Time: 7.81 seconds


  for i, chunk in enumerate(df_iter):


Chunk 276 loaded into PostgreSQL. Elapsed Time: 7.74 seconds
Chunk 277 loaded into PostgreSQL. Elapsed Time: 8.78 seconds
Chunk 278 loaded into PostgreSQL. Elapsed Time: 8.08 seconds
Chunk 279 loaded into PostgreSQL. Elapsed Time: 8.42 seconds
Chunk 280 loaded into PostgreSQL. Elapsed Time: 8.19 seconds
Chunk 281 loaded into PostgreSQL. Elapsed Time: 8.63 seconds
Chunk 282 loaded into PostgreSQL. Elapsed Time: 8.19 seconds
Chunk 283 loaded into PostgreSQL. Elapsed Time: 8.01 seconds
Chunk 284 loaded into PostgreSQL. Elapsed Time: 8.45 seconds
Chunk 285 loaded into PostgreSQL. Elapsed Time: 8.41 seconds
Chunk 286 loaded into PostgreSQL. Elapsed Time: 8.19 seconds
Chunk 287 loaded into PostgreSQL. Elapsed Time: 8.28 seconds
Chunk 288 loaded into PostgreSQL. Elapsed Time: 8.53 seconds
Chunk 289 loaded into PostgreSQL. Elapsed Time: 8.17 seconds
Chunk 290 loaded into PostgreSQL. Elapsed Time: 8.42 seconds
Chunk 291 loaded into PostgreSQL. Elapsed Time: 8.07 seconds
Chunk 292 loaded into Po

  for i, chunk in enumerate(df_iter):


Chunk 308 loaded into PostgreSQL. Elapsed Time: 7.26 seconds
Chunk 309 loaded into PostgreSQL. Elapsed Time: 0.53 seconds
Data loading complete.


### load_csv_to_postgres function

In [None]:
import pandas as pd
from sqlalchemy import create_engine, inspect
import time

def load_csv_to_postgres(engine, table_name, file_path, chunk_size=100000):
    # Check if table exists
    inspector = inspect(engine)
    if not inspector.has_table(table_name):
        print(f'Table {table_name} does not exist. Creating...')
        # Assume the first chunk has the column names
        first_chunk = pd.read_csv(file_path, nrows=1)
        first_chunk.to_sql(table_name, engine, index=False)

    # Read CSV in chunks
    df_iter = pd.read_csv(file_path, iterator=True, chunksize=chunk_size)

    # Iterate over chunks and load into PostgreSQL
    for i, chunk in enumerate(df_iter):
        start_time = time.time()

        # Modify chunk processing as needed
        chunk.to_sql(table_name, engine, if_exists='append', index=False)

        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f'Chunk {i + 1} loaded into PostgreSQL. Elapsed Time: {elapsed_time:.2f} seconds')

    print('Data loading complete.')

# Example usage:
# Replace 'your_postgres_url' and 'your_table_name' with actual values
db_url = 'postgresql://username:password@host:port/database_name'
table_name = 'your_table_name'
file_path = '/workspaces/DE-Zoomcamp-2024/data/ny_taxi.csv'
engine = create_engine(db_url)

# Call the function
load_csv_to_postgres(engine, table_name, file_path)
