In [1]:
import mysql.connector
import pandas as pd

In [2]:
try:
    conn = mysql.connector.connect(option_files="/etc/my.cnf",
                                  database="taxi_data")
    cursor = conn.cursor()

    cursor.execute("""
    CREATE TABLE IF NOT EXISTS dim_date (
        date_id          BIGINT       NOT NULL,
        full_date        DATETIME     NOT NULL,
        year             INT          NOT NULL,
        month            INT          NOT NULL,
        day              INT          NOT NULL,
        day_of_week      VARCHAR(50)  NOT NULL,
        hour_of_day      INT          NOT NULL,
        am_pm            VARCHAR(2)   NOT NULL,
        is_peak_hour     BOOLEAN      NOT NULL,
        minute           INT          NOT NULL,
        second           INT          NOT NULL,
        PRIMARY KEY(date_id)
    );
    """)

    cursor.execute("""
    CREATE TABLE IF NOT EXISTS dim_payment_type (
        payment_type_id           INT           NOT NULL,
        payment_type_description  VARCHAR(50)   NOT NULL,
        PRIMARY KEY(payment_type_id)
    );
    """)

    cursor.execute("""
    CREATE TABLE IF NOT EXISTS dim_ratecode (
        ratecode_id             INT           NOT NULL,
        rate_type_description   VARCHAR(50)   NOT NULL,
        PRIMARY KEY(ratecode_id)
    );
    """)

    cursor.execute("""
    CREATE TABLE IF NOT EXISTS fact_trips (
        trip_id INT AUTO_INCREMENT,
        lpep_pickup_date_id    BIGINT      NOT NULL,  -- FK to dim_date
        lpep_dropoff_date_id   BIGINT      NOT NULL,  -- FK to dim_date
        pulocationid           INT,
        dolocationid           INT,
        payment_type_id        INT         NOT NULL,  -- FK to dim_payment_type
        ratecode_id            INT,                  -- FK to dim_ratecode (optional)
        fare_amount            DECIMAL(10,2),
        total_amount           DECIMAL(10,2),
        trip_distance          DECIMAL(10,2),
        trip_duration          INT,
        tip_amount             DECIMAL(10,2),
        extra                  DECIMAL(10,2),
        fare_per_mile          FLOAT,
        PRIMARY KEY (trip_id),
        FOREIGN KEY (lpep_pickup_date_id)  REFERENCES dim_date (date_id),
        FOREIGN KEY (lpep_dropoff_date_id) REFERENCES dim_date (date_id),
        FOREIGN KEY (payment_type_id)      REFERENCES dim_payment_type (payment_type_id),
        FOREIGN KEY (ratecode_id)          REFERENCES dim_ratecode (ratecode_id)
    );
    """)

    try:
        cursor.execute("SET GLOBAL net_read_timeout=300;")
        cursor.execute("SET GLOBAL wait_timeout=300;")
    except:
        pass

    insert_dim_date = """
    INSERT INTO dim_date (
        date_id,
        full_date,
        year,
        month,
        day,
        day_of_week,
        hour_of_day,
        am_pm,
        is_peak_hour,
        minute,
        second
    )
    SELECT DISTINCT
        CAST(DATE_FORMAT(t_datetime, '%Y%m%d%H%i%s') AS UNSIGNED) AS date_id,
        t_datetime AS full_date,
        YEAR(t_datetime) AS year,
        MONTH(t_datetime) AS month,
        DAY(t_datetime) AS day,
        DAYNAME(t_datetime) AS day_of_week,
        HOUR(t_datetime) AS hour_of_day,
        CASE WHEN HOUR(t_datetime) < 12 THEN 'AM' ELSE 'PM' END AS AM_PM,
        CASE
            WHEN (HOUR(t_datetime) BETWEEN 6 AND 8)
                 OR (HOUR(t_datetime) BETWEEN 15 AND 17) THEN 1
            ELSE 0
        END AS is_peak_hour,
        MINUTE(t_datetime),
        SECOND(t_datetime)
    FROM (
        SELECT lpep_pickup_datetime AS t_datetime FROM temp_taxi_data
        UNION
        SELECT lpep_dropoff_datetime FROM temp_taxi_data
    ) AS all_datetimes
    WHERE CAST(DATE_FORMAT(t_datetime, '%Y%m%d%H%i%s') AS UNSIGNED) NOT IN (
        SELECT date_id FROM dim_date
    );
    """
    cursor.execute(insert_dim_date)
    conn.commit()

    cursor.execute("CREATE INDEX idx_full_date ON dim_date (full_date);")
    cursor.execute("CREATE INDEX idx_pickup_datetime ON temp_taxi_data (lpep_pickup_datetime);")
    cursor.execute("CREATE INDEX idx_dropoff_datetime ON temp_taxi_data (lpep_dropoff_datetime);")

    insert_dim_payment_type = """
    INSERT INTO dim_payment_type (payment_type_id, payment_type_description)
    SELECT
        ROW_NUMBER() OVER() AS payment_type_id,
        CASE
            WHEN payment_type = 1 THEN 'Credit Card'
            WHEN payment_type = 2 THEN 'Cash'
            WHEN payment_type = 3 THEN 'No Charge'
            WHEN payment_type = 4 THEN 'Dispute'
            WHEN payment_type = 5 THEN 'Unknown'
            WHEN payment_type = 6 THEN 'Voided trip'
            ELSE 'Other'
        END AS payment_type_description
    FROM (SELECT DISTINCT payment_type FROM temp_taxi_data) AS unique_payment_types;
    """
    cursor.execute(insert_dim_payment_type)
    conn.commit()

    insert_dim_ratecode = """
    INSERT INTO dim_ratecode (ratecode_id, rate_type_description)
    SELECT DISTINCT
        RatecodeID,
        CASE
            WHEN RatecodeID = 1 THEN 'Standard'
            WHEN RatecodeID = 2 THEN 'JFK'
            WHEN RatecodeID = 3 THEN 'Newark'
            WHEN RatecodeID = 4 THEN 'Nassau or Westchester'
            WHEN RatecodeID = 5 THEN 'Negotiated fare'
            WHEN RatecodeID = 6 THEN 'Group ride'
            ELSE 'Other'
        END AS rate_type_description
    FROM temp_taxi_data
    WHERE RatecodeID IS NOT NULL;
    """
    cursor.execute(insert_dim_ratecode)
    conn.commit()

    insert_fact_trips = """
    INSERT INTO fact_trips (
        lpep_pickup_date_id, lpep_dropoff_date_id, pulocationid, dolocationid,
        payment_type_id, ratecode_id, fare_amount, total_amount,
        trip_distance, trip_duration, tip_amount, extra, fare_per_mile
    )
    SELECT
        d1.date_id AS lpep_pickup_date_id,
        d2.date_id AS lpep_dropoff_date_id,
        t.PULocationID,
        t.DOLocationID,
        t.payment_type AS payment_type_id,
        t.RatecodeID AS ratecode_id,
        t.fare_amount,
        t.total_amount,
        t.trip_distance,
        (t.trip_duration / 1000000000) / 60 AS trip_duration,
        t.tip_amount,
        t.extra,
        CASE WHEN t.trip_distance > 0 THEN t.fare_amount / t.trip_distance ELSE NULL END AS fare_per_mile
    FROM temp_taxi_data t
    JOIN dim_date d1 ON d1.full_date = t.lpep_pickup_datetime
    JOIN dim_date d2 ON d2.full_date = t.lpep_dropoff_datetime;
    """
    cursor.execute(insert_fact_trips)
    conn.commit()

    print("ETL Process completed successfully!")

except mysql.connector.Error as err:
    print(f"Error: {err}")
finally:
    if conn.is_connected():
        cursor.close()
        conn.close()

ETL Process completed successfully!
