In [None]:
import pandas as pd 

# URL for taxi data
url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"

# Extract file name from URL

csv_name = url.split("/")[-1]

# Read compressed CSV directly into a dataframe

df = pd.read_csv(csv_name, nrows=100)

print(df.head()) #print first 5 rows of dataframe

   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         1  2021-01-01 00:30:10   2021-01-01 00:36:12                1   
1         1  2021-01-01 00:51:20   2021-01-01 00:52:19                1   
2         1  2021-01-01 00:43:30   2021-01-01 01:11:06                1   
3         1  2021-01-01 00:15:48   2021-01-01 00:31:01                0   
4         2  2021-01-01 00:31:49   2021-01-01 00:48:21                1   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           2.10           1                  N           142            43   
1           0.20           1                  N           238           151   
2          14.70           1                  N           132           165   
3          10.60           1                  N           138           132   
4           4.94           1                  N            68            33   

   payment_type  fare_amount  extra  mta_tax  tip_amount  tolls_amount  \


In [None]:
#Use pandas get_schema to convert dataframe into DDL (data definition language)

print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [None]:
# Convert pickup and dropoff columns from text to datetime objects

df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

# Print updated SQL schema with the new datetime columns

print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))   

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [None]:
# Need to create a SQLAlchemy 'engine' for pandas to connect to Postgres container

from sqlalchemy import create_engine
engine = create_engine('postgresql://root:root@localhost:5432/nyc_taxi')

# Generate DDL (CREATE TABLE...) to send to postgres specifically 
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))

# This output is the SQL that Pandas will use to create the table in Postgres


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [None]:
#Define batch ingestion to Postgres: Read CSV in chunks of 100,000 rows to avoid overloading memory

df_iter = pd.read_csv(csv_name, iterator= True, chunksize=100000)

#Call next(df_iter) to get the first chunk of the iterator 
df = next(df_iter)

#print how many rows are in this chunk (length of the iterator)
print(len(df))

100000


In [None]:
# Take the schema only (no data yet) by using .head(0) to select first 0 rows
df.head(0)

# Call to_sql to create table in Postgres; if_exists='replace' drops the table if it already exists and creates a fresh one

df.head(0).to_sql(name= 'yellow_taxi_data', con= engine, if_exists= 'replace')

In [38]:
# Start to load data into the table 
df.to_sql(name= 'yellow_taxi_data', con= engine, if_exists= 'append')

1000

In [None]:
#Check if any duplicates since we just ran the last cell twice. 
dupes_count = df.duplicated().sum()
print(f"Total duplicate rows found: {dupes_count}")

Total duplicate rows found: 0


In [41]:
#Continue ingesting data in batches using for loop 
#time each ingestion and print how look it took

from time import time 

for i, df in enumerate(df_iter): 
    
    start_t = time()

    df.to_sql(name='yellow_taxi_data', con=engine, if_exists= 'append')

    end_t = time()
    print(f'Ingested chunk {i +1}, took {end_t - start_t:.3f}seconds')

Ingested chunk 1, took 3.712seconds
Ingested chunk 2, took 3.638seconds
Ingested chunk 3, took 3.489seconds
Ingested chunk 4, took 3.611seconds
Ingested chunk 5, took 3.638seconds
Ingested chunk 6, took 3.923seconds
Ingested chunk 7, took 3.600seconds
Ingested chunk 8, took 3.574seconds
Ingested chunk 9, took 3.591seconds
Ingested chunk 10, took 3.660seconds
Ingested chunk 11, took 3.622seconds


  for i, df in enumerate(df_iter):


Ingested chunk 12, took 3.548seconds
Ingested chunk 13, took 2.272seconds
