In [6]:
!pip install --only-binary=:all: pyarrow


Collecting pyarrow
  Downloading pyarrow-17.0.0-cp39-cp39-macosx_10_15_x86_64.whl (29.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m29.0/29.0 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: pyarrow
Successfully installed pyarrow-17.0.0


In [7]:
import pandas as pd
import pyarrow.parquet as pq
from time import time

In [8]:
pq.read_metadata('yellow_tripdata_2021-01.parquet')

<pyarrow._parquet.FileMetaData object at 0x7fefc8076590>
  created_by: parquet-cpp-arrow version 7.0.0
  num_columns: 19
  num_rows: 1369769
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 10382

In [9]:
#read file and check schema
file= pq.ParquetFile('yellow_tripdata_2021-01.parquet')
table = file.read()
table.schema

VendorID: int64
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
RatecodeID: double
store_and_fwd_flag: string
PULocationID: int64
DOLocationID: int64
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 2492

In [10]:
df= table.to_pandas()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1369769 entries, 0 to 1369768
Data columns (total 19 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   VendorID               1369769 non-null  int64         
 1   tpep_pickup_datetime   1369769 non-null  datetime64[ns]
 2   tpep_dropoff_datetime  1369769 non-null  datetime64[ns]
 3   passenger_count        1271417 non-null  float64       
 4   trip_distance          1369769 non-null  float64       
 5   RatecodeID             1271417 non-null  float64       
 6   store_and_fwd_flag     1271417 non-null  object        
 7   PULocationID           1369769 non-null  int64         
 8   DOLocationID           1369769 non-null  int64         
 9   payment_type           1369769 non-null  int64         
 10  fare_amount            1369769 non-null  float64       
 11  extra                  1369769 non-null  float64       
 12  mta_tax                13697

In [11]:
from sqlalchemy import create_engine

In [13]:
!pip install  psycopg2

Collecting psycopg2
  Downloading psycopg2-2.9.10.tar.gz (385 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m385.7/385.7 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: psycopg2
  Building wheel for psycopg2 (setup.py) ... [?25ldone
[?25h  Created wheel for psycopg2: filename=psycopg2-2.9.10-cp39-cp39-macosx_10_9_x86_64.whl size=133747 sha256=c9969813d5a029e4185782a12040d1464fd28e8c05d738705f37c1a6be0519f4
  Stored in directory: /Users/yasmeenel3sh/Library/Caches/pip/wheels/a3/f0/13/36dd45ba7a971c79ded4f3003e5f4652d262195d0e8ea8f249
Successfully built psycopg2
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.10


In [14]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [15]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7fefc0640100>

In [17]:
#Generate CREATE SQL statement from schema
print(pd.io.sql.get_schema(df,name="yellow_taxi_data",con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [20]:
#creating batches of 100,000 records converting them to pandas and loading them in postgres
batches_iter= file.iter_batches(batch_size=100000)

df= next(batches_iter).to_pandas()

df.head()


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [23]:
t_start=time()
count=0
for batch in file.iter_batches(batch_size=100000):
    count+=1
    batch_df= batch.to_pandas()
    print(f'inserting batch {count}')
    b_start=time()
    batch_df.to_sql(name='ny_taxi_data',con=engine,if_exists='append')
    b_end= time()
    print(f'inserted and time taken is {b_end-b_start:10.3f} seconds.\n')
t_end=time()
print(f'completed in {t_end-t_start:10.3f} seconds for {count} batches.')

inserting batch 1
inserted and time taken is      5.727 seconds.

inserting batch 2
inserted and time taken is      5.463 seconds.

inserting batch 3
inserted and time taken is      5.376 seconds.

inserting batch 4
inserted and time taken is      5.432 seconds.

inserting batch 5
inserted and time taken is      5.446 seconds.

inserting batch 6
inserted and time taken is      5.294 seconds.

inserting batch 7
inserted and time taken is      5.366 seconds.

inserting batch 8
inserted and time taken is      5.420 seconds.

inserting batch 9
inserted and time taken is      5.450 seconds.

inserting batch 10
inserted and time taken is      5.322 seconds.

inserting batch 11
inserted and time taken is      5.459 seconds.

inserting batch 12
inserted and time taken is      5.542 seconds.

inserting batch 13
inserted and time taken is      5.413 seconds.

inserting batch 14
inserted and time taken is      3.513 seconds.

completed in     74.390 seconds for 14 batches.
