In [1]:
import os
import uuid
import random
from typing import List, Dict, Tuple

import numpy as np
import pandas as pd


# ================================================================
# Configuration
# ================================================================

N_TRACES = 10_000
RANDOM_SEED = 42
DATA_DIR = "./data"


# ================================================================
# Helper: deterministic RNG
# ================================================================

rng = np.random.default_rng(RANDOM_SEED)
random.seed(RANDOM_SEED)


# ================================================================
# 1. Transition graph definition
# ================================================================

def build_transition_graph() -> pd.DataFrame:
    """
    Build the transition_graph dataframe describing allowed
    edges between event nodes.
    """
    transitions = {
        "CustomerCreated": ["OrderPlaced"],
        "OrderPlaced": ["VerificationStarted"],
        "VerificationStarted": [
            "VerificationAutoApproved",
            "VerificationNeedsManualReview",
            "VerificationFailed",
        ],
        "VerificationAutoApproved": ["OrderConfirmed"],
        "VerificationNeedsManualReview": ["ManualReviewCompleted"],
        # Note: at runtime we choose either OrderConfirmed or OrderRejected,
        # but both are listed here as possible successors.
        "ManualReviewCompleted": ["OrderConfirmed", "OrderRejected"],
        "VerificationFailed": ["OrderRejected"],
        "OrderRejected": [],

        "OrderConfirmed": ["KitchenStarted"],
        "KitchenStarted": ["KitchenCompleted"],
        "KitchenCompleted": ["DriverAssigned"],
        "DriverAssigned": ["DriverAtRestaurant"],
        "DriverAtRestaurant": ["PickupComplete"],
        "PickupComplete": ["OnTheWay"],
        "OnTheWay": ["Delivered"],

        # Branch: with or without support issue
        "Delivered": ["PaymentAuthorized", "SupportTicketOpened"],
        "SupportTicketOpened": ["VoucherIssued"],
        "VoucherIssued": ["PaymentAuthorized"],
        "PaymentAuthorized": ["PaymentCaptured"],
        "PaymentCaptured": [],
    }

    records = []
    for src, to_list in transitions.items():
        records.append({
            "From": src,
            "To_List": str(to_list)  # stringified Python list
        })

    return pd.DataFrame(records, columns=["From", "To_List"])


# ================================================================
# 2. Attribute-driven verification logic
# ================================================================

def determine_verification_outcome(
    category: str,
    region: str,
    wallet_balance: float,
    outstanding_amount: float,
    credit_rating: int
) -> str:
    """
    Deterministically decide verification outcome based on
    product category, customer region, wallet balance, outstanding
    amount, and credit rating.

    Returns one of:
    - 'FAILED'
    - 'MANUAL_REVIEW'
    - 'AUTO_APPROVED'
    """

    # ---- Fail rules (single / multi-attribute) ----
    # 1) Product type triggers failure
    if category == "APPAREL":
        return "FAILED"

    # 2) Customer region triggers failure
    if region == "NORTH":
        return "FAILED"

    # 3) Wallet balance too small
    if wallet_balance < 10:
        return "FAILED"

    # 4) Two-way combination: Apparel + NORTH
    if category == "APPAREL" and region == "NORTH":
        return "FAILED"

    # 5) Two-way combination: Apparel + low wallet
    if category == "APPAREL" and wallet_balance < 20:
        return "FAILED"

    # 6) Three-way combination: Electronics + high debt + poor credit
    if category == "ELECTRONICS" and outstanding_amount > 100 and credit_rating <= 2:
        return "FAILED"

    # ---- Manual review rules ----
    # 7) Medium-risk credit rating
    if credit_rating == 3:
        return "MANUAL_REVIEW"

    # 8) Moderate outstanding debt
    if 10 <= outstanding_amount < 100:
        return "MANUAL_REVIEW"

    # 9) Medium wallet range
    if 10 <= wallet_balance < 20:
        return "MANUAL_REVIEW"

    # ---- Default: auto-approve ----
    return "AUTO_APPROVED"


# ================================================================
# 3. Reference data generators (customers, restaurants, etc.)
# ================================================================

