In [24]:
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
from time import time

In [8]:
PGS_SCHEMA = 'pipeline'
HOST = "127.0.0.1"
PORT = "5432"
USER = "postgres"
PASSWORD = "postgres" 
DATABASE = "postgres"
# define schema
OPTIONS = "-c search_path=pipeline"

In [9]:
def get_engine(user=USER, password=PASSWORD, host=HOST, port=PORT, db=DATABASE):
    url = 'postgresql://{}:{}@{}:{}/{}'
    url = url.format(user, password, host, port, db)
    engine = create_engine(url)
    
    return engine

In [15]:
df = pd.read_csv('../csv-data/yellow_tripdata_2021-01.csv', nrows=100)
# convert to datetime type
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [13]:
engine = get_engine()

In [16]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [18]:
"""日常数据分析工作中，难免碰到数据量特别大的情况，动不动就2、3千万行，如果直接读进 Python 内存中，且不说内存够不够，
读取的时间和后续的处理操作都很费劲。
Pandas 的 read_csv 函数提供2个参数：chunksize、iterator ，可实现按行多次读取文件，避免内存不足情况。"""

df_iter = pd.read_csv('../csv-data/yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)

In [19]:
df = next(df_iter)

In [20]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [21]:
# get header as column names
df.head(n=0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [22]:
df.head(n=0).to_sql(name='yellow_taxi_data', schema='pipeline', con=engine, if_exists='replace')

0

In [23]:
%time df.to_sql(name='yellow_taxi_data', schema='pipeline', con=engine, if_exists='append')

Wall time: 8.27 s


1000

In [25]:
while True: 
    t_start = time()

    df = next(df_iter)

    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    
    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

    t_end = time()

    print('inserted another chunk, took %.3f second' % (t_end - t_start))

inserted another chunk, took 9.563 second
inserted another chunk, took 8.559 second
inserted another chunk, took 8.534 second
inserted another chunk, took 9.232 second
inserted another chunk, took 8.988 second
inserted another chunk, took 7.985 second
inserted another chunk, took 8.507 second
inserted another chunk, took 8.180 second
inserted another chunk, took 7.654 second
inserted another chunk, took 7.681 second
inserted another chunk, took 8.343 second


  df = next(df_iter)


inserted another chunk, took 8.470 second
inserted another chunk, took 6.984 second


StopIteration: 

In [28]:
df_zones = pd.read_csv('../csv-data/taxi+_zone_lookup.csv')

In [29]:
df_zones.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [30]:
df_zones.to_sql(name='zones', schema='pipeline', con=engine, if_exists='replace')

265