## **Import Library**

In [1]:
# import Library used
import pandas as pd
from sqlalchemy import create_engine
import configparser
from time import time
import warnings
warnings.filterwarnings('ignore')

## **Load dataset**

In [2]:
# load dataset with 100 rows
df = pd.read_csv('yellow_tripdata_2022-01.csv', nrows=100)

In [3]:
# show head of data
df.head()

Unnamed: 0.1,Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2.0,3.8,1.0,N,142,236,1,14.5,3.0,0.5,3.65,0.0,0.3,21.95,2.5,0.0
1,1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1.0,2.1,1.0,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.3,13.3,0.0,0.0
2,2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1.0,0.97,1.0,N,166,166,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56,0.0,0.0
3,3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1.0,1.09,1.0,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5,0.0
4,4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1.0,4.3,1.0,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.3,30.3,2.5,0.0


In [4]:
# show information of data
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 20 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Unnamed: 0             100 non-null    int64  
 1   VendorID               100 non-null    int64  
 2   tpep_pickup_datetime   100 non-null    object 
 3   tpep_dropoff_datetime  100 non-null    object 
 4   passenger_count        100 non-null    float64
 5   trip_distance          100 non-null    float64
 6   RatecodeID             100 non-null    float64
 7   store_and_fwd_flag     100 non-null    object 
 8   PULocationID           100 non-null    int64  
 9   DOLocationID           100 non-null    int64  
 10  payment_type           100 non-null    int64  
 11  fare_amount            100 non-null    float64
 12  extra                  100 non-null    float64
 13  mta_tax                100 non-null    float64
 14  tip_amount             100 non-null    float64
 15  tolls_a

cleaning data with dropped columns unnamed, and change dtype of columns tpep_pickup_datetime and tpep_dropoff_datetime

In [5]:
# drop columns unnamed
df = df.drop(columns='Unnamed: 0')

In [6]:
# convert to date time
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])

In [7]:
# convert to date time
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

In [8]:
#show dtypes of data
df.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
airport_fee                     float64
dtype: object

## **Create Connections to postgres**

In [9]:
# get config file
config = configparser.ConfigParser()
config.read('config.cfg')

host= config.get('SQL', 'HOST')
user= config.get('SQL', 'USER')
pwd= config.get('SQL', 'PASSWORD')
port= config.get('SQL', 'PORT')
database= config.get('SQL', 'DB_NAME')

In [10]:
# create connections
engine = create_engine(f"postgresql://{user}:{pwd}@{host}:{port}/{database}")

In [11]:
# test connections
engine.connect()

<sqlalchemy.engine.base.Connection at 0x1faf87a1a60>

In [12]:
# get schema data
print (pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




```bash
SELECT 
   *
FROM 
   pg_catalog.pg_tables
where 
    schemaname != 'pg_catalog' AND 
    schemaname != 'information_schema';
    
```

In [13]:
# Query shema and table in database
query = '''
SELECT 
   *
FROM 
   pg_catalog.pg_tables
where 
    schemaname != 'pg_catalog' AND 
    schemaname != 'information_schema';
'''

pd.read_sql(query, con=engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity


## **Ingest Data into database**

In [14]:
# load all dataset
data_iter = pd.read_csv('yellow_tripdata_2022-01.csv', iterator=True, chunksize=100000)

In [15]:
# data is next iteroows
data = next(data_iter)

In [16]:
# show head of data
data.head()

Unnamed: 0.1,Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2.0,3.8,1.0,N,142,236,1,14.5,3.0,0.5,3.65,0.0,0.3,21.95,2.5,0.0
1,1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1.0,2.1,1.0,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.3,13.3,0.0,0.0
2,2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1.0,0.97,1.0,N,166,166,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56,0.0,0.0
3,3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1.0,1.09,1.0,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5,0.0
4,4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1.0,4.3,1.0,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.3,30.3,2.5,0.0


In [17]:
# show shape of data
data.shape

(100000, 20)

In [18]:
# drop columns unnamed
data.drop(columns='Unnamed: 0', inplace=True)

In [19]:
# Change dtypes
data['tpep_pickup_datetime'] = pd.to_datetime(data['tpep_pickup_datetime'])
data['tpep_dropoff_datetime'] = pd.to_datetime(data['tpep_dropoff_datetime'])

In [20]:
# show columns name
df.head(0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee


In [21]:
# insert col to database
df.head(0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [22]:
%time data.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: total: 4.67 s
Wall time: 15.4 s


1000

In [23]:
while True:
    try:
        t_start = time()
        
        data = next(data_iter)
        data.drop(columns='Unnamed: 0', inplace=True)
        data['tpep_pickup_datetime'] = pd.to_datetime(data['tpep_pickup_datetime'])
        data['tpep_dropoff_datetime'] = pd.to_datetime(data['tpep_dropoff_datetime'])
        
        data.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
        
        t_end = time()
        
        print('inserted another chunk, took %.3f second' % (t_end - t_start))
    
    Exception StopIteration:
        print('all data complete ingest to potgres')

inserted another chunk, took 22.068 second
inserted another chunk, took 18.563 second
inserted another chunk, took 26.361 second
inserted another chunk, took 25.208 second
inserted another chunk, took 29.293 second
inserted another chunk, took 22.221 second
inserted another chunk, took 22.190 second
inserted another chunk, took 16.314 second
inserted another chunk, took 16.731 second
inserted another chunk, took 17.227 second
inserted another chunk, took 19.473 second
inserted another chunk, took 14.909 second
inserted another chunk, took 16.158 second
inserted another chunk, took 14.897 second
inserted another chunk, took 19.041 second
inserted another chunk, took 15.254 second
inserted another chunk, took 16.954 second
inserted another chunk, took 16.794 second
inserted another chunk, took 27.340 second
inserted another chunk, took 32.381 second
inserted another chunk, took 30.767 second
inserted another chunk, took 30.795 second
inserted another chunk, took 28.773 second
inserted an

StopIteration: 

## **Query data inserted from database**

In [24]:
query = '''
select count(*) from yellow_taxi_data ;
'''

pd.read_sql(query, con=engine)

Unnamed: 0,count
0,2463931
