In [None]:
#install sqlalchemy and psycopg2

In [31]:
import pandas as pd
from sqlalchemy import create_engine
from time import time

In [11]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [12]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x73a843e30770>

In [13]:
query = """
select 1 as number;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,number
0,1


In [14]:
#The following is the equivalent of `\\dt` in pgcli. Only, we're using sqlalchemy"
query = """
select *
from pg_catalog.pg_tables
where schemaname != 'pg_catalog' 
and schemaname != 'information_schema';
"""

pd.read_sql(query, con = engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity


In [4]:
df = pd.read_csv('/workspaces/2025_data_engineering_zoomcamp/week_1_basics_and_setup/2_Docker_sql/yellow_tripdata_2021-01.csv', low_memory=False)

In [15]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [17]:
print(pd.io.sql.get_schema(df, name = 'yellow_taxi_data', con= engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" FLOAT(53), 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [20]:
#df is now not a dataframe, it's an iterator. We have to use this method as we can't add 1300000 rows to db all at once
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator = True, chunksize=100000)

In [21]:
#see file type?
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x73a8412287a0>

In [22]:
#to make it a df. This will only return one iteration for now though. See len(df)
df = next(df_iter)
len(df)

100000

In [23]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [24]:
#we will now use the schema-creation code to create oour table. First we will just create column names df.head(n=0)
df.head(0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [25]:
#note minor differences in creation of column names and inserting data to the table (chunks- update)
df.head(0).to_sql(name= 'yellow_taxi_data', con = engine, if_exists = 'replace')

0

In [26]:
query = """
select *
from pg_catalog.pg_tables
where schemaname != 'pg_catalog' 
and schemaname != 'information_schema';
"""

pd.read_sql(query, con = engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,yellow_taxi_data,root,,True,False,False,False


In [27]:
query = """
select * from yellow_taxi_data;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,index,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [28]:
#now we will update our table with information in the chunks. Removing df.head(0) - note `append`
%time df.to_sql(name= 'yellow_taxi_data', con = engine, if_exists = 'append')

CPU times: user 6.06 s, sys: 73.5 ms, total: 6.13 s
Wall time: 9.26 s


1000

In [29]:
query = """
select count(*) from yellow_taxi_data;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,count
0,100000


In [32]:
#appended one chunk of 100000. now we need to do it iteratively. 


while True:
    try:
        t_start = time()

        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

        df.to_sql(name= 'yellow_taxi_data', con = engine, if_exists = 'append') #adds data to the table as chunks because inside while loop

        t_end = time()

        duration = t_end - t_start

        print('inserted another chunk... this chunk took %.3f seconds' % (duration))
    
    except StopIteration:
        print('finished inserting all chunks.')
        break



inserted another chunk... this chunk took 9.196 seconds
inserted another chunk... this chunk took 9.243 seconds
inserted another chunk... this chunk took 9.076 seconds
inserted another chunk... this chunk took 9.675 seconds
inserted another chunk... this chunk took 9.153 seconds
inserted another chunk... this chunk took 9.118 seconds
inserted another chunk... this chunk took 9.050 seconds
inserted another chunk... this chunk took 9.241 seconds
inserted another chunk... this chunk took 9.033 seconds
inserted another chunk... this chunk took 9.404 seconds
inserted another chunk... this chunk took 9.543 seconds


  df = next(df_iter)


inserted another chunk... this chunk took 9.683 seconds
inserted another chunk... this chunk took 5.697 seconds
finished inserting all chunks.


In [33]:
#verify that we've added all rows to our database:
query = """
select count(*) from yellow_taxi_data;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,count
0,1369765


In [34]:
#we can also do a bit of eda
query = """
select max(tpep_pickup_datetime) as pick_up_max, min(tpep_pickup_datetime) as pickup_min, max(total_amount) as total_amount_max
from yellow_taxi_data;
"""

pd.read_sql(query, con = engine)

Unnamed: 0,pick_up_max,pickup_min,total_amount_max
0,2021-02-22 16:52:16,2008-12-31 23:05:14,7661.28
