In [None]:
import psycopg2
import re
import datetime as dt
import random
import pandas as pd
from faker import Faker
from psycopg2.extras import execute_values

try:
    conn = psycopg2.connect(
        dbname="project", 
        user="postgres",       
        password="123",   
        host="localhost",          
        port="5432"                
    )
    print("Connection successful!")
except Exception as e:
    print(f"Connection failed: {e}")
    
cur = conn.cursor()

In [None]:
fake = Faker()
Faker.seed(0)
random.seed(0)

In [None]:
def ensure_support_tables():
    cur.execute("""
    CREATE TABLE IF NOT EXISTS rejects(
        id SERIAL PRIMARY KEY,
        table_name TEXT,
        raw_payload JSONB,
        reason TEXT,
        created_at TIMESTAMP DEFAULT now()
    );
    """)
    cur.execute("""
    CREATE TABLE IF NOT EXISTS etl_run_log(
        id SERIAL PRIMARY KEY,
        run_started TIMESTAMP,
        run_ended   TIMESTAMP,
        status      TEXT,
        message     TEXT
    );
    """)
    conn.commit()


In [None]:
def exec_values_upsert(cur, table, cols, rows, conflict, update_set_sql):
    if not rows: 
        return 0
    cols_sql = ','.join(cols)
    tpl = '(' + ','.join(['%s']*len(cols)) + ')'
    upsert_sql = f"""
      INSERT INTO {table} ({cols_sql}) VALUES %s
      ON CONFLICT {conflict} DO UPDATE SET {update_set_sql}
    """
    execute_values(cur, upsert_sql, rows, template=tpl, page_size=5000)
    return len(rows)

In [None]:
def exec_values_insert(cur, table, cols, rows, on_conflict_do_nothing=False, conflict_cols=None):
    if not rows:
        return 0
    cols_sql = ','.join(cols)
    tpl = '(' + ','.join(['%s']*len(cols)) + ')'
    sql = f"INSERT INTO {table} ({cols_sql}) VALUES %s"
    if on_conflict_do_nothing and conflict_cols:
        sql += f" ON CONFLICT ({','.join(conflict_cols)}) DO NOTHING"
    execute_values(cur, sql, rows, template=tpl, page_size=5000)
    return len(rows)

In [None]:
def push_rejects(cur, table_name, df_bad, reason):
    if df_bad is None or len(df_bad)==0: 
        return
    rows = []
    for row in df_bad.to_dict(orient="records"):
        rows.append((table_name, row, reason, dt.datetime.utcnow()))
    execute_values(cur, 
      "INSERT INTO rejects (table_name, raw_payload, reason, created_at) VALUES %s",
      rows, page_size=1000)

In [None]:
def generate_stores(num=10):
    stores = []
    for _ in range(num):
        store = {
            'store_id': fake.unique.random_number(),
            'name': fake.company(),
            'address': fake.address(),
            'opening_date': fake.date_this_decade()
        }
        stores.append(store)
    return pd.DataFrame(stores)

import faker_commerce
fake = Faker('en_US')
fake.add_provider(faker_commerce.Provider)

def generate_products(num=50):
    products = []
    categories = ['Electronics', 'Furniture', 'Clothing', 'Food', 'Toys']
    for _ in range(num):
        product = {
            'product_id': fake.unique.random_number(),
            'name': fake.unique.ecommerce_name(),
            'category': random.choice(categories),
            'unit_price': round(random.uniform(5, 500), 2),
            'stock_unit': random.choice(['Piece', 'Box', 'Kg', 'Bottle'])
        }
        products.append(product)
    return pd.DataFrame(products)

def generate_vendors(num=10):
    vendors = []
    for _ in range(num):
        vendor = {
            'vendor_id': fake.unique.random_number(),
            'name': fake.company(),
            'contact_info': fake.phone_number(),
            'rating': round(random.uniform(0, 5), 1)
        }
        vendors.append(vendor)
    return pd.DataFrame(vendors)

