In [2]:
import pandas as pd 

In [3]:
# Load the first 100 rows
df = pd.read_csv('yellow_tripdata_2021.csv', nrows=100)

# Display the loaded DataFrame
df.dtypes

VendorID                   int64
tpep_pickup_datetime      object
tpep_dropoff_datetime     object
passenger_count            int64
trip_distance            float64
RatecodeID                 int64
store_and_fwd_flag        object
PULocationID               int64
DOLocationID               int64
payment_type               int64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
improvement_surcharge    float64
total_amount             float64
congestion_surcharge     float64
dtype: object

In [4]:
# we need to load the data into postgress
print(pd.io.sql.get_schema(df, name="yellow_tripdata_2021"))

CREATE TABLE "yellow_tripdata_2021" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [5]:
#convert the two first two columns to Datetime format 
# Load the first 100 rows
df = pd.read_csv('yellow_tripdata_2021.csv', nrows=100)

df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'], errors='coerce')
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'], errors='coerce')

# Generate SQL schema
sql_schema = pd.io.sql.get_schema(df, name="yellow_tripdata_2021")
print(sql_schema)

CREATE TABLE "yellow_tripdata_2021" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [6]:
#Establish connection to DB 
from sqlalchemy import create_engine

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x11f7ac03eb0>

In [7]:
#convert this to the schema for the database which postgress will recognise
print(pd.io.sql.get_schema(df, name="yellow_tripdata_2021", con=engine))


CREATE TABLE yellow_tripdata_2021 (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [8]:
# We need to batch the Upload now 

df_iter = pd.read_csv('yellow_tripdata_2021.csv', iterator = True, chunksize=100000)

df = next(df_iter)

In [9]:
#to create the schema we only need the column names and the data types associated with them. 
df.head(n=0)
df.head(n=0).to_sql(name='yellow_taxi_data', con = engine, if_exists='replace')

0

In [25]:
# Now We need to address  to add the chuncks to 
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'], errors='coerce')
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'], errors='coerce')

In [26]:
df.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

In [27]:
df.head(n=0)
df.head(n=0).to_sql(name='yellow_taxi_data', con = engine, if_exists='replace')

0

In [12]:
df.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

In [30]:
df_iter = pd.read_csv('yellow_tripdata_2021.csv', iterator = True, chunksize=100000)

In [31]:
df = next(df_iter)

df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'], errors='coerce')
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'], errors='coerce')

In [32]:
%time df.to_sql(name='yellow_taxi_data', con = engine, if_exists='append')

CPU times: total: 9.53 s
Wall time: 37.8 s


1000

In [33]:
from time import time

# Assuming df_iter is your iterator for the chunks
while True:
        t_start = time()
        df = next(df_iter)  # Get the next chunk from the iterator
        df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'], errors='coerce')
        df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'], errors='coerce')
        
        # Insert data into the SQL table
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
        
        t_end = time()

        print(f'Added another chunk to the DB...., {t_end - t_start:.3f} seconds')
  



Added another chunk to the DB...., 32.380 seconds
Added another chunk to the DB...., 29.717 seconds
Added another chunk to the DB...., 42.665 seconds
Added another chunk to the DB...., 30.284 seconds
Added another chunk to the DB...., 48.730 seconds
Added another chunk to the DB...., 45.552 seconds
Added another chunk to the DB...., 32.334 seconds
Added another chunk to the DB...., 86.808 seconds
Added another chunk to the DB...., 42.912 seconds
Added another chunk to the DB...., 56.757 seconds
Added another chunk to the DB...., 30.287 seconds


  df = next(df_iter)  # Get the next chunk from the iterator


Added another chunk to the DB...., 40.717 seconds
Added another chunk to the DB...., 45.160 seconds


StopIteration: 

In [17]:
df.dtypes 

VendorID                        float64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag              float64
PULocationID                      int64
DOLocationID                      int64
payment_type                    float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object