# NY_TAXI Data Pipeline
  - Download the datasets from the source URLs
  - Load the datasets into a usable format (ie. Pandas Dataframe)
  - Clean up and transform the data to be ready for export into the database
  - Set up a connection with the database
  - Generate the appropriate schema, if required, for the new tables
  - Iteratively chunk and load the transformed data into the database
  - Close the database connection and perform clean up.

### Download the datasets


In [1]:
import subprocess

def download_csv(url, csv_name):
    try:
        subprocess.run(["wget", url, "-O", csv_name], check=True)
    except subprocess.CalledProcessError as e:
        print(f"Error downloading file: {e}")
        raise

In [2]:
green_taxi_csv = "green-taxi-tripdata.csv.gz"
taxi_zone_csv = "taxi_zone_lookup.csv"

In [4]:
download_csv(url="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz", csv_name=green_taxi_csv)

--2025-01-20 19:54:44--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e-555c-4bd0-ae73-43051d8e7c0b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250120%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250120T135445Z&X-Amz-Expires=300&X-Amz-Signature=089215cdb1938473faee1b34560f7082be2a51cb976939edcee5b831948896c4&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dgreen_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2025-01-20 19:54:45--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/ea580e9e-555c-4bd0-ae73-43051d8e7c0b?X-A

In [5]:
download_csv(url="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv", csv_name=taxi_zone_csv)

--2025-01-20 19:54:54--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250120%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250120T135454Z&X-Amz-Expires=300&X-Amz-Signature=41a84f47533bd5f6fc6281e1406740384ab2957d0d3897c5bd4b59891003ffd6&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2025-01-20 19:54:55--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-H

## Load Datasets

In [7]:
import pandas as pd

In [8]:
pd.read_csv(green_taxi_csv, compression="gzip")

  pd.read_csv(green_taxi_csv, compression="gzip")


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2019-10-01 00:26:02,2019-10-01 00:39:58,N,1.0,112,196,1.0,5.88,18.00,0.50,0.5,0.00,0.00,,0.3,19.30,2.0,1.0,0.0
1,1.0,2019-10-01 00:18:11,2019-10-01 00:22:38,N,1.0,43,263,1.0,0.80,5.00,3.25,0.5,0.00,0.00,,0.3,9.05,2.0,1.0,0.0
2,1.0,2019-10-01 00:09:31,2019-10-01 00:24:47,N,1.0,255,228,2.0,7.50,21.50,0.50,0.5,0.00,0.00,,0.3,22.80,2.0,1.0,0.0
3,1.0,2019-10-01 00:37:40,2019-10-01 00:41:49,N,1.0,181,181,1.0,0.90,5.50,0.50,0.5,0.00,0.00,,0.3,6.80,2.0,1.0,0.0
4,2.0,2019-10-01 00:08:13,2019-10-01 00:17:56,N,1.0,97,188,1.0,2.52,10.00,0.50,0.5,2.26,0.00,,0.3,13.56,1.0,1.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
476381,,2019-10-31 23:30:00,2019-11-01 00:00:00,,,65,102,,7.04,29.57,2.75,0.5,0.00,0.00,,0.0,32.82,,,
476382,,2019-10-31 23:03:00,2019-10-31 23:24:00,,,129,136,,0.00,39.83,2.75,0.5,0.00,6.12,,0.0,49.20,,,
476383,,2019-10-31 23:02:00,2019-10-31 23:23:00,,,61,222,,3.90,23.11,2.75,0.5,0.00,0.00,,0.0,26.36,,,
476384,,2019-10-31 23:42:00,2019-10-31 23:56:00,,,76,39,,3.08,15.23,2.75,0.5,0.00,0.00,,0.0,18.48,,,


In [9]:
df_iter = pd.read_csv(green_taxi_csv, compression="gzip", chunksize=50000, iterator=True)

In [10]:
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x11f88d6a0>

In [11]:
df = next(df_iter)

In [12]:
print(pd.io.sql.get_schema(df, name="green_taxi"))

CREATE TABLE "green_taxi" (
"VendorID" INTEGER,
  "lpep_pickup_datetime" TEXT,
  "lpep_dropoff_datetime" TEXT,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" INTEGER,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" INTEGER,
  "trip_type" INTEGER,
  "congestion_surcharge" REAL
)


### Connect Database

In [14]:

from sqlalchemy.engine.url import URL
from sqlalchemy import create_engine

def get_db_engine(user, password, host, port, db):
    db_url = URL.create(
        drivername = "postgresql",
        username=user,
        password=password,
        host=host,
        port=port,
        database=db
    )
    return create_engine(db_url)

In [15]:
engine = get_db_engine(user="postgres", password="postgres", host="localhost", port="5433", db="ny_taxi")

In [16]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x16a538830>

In [17]:
from toolz import curry, pipe

In [18]:
df.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2019-10-01 00:26:02,2019-10-01 00:39:58,N,1,112,196,1,5.88,18.0,0.5,0.5,0.0,0.0,,0.3,19.3,2,1,0.0
1,1,2019-10-01 00:18:11,2019-10-01 00:22:38,N,1,43,263,1,0.8,5.0,3.25,0.5,0.0,0.0,,0.3,9.05,2,1,0.0
2,1,2019-10-01 00:09:31,2019-10-01 00:24:47,N,1,255,228,2,7.5,21.5,0.5,0.5,0.0,0.0,,0.3,22.8,2,1,0.0
3,1,2019-10-01 00:37:40,2019-10-01 00:41:49,N,1,181,181,1,0.9,5.5,0.5,0.5,0.0,0.0,,0.3,6.8,2,1,0.0
4,2,2019-10-01 00:08:13,2019-10-01 00:17:56,N,1,97,188,1,2.52,10.0,0.5,0.5,2.26,0.0,,0.3,13.56,1,1,0.0


In [None]:
@curry
def transform(df):
    df = df.assign(
        lpep_pickup_datetime = pd.to_datetime(df['lpep_pickup_datetime']),
        lpep_dropoff_datetime = pd.to_datetime(df["lpep_dropoff_datetime"])
    )