def gen_employees(n=40, store_ids=None):
    employees = []
    roles = ['Cashier', 'Salesperson', 'Manager', 'Supervisor']
    
    for _ in range(n):
        eid = fake.unique.random_number()
        hire_date = fake.date_between(start_date="-4y", end_date="-30d")
        
        if random.random() < 0.3:
            leave_date = fake.date_between(
                start_date=hire_date + dt.timedelta(days=30),
                end_date="today"
            )
        else:
            leave_date = None
        
        employees.append({
            "employee_id": eid,
            "first_name": fake.first_name(),
            "last_name": fake.last_name(),
            "role": random.choice(roles),
            "hire_date": hire_date,
            "leave_date": leave_date,
            "store_id": random.choice(store_ids)
        })
    
    return pd.DataFrame(employees)

def generate_customers(num=100):
    customers = []
    for _ in range(num):
        email = fake.unique.email()
        customer = {
            'customer_id': fake.unique.random_number(),
            'first_name': fake.first_name(),
            'last_name': fake.last_name(),
            'email': email,
            'phone_number': fake.phone_number(),
            'membership_level': random.choice(['Standard', 'Silver', 'Gold', 'Platinum'])
        }
        customers.append(customer)
    return pd.DataFrame(customers)

def generate_sales_transactions(num=100, store_ids=None, employee_ids=None, customer_ids=None):
    sales_transactions = []
    for _ in range(num):
        transaction = {
            'transaction_id': fake.unique.random_number(),
            'store_id': random.choice(store_ids) if store_ids else fake.random_int(min=1, max=10),
            'employee_id': random.choice(employee_ids) if employee_ids else fake.random_int(min=1, max=20),
            'customer_id': random.choice(customer_ids) if customer_ids else fake.random_int(min=1, max=100),
            'transaction_date': fake.date_time_this_year(), 
            'total_amount': round(random.uniform(50, 500), 2)
        }
        sales_transactions.append(transaction)
    return pd.DataFrame(sales_transactions)

def generate_transaction_items(num=300, transaction_ids=None, product_ids=None):
    transaction_items = []
    for _ in range(num):
        item = {
            'id': fake.unique.random_number(),
            'transaction_id': random.choice(transaction_ids) if transaction_ids else fake.random_int(min=1, max=100),
            'product_id': random.choice(product_ids) if product_ids else fake.random_int(min=1, max=50),
            'quantity': random.randint(1, 10),
            'unit_price': round(random.uniform(5, 500), 2)
        }
        transaction_items.append(item)
    return pd.DataFrame(transaction_items)

def generate_orders(num=20, vendor_ids=None):
    orders = []
    for _ in range(num):
        order = {
            'order_id': fake.unique.random_number(),
            'vendor_id': random.choice(vendor_ids) if vendor_ids else fake.random_int(min=1, max=10),
            'order_date': fake.date_this_year(),
            'status': random.choice(['Pending', 'Completed', 'Cancelled'])
        }
        orders.append(order)
    return pd.DataFrame(orders)

def generate_inventory(num=50, store_ids=None, product_ids=None):
    inventory = []
    store_ids = store_ids if store_ids else list(range(1, 11))
    product_ids = product_ids if product_ids else list(range(1, 51))

    used_pairs = set()
    used_inv_ids = set()
    attempts = 0
    max_attempts = num * 3

    while len(inventory) < num and attempts < max_attempts:
        store_id = random.choice(store_ids)
        product_id = random.choice(product_ids)
        if (store_id, product_id) in used_pairs:
            attempts += 1
            continue
        inv_id = fake.random_number()
        if inv_id in used_inv_ids:
            attempts += 1
            continue
        inv = {
            'inventory_id': inv_id,
            'store_id': store_id,
            'product_id': product_id,
            'quantity': random.randint(0, 100),
            'last_updated': fake.date_this_year()
        }
        inventory.append(inv)
        used_pairs.add((store_id, product_id))
        used_inv_ids.add(inv_id)
        attempts = 0
    if len(inventory) < num:
        print(f"Warning: Only generated {len(inventory)} inventory rows due to uniqueness.")
    return pd.DataFrame(inventory)