def generate_customers(n_customers: int = 1000) -> pd.DataFrame:
    customer_ids = np.arange(1, n_customers + 1)

    regions = ["NORTH", "SOUTH", "EAST", "WEST"]
    credit_ratings = [1, 2, 3, 4, 5]

    # Skew so we get enough failures and manual reviews
    region_choices = rng.choice(regions, size=n_customers, p=[0.25, 0.25, 0.25, 0.25])
    credit_choices = rng.choice(credit_ratings, size=n_customers, p=[0.1, 0.2, 0.3, 0.25, 0.15])
    outstanding = rng.uniform(0, 200, size=n_customers).round(2)

    df = pd.DataFrame({
        "customer_id": customer_ids,
        "region": region_choices,
        "outstanding_amount": outstanding,
        "credit_rating": credit_choices,
        "created_ts": pd.Timestamp("2024-01-01") +
                      pd.to_timedelta(rng.integers(0, 60, size=n_customers), unit="D")
    })
    return df


def generate_customer_addresses(customers: pd.DataFrame) -> pd.DataFrame:
    records = []
    address_id = 1

    for _, row in customers.iterrows():
        # 1–3 addresses per customer
        n_addr = rng.integers(1, 4)
        for _ in range(n_addr):
            records.append({
                "address_id": address_id,
                "cust_ref": row["customer_id"],
                "address_line": f"{address_id} Example Street",
                "zone": row["region"],  # align zone with region for simplicity
            })
            address_id += 1

    return pd.DataFrame(records)


def generate_customer_payment_methods(customers: pd.DataFrame) -> pd.DataFrame:
    methods = []
    pay_id = 1
    types = ["CARD", "WALLET", "BANK"]

    for _, row in customers.iterrows():
        # Each customer gets 1–2 methods
        n_methods = rng.integers(1, 3)
        for _ in range(n_methods):
            method_type = rng.choice(types)
            # Wallet balance only relevant if WALLET
            wallet_balance = float(
                rng.uniform(0, 200) if method_type == "WALLET" else rng.uniform(20, 200)
            )
            methods.append({
                "pay_method_id": pay_id,
                "cust_ref": row["customer_id"],
                "method_type": method_type,
                "wallet_balance": round(wallet_balance, 2),
                "risk_category": rng.choice(["LOW", "MEDIUM", "HIGH"], p=[0.6, 0.3, 0.1]),
            })
            pay_id += 1

    return pd.DataFrame(methods)


def generate_restaurants(n_restaurants: int = 50) -> Tuple[pd.DataFrame, pd.DataFrame]:
    rest_ids = np.arange(1, n_restaurants + 1)
    restaurants = pd.DataFrame({
        "restaurant_id": rest_ids,
        "name": [f"Restaurant_{i}" for i in rest_ids],
        "cuisine": rng.choice(["ITALIAN", "INDIAN", "CHINESE", "BURGER", "DESSERT"],
                              size=n_restaurants),
        "rating": rng.uniform(3.0, 5.0, size=n_restaurants).round(1),
    })

    # 1–3 branches per restaurant
    branches = []
    branch_id = 1
    for rest_id in rest_ids:
        n_branches = rng.integers(1, 4)
        for _ in range(n_branches):
            branches.append({
                "branch_id": branch_id,
                "rest_ref": rest_id,
                "address_line": f"Branch_{branch_id} Street",
                "opening_hours": "09:00-22:00",
            })
            branch_id += 1

    branches_df = pd.DataFrame(branches)
    return restaurants, branches_df


