In [17]:
# pip install pyarrow

In [29]:
# pip install jupyter_contrib_nbextensions

In [4]:
import pandas as pd
import pyarrow.parquet as pq
from time import time

In [5]:
pd.__version__

'1.4.2'

In [6]:
pq.read_metadata('yellow_tripdata_2024-01.parquet')

<pyarrow._parquet.FileMetaData object at 0x000002346BACC090>
  created_by: parquet-cpp-arrow version 14.0.2
  num_columns: 19
  num_rows: 2964624
  num_row_groups: 3
  format_version: 2.6
  serialized_size: 6357

In [7]:
# Read file, read the table from file and check schema
file = pq.ParquetFile('yellow_tripdata_2024-01.parquet')
table = file.read()
table.schema

VendorID: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: large_string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double

In [8]:
# Convert to pandas and check data 
df = table.to_pandas()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2964624 entries, 0 to 2964623
Data columns (total 19 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int32         
 1   tpep_pickup_datetime   datetime64[ns]
 2   tpep_dropoff_datetime  datetime64[ns]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int32         
 8   DOLocationID           int32         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 18  Airport_fee           

In [13]:
# pip install psycopg2
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,2,2024-01-01 00:57:55,2024-01-01 01:17:43,1.0,1.72,1.0,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,1,2024-01-01 00:03:00,2024-01-01 00:09:36,1.0,1.8,1.0,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
2,1,2024-01-01 00:17:06,2024-01-01 00:35:01,1.0,4.7,1.0,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
3,1,2024-01-01 00:36:38,2024-01-01 00:44:56,1.0,1.4,1.0,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
4,1,2024-01-01 00:46:51,2024-01-01 00:52:57,1.0,0.8,1.0,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0


In [10]:
# Create an open SQL database connection object or a SQLAlchemy connectable
from sqlalchemy import create_engine

engine = create_engine('postgresql://postgres:root@localhost:5431/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x2346ba48370>

In [12]:
# pip install sqlalchemy
print(pd.io.sql.get_schema(df, name='yellow_taxi_data',con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" INTEGER, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" INTEGER, 
	"DOLocationID" INTEGER, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	"Airport_fee" FLOAT(53)
)




In [19]:
batch_iter = file.iter_batches(batch_size=100000)
batch_iter

<_cython_3_0_10.generator at 0x23474882ae0>

In [21]:
df= next(batch_iter).to_pandas()
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,2,2024-01-02 13:54:12,2024-01-02 13:57:25,1,0.64,1,N,236,263,2,5.8,0.0,0.5,0.0,0.0,1.0,9.8,2.5,0.0
1,1,2024-01-02 13:16:23,2024-01-02 13:38:40,1,2.2,99,N,167,168,1,20.5,0.0,0.5,0.0,0.0,1.0,22.0,0.0,0.0
2,1,2024-01-02 13:02:50,2024-01-02 13:16:37,1,2.7,1,N,237,137,3,14.2,2.5,0.5,0.0,0.0,1.0,18.2,2.5,0.0
3,2,2024-01-02 13:22:07,2024-01-02 13:24:57,2,0.5,1,N,262,262,2,5.1,0.0,0.5,0.0,0.0,1.0,9.1,2.5,0.0
4,2,2024-01-02 13:56:20,2024-01-02 14:10:15,2,2.21,1,N,43,186,1,14.2,0.0,0.5,3.64,0.0,1.0,21.84,2.5,0.0


In [23]:
# Creating just the table in postgres
df.head(0).to_sql(name='yellow_taxi_data',con=engine, if_exists='replace')

0

In [24]:
t_start = time()
count = 0
for batch in file.iter_batches(batch_size=100000):
    count+=1
    batch_df = batch.to_pandas()
    print(f'inserting batch {count}...')
    b_start = time()
    
    batch_df.to_sql(name='yellow_taxi_data',con=engine, if_exists='append')
    b_end = time()
    print(f'inserted! time taken {b_end-b_start:10.3f} seconds.\n')
    
t_end = time()   
print(f'Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.')    

inserting batch 1...
inserted! time taken     13.691 seconds.

inserting batch 2...
inserted! time taken     12.379 seconds.

inserting batch 3...
inserted! time taken     13.486 seconds.

inserting batch 4...
inserted! time taken     12.942 seconds.

inserting batch 5...
inserted! time taken     12.233 seconds.

inserting batch 6...
inserted! time taken     19.681 seconds.

inserting batch 7...
inserted! time taken     18.122 seconds.

inserting batch 8...
inserted! time taken     14.359 seconds.

inserting batch 9...
inserted! time taken     12.059 seconds.

inserting batch 10...
inserted! time taken     12.407 seconds.

inserting batch 11...
inserted! time taken     12.882 seconds.

inserting batch 12...
inserted! time taken     13.503 seconds.

inserting batch 13...
inserted! time taken     11.272 seconds.

inserting batch 14...
inserted! time taken     12.126 seconds.

inserting batch 15...
inserted! time taken     12.514 seconds.

inserting batch 16...
inserted! time taken     11

In [37]:
# engine = create_engine("postgresql+psycopg://postgres:root@localhost:5431/ny_taxi")
# # engine = create_engine(conn_string)
# engine.connect()

<sqlalchemy.engine.base.Connection at 0x234779516a0>

In [38]:
query = """
SELECT COUNT(1) FROM yellow_taxi_data;
"""
pd.read_sql(query, con=engine)

AttributeError: 'OptionEngine' object has no attribute 'execute'