# **NYC Taxi Data Engineering Pipeline (Yellow Taxi Trip Records)**

This project outlines the steps needed to build a comprehensive data engineering pipeline using NYC taxi data from the years 2022 and 2023. This pipeline will involve extracting, transforming, and loading (ETL) data into a Snowflake database followed by creating a dashboard for visualization. The goal is to consolidate, clean, transform, and store large volumes of taxi trip data in a Snowflake database and to create a dashboard for visualizing insights from the data. The dataset comprises monthly PARQUET files containing detailed records of yellow taxi trips in New York City. The pipeline will automate the process of merging these files, cleaning and transforming the data, and loading it into a database for easy querying and analysis.

## Problem Statement

The primary challenge is to efficiently handle and process large volumes of taxi trip data stored in monthly PARQUET files. The project must address the following issues:

- **Data Consolidation:** Merging monthly files into a single comprehensive dataset for each year.
- **Data Cleaning:** Ensuring the data is free from errors, missing values and inconsistencies.
- **Data Transformation:** Structuring the data to be suitable for analysis and visualisation.
- **Database Storage:** Efficiently storing the cleaned and transformed data in a Snowflake database.
- **Testing with Another Year (2023):** Once the pipeline is established for 2022, it can be tested with the 2023 data for validation.
- **Visualization:** Creating a dashboard to provide insights and facilitate decision-making based on the taxi trip data.

## Key Steps in the Project

### Step 1: Tools and Environment Setup

1. **Install Necessary Python Libraries:**
   - Install `pandas`, `pyarrow`, `snowflake-connector-python` and `snowflake.sqlalchemy`
   - Set up Snowflake for data storage and database management.
   - Configure the working environment using VS Code.

3. **Configure Working Environment:**
   - Use VS Code for writing and running scripts.
   - Set up a virtual environment for dependency management.

### Step 2: Extract and Transform data from Parquet files into a Pandas DataFrame

1. **Define Parquet File Paths:**
   - Defined the paths to the Parquet files for each month of 2022

2. **Read and Merge Parquet Files:**
   - Parquet file using pyarrow and appending the resulting DataFrames to a list (data_frames)
   - Concatenate these DataFrames into a single DataFrame (all_data)

3. **Transform Data:**
   - Dropping rows with missing data (dropna())
   - Renaming columns (rename())
   - Converting date/time columns to datetime objects (pd.to_datetime())
   - Calculating trip duration in minutes (dt.total_seconds() / 60)

### Step 3: Database Storage

1. **Database Configuration:**
   - Connecting Snowflake with python connector 
   - Create tables in Snowflake using query executor

2. **Load Data into Snowflake Database:**
   - Use Snowflake SQLAlchemy's ` and pandas dataframe while making sure of the data intergrity 

### Step 4: Testing with Another Year (2023)

1. **Repeat Data Processing:**
   - Apply the same data extraction, cleaning and transformation steps for the 2023 dataset.
   - Load the 2023 data into Snowflake to validate the pipeline.

### Step 5: Visualisation

1. **Create Dashboards:**
   - Use tools like Power BI to connect to the Snowflake database.
   - Develop visualisations to display key metrics such as total rides per day, revenue and popular pickup and drop-off locations.

By following these steps, you will build a robust and efficient data engineering pipeline for NYC Taxi data, enabling comprehensive analysis and visualisation of taxi trip records.


In [1]:
# Importing libraries
import pandas as pd
import os
import snowflake.connector
import pyarrow.parquet as pq
from snowflake.connector.pandas_tools import write_pandas

print("All libraries installed and imported successfully!")

All libraries installed and imported successfully!


In [2]:
# Extract data from Parquet files like shown in the code below 
parquet_files = [
    r"2022\yellow_tripdata_2022-01.parquet",
    r"2022\yellow_tripdata_2022-02.parquet",
    r"2022\yellow_tripdata_2022-03.parquet",
    r"2022\yellow_tripdata_2022-04.parquet",
    r"2022\yellow_tripdata_2022-05.parquet",
    r"2022\yellow_tripdata_2022-06.parquet",
    r"2022\yellow_tripdata_2022-07.parquet",
    r"2022\yellow_tripdata_2022-08.parquet",
    r"2022\yellow_tripdata_2022-09.parquet",
    r"2022\yellow_tripdata_2022-10.parquet",
    r"2022\yellow_tripdata_2022-11.parquet",
    r"2022\yellow_tripdata_2022-12.parquet"
]

data_frames = []

for file in parquet_files:
    df = pq.read_table(file).to_pandas()
    data_frames.append(df)

all_data = pd.concat(data_frames, ignore_index=True)

# Transform the data
all_data = all_data.dropna()  # Remove rows with missing data
all_data = all_data.rename(columns={'tpep_pickup_datetime': 'pickup_datetime', 'tpep_dropoff_datetime': 'dropoff_datetime'})
all_data['pickup_datetime'] = pd.to_datetime(all_data['pickup_datetime'])
all_data['dropoff_datetime'] = pd.to_datetime(all_data['dropoff_datetime'])
all_data['trip_duration'] = all_data['dropoff_datetime'] - all_data['pickup_datetime']
all_data['trip_duration_minutes'] = all_data['trip_duration'].dt.total_seconds() / 60

