## Description

Objectives:
1. Connect to the database
2. Read CSV file
3. Create the DDL statement for creating the table in postgres
4. Create the ingestion script to ingest the data which is csv file to the postgres in chunks

In [1]:
import pandas as pd

In [2]:
!url="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-08.csv.gz" && wget -P ./data/ $url
!ls data

--2023-04-09 03:43:38--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-08.csv.gz
Resolving github.com (github.com)... ::ffff:140.82.112.3, 140.82.112.3
Connecting to github.com (github.com)|::ffff:140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/cabf77d2-e711-4bf4-a64e-aea7824bdc4e?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230409%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230409T034337Z&X-Amz-Expires=300&X-Amz-Signature=4c7436b5a2e98b44f9e49268b3a70c9a4c54c8ad2f6a659521a0ebfdf40f76ee&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dyellow_tripdata_2019-08.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-04-09 03:43:38--  https://objects.githubusercontent.com/github-production-release-

### Loading the data to pandas dataframe

After we have downloaded the file

In [3]:
import os
os.listdir("data")
test_file = os.listdir("data")[0]

test_file

'yellow_tripdata_2019-08.csv.gz'

In [4]:
pd.read_csv(f"data/{test_file}", nrows=100).head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-08-01 00:09:46,2019-08-01 00:10:36,1,0.0,1,N,145,145,2,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,1,2019-08-01 00:30:47,2019-08-01 00:31:39,1,0.0,1,N,145,145,2,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
2,1,2019-08-01 00:44:50,2019-08-01 00:46:32,1,7.1,1,N,145,145,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
3,1,2019-08-01 00:57:16,2019-08-01 00:58:20,1,0.0,1,N,145,145,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
4,1,2019-08-01 00:59:18,2019-08-01 01:04:13,1,0.0,1,N,145,145,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,0.0


## SQLALCHEMY CONNECTION

In [17]:
from sqlalchemy import create_engine
from sqlalchemy.sql import text as t

In [14]:
connection_string = "postgresql://root:root@postgres:5432/ny_taxi"
engine = create_engine(connection_string)
engine

Engine(postgresql://root:***@postgres:5432/ny_taxi)

In [20]:
# create a connection and test
connection = engine.connect()
connection.execute(t("Select 1")).fetchall()
connection.close()

## Create DDL statement for the table

In [38]:
# get the columns
cols = pd.read_csv(f"data/{test_file}", nrows=0).columns
date_cols = cols[cols.str.contains("date")].tolist()

# get the first 100 rows to infer the datatypes
data_sample = pd.read_csv(f"data/{test_file}", nrows=100, parse_dates=date_cols).head()
data_sample.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

In [70]:
with open("create_tables.sql", "w") as file:
    _ = pd.io.sql.get_schema(data_sample, "ny_taxi_data", con=engine).replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS")
    file.write(_)

In [71]:
print(_)


CREATE TABLE IF NOT EXISTS ny_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [46]:
!ls

Dockerfile  create_tables.sql  data  test_notebook.ipynb  test_pipeline.py


### Logic to load the data to the table

In [91]:
with engine.connect() as conn:
    conn.execute(t("DROP TABLE IF EXISTS ny_taxi_data"))
    conn.commit()

with engine.connect() as c:
    c.execute(t(_))
    c.commit()

In [None]:
from tqdm import tqdm

filename = f"data/{test_file}"
CHUNK_SIZE = 10000
MAX_ROWS = int(subprocess.check_output(f"wc -l data/{test_file}", shell=True).split()[0]) - 1

with tqdm(total=MAX_ROWS, desc="Chunks read: ") as bar:
    data_iterator = pd.read_csv(f"data/{test_file}", chunksize=CHUNK_SIZE, iterator=True)
    while True:
        try:
            with engine.connect() as conn:
                data = next(data_iterator)
                data.to_sql("ny_taxi_data", if_exists="append", index=False, con=conn)
                conn.commit()
            bar.update(CHUNK_SIZE)
        except IndexError:
            pass
    

Chunks read: : 3300000it [08:49, 6256.37it/s]                                                                                                                                                      