In [9]:
import pandas as pd
import pyarrow.parquet as pq
from sqlalchemy import create_engine
import sys
import os

try:
    # 0. Print current working directory (for verification)
    print("Current working directory:", os.getcwd())  # Double-check this!

    # 1. Load Parquet Data
    parquet_file = 'yellow_tripdata_2021-01.parquet'  # Replace with your Parquet file name
    df = pq.read_table(parquet_file).to_pandas()  # Load Parquet file
    print(f"Parquet file {parquet_file} loaded successfully.")

    # 2. Data Transformation (Convert to datetime)
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

    # 3. Load Location ID data (Borough, Zone, Service Zone)
    zones = pd.read_csv('taxi_zone_lookup.csv')  # Assuming taxi_zone_lookup.csv is in the same directory
    print("Taxi zone lookup data loaded successfully.")

    # 4. Merge Location IDs (Pickup)
    df = df.merge(zones, left_on='PULocationID', right_on='LocationID', how='left')
    df.rename(columns={'Borough': 'PULocationBorough', 'Zone': 'PULocationZone'}, inplace=True)
    df.drop(columns=['LocationID'], inplace=True)

    # 5. Merge Location IDs (Dropoff)
    df = df.merge(zones, left_on='DOLocationID', right_on='LocationID', how='left')
    df.rename(columns={'Borough': 'DOLocationBorough', 'Zone': 'DOLocationZone'}, inplace=True)
    df.drop(columns=['LocationID'], inplace=True)

    # 6. Connect to PostgreSQL
    engine = create_engine('postgresql://root:bmaleh@localhost:5432/ny_taxi')  # Correct credentials

    # 7. Load Data to PostgreSQL (Create table if it does not exist)
    table_name = 'yellow_tripdata_trip'  # Your table name
    df.to_sql(name=table_name, con=engine, index=False, if_exists='replace')  # Load data
    print(f"Data loaded to PostgreSQL table {table_name} successfully.")

    print("Data upload finished successfully.")

except FileNotFoundError as e:
    print(f"A file error occurred: {e}")
    sys.exit(1)
except Exception as e:
    print(f"A general error occurred: {e}")
    sys.exit(1)




Current working directory: C:\Users\ADMIN\code\data-engineering-zoomcamp-course\myjupyterproject
Parquet file yellow_tripdata_2021-01.parquet loaded successfully.
Taxi zone lookup data loaded successfully.
Data loaded to PostgreSQL table yellow_tripdata_trip successfully.
Data upload finished successfully.