def generate_vendor_products(num=30, vendor_ids=None, product_ids=None):
    vendor_products = []
    vendor_ids = vendor_ids if vendor_ids else list(range(1, 11))
    product_ids = product_ids if product_ids else list(range(1, 51))
    used_pairs = set()
    attempts = 0
    max_attempts = num * 3
    while len(vendor_products) < num and attempts < max_attempts:
        vendor_id = random.choice(vendor_ids)
        product_id = random.choice(product_ids)
        if (vendor_id, product_id) in used_pairs:
            attempts += 1
            continue
        vendor_products.append({'vendor_id': vendor_id, 'product_id': product_id})
        used_pairs.add((vendor_id, product_id))
        attempts = 0
    if len(vendor_products) < num:
        print(f"Warning: Only generated {len(vendor_products)} vendor-product pairs.")
    return pd.DataFrame(vendor_products)

def generate_order_items(num=50, order_ids=None, product_ids=None):
    order_items = []
    for _ in range(num):
        item = {
            'order_id': random.choice(order_ids) if order_ids else fake.random_int(min=1, max=20),
            'product_id': random.choice(product_ids) if product_ids else fake.random_int(min=1, max=50),
            'quantity': random.randint(1, 100),
            'unit_price': round(random.uniform(1, 200), 2)
        }
        order_items.append(item)
    return pd.DataFrame(order_items)

def gen_deliveries(n=50, order_ids=None, store_ids=None):
    deliveries = []
    statuses = ["Pending", "Shipped", "Delivered", "Cancelled"]

    for _ in range(n):
        deliveries.append({
            "order_id": random.choice(order_ids),
            "store_id": random.choice(store_ids),
            "delivered_at": fake.date_time_between(start_date="-1y", end_date="now"),
            "status": random.choice(statuses)
        })

    return pd.DataFrame(deliveries)

def gen_delivery_items_df(cur, max_items_per_delivery=2):

    cur.execute("SELECT delivery_id, order_id FROM Deliveries;")
    deliveries = cur.fetchall()  

    cur.execute("SELECT id, order_id, quantity FROM Order_Items;")
    order_items = cur.fetchall()  

    items_by_order = {}
    for oi_id, ord_id, qty in order_items:
        items_by_order.setdefault(ord_id, []).append((oi_id, qty))

    rows = []
    for d_id, ord_id in deliveries:
        candidates = items_by_order.get(ord_id, [])
        if not candidates:
            continue
        k = random.randint(1, min(max_items_per_delivery, len(candidates)))
        pick = random.sample(candidates, k)  

        for oi_id, oi_qty in pick:
            q_del = random.randint(1, max(1, int(oi_qty))) 
            rows.append((d_id, oi_id, q_del))

    return pd.DataFrame(rows, columns=['delivery_id','order_item_id','quantity_delivered'])

def generate_shifts(num=50, employee_ids=None, store_ids=None):
    rows = []
    for _ in range(num):
        d = fake.date_between(start_date="-90d", end_date="+10d")
        h = random.randint(6, 12)
        start_dt = dt.datetime.combine(d, dt.time(hour=h, minute=0, second=0))
        end_dt   = start_dt + dt.timedelta(hours=8)

        rows.append({
            "employee_id": random.choice(employee_ids),
            "store_id":    random.choice(store_ids),
            'shift_date': fake.date_this_year(),
            "start_time":  start_dt,   
            "end_time":    end_dt
        })
    df = pd.DataFrame(rows)
    df["start_time"] = pd.to_datetime(df["start_time"])
    df["end_time"]   = pd.to_datetime(df["end_time"])
    return df

