# ETL Process to Build a Data Pipeline

##### Step 1: Extract data from the XLSX file and load it into a Pandas DataFrame
##### Step 2: Transform the data (handle duplicates, missing values, calculations)
##### Step 3: Load the transformed data into the PostgreSQL database and apply SQL queries for sales analysis

In [1]:
# Import libraries
import pandas as pd  # Data manipulation
from dotenv import load_dotenv  # Load environment variables
import os  # Interaction with the operating system
from sqlalchemy import create_engine  # SQL database connection
import time  # Time measurement
import psutil  # System resource monitoring (CPU and memory)

##### Step 1: Extract data from the xlsx file and load it into a Pandas DataFrame

In [2]:
# Load the Excel file
df = pd.read_excel('restaurant_sales_2.xlsx')

# Define the chunk size if working with datasets containing hundreds of thousands or millions of rows
# chunk_size = 100000

In [3]:
# Display the first 6 rows of the DataFrame to quickly inspect the data
df.head(6)  

Unnamed: 0,sale_id,order_id,item_id,quantity,unit_price,food_cost,total_amount,total_amount_discounted,item_name,item_size,...,delivery_datetime,delivery_hour,prep_time,customer_satisfaction,total_service_time,promo_id,discount_percent,discount_name,staff_id,staff_name
0,1,1,SCC,4,48.0,110.25,1241.0,1116.9,Stone Crab Claws,Regular,...,2023-01-01 12:00:36,12:00:36,19,5,32,1,0.1,Promotion Q1,3,John Smith
1,2,1,SCC,4,48.0,110.25,1241.0,1116.9,Stone Crab Claws,Regular,...,2023-01-01 12:15:40,12:15:40,15,5,28,1,0.1,Promotion Q1,4,Mary Johnson
2,3,1,SCC,4,48.0,110.25,1241.0,1116.9,Stone Crab Claws,Regular,...,2023-01-01 12:26:40,12:26:40,26,4,39,1,0.1,Promotion Q1,3,John Smith
3,4,2,KLP,3,18.5,110.25,930.75,837.68,Key Lime Pie,Slice,...,2023-01-01 12:23:40,12:23:40,23,5,36,1,0.1,Promotion Q1,1,Anna Williams
4,5,2,KLP,3,18.5,110.25,930.75,837.68,Key Lime Pie,Slice,...,2023-01-01 12:16:40,12:16:40,16,5,29,1,0.1,Promotion Q1,2,Louis Brown
5,6,2,KLP,3,18.5,110.25,930.75,837.68,Key Lime Pie,Slice,...,2023-01-01 12:16:40,12:16:40,16,5,29,1,0.1,Promotion Q1,1,Anna Williams


##### Step 2: Transform the data (duplicates, missing values, calculations)

In [4]:
def apply_transformations(df):
    # 1. Convert 'order_datetime' and 'delivery_datetime' to datetime format
    df['order_datetime'] = pd.to_datetime(df['order_datetime'], errors='coerce')
    df['delivery_datetime'] = pd.to_datetime(df['delivery_datetime'], errors='coerce')
    
    # 2. Calculate 'prep_time' based on the difference between order and delivery
    df['prep_time'] = (df['delivery_datetime'] - df['order_datetime']).dt.total_seconds() / 60.0
    df['prep_time'] = df['prep_time'] - 3  # Adjust by 3 minutes if needed

    # 3. Convert numeric columns to float and round
    df[['unit_price', 'food_cost', 'total_amount', 'total_amount_discounted']] = \
        df[['unit_price', 'food_cost', 'total_amount', 'total_amount_discounted']].astype(float).round(2)
    
    # 4. Convert 'quantity' to int
    df['quantity'] = df['quantity'].astype(int)
    
    # 5. Recalculate 'total_amount' if necessary
    df['total_amount'] = (df['unit_price'] * df['quantity']).round(2)
    
    # 6. Ensure 'food_cost' is calculated safely
    df['food_cost'] = (df['unit_price'] - 10).round(2)  # Example calculation
    
    # 7. Calculate total_amount_discounted
    df['total_amount_discounted'] = (df['total_amount'] * (1 - df['discount_percent'])).round(2)
    
    # 8. Split DataFrame into fact and dimension tables
    
    # Fact table: fact_sales
    fact_sales = df[['sale_id', 'order_id', 'order_datetime', 'estimated_delivery_time',
                     'delivery_datetime', 'total_service_time',
                     'restaurant_id', 'staff_id', 'payment_id', 'promo_id',
                     'item_id', 'quantity', 'unit_price', 'food_cost', 
                     'total_amount', 'total_amount_discounted']]
    
    # Dimension tables
    
    # Dim_Items
    dim_items = df[['item_id', 'item_name', 'item_size', 'category', 'ingredients']].drop_duplicates()
    
    # Dim_Restaurants
    dim_restaurants = df[['restaurant_id', 'restaurant_name', 'restaurant_location']].drop_duplicates()
    
    # Dim_Staff
    dim_staff = df[['staff_id', 'staff_name', 'restaurant_id']].drop_duplicates()
    
    # Dim_Payments
    dim_payments = df[['payment_id', 'payment_method']].drop_duplicates()
    
    # Dim_Promotions
    dim_promotions = df[['promo_id', 'discount_percent', 'discount_name']].drop_duplicates()
    
    # Dim_Customer_Satisfaction
    dim_customer_satisfaction = df[['sale_id', 'prep_time', 'customer_satisfaction']].drop_duplicates()
    
    # Return all tables
    return fact_sales, dim_items, dim_restaurants, dim_staff, dim_payments, dim_promotions, dim_customer_satisfaction