In [3]:
# Snowflake connection parameters
snowflake_account = 'pvb86519.us-east-1'
snowflake_user = 'Elfeenah413'
snowflake_password = '@Nafeesah413'
snowflake_warehouse = 'NYC_DATA_PIPELINE'
snowflake_database = 'NYC_TAXI_DB'  # Specify the desired database name
snowflake_schema = 'PUBLIC'  # Specify the desired Schema name

# Create a Snowflake connection
conn = snowflake.connector.connect(
    user=snowflake_user,
    password=snowflake_password,
    account=snowflake_account,
    warehouse=snowflake_warehouse,
    database=snowflake_database,
    schema=snowflake_schema
)

# Create a cursor
cur = conn.cursor()

In [4]:
# Table creation query (adjust columns and types as per your DataFrame)

# Create the database if it doesn't exist
cur.execute(f"CREATE DATABASE IF NOT EXISTS {snowflake_database}")

# Switch to the specified database
cur.execute(f"USE DATABASE {snowflake_database}")

# Create schema if it doesn't exist
cur.execute(f"CREATE SCHEMA IF NOT EXISTS {snowflake_schema}")

# Use the specified schema
cur.execute(f"USE SCHEMA {snowflake_schema}")

# Create the table if it doesn't exist
cur.execute("""
    CREATE TABLE IF NOT EXISTS taxi_trips (
        pickup_datetime TIMESTAMP,
        dropoff_datetime TIMESTAMP,
        passenger_count INTEGER,
        trip_distance FLOAT,
        store_and_fwd_flag STRING,
        payment_type INTEGER,
        fare_amount FLOAT,
        extra FLOAT,
        mta_tax FLOAT,
        tip_amount FLOAT,
        tolls_amount FLOAT,
        congestion_surcharge FLOAT,
        improvement_surcharge FLOAT,
        airport_fee FLOAT,
        total_amount FLOAT,
        VendorID FLOAT,
        RatecodeID FLOAT,
        PULocationID FLOAT,
        DOLocationID FLOAT,
        trip_duration FLOAT,
        trip_duration_minutes FLOAT
    )
""")


<snowflake.connector.cursor.SnowflakeCursor at 0x1f1d9825520>

In [5]:
# Clear existing data if needed
# cur.execute("TRUNCATE TABLE taxi_trips")

# Begin a transaction for data loading
cur.execute("BEGIN")

# Reset the index of the DataFrame
all_data.reset_index(drop=True, inplace=True)

# Load data from DataFrame to Snowflake table using write_pandas
success, nchunks, nrows, _ = write_pandas(conn, all_data, 'TAXI_TRIPS', quote_identifiers=False)

if success:
    # Commit the transaction if the loading was successful
    cur.execute("COMMIT")
    print(f"Successfully loaded {nrows} rows into Snowflake.")
else:
    # Rollback the transaction if the loading failed
    cur.execute("ROLLBACK")
    print("Loading into Snowflake failed.")

# Close the cursor and connection
cur.close()
conn.close()


Successfully loaded 38287795 rows into Snowflake.


### **To process the data for the year 2023 and load it into Snowflake** ###

In [14]:
import pandas as pd
import pyarrow.parquet as pq

# List of local paths to 2023 Parquet files
parquet_files_2023 = [
    r"2023\yellow_tripdata_2023-01.parquet",
    r"2023\yellow_tripdata_2023-02.parquet",
    r"2023\yellow_tripdata_2023-03.parquet",
    r"2023\yellow_tripdata_2023-04.parquet",
    r"2023\yellow_tripdata_2023-05.parquet",
    r"2023\yellow_tripdata_2023-06.parquet",
    r"2023\yellow_tripdata_2023-07.parquet",
    r"2023\yellow_tripdata_2023-08.parquet",
    r"2023\yellow_tripdata_2023-09.parquet",
    r"2023\yellow_tripdata_2023-10.parquet",
    r"2023\yellow_tripdata_2023-11.parquet",
    r"2023\yellow_tripdata_2023-12.parquet"
]

data_frames = []

for file in parquet_files:
    df = pq.read_table(file).to_pandas()
    data_frames.append(df)

all_data = pd.concat(data_frames, ignore_index=True)

# Transform the data
all_data = all_data.dropna()  # Remove rows with missing data
all_data = all_data.rename(columns={'tpep_pickup_datetime': 'pickup_datetime', 'tpep_dropoff_datetime': 'dropoff_datetime'})
all_data['pickup_datetime'] = pd.to_datetime(all_data['pickup_datetime'])
all_data['dropoff_datetime'] = pd.to_datetime(all_data['dropoff_datetime'])
all_data['trip_duration'] = all_data['dropoff_datetime'] - all_data['pickup_datetime']
all_data['trip_duration_minutes'] = all_data['trip_duration'].dt.total_seconds() / 60

In [15]:
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
import pyarrow.parquet as pq