def generate_payments(num=80, transaction_ids=None):
    payments = []
    methods = ['Credit Card', 'Debit Card', 'Cash', 'Mobile Payment', 'Gift Card']
    for _ in range(num):
        payment = {
            'transaction_id': random.choice(transaction_ids) if transaction_ids else fake.random_int(min=1, max=100),
            'payment_method': random.choice(methods),
            'amount': round(random.uniform(10, 500), 2),
            'payment_date': fake.date_time_this_year()
        }
        payments.append(payment)
    return pd.DataFrame(payments)

def generate_promotions(num=15, product_ids=None):
    promotions = []
    for _ in range(num):
        start_date = fake.date_this_year()
        end_date = fake.date_between(start_date=start_date, end_date='+30d')
        promo = {
            'product_id': random.choice(product_ids) if product_ids else fake.random_int(min=1, max=50),
            'discount_rate': round(random.uniform(5, 50), 2),
            'start_date': start_date,
            'end_date': end_date
        }
        promotions.append(promo)
    return pd.DataFrame(promotions)

In [None]:
ensure_support_tables()
run_started = dt.datetime.utcnow()

In [None]:
import psycopg2.extras as pexp

def exec_values_insert(cur, table, cols, rows, on_conflict_do_nothing=False, conflict_cols=None):
    sql = f"INSERT INTO {table} ({', '.join(cols)}) VALUES %s"
    if on_conflict_do_nothing and conflict_cols:
        sql += f" ON CONFLICT ({', '.join(conflict_cols)}) DO NOTHING"
    pexp.execute_values(cur, sql, rows, page_size=200)

def exec_values_upsert(cur, table, cols, rows, conflict, update_set_sql):
    sql = (
        f"INSERT INTO {table} ({', '.join(cols)}) VALUES %s "
        f"ON CONFLICT {conflict} DO UPDATE SET {update_set_sql}"
    )
    pexp.execute_values(cur, sql, rows, page_size=200)

def dedupe_by_keys(df: pd.DataFrame, keys: list):
    return df.drop_duplicates(subset=keys, keep='last').copy()

def filter_fk(df: pd.DataFrame, fk_map: dict):
    """fk_map: {col: set_of_valid_ids}"""
    out = df.copy()
    for col, valid in fk_map.items():
        out = out[out[col].isin(valid)]
    return out

def ensure_positive_int(df: pd.DataFrame, col: str, min_val: int = 1, max_val: int | None = None):
    out = df.copy()
    out[col] = out[col].fillna(min_val).astype(int)
    out.loc[out[col] < min_val, col] = min_val
    if max_val is not None:
        out.loc[out[col] > max_val, col] = max_val
    return out

def ensure_nonneg_numeric(df: pd.DataFrame, col: str):
    out = df.copy()
    out[col] = out[col].fillna(0)
    out.loc[out[col] < 0, col] = 0
    return out

def clean_customers(df: pd.DataFrame):
    out = df.copy()
    out = out[out['email'].str.contains('@', na=False)]
    out = out.drop_duplicates(subset=['email'], keep='last')
    return out

def clean_promotions(df: pd.DataFrame):
    out = df.copy()
    out = out[(out['discount_rate'] >= 0) & (out['discount_rate'] <= 90)]
    out = out[out['end_date'] >= out['start_date']]
    return out

def clean_shifts(df: pd.DataFrame):
    out = df.copy()
    out = out[out['end_time'] > out['start_time']]
    return out