##### Step 3: Load the transformed data into the PostgreSQL database and execute SQL queries for sales analysis

In [5]:
!pip install psycopg2-binary



In [6]:
#pip install psycopg2-binary

# Load environment variables from the .env file to securely manage credentials and configurations
dotenv_path = '.env'  # Define the path to the .env file where sensitive variables are stored
load_dotenv(dotenv_path)  # Load environment variables into the current execution environment

# Retrieve database connection credentials from environment variables
PG_USER = os.getenv('PG_USER')      # PostgreSQL username
PG_PASSWORD = os.getenv('PG_PASSWORD')  # PostgreSQL password
PG_HOST = os.getenv('PG_HOST')      # Database host address
PG_PORT = os.getenv('PG_PORT')      # Database connection port
PG_DATABASE = os.getenv('PG_DATABASE')  # Database name

# Create a connection to the PostgreSQL database using SQLAlchemy
engine = create_engine(f'postgresql://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DATABASE}')

In [7]:
# Measure memory usage before loading the dataset
print(f"Memory used before loading the dataset: {psutil.Process().memory_info().rss / (1024 ** 2):.2f} MB")

try:
    start_time = time.time()  # Start the total timer

    # Load the full dataset (without chunks)
    df = pd.read_excel('restaurant_sales_2.xlsx') 

    print(f"Memory used after loading the dataset: {psutil.Process().memory_info().rss / (1024 ** 2):.2f} MB")

    # Call the transformation function to split into tables
    fact_sales, dim_items, dim_restaurants, dim_staff, dim_payments, dim_promotions, dim_satisfaction = apply_transformations(df)

    # Save each DataFrame separately to the database

    # ⚠️ WARNING: Using 'append' can duplicate records if this ETL is executed multiple times.
    # This may inflate KPIs (revenue, profit, total orders). 
    # Consider using 'replace' during development or implement primary keys/upsert logic in production.

    fact_sales.to_sql('Fact_Sales', con=engine, if_exists='replace', index=False)
    dim_items.to_sql('Dim_Items', con=engine, if_exists='replace', index=False)
    dim_restaurants.to_sql('Dim_Restaurants', con=engine, if_exists='replace', index=False)
    dim_staff.to_sql('Dim_Staff', con=engine, if_exists='replace', index=False)
    dim_payments.to_sql('Dim_Payments', con=engine, if_exists='replace', index=False)
    dim_promotions.to_sql('Dim_Promotions', con=engine, if_exists='replace', index=False)
    dim_satisfaction.to_sql('Dim_Satisfaction', con=engine, if_exists='replace', index=False)

    # Measure memory usage after processing
    print(f"Memory used after processing: {psutil.Process().memory_info().rss / (1024 ** 2):.2f} MB")

    # Measure total processing time
    total_time = time.time() - start_time
    print(f"Total processing time: {total_time:.2f} seconds")
    print("Process completed successfully!")

except Exception as e:
    print(f"Error while processing the data: {e}")


Memory used before loading the dataset: 288.68 MB
Memory used after loading the dataset: 316.23 MB
Memory used after processing: 358.86 MB
Total processing time: 37.73 seconds
Process completed successfully!
