# NYC Taxi Data Engineer Project <a name="overview"></a>

## Overview 
This project aims to develop a dimensional model for `NYC Taxi` dataset. By structuring the data into a dimensional model, perform data normalization, ingest data into `SQL Alchemy` tables.


- **Dateset**: [Yellow Tripdata 2024-Feb](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
- **Data Dictionary**: [Yello Tripdata](https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf)
- **Data Wrangling**: `Pandas` for transforming and shaping data.
- **Database Management**: `SQLAlchemy` for ORM-based interactions with SQL Server.

## Contributors
- [![LinkedIn](https://img.shields.io/badge/-Ziyad_Alshawi-blue?style=flat&logo=LinkedIn)](https://www.linkedin.com/in/zalshawi)
- [![LinkedIn](https://img.shields.io/badge/-Abdullah_Alqahtani-blue?style=flat&logo=LinkedIn)](https://www.linkedin.com/in/Abdullah-alqahtani-987b69197) 

# Table of Contents

- [Overview](#overview)
- [Data Preparation](#data_preparation)
- [Data Normalization](#data_normalization)
- [Define SQL Alchemy schema](#sqlalchemy)
- [Insert Data into SQL Database](#Insert_data)


## Data Preparation<a name="data_preparation"></a>

#### Load libraries


In [1]:
import pandas as pd
import urllib
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, DATETIME
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

#### Load Data


In [2]:
# Relative path for dataset
file_path = 'data/yellow_tripdata_2024-02.parquet'

# Load file into a DataFrame
df = pd.read_parquet(file_path)

# Display the few rows
df.head(3)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,2,2024-02-01 00:04:45,2024-02-01 00:19:58,1.0,4.39,1.0,N,68,236,1,20.5,1.0,0.5,1.28,0.0,1.0,26.78,2.5,0.0
1,2,2024-02-01 00:56:31,2024-02-01 01:10:53,1.0,7.71,1.0,N,48,243,1,31.0,1.0,0.5,9.0,0.0,1.0,45.0,2.5,0.0
2,2,2024-02-01 00:07:50,2024-02-01 00:43:12,2.0,28.69,2.0,N,132,261,2,70.0,0.0,0.5,0.0,6.94,1.0,82.69,2.5,1.75


#### Data Cleaning and Preprocessing


In [3]:
# Remove duplicate if exist
df.drop_duplicates()

# Convert to datetime
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

# Calculate the duration by subtracting pickup datetime from dropoff datetime
df['trip_duration'] = df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
df['trip_duration'] = df['trip_duration'].apply(lambda x: x.total_seconds())


## Data Normalization <a name="data_normalization"></a>

Split dataframe into fact table and dimensions.

![data_model](data_model.png)

#### Define Dimension DataFrames

In [4]:
dim_datetime_columns = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_duration']

dim_datetime = df[dim_datetime_columns].drop_duplicates()

# Add foregin Key
dim_datetime['datetime_id'] = range(1, len(dim_datetime) + 1)


In [5]:
# Data Dectionary contain additional context

# Create Dim_Payment 
payment = {
    'payment_type': [1, 2, 3, 4, 5, 6], 
    'payment_type_name':['Credit card', 'Cash', 'No charge','Dispute', 'Unknown','Voided trip']
}


# Create Dim_Payment

rate_code = { 
    'RatecodeID': [1, 2, 3, 4, 5, 6], 
    'rate_code_name' : ["Standard rate", "JFK", "Newark", "Nassau or Westchester", "Negotiated fare", "Group ride"],
    'rate_code_description' : [
        "The default fare charged for trips within the standard service area.",
        "Applies to trips to or from John F. Kennedy International Airport.",
        "Applies to trips to or from Newark Liberty International Airport.",
        "Applies to trips to areas outside the standard service area.",
        "Special rates agreed upon between the passenger and the service provider.",
        "Applies to trips where multiple passengers share the ride, typically resulting in a discounted fare per person."
    ]
}

dim_payment = pd.DataFrame(payment)
dim_rate = pd.DataFrame(rate_code)

#### Define Fact DataFrame

In [6]:
# Create Fact Table for trips
fact_table = df.merge(dim_datetime, how='left', on=['tpep_pickup_datetime','tpep_dropoff_datetime', 'trip_duration']) \
               .merge(dim_payment, how='left', on='payment_type') \
               .merge(dim_rate, how='left', on='RatecodeID') \
               [['VendorID', 'datetime_id', 'payment_type',  'RatecodeID',
                 'PULocationID', 'DOLocationID','passenger_count', 'trip_distance', 'store_and_fwd_flag', 
                  'fare_amount', 'extra','mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
                  'total_amount', 'congestion_surcharge', 'Airport_fee']]


## Define SQLAlchemy Tables <a name="sqlalchemy"></a>

In [7]:
Base = declarative_base()

class FactTable(Base):
    __tablename__ = 'fact_table'
    Trip_id = Column(Integer, primary_key=True)
    VendorID = Column(Integer)
    datetime_id = Column(Integer, ForeignKey('dim_datetime.Datetime_id'))
    payment_type = Column(Integer, ForeignKey('dim_payment.payment_type'))
    RatecodeID = Column(Integer, ForeignKey('dim_rate.RatecodeID'))
    PULocationID = Column(Integer)
    DOLocationID = Column(Integer)
    trip_distance = Column(Float)
    passenger_count = Column(Integer)
    store_and_fwd_flag = Column(String)
    fare_amount = Column(Float)
    extra = Column(Float)
    mta_tax = Column(Float)
    tip_amount = Column(Float)
    tolls_amount = Column(Float)
    improvement_surcharge = Column(Float)
    congestion_surcharge = Column(Float)
    Airport_fee = Column(Float)
    total_amount = Column(Float)

class DimDatetime(Base):
    __tablename__ = 'dim_datetime'
    Datetime_id = Column(Integer, primary_key=True)
    tpep_pickup_datetime = Column(DateTime)
    tpep_dropoff_datetime = Column(DateTime)
    trip_duration = Column(Float)

class DimPayment(Base):
    __tablename__ = 'dim_payment'
    payment_type = Column(Integer, primary_key=True)
    payment_type_name = Column(String)

class DimRate(Base):
    __tablename__ = 'dim_rate'
    RatecodeID = Column(Integer, primary_key=True)
    rate_code_name = Column(String)
    rate_code_description = Column(String)


#### Prepare connection

Connect to SQL Server (Optional)

In [None]:
# params = urllib.parse.quote_plus(f'DRIVER=ODBC Driver 17 for SQL Server;SERVER={server};DATABASE={database};UID={username};PWD={password}')

# # Create engine
# engine = create_engine(f'mssql+pyodbc:///?odbc_connect={params}', echo=True, future=True)

# # Generate table if not exist
# Base.metadata.create_all(engine)

Connect to SQLite

In [8]:
# Create an engine to the database
engine = create_engine('sqlite:///NYC_taxi.db', echo=True, future=True)

# Generate table if not exist
Base.metadata.create_all(engine)

2024-04-28 19:16:13,226 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-04-28 19:16:13,229 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("fact_table")
2024-04-28 19:16:13,230 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-04-28 19:16:13,234 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("fact_table")
2024-04-28 19:16:13,234 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-04-28 19:16:13,235 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("dim_datetime")
2024-04-28 19:16:13,236 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-04-28 19:16:13,237 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("dim_datetime")
2024-04-28 19:16:13,237 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-04-28 19:16:13,238 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("dim_payment")
2024-04-28 19:16:13,239 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-04-28 19:16:13,240 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("dim_payment")
2024-04-28 19:16:13,241 INFO sqlalchemy.en

## Insert Data into SQL Database <a name="Insert_data"></a>

#### Create helper function to insert data

In [9]:
# Create a function to insert into SQLAlchemy table
def bulk_insert_from_dataframe(session, model, dataframe):
    """ Inserts data from a pandas DataFrame into a SQLAlchemy table using bulk_insert_mappings."""
    try:
        # Convert DataFrame to dictionary and perform bulk insert
        data_to_insert = dataframe.to_dict(orient='records')
        session.bulk_insert_mappings(model, data_to_insert)
        session.commit()
        print(f"Data successfully inserted into {model.__tablename__} table.")
    except Exception as e:
        session.rollback()
        print(f"An error occurred: {e}")


#### Ingest data to SQL tables

In [10]:
# Create session
Session = sessionmaker(bind=engine, future=True)
session = Session()

# Example of inserting data into tables
bulk_insert_from_dataframe(session, FactTable, fact_table)
bulk_insert_from_dataframe(session, DimDatetime, dim_datetime)
bulk_insert_from_dataframe(session, DimPayment, dim_payment)
bulk_insert_from_dataframe(session, DimRate, dim_rate)


# Close the session after all operations
session.close()

2024-04-28 19:18:06,479 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-04-28 19:19:21,768 INFO sqlalchemy.engine.Engine INSERT INTO fact_table ("VendorID", datetime_id, payment_type, "RatecodeID", "PULocationID", "DOLocationID", trip_distance, passenger_count, store_and_fwd_flag, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, congestion_surcharge, "Airport_fee", total_amount) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
2024-04-28 19:19:21,838 INFO sqlalchemy.engine.Engine [generated in 34.27708s] [(2, 1, 1, 1.0, 68, 236, 4.39, 1.0, 'N', 20.5, 1.0, 0.5, 1.28, 0.0, 1.0, 2.5, 0.0, 26.78), (2, 2, 1, 1.0, 48, 243, 7.71, 1.0, 'N', 31.0, 1.0, 0.5, 9.0, 0.0, 1.0, 2.5, 0.0, 45.0), (2, 3, 2, 2.0, 132, 261, 28.69, 2.0, 'N', 70.0, 0.0, 0.5, 0.0, 6.94, 1.0, 2.5, 1.75, 82.69), (1, 4, 1, 1.0, 161, 163, 1.1, 1.0, 'N', 9.3, 3.5, 0.5, 2.85, 0.0, 1.0, 2.5, 0.0, 17.15), (1, 5, 2, 1.0, 246, 79, 2.6, 1.0, 'N', 15.6, 3.5, 0.5, 0.0, 0.0, 1.0, 2.5, 0.0, 20.

#### Ingest data to SQL Server (optional)

In [None]:
# Base.metadata.create_all(engine)


# # Create session
# Session = sessionmaker(bind=engine, future=True)
# session = Session()

# # Example of inserting data into tables
# bulk_insert_from_dataframe(session, FactTable, fact_table)
# bulk_insert_from_dataframe(session, DimDatetime, dim_datetime)
# bulk_insert_from_dataframe(session, DimPayment, dim_payment)
# bulk_insert_from_dataframe(session, DimRate, dim_rate)


# # Close the session after all operations
# session.close()

2024-04-28 16:34:01,489 INFO sqlalchemy.engine.Engine SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)
2024-04-28 16:34:01,490 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-04-28 16:34:01,501 INFO sqlalchemy.engine.Engine SELECT schema_name()
2024-04-28 16:34:01,502 INFO sqlalchemy.engine.Engine [generated in 0.00090s] ()
2024-04-28 16:34:01,527 INFO sqlalchemy.engine.Engine SELECT CAST('test max support' AS NVARCHAR(max))
2024-04-28 16:34:01,528 INFO sqlalchemy.engine.Engine [generated in 0.00090s] ()
2024-04-28 16:34:01,533 INFO sqlalchemy.engine.Engine SELECT 1 FROM fn_listextendedproperty(default, default, default, default, default, default, default)
2024-04-28 16:34:01,534 INFO sqlalchemy.engine.Engine [generated in 0.00090s] ()
2024-04-28 16:34:01,560 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-04-28 16:34:01,563 INFO sqlalchemy.engine.Engine SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE ([INFORMATION_SCHEMA].[TABLE

## Future Work
- Implement data quality checks
- Implement log table to monitor ingestion process