# **Ingestion Walkthrough**

In [1]:
import time
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy import Integer, String, DateTime, Numeric

Setting up postgres connection.

In [2]:
DATABASE_HOST = "postgresdb"
DATABASE_PORT = 5432
DATABASE_NAME = "taxi"
DATABASE_USERNAME = "supermaker"
DATABASE_PASSWORD = "superpasswd"

In [3]:
connection_string = f"postgresql+psycopg2://{DATABASE_USERNAME}:{DATABASE_PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/{DATABASE_NAME}"

In [4]:
engine = create_engine(connection_string)

experimenting with the data.

In [17]:
yellow_taxi_df = pd.read_csv("data/yellow_tripdata_2021-01.csv.gz", nrows=100)
yellow_taxi_df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


In [18]:
yellow_taxi_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 18 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   VendorID               100 non-null    int64  
 1   tpep_pickup_datetime   100 non-null    object 
 2   tpep_dropoff_datetime  100 non-null    object 
 3   passenger_count        100 non-null    int64  
 4   trip_distance          100 non-null    float64
 5   RatecodeID             100 non-null    int64  
 6   store_and_fwd_flag     100 non-null    object 
 7   PULocationID           100 non-null    int64  
 8   DOLocationID           100 non-null    int64  
 9   payment_type           100 non-null    int64  
 10  fare_amount            100 non-null    float64
 11  extra                  100 non-null    float64
 12  mta_tax                100 non-null    float64
 13  tip_amount             100 non-null    float64
 14  tolls_amount           100 non-null    float64
 15  improve

creating a schema with sqlalchemy dtypes

In [19]:
df_schema = {
     'tpep_pickup_datetime': DateTime,
     'tpep_dropoff_datetime': DateTime,
}

In [25]:
yellow_taxi_df_iter = pd.read_csv("data/yellow_tripdata_2021-02.csv.gz", iterator=True, chunksize=100000)

In [27]:
for index, chunk in enumerate(yellow_taxi_df_iter):
    tik = time.time()
    chunk.to_sql(name="yellow", con=engine, if_exists='append', dtype=df_schema)
    tok = time.time()
    print(f"chunk {index + 1} took {tok - tik} seconds")

In [28]:
with engine.connect() as conn:
    rows = conn.execute(text("SELECT count(1) FROM yellow;"))
    for row in rows:
        print(row)

(4566625,)


In [16]:
with engine.connect() as conn:
    rows = conn.execute(text("SELECT * FROM yellow LIMIT 10;"))
    for row in rows:
        print(row)

(0, 2, datetime.datetime(2021, 3, 1, 0, 22, 2), datetime.datetime(2021, 3, 1, 0, 23, 22), 1, 0.0, 1, 'N', 264, 264, 2, 3.0, 0.5, 0.5, 0.0, 0.0, 0.3, 4.3, 0.0)
(1, 2, datetime.datetime(2021, 3, 1, 0, 24, 48), datetime.datetime(2021, 3, 1, 0, 24, 56), 1, 0.0, 1, 'N', 152, 152, 2, 2.5, 0.5, 0.5, 0.0, 0.0, 0.3, 3.8, 0.0)
(2, 2, datetime.datetime(2021, 3, 1, 0, 25, 17), datetime.datetime(2021, 3, 1, 0, 31, 1), 1, 0.0, 1, 'N', 152, 152, 2, 3.5, 0.5, 0.5, 0.0, 0.0, 0.3, 4.8, 0.0)
(3, 1, datetime.datetime(2021, 3, 1, 0, 7, 40), datetime.datetime(2021, 3, 1, 0, 31, 23), 0, 16.5, 4, 'N', 138, 265, 1, 51.0, 0.5, 0.5, 11.65, 6.12, 0.3, 70.07, 0.0)
(4, 2, datetime.datetime(2021, 3, 1, 0, 2, 13), datetime.datetime(2021, 3, 1, 0, 6, 1), 1, 1.13, 1, 'N', 68, 264, 1, 5.5, 0.5, 0.5, 1.86, 0.0, 0.3, 11.16, 2.5)
(5, 2, datetime.datetime(2021, 3, 1, 0, 40, 16), datetime.datetime(2021, 3, 1, 0, 50, 23), 1, 2.68, 1, 'N', 239, 262, 1, 10.5, 0.5, 0.5, 4.29, 0.0, 0.3, 18.59, 2.5)
(6, 1, datetime.datetime(2021, 

In [13]:
# in case if you want to drop the ingested table and change something
with engine.connect() as conn:
    conn.execute(text("DROP TABLE yellow"))
    conn.commit()