In [None]:
import pandas as pd
import psycopg2
from psycopg2 import sql
import numpy as np 
import psycopg2.extras as extras

CONFIGURATION

In [None]:
CSV_FILE = '../data/sales.csv'
DB_NAME = 'sales_db'
DB_USER = '<user_name>'
DB_PASSWORD = '<password>' 
DB_HOST = 'localhost'
DB_PORT = '5432'
TARGET_TABLE = 'etl_transformed_sales'

DB CONNECTION

In [None]:
#Establish Connection to PostgreSQL Database
try:
    conn = psycopg2.connect(
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT
    )
    print(f"Successfully connected to PostgreSQL database: {DB_NAME}")

except psycopg2.Error as e:
    print(f"Database connection failed. Error: {e}")
    print("Please check your database service and credentials.")

CREATE TABLE

In [None]:
# Create the target table structure in PostgreSQL if it doesn't exist
create_table_query = sql.SQL(f"""
        CREATE TABLE IF NOT EXISTS {TARGET_TABLE}(
            order_id INTEGER UNIQUE,
            product VARCHAR(255),
            category VARCHAR(100),
            sales_amount NUMERIC(10, 2),
            order_date DATE,
            region VARCHAR(50),
            customer_name VARCHAR(255),
            unit_price NUMERIC(10, 2), -- Transformed column
            sales_tier VARCHAR(50),     -- Transformed column
            load_timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
        );
    """)

try:
    with conn.cursor() as cur:
        cur.execute(create_table_query)
        conn.commit()
    print(f"Table '{TARGET_TABLE}' already exists or has been successully created!")
except psycopg2.Error as e:
    print(f" Error creating table: {e}")
    conn.rollback()

EXTRACT (E) STEP

In [None]:
# Extract Data (E)
print("---Step 1: Extraction (E) ---")

try:
    # Read the CSV file into a Pandas DataFrame
    df = pd.read_csv(CSV_FILE)
    print(f"Data extracted successfully from '{CSV_FILE}'.")
    print(f"Initial DataFrame shape: {df.shape}")
    print("\nInitial Data Head:")
    display(df.head())
except FileNotFoundError:
    print(f" Error: The file '{CSV_FILE}' was not found. Make sure it's in the same directory.")
    df = None
except Exception as e:
    print(f" Error during CSV read: {e}")
    df = None

if df is None:
    raise SystemExit("Exiting ETL process due to extraction failure.")

TRANSFORM (T) STEP

In [None]:
# --- 2. TRANSFORMATION (T) ---
print("\n--- Step 2: Transformation (T) ---")

# T1: Clean column names (lowercase and replace spaces/special chars)
df.columns = df.columns.str.replace(' ', '_').str.replace(r'([A-Z])', r'_\1', regex=True).str.lower().str.strip('_')
print("Column names cleaned.")
print(f"New Columns: {df.columns.tolist()}")


In [None]:
# T2: Data Type Conversion
# Convert OrderDate to datetime objects
df['order_date'] = pd.to_datetime(df['order_date'])
# Ensure SalesAmount is numeric (it should be, but good practice)
df['sales_amount'] = pd.to_numeric(df['sales_amount'])
print("Data types adjusted (OrderDate to datetime, SalesAmount to numeric).")

In [None]:
# T3: Derived Column Creation (Unit Price)
# For this simple example, we'll assume every sale is for exactly 1 unit.
df['unit_price'] = df['sales_amount']
print("Derived column 'unit_price' created (assuming quantity=1).")

In [None]:
# T4: Categorical Transformation (Creating a Sales Tier based on amount)

# 1. Define the list of conditions (must be Boolean arrays/Series)
conditions = [
    df['sales_amount'] >= 500,
    df['sales_amount'] >= 100
]

# 2. Define the list of values corresponding to the conditions
choices = [
    'High Value',
    'Medium Value'
]

# 3. Apply np.select: The "default" argument handles all cases that don't meet the conditions (i.e., less than 100)
df['sales_tier'] = np.select(conditions, choices, default='Low Value')

print("Categorical transformation 'sales_tier' created using np.select.")

In [None]:
# T5: Filtering (Basic Data Quality Check)
# Remove rows where SalesAmount is less than or equal to 0
initial_rows = len(df)
df = df[df['sales_amount'] > 0]
rows_removed = initial_rows - len(df)
print(f" Data filtered: Removed {rows_removed} rows with non-positive sales amounts.")

LOAD (L)

In [None]:
df_load = df[[
    'order_id', 'product', 'category', 'sales_amount', 'order_date',
    'region', 'customer_name', 'unit_price', 'sales_tier'
]]

In [None]:
# Convert the DataFrame to a list of tuples for psycopg2's execute_batch
data_tuples = [tuple(row) for row in df_load.values]

In [None]:
# Define the INSERT query structure
cols = ', '.join(df_load.columns)
placeholders = ', '.join(['%s'] * len(df_load.columns))

In [None]:
# Define columns to update (all except the unique key 'order_id')
update_cols = [col for col in df_load.columns if col not in ['order_id']]

In [None]:
set_clause = ', '.join([f"{col} = EXCLUDED.{col}" for col in update_cols])

In [None]:
insert_query = sql.SQL(
    f"""
    INSERT INTO {TARGET_TABLE} ({cols}) 
    VALUES ({placeholders}) 
    ON CONFLICT (order_id) 
    DO UPDATE SET {set_clause}
    """
)

In [None]:
try:
    with conn.cursor() as cur:
        # Use execute_batch for efficient bulk insertion
        extras.execute_batch(cur, insert_query, data_tuples)
        conn.commit()
        print(f"Successfully attempted to load {len(data_tuples)} records into '{TARGET_TABLE}'. Conflicting records were updated.")

except psycopg2.Error as e:
    print(f"Error during data loading: {e}")
    conn.rollback()

finally:
    if conn:
        conn.close()
        print(" Database connection closed.")

print("\n--- ETL Process Complete ---")