In [44]:
import pandas as pd
from sqlalchemy import create_engine
from time import time

In [27]:
df = pd.read_parquet('yellow_tripdata_2021-01.parquet')

In [28]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [29]:
df.to_csv('yellow_tripdata_2021-01.csv')

In [30]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [31]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7fb8842ec100>

In [32]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [53]:
# we will use an iterator to manage resources. we don't want to upload all 1,000,000 rows at once
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', chunksize=100000)

In [54]:
df = next(df_iter)

In [55]:
len(df)

100000

In [56]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [57]:
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [58]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 3.2 s, sys: 77.2 ms, total: 3.28 s
Wall time: 5.41 s


1000

In [59]:
while True:
    t_start = time()
    df = next(df_iter)
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

    t_end = time()

    print(f'inserted another {df.shape[0]} rows in %.1f{t_end - t_start:.0f} seconds')

inserted another 100000 rows in %.1f5.4320595264434814 seconds
inserted another 100000 rows in %.1f5.358575105667114 seconds
inserted another 100000 rows in %.1f5.43644642829895 seconds
inserted another 100000 rows in %.1f5.343691110610962 seconds
inserted another 100000 rows in %.1f5.282572507858276 seconds
inserted another 100000 rows in %.1f5.297905683517456 seconds


  df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)


inserted another 100000 rows in %.1f5.507069826126099 seconds
inserted another 100000 rows in %.1f5.598464727401733 seconds
inserted another 100000 rows in %.1f5.5059826374053955 seconds
inserted another 100000 rows in %.1f5.504063844680786 seconds
inserted another 100000 rows in %.1f5.340231657028198 seconds


  df = next(df_iter)


inserted another 100000 rows in %.1f5.265003442764282 seconds
inserted another 69769 rows in %.1f3.3467750549316406 seconds


StopIteration: 

In [64]:
df = pd.read_parquet('yellow_tripdata_2021-01.parquet')

In [65]:
df.to_chunks(3)

AttributeError: 'DataFrame' object has no attribute 'to_chunks'

In [61]:
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [62]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 43.5 s, sys: 6.02 s, total: 49.5 s
Wall time: 1min 16s


769

In [63]:
import pyarrow.parquet as pq

output_name = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet"

parquet_file = pq.ParquetFile(output_name)
parquet_size = parquet_file.metadata.num_rows

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

table_name='yellow_taxi_data'

# Clear table if exists
pq.read_table(output_name).to_pandas().head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')



ArrowInvalid: Unrecognized filesystem type in URI: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet

In [None]:
# default (and max) batch size
index = 65536

for i in parquet_file.iter_batches(use_threads=True):
	t_start = time()
	print(f'Ingesting {index} out of {parquet_size} rows ({index / parquet_size:.0%})')
	i.to_pandas().to_sql(name=table_name, con=engine, if_exists='append')
	index += 65536
	t_end = time()
	print(f'\t- it took %.1f seconds' % (t_end - t_start))
