In [2]:
# !pip install wget
# !pip install pandas
# !pip install numpy
# !pip install sqlalchemy
# !pip install psycopg2

In [3]:
import wget
import pandas as pd
from sqlalchemy import create_engine

# URL of the CSV file
url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz'

wget.download(url, 'yellow_tripdata_2021-01.csv.gz')

df = pd.read_csv('yellow_tripdata_2021-01.csv.gz', compression='gzip', low_memory=False)
df.to_csv('yellow_tripdata_2021-01.csv')

100% [........................................................................] 25031880 / 25031880

In [4]:
df.info(verbose=True, show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1369765 entries, 0 to 1369764
Data columns (total 18 columns):
 #   Column                 Non-Null Count    Dtype  
---  ------                 --------------    -----  
 0   VendorID               1271413 non-null  float64
 1   tpep_pickup_datetime   1369765 non-null  object 
 2   tpep_dropoff_datetime  1369765 non-null  object 
 3   passenger_count        1271413 non-null  float64
 4   trip_distance          1369765 non-null  float64
 5   RatecodeID             1271413 non-null  float64
 6   store_and_fwd_flag     1271413 non-null  object 
 7   PULocationID           1369765 non-null  int64  
 8   DOLocationID           1369765 non-null  int64  
 9   payment_type           1271413 non-null  float64
 10  fare_amount            1369765 non-null  float64
 11  extra                  1369765 non-null  float64
 12  mta_tax                1369765 non-null  float64
 13  tip_amount             1369765 non-null  float64
 14  tolls_amount      

In [5]:
# Provide a name for the table with 'yellow_taxi_data'
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" REAL,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [6]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [7]:
# SQL transformation 
from sqlalchemy import create_engine
!pip install psycopg2-binary

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect() # Run this only if Postgres is running

print(pd.io.sql.get_schema(df, name="yellow_taxi_data", con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" FLOAT(53), 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [8]:
# Using iterator to read in chunks then send to database
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x1b8ca582870>

In [9]:
# Using next() to get data chunks
df = next(df_iter)
df.head(5)

Unnamed: 0.1,Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,0,1.0,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2.0,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,1.0,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2.0,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,2,1.0,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1.0,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,3,1.0,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1.0,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,4,2.0,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1.0,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


In [10]:
# Convert the time columns to timestamp format
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df

Unnamed: 0.1,Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,0,1.0,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.10,1.0,N,142,43,2.0,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5
1,1,1.0,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.20,1.0,N,238,151,2.0,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0
2,2,1.0,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.70,1.0,N,132,165,1.0,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,3,1.0,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.60,1.0,N,138,132,1.0,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,4,2.0,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1.0,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,99995,1.0,2021-01-04 14:04:31,2021-01-04 14:08:52,3.0,0.70,1.0,N,234,224,2.0,5.0,2.5,0.5,0.00,0.0,0.3,8.30,2.5
99996,99996,1.0,2021-01-04 14:18:46,2021-01-04 14:35:45,2.0,3.30,1.0,N,234,236,1.0,14.5,2.5,0.5,3.55,0.0,0.3,21.35,2.5
99997,99997,1.0,2021-01-04 14:42:41,2021-01-04 14:59:22,2.0,4.70,1.0,N,236,79,1.0,17.0,2.5,0.5,4.05,0.0,0.3,24.35,2.5
99998,99998,2.0,2021-01-04 14:39:02,2021-01-04 15:09:37,2.0,17.95,2.0,N,132,148,1.0,52.0,0.0,0.5,5.00,0.0,0.3,60.30,2.5


In [11]:
# Provide the table name, the connection and what to do if the table already exists
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [12]:
# Transfer to SQL  
df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

1000

In [13]:
df.head(n=0)

Unnamed: 0.1,Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [14]:
# Loop function for database 

from time import time

while True: 
    try:
        t_start = time()
        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

        t_end = time()

        print('inserted another chunk, took %.3f second' % (t_end - t_start))
    except StopIteration:
        print('completed')
        break

inserted another chunk, took 14.252 second
inserted another chunk, took 15.766 second
inserted another chunk, took 16.726 second
inserted another chunk, took 16.093 second
inserted another chunk, took 14.738 second
inserted another chunk, took 15.006 second
inserted another chunk, took 12.986 second
inserted another chunk, took 12.754 second
inserted another chunk, took 12.281 second
inserted another chunk, took 13.307 second
inserted another chunk, took 13.607 second


  df = next(df_iter)


inserted another chunk, took 13.281 second
inserted another chunk, took 9.463 second
completed