# Snowflake connection parameters
snowflake_account = 'pvb86519.us-east-1'
snowflake_user = 'Elfeenah413'
snowflake_password = '@Nafeesah413'
snowflake_warehouse = 'NYC_DATA_PIPELINE'
snowflake_database = 'NYC_TAXI_DB'
snowflake_schema = 'PUBLIC'

# Create a Snowflake connection
conn = snowflake.connector.connect(
    user=snowflake_user,
    password=snowflake_password,
    account=snowflake_account,
    warehouse=snowflake_warehouse,
    database=snowflake_database,
    schema=snowflake_schema
)

# Create a cursor
cur = conn.cursor()

In [16]:
# Table creation query (adjust columns and types as per your DataFrame)
# Create the database if it doesn't exist
cur.execute(f"CREATE DATABASE IF NOT EXISTS {snowflake_database}")

# Switch to the specified database
cur.execute(f"USE DATABASE {snowflake_database}")

# Create schema if it doesn't exist
cur.execute(f"CREATE SCHEMA IF NOT EXISTS {snowflake_schema}")

# Use the specified schema
cur.execute(f"USE SCHEMA {snowflake_schema}")

# Create the table if it doesn't exist
cur.execute("""
    CREATE TABLE IF NOT EXISTS taxi_trips (
        pickup_datetime TIMESTAMP,
        dropoff_datetime TIMESTAMP,
        passenger_count INTEGER,
        trip_distance FLOAT,
        store_and_fwd_flag STRING,
        payment_type INTEGER,
        fare_amount FLOAT,
        extra FLOAT,
        mta_tax FLOAT,
        tip_amount FLOAT,
        tolls_amount FLOAT,
        congestion_surcharge FLOAT,
        improvement_surcharge FLOAT,
        airport_fee FLOAT,
        total_amount FLOAT,
        VendorID FLOAT,
        RatecodeID FLOAT,
        PULocationID FLOAT,
        DOLocationID FLOAT,
        trip_duration FLOAT,
        trip_duration_minutes FLOAT
    )
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x1f20806e870>

In [17]:
# Load data into Snowflake table using write_pandas
try:
    success, nchunks, nrows, _ = write_pandas(conn, all_data_2023, 'TAXI_TRIPS_2023', quote_identifiers=False)

    if success:
        # Commit the transaction if the loading was successful
        cur.execute("COMMIT")
        print(f"Data loaded successfully {nrows} rows into TAXI_TRIPS_2023.")
    else:
        # Rollback the transaction if the loading failed
        cur.execute("ROLLBACK")
        print("Loading data into TAXI_TRIPS_2023 failed.")

except snowflake.connector.errors.ProgrammingError as e:
    print(f"Snowflake ProgrammingError: {e}")

finally:
    # Close the cursor and connection in the finally block
    cur.close()
    conn.close()

Snowflake ProgrammingError: 002025 (42S21): SQL compilation error:
duplicate column name 'AIRPORT_FEE'


In [None]:
# Clear existing data if needed
# cur.execute("TRUNCATE TABLE taxi_trips")

# Begin a transaction for data loading
cur.execute("BEGIN")

# Reset the index of the DataFrame
all_data.reset_index(drop=True, inplace=True)

# Load data from DataFrame to Snowflake table using write_pandas
success, nchunks, nrows, _ = write_pandas(conn, all_data, 'TAXI_TRIPS', quote_identifiers=False)

if success:
    # Commit the transaction if the loading was successful
    cur.execute("COMMIT")
    print(f"Successfully loaded {nrows} rows into Snowflake.")
else:
    # Rollback the transaction if the loading failed
    cur.execute("ROLLBACK")
    print("Loading into Snowflake failed.")

# Close the cursor and connection
cur.close()
conn.close()

In [11]:
# Execute your SQL query
query = "SELECT * FROM TAXI_TRIPS LIMIT 500"  # Example: Fetch 100 rows
cur.execute(query)

# Fetch data into a Pandas DataFrame
df = cur.fetch_pandas_all()

# Close the connection
cur.close()
conn.close()

print(df.head()) 

               PICKUP_DATETIME             DROPOFF_DATETIME  PASSENGER_COUNT  \
0 -22205-03-19 10:44:09.906176 -22171-02-02 03:10:49.906176                2   
1 -22209-07-04 05:44:09.906176 -22193-06-24 13:44:09.906176                1   
2 -22172-11-01 12:57:29.906176 -22155-11-19 09:24:09.906176                1   
3 -22225-08-07 01:17:29.906176 -22206-09-03 15:30:49.906176                1   
4 -22203-05-14 11:37:29.906176 -22132-09-24 06:10:49.906176                1   

   TRIP_DISTANCE STORE_AND_FWD_FLAG  PAYMENT_TYPE  FARE_AMOUNT  EXTRA  \
0           3.80                  N             1         14.5    3.0   
1           2.10                  N             1          8.0    0.5   
2           0.97                  N             1          7.5    0.5   
3           1.09                  N             2          8.0    0.5   
4           4.30                  N             1         23.5    0.5   

   MTA_TAX  TIP_AMOUNT  ...  CONGESTION_SURCHARGE  IMPROVEMENT_SURCHARGE  \
0   