def generate_menu(branches: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    categories = ["FOOD", "DRINK", "DESSERT", "APPAREL", "ELECTRONICS"]
    items = []
    options = []
    item_id = 1
    option_id = 1

    for _, br in branches.iterrows():
        # Each branch has ~10 menu items
        n_items = rng.integers(6, 12)
        for _ in range(n_items):
            cat = rng.choice(categories, p=[0.5, 0.2, 0.1, 0.1, 0.1])
            price = float(rng.uniform(5, 50))
            items.append({
                "item_id": item_id,
                "branch_ref": br["branch_id"],
                "name": f"Item_{item_id}",
                "category": cat,
                "base_price": round(price, 2),
            })

            # Add 0–3 options
            n_opts = rng.integers(0, 4)
            for _ in range(n_opts):
                options.append({
                    "option_id": option_id,
                    "menu_ref": item_id,
                    "option_name": rng.choice(["LARGE", "EXTRA_CHEESE", "SPICY", "ADD_ON"]),
                    "extra_price": round(float(rng.uniform(0.5, 5)), 2),
                })
                option_id += 1

            item_id += 1

    return pd.DataFrame(items), pd.DataFrame(options)


def generate_drivers(n_drivers: int = 300) -> Tuple[pd.DataFrame, pd.DataFrame]:
    driver_ids = np.arange(1, n_drivers + 1)
    drivers = pd.DataFrame({
        "driver_id": driver_ids,
        "name": [f"Driver_{i}" for i in driver_ids],
        "rating": rng.uniform(3.0, 5.0, size=n_drivers).round(1),
        "region": rng.choice(["NORTH", "SOUTH", "EAST", "WEST"], size=n_drivers),
    })

    vehicles = []
    vehicle_id = 1
    for d_id in driver_ids:
        vehicles.append({
            "vehicle_id": vehicle_id,
            "owner_driver_ref": d_id,
            "vehicle_type": rng.choice(["BIKE", "CAR", "SCOOTER"]),
            "plate": f"PLATE_{vehicle_id}",
        })
        vehicle_id += 1

    return drivers, pd.DataFrame(vehicles)


# ================================================================
# 4. Main order + event generation
# ================================================================

def generate_orders_and_events(
    n_traces: int,
    customers: pd.DataFrame,
    addresses: pd.DataFrame,
    pay_methods: pd.DataFrame,
    restaurants: pd.DataFrame,
    branches: pd.DataFrame,
    menu_items: pd.DataFrame,
    menu_options: pd.DataFrame,
    drivers: pd.DataFrame
) -> Tuple[Dict[str, pd.DataFrame], pd.DataFrame]:
    """
    Generate orders and all fact tables, plus event_traces DataFrame.
    """
    # Initialise collection containers for each table
    orders = []
    order_items = []
    order_discounts = []
    order_status_events = []

    kitchen_tickets = []
    delivery_requests = []
    delivery_assignments = []
    delivery_status_events = []
    payment_auths = []
    payment_caps = []
    support_tickets = []
    verification_requests = []
    verification_results = []
    manual_reviews = []

    event_trace_records = []

    # Convenient lookups
    cust_idx = customers.set_index("customer_id")
    addr_by_cust = addresses.groupby("cust_ref")["address_id"].apply(list).to_dict()
    pay_by_cust = pay_methods.groupby("cust_ref")["pay_method_id"].apply(list).to_dict()
    branch_ids = branches["branch_id"].to_list()
    drivers_ids = drivers["driver_id"].to_list()

    menu_items_idx = menu_items.set_index("item_id")

    # ID counters
    order_id = 1
    status_event_id = 1
    ticket_id = 1
    del_req_id = 1
    del_assign_id = 1
    del_evt_id = 1
    auth_id = 1
    cap_id = 1
    supp_ticket_id = 1
    veri_req_id = 1
    veri_res_id = 1
    manual_review_id = 1
    order_item_id = 1
    discount_row_id = 1

    for _ in range(n_traces):
        # -------------------------
        # Basic foreign key choices
        # -------------------------
        cust_id = int(rng.choice(customers["customer_id"].to_numpy()))
        cust_row = cust_idx.loc[cust_id]

        addr_choices = addr_by_cust[cust_id]
        addr_id = int(rng.choice(addr_choices))

        pay_choices = pay_by_cust[cust_id]
        pm_id = int(rng.choice(pay_choices))
        pm_row = pay_methods.loc[pay_methods["pay_method_id"] == pm_id].iloc[0]

        branch_id = int(rng.choice(branch_ids))

        # Pick one menu item from that branch
        branch_menu = menu_items[menu_items["branch_ref"] == branch_id]
        if branch_menu.empty:
            # Fallback: any menu item
            item_row = menu_items.sample(1, random_state=int(rng.integers(0, 10_000))).iloc[0]
        else:
            item_row = branch_menu.sample(1, random_state=int(rng.integers(0, 10_000))).iloc[0]
        item_id_val = int(item_row["item_id"])

        # quantity & optional discount
        quantity = int(rng.integers(1, 5))
        has_discount = rng.random() < 0.2
        discount_amount = float(rng.uniform(1, 5)) if has_discount else 0.0

        # -------------------------
        # Verification outcome
        # -------------------------
        category = str(item_row["category"])
        region = str(cust_row["region"])
        wallet_balance = float(pm_row["wallet_balance"])
        outstanding_amount = float(cust_row["outstanding_amount"])
        credit_rating = int(cust_row["credit_rating"])

        veri_outcome = determine_verification_outcome(
            category=category,
            region=region,
            wallet_balance=wallet_balance,
            outstanding_amount=outstanding_amount,
            credit_rating=credit_rating,
        )

        # -------------------------
        # Build base events & join path
        # -------------------------
        events: List[str] = []
        join_path: List[str] = []

        key_uuid = str(uuid.uuid4())

        # CustomerCreated
        events.append("CustomerCreated")
        join_path.append("customers.customer_id")

        # OrderPlaced
        events.append("OrderPlaced")
        join_path.append("orders.order_id")

        # Create order row
        base_ts = pd.Timestamp("2024-02-01") + pd.to_timedelta(
            int(rng.integers(0, 60)), unit="D"
        )
        orders.append({
            "order_id": order_id,
            "cust_ref": cust_id,
            "branch_ref": branch_id,
            "primary_address_ref": addr_id,
            "created_ts": base_ts,
        })

        # Add order_items row
        order_items.append({
            "order_item_id": order_item_id,
            "order_fk": order_id,
            "menu_ref": item_id_val,
            "quantity": quantity,
        })
        order_item_id += 1

        # Discount row if applicable
        if has_discount:
            order_discounts.append({
                "discount_row_id": discount_row_id,
                "ord_ref": order_id,
                "discount_amount": round(discount_amount, 2),
            })
            discount_row_id += 1

        # VerificationStarted
        events.append("VerificationStarted")
        join_path.append("verification_requests.veri_req_id")
        verification_requests.append({
            "veri_req_id": veri_req_id,
            "order_ref": order_id,
            "created_ts": base_ts + pd.Timedelta(minutes=1),
        })

        # verification_results
        result_status = None
        if veri_outcome == "FAILED":
            result_status = "FAILED"
            events.append("VerificationFailed")
        elif veri_outcome == "MANUAL_REVIEW":
            result_status = "MANUAL_REVIEW"
            events.append("VerificationNeedsManualReview")
        else:
            result_status = "AUTO_APPROVED"
            events.append("VerificationAutoApproved")

        join_path.append("verification_results.veri_result_id")
        verification_results.append({
            "veri_result_id": veri_res_id,
            "req_ref": veri_req_id,
            "result_status": result_status,
            "result_ts": base_ts + pd.Timedelta(minutes=2),
        })

        veri_req_id += 1
        veri_res_id += 1

        # Track if the order will continue to fulfillment
        continue_to_fulfillment = True

        # Manual review branch if needed
        if veri_outcome == "MANUAL_REVIEW":
            events.append("ManualReviewCompleted")
            join_path.append("manual_reviews.review_id")

            # 10% rejected, 90% approved
            is_rejected = (rng.random() < 0.1)
            review_outcome = "REJECTED" if is_rejected else "APPROVED"

            manual_reviews.append({
                "review_id": manual_review_id,
                "result_ref": veri_res_id - 1,
                "review_outcome": review_outcome,
                "review_ts": base_ts + pd.Timedelta(minutes=5),
            })
            manual_review_id += 1

            if is_rejected:
                # Manual review rejected
                events.append("OrderRejected")
                join_path.append("order_status_events.status_event_id")
                order_status_events.append({
                    "status_event_id": status_event_id,
                    "order_ref": order_id,
                    "status": "REJECTED",
                    "event_ts": base_ts + pd.Timedelta(minutes=6),
                })
                status_event_id += 1
                continue_to_fulfillment = False

        elif veri_outcome == "FAILED":
            # Immediate rejection
            events.append("OrderRejected")
            join_path.append("order_status_events.status_event_id")
            order_status_events.append({
                "status_event_id": status_event_id,
                "order_ref": order_id,
                "status": "REJECTED",
                "event_ts": base_ts + pd.Timedelta(minutes=3),
            })
            status_event_id += 1
            continue_to_fulfillment = False

        # If rejected at verification or manual review, we stop here
        if not continue_to_fulfillment:
            event_trace_records.append({
                "Key_Selector": "Order_ID",
                "Key_ID": key_uuid,
                "Event_Trace": str(events),
                "Join_Path": str(join_path),
            })
            order_id += 1
            continue

        # ------------------------------------------------
        # Fulfillment path: OrderConfirmed all the way to Payment
        # ------------------------------------------------
        events.append("OrderConfirmed")
        join_path.append("order_status_events.status_event_id")
        order_status_events.append({
            "status_event_id": status_event_id,
            "order_ref": order_id,
            "status": "CONFIRMED",
            "event_ts": base_ts + pd.Timedelta(minutes=4),
        })
        status_event_id += 1

        # Kitchen
        events.append("KitchenStarted")
        join_path.append("kitchen_tickets.ticket_id")
        kitchen_tickets.append({
            "ticket_id": ticket_id,
            "ticket_order_id": order_id,
            "created_ts": base_ts + pd.Timedelta(minutes=5),
        })

        events.append("KitchenCompleted")
        join_path.append("kitchen_tickets.ticket_id")
        kitchen_tickets[-1]["completed_ts"] = base_ts + pd.Timedelta(minutes=20)
        ticket_id += 1

        # Delivery / driver assignment
        events.append("DriverAssigned")
        join_path.append("delivery_assignments.assignment_id")
        delivery_requests.append({
            "request_id": del_req_id,
            "src_order_id": order_id,
            "pickup_branch_ref": branch_id,
            "dropoff_addr_ref": addr_id,
            "created_ts": base_ts + pd.Timedelta(minutes=21),
        })

        driver_id = int(rng.choice(drivers_ids))
        delivery_assignments.append({
            "assignment_id": del_assign_id,
            "req_ref": del_req_id,
            "courier_ref": driver_id,
            "assigned_ts": base_ts + pd.Timedelta(minutes=22),
        })

        # Delivery status events
        events.extend(["DriverAtRestaurant", "PickupComplete", "OnTheWay", "Delivered"])
        for status in ["AT_RESTAURANT", "PICKED_UP", "ON_THE_WAY", "DELIVERED"]:
            delivery_status_events.append({
                "delivery_evt_id": del_evt_id,
                "route_assignment_id": del_assign_id,
                "status": status,
                "event_ts": base_ts + pd.Timedelta(minutes=23 + del_evt_id - del_evt_id),
            })
            del_evt_id += 1
        join_path.append("delivery_status_events.delivery_evt_id")

        del_req_id += 1
        del_assign_id += 1

        # Branch at Delivered: maybe support issue
        support_issue = (rng.random() < 0.2)

        if support_issue:
            events.append("SupportTicketOpened")
            join_path.append("support_tickets.ticket_id")
            support_tickets.append({
                "ticket_id": supp_ticket_id,
                "issue_order_ref": order_id,
                "cust_ref": cust_id,
                "opened_ts": base_ts + pd.Timedelta(minutes=40),
            })

            events.append("VoucherIssued")
            join_path.append("support_tickets.ticket_id")
            support_tickets[-1]["closed_ts"] = base_ts + pd.Timedelta(minutes=50)
            supp_ticket_id += 1

        # Payment authorization & capture
        events.append("PaymentAuthorized")
        join_path.append("payment_authorizations.auth_id")
        payment_auths.append({
            "auth_id": auth_id,
            "auth_order_ref": order_id,
            "method_ref": pm_id,
            "auth_amount": float(item_row["base_price"]) * quantity - discount_amount,
            "status": "AUTHORIZED",
            "auth_ts": base_ts + pd.Timedelta(minutes=55),
        })

        events.append("PaymentCaptured")
        join_path.append("payment_captures.capture_id")
        payment_caps.append({
            "capture_id": cap_id,
            "initial_auth_ref": auth_id,
            "fee_order_ref": order_id,
            "captured_amount": float(item_row["base_price"]) * quantity - discount_amount,
            "captured_ts": base_ts + pd.Timedelta(minutes=60),
        })

        auth_id += 1
        cap_id += 1

        # Record the full trace
        event_trace_records.append({
            "Key_Selector": "Order_ID",
            "Key_ID": key_uuid,
            "Event_Trace": str(events),
            "Join_Path": str(join_path),
        })

        # Increment order id
        order_id += 1

    # ------------------------------------------------
    # Convert everything into DataFrames
    # ------------------------------------------------
    tables = {
        "orders": pd.DataFrame(orders),
        "order_items": pd.DataFrame(order_items),
        "order_discounts": pd.DataFrame(order_discounts),
        "order_status_events": pd.DataFrame(order_status_events),
        "kitchen_tickets": pd.DataFrame(kitchen_tickets),
        "delivery_requests": pd.DataFrame(delivery_requests),
        "delivery_assignments": pd.DataFrame(delivery_assignments),
        "delivery_status_events": pd.DataFrame(delivery_status_events),
        "payment_authorizations": pd.DataFrame(payment_auths),
        "payment_captures": pd.DataFrame(payment_caps),
        "support_tickets": pd.DataFrame(support_tickets),
        "verification_requests": pd.DataFrame(verification_requests),
        "verification_results": pd.DataFrame(verification_results),
        "manual_reviews": pd.DataFrame(manual_reviews),
    }

    event_traces_df = pd.DataFrame(
        event_trace_records,
        columns=["Key_Selector", "Key_ID", "Event_Trace", "Join_Path"]
    )

    return tables, event_traces_df


# ================================================================
# 5. High-level generator
# ================================================================

def generate_food_delivery_dataset(
    n_traces: int = N_TRACES,
    data_dir: str = DATA_DIR
) -> None:
    """
    Generate the full synthetic food delivery dataset:
    - 23 relational tables under ./data
    - event_traces.csv
    - transition_graph.csv
    - README.md
    """
    os.makedirs(data_dir, exist_ok=True)

    # Reference tables
    customers = generate_customers()
    addresses = generate_customer_addresses(customers)
    pay_methods = generate_customer_payment_methods(customers)
    restaurants, branches = generate_restaurants()
    menu_items, menu_options = generate_menu(branches)
    drivers, vehicles = generate_drivers()

    # Core fact tables & event traces
    fact_tables, event_traces = generate_orders_and_events(
        n_traces=n_traces,
        customers=customers,
        addresses=addresses,
        pay_methods=pay_methods,
        restaurants=restaurants,
        branches=branches,
        menu_items=menu_items,
        menu_options=menu_options,
        drivers=drivers,
    )

    # Transition graph
    transition_graph = build_transition_graph()

    # Save core CSVs (root)
    event_traces.to_csv("event_traces.csv", index=False)
    transition_graph.to_csv("transition_graph.csv", index=False)

    # Save reference tables
    customers.to_csv(os.path.join(data_dir, "customers.csv"), index=False)
    addresses.to_csv(os.path.join(data_dir, "customer_addresses.csv"), index=False)
    pay_methods.to_csv(os.path.join(data_dir, "customer_payment_methods.csv"), index=False)
    restaurants.to_csv(os.path.join(data_dir, "restaurants.csv"), index=False)
    branches.to_csv(os.path.join(data_dir, "restaurant_locations.csv"), index=False)
    menu_items.to_csv(os.path.join(data_dir, "menu_items.csv"), index=False)
    menu_options.to_csv(os.path.join(data_dir, "menu_item_options.csv"), index=False)
    drivers.to_csv(os.path.join(data_dir, "drivers.csv"), index=False)
    vehicles.to_csv(os.path.join(data_dir, "driver_vehicles.csv"), index=False)

    # Save fact tables
    for name, df in fact_tables.items():
        df.to_csv(os.path.join(data_dir, f"{name}.csv"), index=False)


    print(f"Generated {len(event_traces)} traces and saved tables to '{data_dir}'")



generate_food_delivery_dataset(10000, 'data')

Generated 10000 traces and saved tables to 'data'


In [2]:
import pandas as pd
import numpy as np
import random
import os
from typing import Dict, List


# ===============================================================
# CONFIGURATION
# ===============================================================

ATTRIBUTE_SWAP_RATIO = 0.30     # 30% schema drift
TIMESTAMP_MISSING_RATIO = 0.15  # 15% timestamp missingness

rng = np.random.default_rng(42)
random.seed(42)


# ===============================================================
# 1. ATTRIBUTE SWAP LOGIC
# ===============================================================

def get_safe_columns(df: pd.DataFrame) -> List[str]:
    """
    Identify safe-to-swap columns:
    - Excludes PK columns
    - Excludes FK columns
    - Excludes timestamp columns
    - Excludes numeric IDs & join keys
    """
    pk_like = {"id", "pk", "_id", "order_ref", "cust_ref", "branch_ref",
               "menu_ref", "req_ref", "result_ref", "assignment_id"}
    
    ts_like = {"ts", "time", "timestamp"}

    safe_cols = []

    for col in df.columns:
        col_lower = col.lower()
        
        if any(k in col_lower for k in pk_like):
            continue
        if any(t in col_lower for t in ts_like):
            continue
        if df[col].dtype == "datetime64[ns]":
            continue

        safe_cols.append(col)

    return safe_cols


def apply_attribute_swap_to_table(df: pd.DataFrame) -> pd.DataFrame:
    """
    Perform schema drift on safe columns:
    - Random renaming
    - Column value swapping
    - Type transformation
    """
    safe_cols = get_safe_columns(df)
    if len(safe_cols) == 0:
        return df

    num_to_modify = int(len(safe_cols) * ATTRIBUTE_SWAP_RATIO)
    chosen_cols = random.sample(safe_cols, num_to_modify)

    df = df.copy()

    # 1. Rename columns
    for col in chosen_cols:
        new_name = f"attr_{random.randint(100, 999)}"
        df.rename(columns={col: new_name}, inplace=True)

    # 2. Swap values between columns (if at least two exist)
    if len(chosen_cols) >= 2:
        col1, col2 = random.sample(chosen_cols, 2)
        df[col1], df[col2] = df[col2], df[col1]

    return df


def apply_attribute_swap(tables: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    """
    Apply attribute drift to all tables except those requiring strict schema:
    - Do NOT modify event_traces or transition_graph
    - Only modify relational tables
    """
    new_tables = {}

    for name, df in tables.items():
        drifted = apply_attribute_swap_to_table(df)
        new_tables[name] = drifted

    return new_tables


# ===============================================================
# 2. TIMESTAMP MISSINGNESS LOGIC
# ===============================================================

def get_timestamp_columns(df: pd.DataFrame) -> List[str]:
    return [
        col for col in df.columns
        if df[col].dtype == "datetime64[ns]" or "ts" in col.lower()
    ]


def apply_timestamp_missingness(df: pd.DataFrame) -> pd.DataFrame:
    """
    Remove timestamps from 15% of the rows for all timestamp-bearing columns.
    """
    df = df.copy()
    ts_cols = get_timestamp_columns(df)

    if len(ts_cols) == 0:
        return df

    for col in ts_cols:
        n = len(df)
        num_missing = int(n * TIMESTAMP_MISSING_RATIO)
        missing_indices = rng.choice(n, size=num_missing, replace=False)
        df.loc[missing_indices, col] = pd.NaT

    return df


def apply_timestamp_missingness_to_tables(tables: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    new_tables = {}

    for name, df in tables.items():
        new_tables[name] = apply_timestamp_missingness(df)

    return new_tables


# ===============================================================
# 3. COMBINED NOISE PIPELINE
# ===============================================================

def inject_noise(data_dir: str = "./data"):
    """
    Post-process all generated CSV tables in data_dir with:
    - 30% schema drift (attribute swap)
    - 15% timestamp missingness
    """
    print("Loading tables...")
    tables = {}

    for file in os.listdir(data_dir):
        if file.endswith(".csv"):
            df = pd.read_csv(os.path.join(data_dir, file), parse_dates=True)
            tables[file.replace(".csv", "")] = df

    print("Applying attribute drift...")
    tables = apply_attribute_swap(tables)

    print("Applying timestamp missingness...")
    tables = apply_timestamp_missingness_to_tables(tables)

    # Save modified tables
    print("Saving noisy tables...")
    for name, df in tables.items():
        df.to_csv(os.path.join(data_dir, f"{name}.csv"), index=False)

    print("Noise injection complete.")


# ===============================================================
# 4. ENTRY POINT (optional)
# ===============================================================

if __name__ == "__main__":
    inject_noise()


Loading tables...
Applying attribute drift...
Applying timestamp missingness...
Saving noisy tables...
Noise injection complete.


  df.loc[missing_indices, col] = pd.NaT
