# import library

In [1]:
import pandas as pd
import sqlalchemy as sql
from time import time

In [2]:
df = pd.read_csv("../csv/yellow_tripdata_2021-01.csv",nrows=100)

In [3]:
pd.__version__

'2.2.2'

# Check schema first

In [6]:
print(pd.io.sql.get_schema(df,"show schema"))

CREATE TABLE "show schema" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


# Transform Date to timestamp because type is object python. we transform it as datetime64

In [22]:
df.tpep_pickup_datetime

0     2021-01-01 00:30:10
1     2021-01-01 00:51:20
2     2021-01-01 00:43:30
3     2021-01-01 00:15:48
4     2021-01-01 00:31:49
             ...         
95    2021-01-01 00:12:41
96    2021-01-01 00:23:29
97    2021-01-01 00:46:17
98    2021-01-01 00:28:16
99    2021-01-01 00:42:35
Name: tpep_pickup_datetime, Length: 100, dtype: object

In [23]:
df.tpep_dropoff_datetime

0     2021-01-01 00:36:12
1     2021-01-01 00:52:19
2     2021-01-01 01:11:06
3     2021-01-01 00:31:01
4     2021-01-01 00:48:21
             ...         
95    2021-01-01 00:26:47
96    2021-01-01 00:35:03
97    2021-01-01 00:54:25
98    2021-01-01 00:51:44
99    2021-01-01 00:54:41
Name: tpep_dropoff_datetime, Length: 100, dtype: object

In [28]:
pd.to_datetime(df.tpep_dropoff_datetime)

0    2021-01-01 00:36:12
1    2021-01-01 00:52:19
2    2021-01-01 01:11:06
3    2021-01-01 00:31:01
4    2021-01-01 00:48:21
             ...        
95   2021-01-01 00:26:47
96   2021-01-01 00:35:03
97   2021-01-01 00:54:25
98   2021-01-01 00:51:44
99   2021-01-01 00:54:41
Name: tpep_dropoff_datetime, Length: 100, dtype: datetime64[ns]

In [30]:
pd.to_datetime(df.tpep_pickup_datetime)

0    2021-01-01 00:30:10
1    2021-01-01 00:51:20
2    2021-01-01 00:43:30
3    2021-01-01 00:15:48
4    2021-01-01 00:31:49
             ...        
95   2021-01-01 00:12:41
96   2021-01-01 00:23:29
97   2021-01-01 00:46:17
98   2021-01-01 00:28:16
99   2021-01-01 00:42:35
Name: tpep_pickup_datetime, Length: 100, dtype: datetime64[ns]

In [39]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

# Get schema and create with name yellow_taxi_data

In [115]:
print(pd.io.sql.get_schema(df,"yellow_taxi_data"))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


# Create engine to database

In [6]:
engine = sql.create_engine("postgresql://saputra:indonesia11@0.0.0.0:5430/taxi")

In [7]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x70267fdd9ac0>

# Debug Query Create table

In [8]:
print(pd.io.sql.get_schema(df,"yellow_taxi_data",con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TEXT, 
	tpep_dropoff_datetime TEXT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




# Create chunk dataframe

In [112]:
df_iter = pd.read_csv("yellow_tripdata_2021-01.csv",iterator=True,chunksize=100000,low_memory=False)

# Get chunk data

In [113]:
df = next(df_iter)

In [108]:
len(df)

100000

In [95]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

# Get header from dataframe

In [114]:
df.head(0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


# Create Table

In [106]:
df.head(0).to_sql(name="yellow_taxi_data",con=engine,if_exists='replace')

0

# Insert data

In [75]:
%time df.to_sql(name="yellow_taxi_data",con=engine,if_exists='append')

CPU times: user 4.52 s, sys: 95.8 ms, total: 4.62 s
Wall time: 6.36 s


1000

# Make pipeline with approach chunk data

In [116]:
while True:
    start_time = time()
    
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    
    df.to_sql(name="yellow_taxi_data",con=engine,if_exists='append')

    end_time = time()

    df = next(df_iter)
    print("inserted antoher chunk, took %.3f second" % (end_time - start_time))
    

inserted antoher chunk, took 4.813 second
inserted antoher chunk, took 5.326 second
inserted antoher chunk, took 5.409 second
inserted antoher chunk, took 5.377 second
inserted antoher chunk, took 5.217 second
inserted antoher chunk, took 5.277 second
inserted antoher chunk, took 5.465 second
inserted antoher chunk, took 4.999 second
inserted antoher chunk, took 5.700 second
inserted antoher chunk, took 5.496 second
inserted antoher chunk, took 5.196 second
inserted antoher chunk, took 5.533 second
inserted antoher chunk, took 5.347 second


StopIteration: 

# Read data with sql

In [9]:
query = """ 
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND
schemaname != 'information_schema';
"""

pd.read_sql(query,con=engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,yellow_taxi_data,saputra,,True,False,False,False


In [10]:
query = """ 
SELECT *
FROM yellow_taxi_data
limit 10;
"""

pd.read_sql(query,con=engine)

Unnamed: 0,index,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,100000,2,2021-01-04 14:42:51,2021-01-04 14:51:18,1,1.43,1,N,170,161,2,7.5,0.0,0.5,0.0,0.0,0.3,10.8,2.5
1,100001,2,2021-01-04 14:04:39,2021-01-04 14:18:41,1,2.82,1,N,170,143,2,12.0,0.0,0.5,0.0,0.0,0.3,15.3,2.5
2,100002,1,2021-01-04 14:12:49,2021-01-04 14:31:21,0,2.7,1,N,68,239,1,13.5,2.5,0.5,3.35,0.0,0.3,20.15,2.5
3,100003,1,2021-01-04 14:43:55,2021-01-04 14:48:45,1,0.7,1,N,246,68,2,5.5,2.5,0.5,0.0,0.0,0.3,8.8,2.5
4,100004,1,2021-01-04 14:59:16,2021-01-04 15:07:08,1,1.6,1,N,161,234,1,8.0,2.5,0.5,2.25,0.0,0.3,13.55,2.5
5,100005,2,2021-01-04 14:19:25,2021-01-04 14:27:25,1,2.04,1,N,48,234,1,8.5,0.0,0.5,2.36,0.0,0.3,14.16,2.5
6,100006,1,2021-01-04 14:05:13,2021-01-04 14:09:53,1,0.9,1,N,161,229,1,5.5,2.5,0.5,1.75,0.0,0.3,10.55,2.5
7,100007,1,2021-01-04 14:13:43,2021-01-04 14:25:17,1,2.4,1,N,233,90,1,10.5,2.5,0.5,2.5,0.0,0.3,16.3,2.5
8,100008,1,2021-01-04 14:39:19,2021-01-04 14:44:26,1,0.8,1,N,113,79,2,5.0,2.5,0.5,0.0,0.0,0.3,8.3,2.5
9,100009,1,2021-01-04 14:49:23,2021-01-04 15:08:45,1,4.3,1,N,79,236,1,16.5,2.5,0.5,2.0,0.0,0.3,21.8,2.5


In [11]:
query = """ 
SELECT COUNT(1)
FROM yellow_taxi_data;
"""

pd.read_sql(query,con=engine)

Unnamed: 0,count
0,1469765
