### Data Ingestion Script

In [1]:
# importing libraries
import pandas as pd
import sqlalchemy
from sqlalchemy import *
import datetime
from time import time

In [2]:
# Connecting to postgresql
engine = create_engine('postgresql://postgres:postgres@localhost:5433/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x20da6de56d0>

In [3]:
sqlcon=engine.connect()

In [4]:
# Reading the first 1000 row for green_taxi_trips 
green_taxi_df = pd.read_csv("green_taxi_trips.csv.gz",compression='gzip', nrows=100000)

In [5]:
green_taxi_df.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2019-10-01 00:26:02,2019-10-01 00:39:58,N,1,112,196,1,5.88,18.0,0.5,0.5,0.0,0.0,,0.3,19.3,2,1.0,0.0
1,1,2019-10-01 00:18:11,2019-10-01 00:22:38,N,1,43,263,1,0.8,5.0,3.25,0.5,0.0,0.0,,0.3,9.05,2,1.0,0.0
2,1,2019-10-01 00:09:31,2019-10-01 00:24:47,N,1,255,228,2,7.5,21.5,0.5,0.5,0.0,0.0,,0.3,22.8,2,1.0,0.0
3,1,2019-10-01 00:37:40,2019-10-01 00:41:49,N,1,181,181,1,0.9,5.5,0.5,0.5,0.0,0.0,,0.3,6.8,2,1.0,0.0
4,2,2019-10-01 00:08:13,2019-10-01 00:17:56,N,1,97,188,1,2.52,10.0,0.5,0.5,2.26,0.0,,0.3,13.56,1,1.0,0.0


In [6]:
print(pd.io.sql.get_schema(green_taxi_df, "green_taxi_trips"))

CREATE TABLE "green_taxi_trips" (
"VendorID" INTEGER,
  "lpep_pickup_datetime" TEXT,
  "lpep_dropoff_datetime" TEXT,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" INTEGER,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" INTEGER,
  "trip_type" REAL,
  "congestion_surcharge" REAL
)


In [7]:
green_taxi_df.lpep_pickup_datetime = pd.to_datetime(green_taxi_df.lpep_pickup_datetime)
green_taxi_df.lpep_dropoff_datetime = pd.to_datetime(green_taxi_df.lpep_dropoff_datetime)

In [8]:
print(pd.io.sql.get_schema(green_taxi_df, "green_taxi_trips"))

CREATE TABLE "green_taxi_trips" (
"VendorID" INTEGER,
  "lpep_pickup_datetime" TIMESTAMP,
  "lpep_dropoff_datetime" TIMESTAMP,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" INTEGER,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" INTEGER,
  "trip_type" REAL,
  "congestion_surcharge" REAL
)


In [9]:
# Start to insert data
green_taxi_df.head(n=0).to_sql("green_taxi_trips", con=sqlcon, if_exists='replace', index=False)

0

In [10]:
# Inserting using itterator
green_taxi_df_itter = pd.read_csv("green_taxi_trips.csv.gz",compression='gzip', iterator=True, chunksize=100000)

In [11]:
green_taxi_df = next(green_taxi_df_itter)

In [12]:
len(green_taxi_df)

100000

In [13]:
green_taxi_df.lpep_pickup_datetime = pd.to_datetime(green_taxi_df.lpep_pickup_datetime)
green_taxi_df.lpep_dropoff_datetime = pd.to_datetime(green_taxi_df.lpep_dropoff_datetime)

In [14]:
%time green_taxi_df.to_sql("green_taxi_trips", con=sqlcon, if_exists='replace', index=False)

CPU times: total: 6.08 s
Wall time: 13.9 s


1000

In [15]:
n = 1
while True:
    s_time = time()
    green_taxi_df = next(green_taxi_df_itter)
    green_taxi_df.lpep_pickup_datetime = pd.to_datetime(green_taxi_df.lpep_pickup_datetime)
    green_taxi_df.lpep_dropoff_datetime = pd.to_datetime(green_taxi_df.lpep_dropoff_datetime)
    try:
        green_taxi_df.to_sql("green_taxi_trips", con=sqlcon, if_exists='append', index=False)
        e_time = time()
        t_time = e_time - s_time
        print(f"Inserted chunk{n}, took: {t_time}")
    except StopIteration:
        # End of the iterator, all chunks processed
        print("Inserted all chunks")
        break
    except Exception as e:
        # Catch any exceptions and print the error message
        print(f"Error inserting chunk {n}: {e}")
        break
    n += 1
print("Inserted all chunks")        

Inserted chunk1, took: 12.710885763168335
Inserted chunk2, took: 12.053579807281494


  green_taxi_df = next(green_taxi_df_itter)


Inserted chunk3, took: 12.760829210281372
Inserted chunk4, took: 9.926883697509766


StopIteration: 

In [17]:
# Inserting taxi_zone lookup csv
tax_zone_lookup_df = pd.read_csv("taxi_zone_lookup.csv")

In [18]:
tax_zone_lookup_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 265 entries, 0 to 264
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   LocationID    265 non-null    int64 
 1   Borough       265 non-null    object
 2   Zone          264 non-null    object
 3   service_zone  263 non-null    object
dtypes: int64(1), object(3)
memory usage: 8.4+ KB


In [19]:
print(pd.io.sql.get_schema(tax_zone_lookup_df, "tax_zone_lookup"))

CREATE TABLE "tax_zone_lookup" (
"LocationID" INTEGER,
  "Borough" TEXT,
  "Zone" TEXT,
  "service_zone" TEXT
)


In [20]:
tax_zone_lookup_df.head(n=0).to_sql("taxi_zone_lookup", con=sqlcon, if_exists='replace', index=False)

0

In [21]:
tax_zone_lookup_df.to_sql("taxi_zone_lookup", con=sqlcon, if_exists='append', index=False)

265