In [1]:
import pandas as pd
from sqlalchemy import create_engine
import time

# Параметры
csv_file = 'green_tripdata_2019-10.csv'
engine = create_engine("postgresql://root:root@localhost:5433/ny_taxi")
table_name = 'green_taxi_data'

# Создаём итератор по CSV-файлу, читаем по 100000 строк за раз.
df_iter = pd.read_csv(csv_file, iterator=True, chunksize=100000)

# Считываем первый чанк
df = next(df_iter)
# Приводим столбцы к датам/времени
df['lpep_pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
df['lpep_dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])

# Печатаем схему (не обязательно, но полезно посмотреть)
print(pd.io.sql.get_schema(df, name=table_name, con=engine))

# Сначала создаём структуру таблицы (если надо пересоздать):
df.head(0).to_sql(name=table_name, con=engine, if_exists='replace')

# Загружаем первый чанк
df.to_sql(name=table_name, con=engine, if_exists='append')
print(f"Inserted first chunk with {len(df)} rows.")

# Обрабатываем остальные чанки в цикле
while True:
    try:
        t_start = time.time()
        
        df = next(df_iter)
        df['lpep_pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
        df['lpep_dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
        
        df.to_sql(name=table_name, con=engine, if_exists='append')
        
        t_end = time.time()
        print(f"Inserted another chunk with {len(df)} rows, "
              f"took {t_end - t_start:.3f} seconds.")

    except StopIteration:
        print("All chunks inserted, no more data left.")
        break



CREATE TABLE green_taxi_data (
	"VendorID" BIGINT, 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" BIGINT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type BIGINT, 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53)
)


Inserted first chunk with 100000 rows.
Inserted another chunk with 100000 rows, took 12.526 seconds.
Inserted another chunk with 100000 rows, took 12.592 seconds.


  df = next(df_iter)


Inserted another chunk with 100000 rows, took 12.806 seconds.
Inserted another chunk with 76386 rows, took 8.299 seconds.
All chunks inserted, no more data left.


In [1]:
import pandas as pd
from sqlalchemy import create_engine
import sys
print(sys.executable)
from time import time

engine = create_engine("postgresql://root:root@localhost:5433/ny_taxi")
engine.connect()

/usr/local/opt/python@3.9/bin/python3.9


<sqlalchemy.engine.base.Connection at 0x1139a9550>

In [4]:
!ls

Dockerfile                 taxi_zone_lookup.csv
green_tripdata_2019-10.csv upload-data.ipynb
[34mny_taxi_postgres_data[m[m


In [7]:
taxi_zone =  pd.read_csv('taxi_zone_lookup.csv')
print(taxi_zone.shape)

(265, 4)


In [9]:
taxi_zone.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [10]:
print(pd.io.sql.get_schema(taxi_zone, name = 'taxi_zone'))

CREATE TABLE "taxi_zone" (
"LocationID" INTEGER,
  "Borough" TEXT,
  "Zone" TEXT,
  "service_zone" TEXT
)


In [11]:
taxi_zone.to_sql(name = 'taxi_zone', con = engine, if_exists = 'replace')

265

In [16]:
df_green = pd.read_csv('green_tripdata_2019-10.csv')
print(df_green.shape)
df_green.lpep_pickup_datetime = pd.to_datetime(df_green['lpep_pickup_datetime'])
df_green.lpep_dropoff_datetime = pd.to_datetime(df_green['lpep_dropoff_datetime'])
df_green.head()

  df_green = pd.read_csv('green_tripdata_2019-10.csv')


(476386, 20)


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2019-10-01 00:26:02,2019-10-01 00:39:58,N,1.0,112,196,1.0,5.88,18.0,0.5,0.5,0.0,0.0,,0.3,19.3,2.0,1.0,0.0
1,1.0,2019-10-01 00:18:11,2019-10-01 00:22:38,N,1.0,43,263,1.0,0.8,5.0,3.25,0.5,0.0,0.0,,0.3,9.05,2.0,1.0,0.0
2,1.0,2019-10-01 00:09:31,2019-10-01 00:24:47,N,1.0,255,228,2.0,7.5,21.5,0.5,0.5,0.0,0.0,,0.3,22.8,2.0,1.0,0.0
3,1.0,2019-10-01 00:37:40,2019-10-01 00:41:49,N,1.0,181,181,1.0,0.9,5.5,0.5,0.5,0.0,0.0,,0.3,6.8,2.0,1.0,0.0
4,2.0,2019-10-01 00:08:13,2019-10-01 00:17:56,N,1.0,97,188,1.0,2.52,10.0,0.5,0.5,2.26,0.0,,0.3,13.56,1.0,1.0,0.0


In [17]:
print(pd.io.sql.get_schema(df_green, name = 'green-taxi-data'))

CREATE TABLE "green-taxi-data" (
"VendorID" REAL,
  "lpep_pickup_datetime" TIMESTAMP,
  "lpep_dropoff_datetime" TIMESTAMP,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" REAL,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" REAL,
  "trip_type" REAL,
  "congestion_surcharge" REAL
)


In [18]:
df_iter = pd.read_csv('green_tripdata_2019-10.csv', iterator = True, chunksize = 100000)

df_iter = pd.read_csv(
    'green_tripdata_2019-10.csv',
    iterator=True,
    chunksize=100000,
    parse_dates=['lpep_pickup_datetime', 'lpep_dropoff_datetime'],
    low_memory=False  
)


df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df['lpep_pickup_datetime'])
df.tpep_dropoff_datetime = pd.to_datetime(df['lpep_dropoff_datetime'])

len(df)

  df.tpep_pickup_datetime = pd.to_datetime(df['lpep_pickup_datetime'])
  df.tpep_dropoff_datetime = pd.to_datetime(df['lpep_dropoff_datetime'])


100000

In [19]:
df.to_sql(name = 'green_taxi_data', con = engine, if_exists = 'append')

1000

In [20]:
from time import time

In [21]:
while True:
    
    t_start = time()
    
    df = next(df_iter)
    df.tpep_pickup_datetime = pd.to_datetime(df['lpep_pickup_datetime'])
    df.tpep_dropoff_datetime = pd.to_datetime(df['lpep_dropoff_datetime'])
    df.to_sql(name = 'green_taxi_data', con = engine, if_exists = 'append')
    
    t_end = time()
    
    print ('inserted another chunk... , took %.3f seconds' % (t_end - t_start))


  df.tpep_pickup_datetime = pd.to_datetime(df['lpep_pickup_datetime'])
  df.tpep_dropoff_datetime = pd.to_datetime(df['lpep_dropoff_datetime'])


inserted another chunk... , took 12.730 seconds


  df.tpep_pickup_datetime = pd.to_datetime(df['lpep_pickup_datetime'])
  df.tpep_dropoff_datetime = pd.to_datetime(df['lpep_dropoff_datetime'])


inserted another chunk... , took 12.464 seconds


  df.tpep_pickup_datetime = pd.to_datetime(df['lpep_pickup_datetime'])
  df.tpep_dropoff_datetime = pd.to_datetime(df['lpep_dropoff_datetime'])


inserted another chunk... , took 12.539 seconds


  df.tpep_pickup_datetime = pd.to_datetime(df['lpep_pickup_datetime'])
  df.tpep_dropoff_datetime = pd.to_datetime(df['lpep_dropoff_datetime'])


inserted another chunk... , took 8.296 seconds


StopIteration: 