try:
    cur.execute("BEGIN;")

    stores_df    = generate_stores(10)
    products_df  = generate_products(50)
    vendors_df   = generate_vendors(10)

    employees_df = gen_employees(40, stores_df['store_id'].tolist())
    customers_df = generate_customers(100)

    orders_df    = generate_orders(30, vendors_df['vendor_id'].tolist())
    vendor_products_df = generate_vendor_products(60, vendors_df['vendor_id'].tolist(), products_df['product_id'].tolist())
    inventory_df = generate_inventory(120, stores_df['store_id'].tolist(), products_df['product_id'].tolist())

    sales_df      = generate_sales_transactions(
        150,
        stores_df['store_id'].tolist(),
        employees_df['employee_id'].tolist(),
        customers_df['customer_id'].tolist()
    )
    order_items_df = generate_order_items(120, orders_df['order_id'].tolist(), products_df['product_id'].tolist())
    payments_df    = generate_payments(120, sales_df['transaction_id'].tolist())
    promotions_df  = generate_promotions(20, products_df['product_id'].tolist())
    shifts_df      = generate_shifts(80, employees_df['employee_id'].tolist(), stores_df['store_id'].tolist())
    deliveries_df  = gen_deliveries(40, orders_df['order_id'].tolist(), stores_df['store_id'].tolist())

    order_items_df = ensure_positive_int(order_items_df, 'quantity', 1, 50)
    order_items_df = ensure_nonneg_numeric(order_items_df, 'unit_price')

    payments_df    = ensure_nonneg_numeric(payments_df, 'amount')

    customers_clean  = clean_customers(customers_df)
    promotions_clean = clean_promotions(promotions_df)
    shifts_clean     = clean_shifts(shifts_df)

    exec_values_insert(cur, 'Stores',
        ['store_id','name','address','opening_date'],
        list(stores_df[['store_id','name','address','opening_date']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=True, conflict_cols=['store_id']
    )
    store_ids = set(stores_df['store_id'])

    exec_values_insert(cur, 'Products',
        ['product_id','name','category','unit_price','stock_unit'],
        list(products_df[['product_id','name','category','unit_price','stock_unit']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=True, conflict_cols=['product_id']
    )
    product_ids = set(products_df['product_id'])

    exec_values_insert(cur, 'Vendors',
        ['vendor_id','name','contact_info','rating'],
        list(vendors_df[['vendor_id','name','contact_info','rating']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=True, conflict_cols=['vendor_id']
    )
    vendor_ids = set(vendors_df['vendor_id'])

    exec_values_upsert(cur, 'Customers',
        ['customer_id','first_name','last_name','email','phone_number','membership_level'],
        list(customers_clean[['customer_id','first_name','last_name','email','phone_number','membership_level']]
             .itertuples(index=False, name=None)),
        conflict='(email)',
        update_set_sql=("first_name=EXCLUDED.first_name, "
                        "last_name=EXCLUDED.last_name, "
                        "phone_number=EXCLUDED.phone_number, "
                        "membership_level=EXCLUDED.membership_level")
    )
    customer_ids = set(customers_clean['customer_id'])

    exec_values_insert(cur, 'Employees',
        ['employee_id','first_name','last_name','role','hire_date','leave_date','store_id'],
        list(employees_df[['employee_id','first_name','last_name','role','hire_date','leave_date','store_id']]
             .itertuples(index=False, name=None)),
        on_conflict_do_nothing=True, conflict_cols=['employee_id']
    )
    employee_ids = set(employees_df['employee_id'])

    inv_df = filter_fk(inventory_df, {'store_id': store_ids, 'product_id': product_ids})
    inv_df = dedupe_by_keys(inv_df, ['store_id','product_id'])
    exec_values_upsert(cur, 'Inventory',
        ['store_id','product_id','quantity','last_updated'],
        list(inv_df[['store_id','product_id','quantity','last_updated']].itertuples(index=False, name=None)),
        conflict='(store_id, product_id)',
        update_set_sql='quantity=EXCLUDED.quantity, last_updated=EXCLUDED.last_updated'
    )

    vp_df = filter_fk(vendor_products_df, {'vendor_id': vendor_ids, 'product_id': product_ids})
    vp_df = dedupe_by_keys(vp_df, ['vendor_id','product_id'])
    exec_values_insert(cur, 'Vendor_Products',
        ['vendor_id','product_id'],
        list(vp_df[['vendor_id','product_id']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=True, conflict_cols=['vendor_id','product_id']
    )

    orders_df = filter_fk(orders_df, {'vendor_id': vendor_ids})
    exec_values_insert(cur, 'Orders',
        ['order_id','vendor_id','order_date','status'],
        list(orders_df[['order_id','vendor_id','order_date','status']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=True, conflict_cols=['order_id']
    )
    order_ids = set(orders_df['order_id'])

    oi_df = filter_fk(order_items_df, {'order_id': order_ids, 'product_id': product_ids})
    oi_df = dedupe_by_keys(oi_df, ['order_id','product_id'])
    oi_df = ensure_positive_int(oi_df, 'quantity', 1, 50)
    oi_df = ensure_nonneg_numeric(oi_df, 'unit_price')

    exec_values_upsert(cur, 'Order_Items',
        ['order_id','product_id','quantity','unit_price'],
        list(oi_df[['order_id','product_id','quantity','unit_price']].itertuples(index=False, name=None)),
        conflict='(order_id, product_id)',
        update_set_sql='quantity=EXCLUDED.quantity, unit_price=EXCLUDED.unit_price'
    )

    deliveries_df = filter_fk(deliveries_df, {'order_id': order_ids, 'store_id': store_ids})
    exec_values_insert(cur, 'Deliveries',
        ['order_id','store_id','delivered_at','status'],
        list(deliveries_df[['order_id','store_id','delivered_at','status']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=False
    )
    di_df = gen_delivery_items_df(cur, max_items_per_delivery=3)

    di_df = di_df.drop_duplicates(subset=['delivery_id','order_item_id'])

    exec_values_insert(
        cur, 'Delivery_Items',
        ['delivery_id','order_item_id','quantity_delivered'],
        list(di_df[['delivery_id','order_item_id','quantity_delivered']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=True,
        conflict_cols=['delivery_id','order_item_id']
    )


    st_df = filter_fk(sales_df, {'store_id': store_ids, 'employee_id': employee_ids, 'customer_id': customer_ids})
    exec_values_insert(cur, 'Sales_Transactions',
        ['transaction_id','store_id','employee_id','customer_id','transaction_date','total_amount'],
        list(st_df[['transaction_id','store_id','employee_id','customer_id','transaction_date','total_amount']]
             .itertuples(index=False, name=None)),
        on_conflict_do_nothing=True, conflict_cols=['transaction_id']
    )
    txn_ids = set(st_df['transaction_id'])

    price_map = {int(r.product_id): float(r.unit_price) for r in
                 products_df[['product_id','unit_price']].itertuples(index=False)}
    inv_map = {(int(r.store_id), int(r.product_id)): int(r.quantity)
               for r in inv_df[['store_id','product_id','quantity']].itertuples(index=False)}

    ti_rows = []
    rng = random.Random(42)

    for tx in st_df[['transaction_id','store_id']].itertuples(index=False):
        store = int(tx.store_id)
        for _ in range(rng.randint(1, 3)):
            candidates = [pid for (sid, pid), qty in inv_map.items()
                          if sid == store and qty > 0]
            if not candidates:
                break
            pid = rng.choice(candidates)
            avail = inv_map[(store, pid)]
            qty = rng.randint(1, min(5, avail))
            inv_map[(store, pid)] = avail - qty
            price = price_map.get(pid, round(rng.uniform(5, 500), 2))
            ti_rows.append((int(tx.transaction_id), pid, qty, price))

    ti_df = pd.DataFrame(ti_rows, columns=['transaction_id', 'product_id', 'quantity', 'unit_price'])

    ti_df = ti_df.dropna(subset=['transaction_id', 'product_id', 'quantity', 'unit_price'])
    ti_df['transaction_id'] = ti_df['transaction_id'].astype(int)
    ti_df['product_id']     = ti_df['product_id'].astype(int)
    ti_df['quantity']       = ti_df['quantity'].astype(int)
    ti_df['unit_price']     = ti_df['unit_price'].astype(float)

    ti_df = (
        ti_df
        .groupby(['transaction_id','product_id'], as_index=False)
        .agg(quantity=('quantity','sum'),
            unit_price=('unit_price','last'))
    )

    ti_df = filter_fk(ti_df, {'transaction_id': txn_ids, 'product_id': product_ids})

    need_inv = (
        st_df[['store_id','transaction_id']]
        .merge(ti_df[['transaction_id','product_id']], on='transaction_id', how='inner')
        [['store_id','product_id']]
        .drop_duplicates()
    )

    if not need_inv.empty:
        inv_missing = need_inv.merge(
            inv_df[['store_id','product_id']],
            on=['store_id','product_id'],
            how='left',
            indicator=True
        )
        inv_missing = inv_missing[inv_missing['_merge'] == 'left_only'][['store_id','product_id']]
        if not inv_missing.empty:
            exec_values_insert(
                cur, 'Inventory',
                ['store_id','product_id','quantity','last_updated'],
                list(inv_missing.assign(quantity=0, last_updated=pd.Timestamp('today').normalize())
                    .itertuples(index=False, name=None))
            )

    exec_values_upsert(
        cur, 'Transaction_Items',
        ['transaction_id', 'product_id', 'quantity', 'unit_price'],
        list(ti_df[['transaction_id','product_id','quantity','unit_price']].itertuples(index=False, name=None)),
        conflict='(transaction_id, product_id)',
        update_set_sql='quantity=EXCLUDED.quantity, unit_price=EXCLUDED.unit_price'
    )

    pay_df = filter_fk(payments_df, {'transaction_id': txn_ids})
    pay_df = ensure_nonneg_numeric(pay_df, 'amount')
    exec_values_insert(cur, 'Payments',
        ['transaction_id','payment_method','amount','payment_date'],
        list(pay_df[['transaction_id','payment_method','amount','payment_date']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=False
    )

    promotions_clean = filter_fk(promotions_clean, {'product_id': product_ids})
    exec_values_insert(cur, 'Promotions',
        ['product_id','discount_rate','start_date','end_date'],
        list(promotions_clean[['product_id','discount_rate','start_date','end_date']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=False
    )

    # Shifts
    shifts_clean = filter_fk(shifts_clean, {'employee_id': employee_ids, 'store_id': store_ids})
    exec_values_insert(cur, 'Shifts',
        ['employee_id','store_id','shift_date','start_time','end_time'],
        list(shifts_clean[['employee_id','store_id','shift_date','start_time','end_time']].itertuples(index=False, name=None)),
        on_conflict_do_nothing=False
    )

    cur.execute("""
    ALTER TABLE Orders
        ADD COLUMN IF NOT EXISTS total_amount NUMERIC(12,2) DEFAULT 0;
    ALTER TABLE Sales_Transactions
        ADD COLUMN IF NOT EXISTS total_amount NUMERIC(12,2) DEFAULT 0;
    """)

    cur.execute("""
    UPDATE Orders o
    SET total_amount = COALESCE(oi.sum_amt, 0)
    FROM (
        SELECT order_id, SUM(quantity * unit_price)::NUMERIC(12,2) AS sum_amt
        FROM Order_Items
        GROUP BY order_id
    ) oi
    WHERE o.order_id = oi.order_id;
    """)

    cur.execute("""
    UPDATE Sales_Transactions st
    SET total_amount = COALESCE(ti.sum_amt, 0)
    FROM (
        SELECT transaction_id, SUM(quantity * unit_price)::NUMERIC(12,2) AS sum_amt
        FROM Transaction_Items
        GROUP BY transaction_id
    ) ti
    WHERE st.transaction_id = ti.transaction_id;
    """)

    conn.commit()
    print("ETL completed successfully.")

except Exception as e:
    conn.rollback()
    print("ETL failed:", e)
    raise
