# Setup

In [19]:
POSTGRESQL_START_FROM_SCRATCH = True
DOCKER_INTERNAL_HOST = "host.docker.internal"

POSTGRESQL_VPN_DNS = "10.15.20.1"
POSTGRESQL_VPN_SELF_HOSTNAME = "mavasbel.vpn.itam.mx"
POSTGRESQL_VPN_SELF_IP = "10.15.20.2"

POSTGRESQL_BASE_PORT = 5423
POSTGRESQL_WORKDIR = "/var/lib/postgresql"

POSTGRESQL_INIT_USER = "postgres"
POSTGRESQL_INIT_PASSWORD = "password"

In [20]:
from enum import Enum


class DatabasesList(str, Enum):
    BANK = "bank_db"
    ECOMMERCE = "ecommerce_db"
    HEALTHCARE = "healthcare_db"
    SOCIAL_MEDIA = "social_media_db"
    STREAMING = "streaming_service_db"

# Schema: Bank

In [23]:
import random
from typing import Optional, cast
from datetime import datetime, timezone, timedelta
from faker import Faker
from mimesis import Generic
from sqlmodel import Field, SQLModel, Session, create_engine, text
from sqlalchemy.orm import registry


mim_generic = Generic(locale="en")
fake = Faker()

db_url = f"postgresql+psycopg2://{POSTGRESQL_INIT_USER}:{POSTGRESQL_INIT_PASSWORD}@postgres.{POSTGRESQL_VPN_SELF_HOSTNAME}:{POSTGRESQL_BASE_PORT}/{DatabasesList.BANK.value}"
db_engine = create_engine(db_url)

with Session(db_engine) as session:
    session.exec(text("DROP SCHEMA public CASCADE;"))
    session.exec(text("CREATE SCHEMA public;"))
    # session.exec(text("GRANT ALL ON SCHEMA public TO public;"))
    session.exec(text(f"GRANT ALL ON SCHEMA public TO {POSTGRESQL_INIT_USER};"))
    session.commit()
print("üóëÔ∏è PostgreSQL Database wiped clean.")

SQLModel.metadata.clear()
if hasattr(SQLModel, "registry"):
    SQLModel.registry.dispose()
registry().dispose()

# --- 1. References ---


class Country(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).country(), unique=True
    )
    code: str = Field(default_factory=lambda: cast(Faker, fake.unique).country_code())


class City(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(default_factory=mim_generic.address.city)
    country_id: int = Field(foreign_key="country.id")


class Currency(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(default_factory=lambda: cast(Faker, fake.unique).currency_name())
    code: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).currency_code(), unique=True
    )
    symbol: str = Field(default_factory=lambda: fake.currency_symbol())


# --- 2. Organization ---


class Branch(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(default_factory=lambda: f"{cast(Faker, fake.unique).company()}")
    address: str = Field(default_factory=mim_generic.address.address)
    city_id: int = Field(foreign_key="city.id")


# --- 3. Customers ---


class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str = Field(
        default_factory=lambda: f"{cast(Faker, fake.unique).user_name()}",
        unique=True,
    )
    email: str = Field(default_factory=lambda: f"{cast(Faker, fake.unique).email()}")
    password: str = Field(
        default_factory=lambda: mim_generic.person.password(
            length=random.randint(12, 32), hashed=True
        )
    )
    first_name: str = Field(default_factory=mim_generic.person.first_name)
    last_name: str = Field(default_factory=mim_generic.person.last_name)
    ssn: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).ssn(), unique=True
    )


class Account(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    account_number: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).iban(), unique=True
    )
    balance: float = Field(
        default_factory=lambda: round(random.uniform(1000.0, 10000.0), 2)
    )
    account_type: str = Field(
        default_factory=lambda: fake.random_element(["Savings", "Checking"])
    )
    user_id: int = Field(foreign_key="user.id")
    currency_id: int = Field(foreign_key="currency.id")
    branch_id: int = Field(foreign_key="branch.id")


# --- 4. Operations ---


class Card(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    card_number: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).credit_card_number(),
        unique=True,
    )
    card_holder_name: str = Field(
        default="N/A"
    )  # Populated during seeding from User table
    card_type: str = Field(
        default_factory=lambda: fake.random_element(["Debit", "Credit"])
    )
    cvv: str = Field(default_factory=fake.credit_card_security_code)
    expiry_date: str = Field(default_factory=fake.credit_card_expire)
    account_id: int = Field(foreign_key="account.id")


class Deposit(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    amount: float = Field(
        default_factory=lambda: fake.pyfloat(
            positive=True, min_value=1, max_value=10000
        )
    )
    timestamp: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 2),
        )
    )
    destiny_account_id: int = Field(foreign_key="account.id")


class Withdraw(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    amount: float = Field(
        default_factory=lambda: fake.pyfloat(
            positive=True, min_value=1, max_value=10000
        )
    )
    timestamp: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 2),
        )
    )
    source_account_id: int = Field(foreign_key="account.id")


class Transaction(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    amount: float = Field(
        default_factory=lambda: fake.pyfloat(
            positive=True, min_value=1, max_value=10000
        )
    )
    transaction_type: str = Field(default="Transfer")
    timestamp: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 2),
        )
    )
    source_account_id: Optional[int] = Field(default=None, foreign_key="account.id")
    destiny_account_id: Optional[int] = Field(default=None, foreign_key="account.id")


class AuditLog(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    event_type: str = Field(
        default_factory=lambda: fake.random_element(
            [
                "LOGIN",
                "LOGOUT",
                "UPDATED_FIRST_NAME",
                "UPDATED_LAST_NAME",
                "UPDATED_EMAIL",
                "UPDATED_ADDRESS",
            ]
        )
    )
    ip_address: str = Field(default_factory=fake.ipv4_public)
    user_agent: str = Field(default_factory=fake.user_agent)
    timestamp: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 2),
        )
    )
    user_id: int = Field(foreign_key="user.id")


SQLModel.metadata.create_all(db_engine)
print("‚úÖ Schema built. Ready for seeding.")

üóëÔ∏è PostgreSQL Database wiped clean.
‚úÖ Schema built. Ready for seeding.


# Data: Bank

In [24]:
from sqlmodel import Session, select
from tqdm.notebook import tqdm


def seed_complete_system(engine):
    # --- DYNAMIC CONFIGURATION ---
    N_USERS = random.randint(10000, 30000)
    N_ACCOUNTS = random.randint(10000, 25000)
    N_CARDS = random.randint(10000, 30000)

    # Granular Reference Counts
    N_COUNTRIES, N_CURRENCIES = random.randint(30, 80), random.randint(30, 80)
    N_CITIES, N_BRANCHES = random.randint(100, 200), random.randint(50, 100)

    # Granular Operation Counts
    N_DEPOSITS = random.randint(10000, 20000)
    N_WITHDRAWS = random.randint(10000, 20000)
    N_TRANSACTIONS = random.randint(10000, 30000)
    N_AUDITS = random.randint(10000, 40000)

    BATCH_SIZE = 1000

    with Session(engine) as session:

        def get_existing(model):
            return session.exec(select(model)).all()

        # --- 1. References (with merged logic) ---
        # Countries
        for _ in tqdm(range(N_COUNTRIES), desc=f"üåç New Countries ({N_COUNTRIES})"):
            c = Country()
            c_ = session.exec(select(Country).where(Country.name == c.name)).first()
            if c_:
                id = c_.id
                c_.sqlmodel_update(c)
                c_.id = id
                session.add(c_)
            else:
                session.add(c)
        session.commit()
        all_countries = get_existing(Country)

        # Currencies
        for _ in tqdm(range(N_CURRENCIES), desc=f"üí± New Currencies ({N_CURRENCIES})"):
            curr = Currency()
            curr_ = session.exec(
                select(Currency).where(Currency.code == curr.code)
            ).first()
            if curr_:
                id = curr_.id
                curr_.sqlmodel_update(curr)
                curr_.id = id
                session.add(curr_)
            else:
                session.add(curr)
        session.commit()
        all_currencies = get_existing(Currency)

        # Cities
        for _ in tqdm(range(N_CITIES), desc=f"üèôÔ∏è New Cities ({N_CITIES})"):
            city = City(country_id=random.choice(all_countries).id)
            session.merge(city)
            session.commit()
        all_cities = get_existing(City)

        # Branches
        for _ in tqdm(range(N_BRANCHES), desc=f"üè¢ New Branches ({N_BRANCHES})"):
            br = Branch(city_id=random.choice(all_cities).id)
            session.merge(br)
            session.commit()
        all_branch_ids = [b.id for b in get_existing(Branch)]

        # --- 2. Users ---
        existing_users = get_existing(User)
        user_pool = [(u.id, f"{u.first_name} {u.last_name}") for u in existing_users]
        for u in existing_users:
            fake.unique.add_value('username', u.username)
            fake.unique.add_value('email', u.email)

        pbar_u = tqdm(total=N_USERS, desc=f"üë§ New Users ({N_USERS})")
        created_u = 0
        while created_u < N_USERS:
            batch_size = min(BATCH_SIZE, N_USERS - created_u)
            batch = [User() for _ in range(batch_size)]
            session.add_all(batch)
            session.commit()
            user_pool.extend([(u.id, f"{u.first_name} {u.last_name}") for u in batch])
            created_u += batch_size
            pbar_u.update(batch_size)
        pbar_u.close()

        # --- 3. Accounts ---
        # Build map for Card Holder consistency
        account_name_map = {}
        for acc in get_existing(Account):
            u = session.get(User, acc.user_id)
            account_name_map[acc.id] = (
                f"{u.first_name} {u.last_name}" if u else "Unknown"
            )

        curr_ids = [c.id for c in all_currencies]
        pbar_acc = tqdm(total=N_ACCOUNTS, desc=f"üè¶ New Accounts ({N_ACCOUNTS})")
        for i in range(0, N_ACCOUNTS, BATCH_SIZE):
            batch_size = min(BATCH_SIZE, N_ACCOUNTS - i)
            batch_data = []
            for _ in range(batch_size):
                u_id, u_name = random.choice(user_pool)
                acc = Account(
                    user_id=u_id,
                    currency_id=random.choice(curr_ids),
                    branch_id=random.choice(all_branch_ids),
                )
                batch_data.append((acc, u_name))
            session.add_all([item[0] for item in batch_data])
            session.commit()
            for acc, u_name in batch_data:
                account_name_map[acc.id] = u_name
            pbar_acc.update(batch_size)
        pbar_acc.close()

        # --- 4. Cards ---
        all_acc_ids = list(account_name_map.keys())
        pbar_card = tqdm(total=N_CARDS, desc=f"üí≥ New Cards ({N_CARDS})")
        for i in range(0, N_CARDS, BATCH_SIZE):
            batch_size = min(BATCH_SIZE, N_CARDS - i)
            batch = [
                Card(
                    account_id=random.choice(all_acc_ids),
                    card_holder_name=account_name_map[
                        random.choice(all_acc_ids)
                    ].upper(),
                )
                for _ in range(batch_size)
            ]
            session.add_all(batch)
            session.commit()
            pbar_card.update(batch_size)
        pbar_card.close()

        # --- 5. Financial Operations (Split) ---
        # Deposits
        pbar_dep = tqdm(total=N_DEPOSITS, desc=f"üí∞ New Deposits ({N_DEPOSITS})")
        for i in range(0, N_DEPOSITS, BATCH_SIZE):
            batch_size = min(BATCH_SIZE, N_DEPOSITS - i)
            session.add_all(
                [
                    Deposit(destiny_account_id=random.choice(all_acc_ids))
                    for _ in range(batch_size)
                ]
            )
            session.commit()
            pbar_dep.update(batch_size)
        pbar_dep.close()

        # Withdrawals
        pbar_wit = tqdm(total=N_WITHDRAWS, desc=f"üí∏ New Withdrawals ({N_WITHDRAWS})")
        for i in range(0, N_WITHDRAWS, BATCH_SIZE):
            batch_size = min(BATCH_SIZE, N_WITHDRAWS - i)
            session.add_all(
                [
                    Withdraw(source_account_id=random.choice(all_acc_ids))
                    for _ in range(batch_size)
                ]
            )
            session.commit()
            pbar_wit.update(batch_size)
        pbar_wit.close()

        # Transfers
        pbar_tra = tqdm(
            total=N_TRANSACTIONS, desc=f"üîÑ New Transfers ({N_TRANSACTIONS})"
        )
        for i in range(0, N_TRANSACTIONS, BATCH_SIZE):
            batch_size = min(BATCH_SIZE, N_TRANSACTIONS - i)
            session.add_all(
                [
                    Transaction(
                        source_account_id=random.choice(all_acc_ids),
                        destiny_account_id=random.choice(all_acc_ids),
                    )
                    for _ in range(batch_size)
                ]
            )
            session.commit()
            pbar_tra.update(batch_size)
        pbar_tra.close()

        # --- 6. Audit Logs ---
        u_ids = [u[0] for u in user_pool]
        pbar_aud = tqdm(total=N_AUDITS, desc=f"üìã New Audit Logs ({N_AUDITS})")
        for i in range(0, N_AUDITS, BATCH_SIZE):
            batch_size = min(BATCH_SIZE, N_AUDITS - i)
            session.add_all(
                [AuditLog(user_id=random.choice(u_ids)) for _ in range(batch_size)]
            )
            session.commit()
            pbar_aud.update(batch_size)
        pbar_aud.close()

    print(f"\n‚úÖ SEEDING SUCCESSFUL: System now contains {len(all_acc_ids)} accounts.")


seed_complete_system(db_engine)

üåç New Countries (70):   0%|          | 0/70 [00:00<?, ?it/s]

üí± New Currencies (48):   0%|          | 0/48 [00:00<?, ?it/s]

üèôÔ∏è New Cities (100):   0%|          | 0/100 [00:00<?, ?it/s]

üè¢ New Branches (69):   0%|          | 0/69 [00:00<?, ?it/s]

üë§ New Users (25392):   0%|          | 0/25392 [00:00<?, ?it/s]

üè¶ New Accounts (22266):   0%|          | 0/22266 [00:00<?, ?it/s]

üí≥ New Cards (26005):   0%|          | 0/26005 [00:00<?, ?it/s]

üí∞ New Deposits (16974):   0%|          | 0/16974 [00:00<?, ?it/s]

üí∏ New Withdrawals (15936):   0%|          | 0/15936 [00:00<?, ?it/s]

üîÑ New Transfers (11953):   0%|          | 0/11953 [00:00<?, ?it/s]

üìã New Audit Logs (15228):   0%|          | 0/15228 [00:00<?, ?it/s]


‚úÖ SEEDING SUCCESSFUL: System now contains 22266 accounts.


# Consistency: Bank

In [25]:
from sqlmodel import select, func

def synchronize_all_balances(engine):
    with Session(engine) as session:
        # Get all accounts to process
        accounts = session.exec(select(Account)).all()
        
        # print("üîÑ Calculating consistent balances based on ledger history...")
        
        for acc in tqdm(accounts, desc="Aggregating"):
            # 1. Credits (Money In)
            sum_deposits = session.exec(
                select(func.sum(Deposit.amount)).where(Deposit.destiny_account_id == acc.id)
            ).one() or 0.0
            
            sum_transfers_in = session.exec(
                select(func.sum(Transaction.amount)).where(Transaction.destiny_account_id == acc.id)
            ).one() or 0.0

            # 2. Debits (Money Out)
            sum_withdraws = session.exec(
                select(func.sum(Withdraw.amount)).where(Withdraw.source_account_id == acc.id)
            ).one() or 0.0
            
            sum_transfers_out = session.exec(
                select(func.sum(Transaction.amount)).where(Transaction.source_account_id == acc.id)
            ).one() or 0.0

            # 3. Update the Account Balance
            # Formula: (Inbound) - (Outbound)
            acc.balance = (sum_deposits + sum_transfers_in) - (sum_withdraws + sum_transfers_out)
            
            session.add(acc)
        
        session.commit()
        print("\n‚úÖ Reconciliation complete. All balances match the transaction history.")

# Run the synchronization
synchronize_all_balances(db_engine)

Aggregating:   0%|          | 0/22266 [00:00<?, ?it/s]


‚úÖ Reconciliation complete. All balances match the transaction history.


# Validation: Bank

In [26]:
import random
from sqlmodel import select, func

def validate_random_account(engine):
    with Session(engine) as session:
        # 1. Pick a random account that actually has some activity
        # We'll look for accounts that have at least one deposit to make it interesting
        stmt = select(Account).where(Account.id.in_(select(Deposit.destiny_account_id)))
        active_accounts = session.exec(stmt).all()
        
        if not active_accounts:
            print("‚ùå No active accounts found. Please ensure the database is seeded.")
            return

        target_acc = random.choice(active_accounts)
        acc_id = target_acc.id

        # 2. Fetch all individual records
        deps = session.exec(select(Deposit).where(Deposit.destiny_account_id == acc_id)).all()
        wits = session.exec(select(Withdraw).where(Withdraw.source_account_id == acc_id)).all()
        tx_in = session.exec(select(Transaction).where(Transaction.destiny_account_id == acc_id)).all()
        tx_out = session.exec(select(Transaction).where(Transaction.source_account_id == acc_id)).all()

        # 3. Calculate Aggregates
        sum_deps = sum(d.amount for d in deps)
        sum_wits = sum(w.amount for w in wits)
        sum_tx_in = sum(t.amount for t in tx_in)
        sum_tx_out = sum(t.amount for t in tx_out)
        
        calculated_balance = (sum_deps + sum_tx_in) - (sum_wits + sum_tx_out)

        # 4. PRINT REPORT
        print("="*60)
        print(f"VALIDATION REPORT FOR ACCOUNT: {target_acc.account_number}")
        print(f"User ID: {target_acc.user_id}")
        print("="*60)

        print(f"{'TYPE':<20} | {'COUNT':<8} | {'TOTAL AMOUNT':>15}")
        print("-" * 60)
        print(f"{'Deposits (+)':<20} | {len(deps):<8} | {sum_deps:>15.2f}")
        print(f"{'Transfers In (+)':<20} | {len(tx_in):<8} | {sum_tx_in:>15.2f}")
        print(f"{'Withdrawals (-)':<20} | {len(wits):<8} | {sum_wits:>15.2f}")
        print(f"{'Transfers Out (-)':<20} | {len(tx_out):<8} | {sum_tx_out:>15.2f}")
        print("-" * 60)
        
        print(f"{'TOTAL CALCULATED':<31} | {calculated_balance:>15.2f}")
        print(f"{'DATABASE BALANCE':<31} | {target_acc.balance:>15.2f}")
        
        diff = abs(calculated_balance - target_acc.balance)
        status = f"{'      ‚úÖ MATCH'}" if diff < 0.01 else f"      ‚ùå DISCREPANCY ({diff:.2f})"
        print(f"{'VERIFICATION':<31} | {status}")
        print("="*60)

        # # 5. Optional: Print top 5 specific movements
        # if deps or tx_in or wits or tx_out:
        #     print("\nRecent Ledger Activity (First 5):")
        #     for d in deps[:2]: print(f"  [+] Deposit:    +{d.amount:>10.2f}")
        #     for t in tx_in[:2]: print(f"  [+] Transfer:   +{t.amount:>10.2f}")
        #     for w in wits[:2]: print(f"  [-] Withdraw:   -{w.amount:>10.2f}")
        #     for t in tx_out[:2]: print(f"  [-] Transfer:   -{t.amount:>10.2f}")

# Execute validation
validate_random_account(db_engine)

VALIDATION REPORT FOR ACCOUNT: GB50NSOT37451520037828
User ID: 17310
TYPE                 | COUNT    |    TOTAL AMOUNT
------------------------------------------------------------
Deposits (+)         | 1        |         6122.12
Transfers In (+)     | 1        |         5885.21
Withdrawals (-)      | 0        |            0.00
Transfers Out (-)    | 0        |            0.00
------------------------------------------------------------
TOTAL CALCULATED                |        12007.33
DATABASE BALANCE                |        12007.33
VERIFICATION                    |       ‚úÖ MATCH


# Schema: e-commerce

In [27]:
import random
import re
import faker_commerce
from typing import Optional, cast
from datetime import datetime, timezone, timedelta
from faker import Faker
from mimesis import Generic
from sqlmodel import Field, SQLModel, Session, create_engine, text
from sqlalchemy.orm import registry
from sqlalchemy.orm import declared_attr  # Import the missing attribute

mim_generic = Generic(locale="en")
fake = Faker()
fake.add_provider(faker_commerce.Provider)

db_url = f"postgresql+psycopg2://{POSTGRESQL_INIT_USER}:{POSTGRESQL_INIT_PASSWORD}@postgres.{POSTGRESQL_VPN_SELF_HOSTNAME}:{POSTGRESQL_BASE_PORT}/{DatabasesList.ECOMMERCE.value}"
db_engine = create_engine(db_url)

with Session(db_engine) as session:
    session.exec(text("DROP SCHEMA public CASCADE;"))
    session.exec(text("CREATE SCHEMA public;"))
    # session.exec(text("GRANT ALL ON SCHEMA public TO public;"))
    session.exec(text(f"GRANT ALL ON SCHEMA public TO {POSTGRESQL_INIT_USER};"))
    session.commit()
print("üóëÔ∏è PostgreSQL Database wiped clean.")

SQLModel.metadata.clear()
if hasattr(SQLModel, "registry"):
    SQLModel.registry.dispose()
registry().dispose()


# --- 1. User & Payment ---


# Regex to convert CamelCase to snake_case
def to_snake_case(name: str) -> str:
    return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()


class SnakeCaseModel(SQLModel):
    @declared_attr
    def __tablename__(cls) -> str:
        return to_snake_case(cls.__name__)


# --- 1. User & Payment ---


class User(SnakeCaseModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).user_name(), unique=True
    )
    email: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).email(), unique=True
    )
    password: str = Field(
        default_factory=lambda: mim_generic.person.password(hashed=True)
    )
    first_name: str = Field(default_factory=mim_generic.person.first_name)
    last_name: str = Field(default_factory=mim_generic.person.last_name)
    birthdate: datetime = Field(
        default_factory=lambda: fake.date_of_birth(minimum_age=18, maximum_age=90)
    )
    created_at: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 3),
        )
    )


class Card(SnakeCaseModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    card_number: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).credit_card_number()
    )
    expiry: str = Field(default_factory=fake.credit_card_expire)
    cvv: str = Field(default_factory=fake.credit_card_security_code)
    user_id: int = Field(foreign_key="user.id")
    # Added creation_date
    created_at: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 3),
        )
    )


# --- 2. Catalog ---


class Category(SnakeCaseModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).bothify(
            f"{cast(faker_commerce.Provider, fake).ecommerce_category()} {cast(faker_commerce.Provider, fake).ecommerce_material()} %%%%"
        ),
        unique=True,
    )


class Product(SnakeCaseModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(
        default_factory=cast(faker_commerce.Provider, fake).ecommerce_name
    )
    description: str = Field(default_factory=fake.sentence)
    sku: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).ean8(), unique=True
    )
    price: float = Field(default_factory=lambda: round(random.uniform(1.0, 1000.0), 2))


class CategoryProduct(SnakeCaseModel, table=True):
    category_id: int = Field(foreign_key="category.id", primary_key=True)
    product_id: int = Field(foreign_key="product.id", primary_key=True)


# --- 3. Shopping Cart ---


class ShoppingCart(SnakeCaseModel, table=True):
    # id: Optional[int] = Field(default=None, primary_key=True)
    user_id: int = Field(foreign_key="user.id", primary_key=True)
    # status: str = Field(
    #     default_factory=lambda: random.choice(["ACTIVE", "ABANDONED"])
    # )
    created_at: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 3),
        )
    )


class CartProduct(SnakeCaseModel, table=True):
    cart_id: int = Field(foreign_key="shopping_cart.user_id", primary_key=True)
    product_id: int = Field(foreign_key="product.id", primary_key=True)
    # Added quantity and creation date
    quantity: int = Field(default_factory=lambda: random.randint(1, 5))
    created_at: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 3),
        )
    )


# --- 4. Orders & Fulfillment ---


class Order(SnakeCaseModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    status: str = Field(
        default_factory=lambda: random.choice(["PENDING", "COMPLETED", "CANCELLED"])
    )
    user_id: int = Field(foreign_key="user.id")
    # Added creation_date
    created_at: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 3),
        )
    )


class OrderProduct(SnakeCaseModel, table=True):
    order_id: int = Field(foreign_key="order.id", primary_key=True)
    product_id: int = Field(foreign_key="product.id", primary_key=True)
    # Added quantity and creation date
    quantity: int = Field(default_factory=lambda: random.randint(1, 5))
    unit_price: float = Field(default=0.0)
    created_at: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 3),
        )
    )


class Payment(SnakeCaseModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    amount: float = Field(default=0.0)
    transaction_id: str = Field(
        default_factory=lambda: cast(Faker, fake.unique).uuid4()
    )
    order_id: int = Field(foreign_key="order.id", unique=True)
    card_id: int = Field(foreign_key="card.id")
    created_at: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 3),
        )
    )


class Shipment(SnakeCaseModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    tracking_number: str = Field(
        default_factory=lambda: cast(Faker, fake.unique)
        .bothify("??-#########-??")
        .upper()
    )
    carrier: str = Field(default_factory=lambda: random.choice(["DHL", "UPS", "FedEx"]))
    shipping_address: str = Field(default_factory=mim_generic.address.address)
    order_id: int = Field(foreign_key="order.id", unique=True)
    status: str = Field(
        default_factory=lambda: random.choice(
            ["PROCESSING", "SHIPPED", "IN TRANSIT", "DELIVERED"]
        )
    )
    send_date: Optional[datetime] = Field(
        default=None,
    )
    received_date: Optional[datetime] = Field(default=None)
    created_at: datetime = Field(
        default_factory=lambda: fake.date_time_between(
            end_date=datetime.now(tz=timezone.utc),
            start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 3),
        )
    )


SQLModel.metadata.create_all(db_engine)
print("‚úÖ Schema built. Ready for seeding.")

üóëÔ∏è PostgreSQL Database wiped clean.
‚úÖ Schema built. Ready for seeding.


# Data: e-commerce

In [28]:
from ast import Or
import random
from tqdm.notebook import tqdm
from sqlmodel import Session, select


def seed_ecommerce_system(engine):
    # --- DYNAMIC CONFIGURATION ---
    # Adjust ranges as needed for your testing scale
    N_USERS = random.randint(5000, 10000)
    N_CATEGORIES = random.randint(100, 200)
    N_PRODUCTS = random.randint(500, 1000)
    MIN_CARDS_PER_USER = 0
    MAX_CARDS_PER_USER = 10
    MIN_PROD_PER_CART = 0
    MAX_PROD_PER_CART = 10
    MIN_PROD_PER_ORDER = 0
    MAX_PROD_PER_ORDER = 10
    MIN_PROD_QUANT_IN_ORDER = 1
    MAX_PROD_QUANT_IN_ORDER = 10
    N_ORDERS = random.randint(8000, 16000)

    with Session(engine) as session:

        for _ in tqdm(range(N_CATEGORIES), desc="üè∑Ô∏è Categories"):
            session.add(Category())
        session.commit()
        category_ids = session.exec(select(Category.id)).all()

        for i in tqdm(range(0, N_PRODUCTS), desc="üì¶ Products"):
            session.add(Product())
        session.commit()
        product_ids = session.exec(select(Product.id)).all()

        for p_id in tqdm(product_ids, desc="üîó Category Products Mapping"):
            chosen_cats = random.sample(category_ids, k=random.randint(1, 10))
            for c_id in chosen_cats:
                session.add(CategoryProduct(category_id=c_id, product_id=p_id))
        session.commit()

        for i in tqdm(range(0, N_USERS), desc="üë§ Users"):
            session.add(User())
        session.commit()
        user_ids = session.exec(select(User.id)).all()

        for u_id in tqdm(user_ids, desc="üí≥ Cards & Carts"):
            session.add(ShoppingCart(user_id=u_id))
            for _ in range(random.randint(MIN_CARDS_PER_USER, MAX_CARDS_PER_USER)):
                session.add(Card(user_id=u_id))
        session.commit()

        cart_ids = session.exec(select(ShoppingCart.user_id)).all()

        for c_id in tqdm(cart_ids, desc="üõí Cart Items"):
            cart_prod_ids = random.sample(
                product_ids, random.randint(MIN_PROD_PER_CART, MAX_PROD_PER_CART)
            )
            card_products = [
                CartProduct(cart_id=c_id, product_id=prod_id)
                for prod_id in cart_prod_ids
            ]
            session.add_all(card_products)
        session.commit()

        for _ in tqdm(range(0, N_ORDERS), desc="üìù Orders"):
            u_id = random.choice(user_ids)
            order = Order(user_id=u_id)
            session.add(order)
        session.commit()

        order_ids = session.exec(select(Order.id)).all()
        for order_id in tqdm(order_ids, desc="üõí Shipments & Payments"):
            prod_ids = random.sample(
                product_ids, random.randint(MIN_PROD_PER_ORDER, MAX_PROD_PER_ORDER)
            )
            for prod_id in prod_ids:
                session.add(
                    OrderProduct(
                        order_id=order_id,
                        product_id=prod_id,
                        quantity=random.randint(
                            MIN_PROD_QUANT_IN_ORDER, MAX_PROD_QUANT_IN_ORDER
                        ),
                    )
                )

            # Create Payment if order is not Pending/Cancelled (mostly)
            if order.status in ["COMPLETED", "CANCELLED"]:
                user_card_id = None
                user_cards = session.exec(
                    select(Card.id)
                    .join(User, User.id == Card.user_id)
                    .join(Order, Order.user_id == User.id)
                    .where(Order.id == order_id)
                ).all()
                if user_cards:
                    user_card_id = random.choice(user_cards)
                else:
                    card = Card(user_id=u_id)
                    session.add(card)
                    session.flush()
                    user_card_id = card.id

                # Create Payment
                payment = Payment(
                    order_id=order_id,
                    amount=0,
                    card_id=user_card_id,
                )
                session.add(payment)
                session.flush()

                # Create Shipment
                shipment_status = random.choice(
                    ["PROCESSING", "SHIPPED", "IN TRANSIT", "DELIVERED"]
                )
                received_date = None
                send_date = None
                creation_date = fake.date_time_between(
                    start_date=datetime.now(tz=timezone.utc) - timedelta(days=365 * 3),
                    end_date=datetime.now(tz=timezone.utc),
                )
                if shipment_status in ["SHIPPED", "IN TRANSIT", "DELIVERED"]:
                    fake.time_delta(end_datetime=creation_date)
                    send_date = creation_date + fake.time_delta(
                        end_datetime=creation_date
                    )
                if shipment_status in ["DELIVERED"]:
                    received_date = send_date + fake.time_delta(end_datetime=send_date)

                shipment = Shipment(
                    order_id=order_id,
                    status=shipment_status,
                    created_at=creation_date,
                    send_date=send_date,
                    received_date=received_date,
                )
                session.add(shipment)
                session.flush()
            session.flush()
        session.commit()

    print(
        f"\n‚úÖ DATABASE SEEDED: {N_USERS} Users, {N_PRODUCTS} Products, and {N_ORDERS} Orders created."
    )


seed_ecommerce_system(db_engine)

üè∑Ô∏è Categories:   0%|          | 0/140 [00:00<?, ?it/s]

üì¶ Products:   0%|          | 0/662 [00:00<?, ?it/s]

üîó Category Products Mapping:   0%|          | 0/662 [00:00<?, ?it/s]

üë§ Users:   0%|          | 0/5182 [00:00<?, ?it/s]

üí≥ Cards & Carts:   0%|          | 0/5182 [00:00<?, ?it/s]

üõí Cart Items:   0%|          | 0/5182 [00:00<?, ?it/s]

üìù Orders:   0%|          | 0/10736 [00:00<?, ?it/s]

üõí Shipments & Payments:   0%|          | 0/10736 [00:00<?, ?it/s]


‚úÖ DATABASE SEEDED: 5182 Users, 662 Products, and 10736 Orders created.


# Consistency: e-commerce

In [29]:
from sqlmodel import Session, select
from tqdm.notebook import tqdm

def synchronize_financial_data(engine):
    with Session(engine) as session:
        # 1. Map Products to Prices for quick lookup
        # print("Mapping product prices...")
        product_price_map = {p.id: p.price for p in session.exec(select(Product)).all()}
        
        # 2. Update OrderProduct unit prices to match Product catalog
        order_products = session.exec(select(OrderProduct)).all()
        # print(f"Syncing {len(order_products)} order line items...")
        for op in tqdm(order_products, desc="Updating unit prices"):
            if op.product_id in product_price_map:
                op.unit_price = product_price_map[op.product_id]
                session.add(op)
            session.flush()
        
        session.commit()
        # print("‚úÖ Unit prices synchronized with Product catalog.")

        # 3. Compute Order Totals and Update Payments
        # We fetch orders and their related payments and line items
        all_orders = session.exec(select(Order)).all()
        
        # print(f"Recomputing totals for {len(all_orders)} orders...")
        for order in tqdm(all_orders, desc="Calculating totals"):
            # Get all items for this specific order
            items = session.exec(
                select(OrderProduct).where(OrderProduct.order_id == order.id)
            ).all()
            
            # Calculate total: sum(quantity * unit_price)
            order_total = sum((item.quantity * item.unit_price) for item in items)
            order_total = round(order_total, 2)
            
            # Find the corresponding payment for this order
            payment = session.exec(
                select(Payment).where(Payment.order_id == order.id)
            ).first()
            
            if payment:
                payment.amount = order_total
                session.add(payment)
            
            # Optional: If you had a 'total_price' field in the Order table, 
            # you would update it here: order.total_price = order_total
            
        session.commit()
        print("\n‚úÖ Payment amounts synchronized with Order totals.")

# Execute the sync
synchronize_financial_data(db_engine)

Updating unit prices:   0%|          | 0/53677 [00:00<?, ?it/s]

Calculating totals:   0%|          | 0/10736 [00:00<?, ?it/s]


‚úÖ Payment amounts synchronized with Order totals.


# Validation: e-commerce

In [48]:
import random
from sqlmodel import Session, select

def validate_ecommerce_data(engine):
    with Session(engine) as session:
        # 1. Pick a random order that has items
        # Using a subquery to ensure we select an order that actually has OrderProducts
        order_ids_with_items = session.exec(select(OrderProduct.order_id).distinct()).all()
        
        if not order_ids_with_items:
            print("‚ùå No orders with products found. Please seed the database first.")
            return

        random_order_id = random.choice(order_ids_with_items)
        order = session.get(Order, random_order_id)
        
        # 2. Fetch related data
        order_items = session.exec(
            select(OrderProduct).where(OrderProduct.order_id == order.id)
        ).all()
        
        payment = session.exec(
            select(Payment).where(Payment.order_id == order.id)
        ).first()

        # 3. Validation Logic
        print("="*70)
        print(f"üîç VALIDATION REPORT FOR ORDER ID: {order.id}")
        print(f"Customer User ID: {order.user_id} | Status: {order.status}")
        print("="*70)
        print(f"{'Product ID':<12} | {'Qty':<5} | {'Unit Price':>12} | {'Catalog Price':>15} | {'Subtotal':>12}")
        print("-" * 70)

        calculated_order_total = 0.0
        price_mismatch = False

        for item in order_items:
            # Cross-reference with Product table
            product = session.get(Product, item.product_id)
            catalog_price = product.price if product else 0.0
            subtotal = item.quantity * item.unit_price
            calculated_order_total += subtotal
            
            # Check for price discrepancies
            match_icon = "‚úÖ" if abs(item.unit_price - catalog_price) < 0.01 else "‚ùå"
            if match_icon == "‚ùå": price_mismatch = True
            
            print(f"{item.product_id:<12} | {item.quantity:<5} | {item.unit_price:>12.2f} | {catalog_price:>15.2f} {match_icon} | {subtotal:>12.2f}")

        calculated_order_total = round(calculated_order_total, 2)
        print("-" * 70)
        
        # 4. Final Verification
        payment_amount = payment.amount if payment else 0.0
        payment_diff = abs(calculated_order_total - payment_amount)
        
        print(f"{'TOTAL CALCULATED FROM ITEMS':<48} | {calculated_order_total:>12.2f}")
        print(f"{'PAYMENT REGISTERED (DB)':<48} | {payment_amount:>12.2f}")
        
        # Result Summary
        print("="*70)
        print("FINAL CHECKS:")
        
        price_status = "‚úÖ PASS" if not price_mismatch else "‚ùå FAIL (Catalog mismatch)"
        total_status = "‚úÖ PASS" if payment_diff < 0.01 else f"‚ùå FAIL (Diff: {payment_diff:.2f})"
        
        print(f"1. Catalog Price Sync:  {price_status}")
        print(f"2. Payment Consistency: {total_status}")
        print("="*70)

# Execute the validation
validate_ecommerce_data(db_engine)

üîç VALIDATION REPORT FOR ORDER ID: 5137
Customer User ID: 2606 | Status: PENDING
Product ID   | Qty   |   Unit Price |   Catalog Price |     Subtotal
----------------------------------------------------------------------
34           | 6     |        49.51 |           49.51 ‚úÖ |       297.06
242          | 7     |       972.26 |          972.26 ‚úÖ |      6805.82
496          | 4     |       781.41 |          781.41 ‚úÖ |      3125.64
621          | 1     |       537.98 |          537.98 ‚úÖ |       537.98
----------------------------------------------------------------------
TOTAL CALCULATED FROM ITEMS                      |     10766.50
PAYMENT REGISTERED (DB)                          |         0.00
FINAL CHECKS:
1. Catalog Price Sync:  ‚úÖ PASS
2. Payment Consistency: ‚ùå FAIL (Diff: 10766.50)


# -----------------------------------------------------------------------------------------------------------
# -----------------------------------------------------------------------------------------------------------
# -----------------------------------------------------------------------------------------------------------
# -----------------------------------------------------------------------------------------------------------
# -----------------------------------------------------------------------------------------------------------

In [167]:
from enum import StrEnum


class SQLRequirement(StrEnum):
    AGGREGATION = "Must include aggregation functions (SUM, AVG, COUNT, etc.)."
    WINDOWING = "Must include partitioning and window functions (RANK, DENSE_RANK, ROW_NUMBER, etc.)."
    ORDER = "Must user ORDER BY to sort by specified criteria."
    LIMIT_OFFSET = (
        "Must use LIMIT and OFFSET to return a specific slice of the results."
    )
    SUBQUERY = "Must utilize a subquery (correlated or non-correlated) in a WHERE or HAVING clause."
    CTE = "The query must be structured using at least one Common Table Expression (WITH clause)."
    JOIN = "The query must include at least one JOIN clause (LEFT JOIN, RIGHT JOIN, INNER JOIN, JOIN, etc.)"
    UNION = (
        "Requires the use of UNION or UNION ALL to combine disparate operation tables."
    )
    INTERSECTION = (
        "Must use INTERSECT to find common entities across different event types."
    )
    TEMPORAL = "Must use SQL date functions (EXTRACT, DATE_PART, or INTERVAL)."
    CASE_LOGIC = "Must use a CASE statement to create conditional categories (e.g., risk levels)."
    HAVING = "Must use a HAVING clause to filter grouped results."
    NULL_HANDLING = "Must use COALESCE or IS NULL to manage optional or missing values."
    STRING_OP = "Must use string functions (LIKE, CONCAT, or SUBSTR) for formatting or filtering."
    EXCEPT_LOGIC = "Must use the EXCEPT operator to exclude specific subsets of data."


# A comprehensive list of paths using the Enum members
CHALLENGE_PATHS = [
    {
        "id": "PATH_GEOGRAPHIC_WEALTH",
        "title": "Regional Liquidity Analysis",
        "tables": ["Country", "City", "Branch", "Account"],
        "description": "Create a query to obtain the total number of account balances are distributed across specific jurisdictions.",
        "valid_requirements": [
            SQLRequirement.JOIN,
            SQLRequirement.AGGREGATION,
            SQLRequirement.WINDOWING,
            SQLRequirement.CTE,
            SQLRequirement.ORDER,
        ],
        "output_columns": ["country_name", "city_name", "total_balance", "wealth_rank"],
    },
    {
        "id": "PATH_GHOST_USERS",
        "title": "Silent User Audit",
        "tables": ["User", "Account", "AuditLog"],
        "description": "Find users with significant funds who show no security activity in the logs.",
        "valid_requirements": [
            SQLRequirement.EXCEPT_LOGIC,
            SQLRequirement.JOIN,
            SQLRequirement.TEMPORAL,
        ],
        "output_columns": ["user_id", "username"],
    },
    {
        "id": "PATH_SECURITY_LOGS",
        "title": "User Behavioral Forensics",
        "tables": ["User", "AuditLog"],
        "description": "Examine security event patterns and session frequency for users.",
        "valid_requirements": [
            SQLRequirement.JOIN,
            SQLRequirement.WINDOWING,
            SQLRequirement.SUBQUERY,
            SQLRequirement.LIMIT_OFFSET,
            SQLRequirement.ORDER,
        ],
        "output_columns": ["username", "ip_address", "event_type", "event_rank"],
    },
    {
        "id": "PATH_CASH_FLOW",
        "title": "Consolidated Ledger Audit",
        "tables": ["Account", "Deposit", "Withdraw"],
        "description": "Compare inflows (deposits) and outflows (withdrawals) for specific accounts.",
        "valid_requirements": [
            SQLRequirement.UNION,
            SQLRequirement.INTERSECTION,
            SQLRequirement.CTE,
            SQLRequirement.AGGREGATION,
            SQLRequirement.JOIN,
        ],
        "output_columns": [
            "account_number",
            "movement_type",
            "amount",
            "transaction_date",
        ],
    },
    {
        "id": "PATH_TRANSFER_VELOCITY",
        "title": "Currency Transfer Analysis",
        "tables": ["Account", "Transaction", "Currency"],
        "description": "Track the volume of 'Transfer' type transactions across different currency codes.",
        "valid_requirements": [
            SQLRequirement.JOIN,
            SQLRequirement.AGGREGATION,
            SQLRequirement.WINDOWING,
            SQLRequirement.CTE,
            SQLRequirement.SUBQUERY,
        ],
        "output_columns": [
            "source_account",
            "destiny_account",
            "amount",
            "currency_code",
        ],
    },
    {
        "id": "PATH_CARD_EXPOSURE",
        "title": "Card Holder Risk Assessment",
        "tables": ["User", "Account", "Card"],
        "description": "Evaluate the financial backing of Credit vs Debit cards based on user balances.",
        "valid_requirements": [
            SQLRequirement.JOIN,
            SQLRequirement.SUBQUERY,
            SQLRequirement.AGGREGATION,
            SQLRequirement.ORDER,
            SQLRequirement.LIMIT_OFFSET,
        ],
        "output_columns": ["full_name", "card_number", "card_type", "balance"],
    },
]

In [169]:
from typing import Any, Dict, List


class SQLExerciseGenerator:
    def __init__(self, paths: List[Dict[str, Any]]):
        self.paths = paths

    def generate(self, student_id: int) -> str:
        # 1. Deterministic Seeding
        random.seed(student_id)
        
        # 2. Select Path and Metadata
        path = random.choice(self.paths)
        
        # 3. Select 3 Requirements valid for this path
        selected_reqs = random.sample(
            path["valid_requirements"], 
            k=min(3, len(path["valid_requirements"]))
        )
        
        # 4. Parameterized Values
        # Relative date: Pick a year from the last 2 years relative to 'now'
        now = datetime.now()
        target_year = now.year - (student_id % 2) 
        target_month = (student_id % 12) + 1
        month_name = datetime(2000, target_month, 1).strftime('%B')
        
        # Unique thresholds and limits
        v_threshold = (student_id * 31) % 5000 + 1000
        v_limit = (student_id % 5) + 5
        
        # 5. Format the Mission Description
        # Injects anchor_metric and primary_entity into the template string
        formatted_desc = path["description"].format(
            anchor_metric=path.get("anchor_metric", "value"),
            primary_entity=path.get("primary_entity", "record")
        )
        
        # 6. Build the Technical Requirements List
        req_list_str = "\n".join([f"   - [ ] {req.value}" for req in selected_reqs])

        # 7. Final Prompt Assembly
        prompt = f"""
--- üìù UNIQUE SQL SPECIFICATION: STUDENT {student_id} ---

MISSION: {path['title']}
DESCRIPTION: {formatted_desc}

MISSION LOGIC:
1. DATA SCOPE: Filter records exclusively for the period: {month_name} {target_year}.
2. THRESHOLD: Only include records where ({path.get('anchor_metric', 'value')}) is > ${v_threshold}.
3. LOGIC GATES: Your query must satisfy the following:

TECHNICAL REQUIREMENTS:
{req_list_str}

STRICT OUTPUT SCHEMA (Order matters):
{", ".join(path['output_columns'])}

ORDERING:
- Order by the final column in your schema DESC.
- Return the top {v_limit} results.
---"""
        return prompt

# --- Example Execution ---
if __name__ == "__main__":    
    generator = SQLExerciseGenerator(CHALLENGE_PATHS)
    
    # Generate for a specific student
    print(generator.generate(student_id=101))


--- üìù UNIQUE SQL SPECIFICATION: STUDENT 101 ---

MISSION: Currency Transfer Analysis
DESCRIPTION: Track the volume of 'Transfer' type transactions across different currency codes.

MISSION LOGIC:
1. DATA SCOPE: Filter records exclusively for the period: June 2024.
2. THRESHOLD: Only include records where (value) is > $4131.
3. LOGIC GATES: Your query must satisfy the following:

TECHNICAL REQUIREMENTS:
   - [ ] Must include aggregation functions (SUM, AVG, COUNT, etc.).
   - [ ] Must include partitioning and window functions (RANK, DENSE_RANK, ROW_NUMBER, etc.).
   - [ ] Must utilize a subquery (correlated or non-correlated) in a WHERE or HAVING clause.

STRICT OUTPUT SCHEMA (Order matters):
source_account, destiny_account, amount, currency_code

ORDERING:
- Order by the final column in your schema DESC.
- Return the top 6 results.
---


In [161]:
import random
from datetime import datetime
from typing import List, Dict, Any

class SQLExerciseGenerator:
    def __init__(self):
        # Full library of technical requirements
        self.requirements: Dict[str, str] = {
            "aggregation": "Must include aggregate functions (SUM, AVG, COUNT, etc.) with GROUP BY.",
            "windowing": "Requires a Window Function (e.g., RANK, DENSE_RANK, ROW_NUMBER, etc.) to categorize data.",
            "order": "Must be explicitly ordered by a calculated metric or alias.",
            "limit_offset": "Must use LIMIT and OFFSET to return a specific slice of the results.",
            "subquery": "Must utilize a subquery (correlated or non-correlated) in a WHERE or HAVING clause.",
            "cte": "The query must be structured using at least one Common Table Expression (WITH clause).",
            "join": "Requires a multi-stage join across at least three distinct tables.",
            "union": "Requires the use of UNION to combine results from Deposits and Withdraws.",
            "intersection": "Must use INTERSECT to find overlap between AuditLog and Account types.",
            "temporal": "Must use SQL date functions (EXTRACT, DATE_PART, or TO_CHAR) for month/year filtering."
        }

        # Expanded paths covering all tables in your SQLModel schema
        self.paths: List[Dict[str, Any]] = [
            {
                "id": "GEO_WEALTH",
                "title": "Geographic Liquidity Map",
                "path": "Country ‚Üî City ‚Üî Branch ‚Üî Account",
                "valid_reqs": ["join", "aggregation", "windowing", "cte", "temporal"],
                "schema": ["country_name", "city_name", "total_balance", "wealth_rank"]
            },
            {
                "id": "USER_FORENSICS",
                "title": "Security & Activity Audit",
                "path": "User ‚Üî AuditLog ‚Üî Account",
                "valid_reqs": ["windowing", "subquery", "temporal", "order", "join"],
                "schema": ["username", "email", "event_type", "account_balance", "event_order"]
            },
            {
                "id": "CASH_FLOW",
                "title": "Global Movement Ledger",
                "path": "Account ‚Üî Deposit ‚Üî Withdraw",
                "valid_reqs": ["union", "intersection", "cte", "temporal", "aggregation"],
                "schema": ["account_number", "flow_type", "amount", "entry_date"]
            },
            {
                "id": "FX_TRANSFERS",
                "title": "Currency Transfer Velocity",
                "path": "Transaction ‚Üî Account ‚Üî Currency",
                "valid_reqs": ["join", "aggregation", "windowing", "cte", "temporal"],
                "schema": ["account_number", "currency_code", "total_transferred", "transfer_rank"]
            },
            {
                "id": "CARD_SECURITY",
                "title": "Cardholder Risk Assessment",
                "path": "User ‚Üî Account ‚Üî Card",
                "valid_reqs": ["join", "subquery", "aggregation", "order", "limit_offset"],
                "schema": ["full_name", "card_number", "card_type", "current_balance"]
            }
        ]

    def generate(self, student_id: int) -> str:
        random.seed(student_id)
        
        # --- 1. DYNAMIC RELATIVE DATE LOGIC ---
        now = datetime.now()
        # Randomly pick a year from the last 2 years (relative to today)
        target_year = now.year - (student_id % 2) 
        target_month = (student_id % 12) + 1
        month_name = datetime(2000, target_month, 1).strftime('%B')
        
        # --- 2. PATH & REQUIREMENT SELECTION ---
        path = random.choice(self.paths)
        req_keys = random.sample(path["valid_reqs"], k=min(3, len(path["valid_reqs"])))
        
        # Force temporal requirement if the path relies on the date scope
        if "temporal" in path["valid_reqs"] and "temporal" not in req_keys:
            req_keys[0] = "temporal"
            
        # --- 3. PARAMETERIZED VALUES ---
        v_threshold = (student_id * 31) % 5000 + 1000
        v_limit = (student_id % 5) + 5
        
        # --- 4. ASSEMBLY ---
        requirements_formatted = "\n".join([f"   - [ ] {self.requirements[rk]}" for rk in req_keys])

        prompt = f"""
--- üìù UNIQUE SQL SPECIFICATION: STUDENT {student_id} ---

MISSION: {path['title']}
STRUCTURE: {path['path']}

MISSION LOGIC:
1. DATA SCOPE: Filter records exclusively for the period: {month_name} {target_year}.
2. THRESHOLD: Only include records where the primary metric (balance/amount) is > ${v_threshold}.
3. LOGIC GATES: Your query must satisfy the following:

TECHNICAL REQUIREMENTS:
{requirements_formatted}

STRICT OUTPUT SCHEMA (Order matters):
{", ".join(path['schema'])}

ORDERING:
- Order by the final column in your schema DESC.
- Return the top {v_limit} results.
---"""
        return prompt

In [160]:
# --- TEST ---
generator = SQLExerciseGenerator()
print(generator.generate(student_id=101))


--- üìù UNIQUE SQL SPECIFICATION: STUDENT 101 ---

MISSION: Cardholder Risk Assessment
STRUCTURE: User ‚Üî Account ‚Üî Card

MISSION LOGIC:
1. DATA SCOPE: Filter records exclusively for the period: June 2024.
2. THRESHOLD: Only include records where the primary metric (balance/amount) is > $4131.
3. LOGIC GATES: Your query must satisfy the following:

TECHNICAL REQUIREMENTS:
   - [ ] Must utilize a subquery (correlated or non-correlated) in a WHERE or HAVING clause.
   - [ ] Must include aggregate functions (SUM, AVG, or COUNT) with GROUP BY.
   - [ ] Must use LIMIT and OFFSET to return a specific slice of the results.

STRICT OUTPUT SCHEMA (Order matters):
full_name, card_number, card_type, current_balance

ORDERING:
- Order by the final column in your schema DESC.
- Return the top 6 results.
---


In [153]:
def generate_hardened_challenge(student_id):
    random.seed(student_id)
    
    # MISSION: Calculate a 'Control Sum'
    # The student MUST find these specific values in their DB to get it right.
    parts = [
        "the count of characters in the user's city name",
        "the number of 'LOGIN' events they have",
        "the first digit of their oldest account number"
    ]
    selected_parts = random.sample(parts, 2)
    
    prompt = f"""
    --- üõ°Ô∏è HARDENED SQL CHALLENGE: STUDENT {student_id} ---
    
    GOAL: Generate a High-Value User Report.
    
    LOGIC: 
    1. Select users with a balance > $5000.
    2. Join with City and AuditLog tables.
    
    MANDATORY VERIFICATION COLUMN:
    You must include a column named 'VERIFICATION_KEY'. 
    It must be calculated as: ({selected_parts[0]}) + ({selected_parts[1]}).
    
    OUTPUT SCHEMA:
    username, total_balance, city_name, VERIFICATION_KEY
    
    ORDER BY: VERIFICATION_KEY DESC
    """
    return prompt

In [154]:
print(generate_hardened_challenge(100))


    --- üõ°Ô∏è HARDENED SQL CHALLENGE: STUDENT 100 ---

    GOAL: Generate a High-Value User Report.

    LOGIC: 
    1. Select users with a balance > $5000.
    2. Join with City and AuditLog tables.

    MANDATORY VERIFICATION COLUMN:
    You must include a column named 'VERIFICATION_KEY'. 
    It must be calculated as: (the count of characters in the user's city name) + (the number of 'LOGIN' events they have).

    OUTPUT SCHEMA:
    username, total_balance, city_name, VERIFICATION_KEY

    ORDER BY: VERIFICATION_KEY DESC
    


In [148]:
import random

def create_random_challenge(student_id):
    random.seed(student_id)
    
    # 1. Choose a "Focal Point" (The core entity we are investigating)
    focal_points = ["User", "Account", "Branch", "City", "Currency"]
    focus = random.choice(focal_points)
    
    # 2. Choose a "Relationship Depth" (How many joins)
    depth = random.randint(2, 4)
    
    # 3. Choose a "Technical Complexity" (The SQL trick)
    complexities = [
        {"name": "Set Difference", "logic": "NOT EXISTS", "hint": "Find X that have no Y"},
        {"name": "Window Ranking", "logic": "DENSE_RANK()", "hint": "Rank X by Y within Z"},
        {"name": "Temporal Gap", "logic": "INTERVAL", "hint": "Find X then Y within N minutes"},
        {"name": "Aggregated Comparison", "logic": "HAVING", "hint": "Find X where total Y > average Z"}
    ]
    comp = random.choice(complexities)
    
    # 4. Define the Output Schema (Strictly ordered)
    # We randomize the required columns to make the queries unique
    possible_cols = ["id", "name", "balance", "count", "rank", "timestamp"]
    selected_cols = random.sample(possible_cols, 3) + ["calculated_metric"]
    
    # 5. Build the Prompt
    prompt = f"""
    --- üèÜ CUSTOM CHALLENGE FOR STUDENT {student_id} ---
    FOCUS: {focus}-centric analysis.
    COMPLEXITY: {comp['name']} ({comp['hint']}).
    
    MISSION: Identify {focus} entities where... [Logic generated based on {comp['logic']}]
    
    REQUIRED OUTPUT COLUMNS (Exact order):
    {", ".join([f"'{c}'" for c in selected_cols])}
    
    ORDERING: {random.choice(selected_cols)} {'DESC' if random.random() > 0.5 else 'ASC'}
    """
    
    return {
        "prompt": prompt,
        "inject_type": comp['name'],
        "expected_columns": selected_cols,
        "seed": student_id
    }

In [150]:
import random

def generate_anti_copy_challenge(student_id):
    random.seed(student_id)
    
    # 1. TOPOLOGY: How the tables must connect
    topologies = [
        {"desc": "Star Join", "hint": "Join User as the center to Branch and AuditLog separately."},
        {"desc": "Linear Chain", "hint": "Connect Country ‚Üí City ‚Üí Branch ‚Üí Account in a single chain."},
        {"desc": "Self-Referential", "hint": "You must join the Transaction table to itself."}
    ]
    
    # 2. STRUCTURAL CONSTRAINT: The mandatory SQL feature
    constraints = [
        {"type": "CTE", "req": "Must define the logic inside a WITH clause."},
        {"type": "Subquery", "req": "The main filter must reside in a WHERE (SELECT...) subquery."},
        {"type": "Window", "req": "You must use a Window Function (OVER PARTITION BY)."},
        {"type": "Set Op", "req": "You must use EXCEPT or INTERSECT to find the result."}
    ]
    
    # 3. ALIASING RULES: (Prevents copy-pasting code directly)
    alias_styles = [
        "Use 'u_info' for User and 'acc_data' for Account.",
        "Prefix all column aliases with 'out_'.",
        "Use Spanish aliases (e.g., 'saldo_total', 'nombre_usuario').",
        "Use underscore-separated uppercase (e.g., USER_NAME_KEY)."
    ]

    topology = random.choice(topologies)
    constraint = random.choice(constraints)
    alias = random.choice(alias_styles)
    
    # Randomly select 4 columns from the available schema
    all_columns = ["u.username", "u.email", "a.balance", "a.account_number", "c.name as country", "cur.code"]
    output_columns = random.sample(all_columns, 4)
    
    prompt = f"""
    --- üìù UNIQUE SQL SPECIFICATION: STUDENT {student_id} ---
    
    STRUCTURE: {topology['desc']}
    MANDATORY TECHNIQUE: {constraint['type']} ({constraint['req']})
    NAMING CONVENTION: {alias}
    
    MISSION: 
    [Insert Random Logic Here: e.g., Find users with a balance > city average]
    
    STRICT OUTPUT SCHEMA (Order matters):
    {', '.join(output_columns)}
    
    ORDERING: 
    Order by the 3rd column in your SELECT list {random.choice(['ASC', 'DESC'])}.
    """
    return prompt

In [152]:
print(generate_anti_copy_challenge(100))


    --- üìù UNIQUE SQL SPECIFICATION: STUDENT 100 ---

    STRUCTURE: Star Join
    MANDATORY TECHNIQUE: Set Op (You must use EXCEPT or INTERSECT to find the result.)
    NAMING CONVENTION: Use underscore-separated uppercase (e.g., USER_NAME_KEY).

    MISSION: 
    [Insert Random Logic Here: e.g., Find users with a balance > city average]

    STRICT OUTPUT SCHEMA (Order matters):
    u.email, a.account_number, a.balance, cur.code

    ORDERING: 
    Order by the 3rd column in your SELECT list ASC.
    


In [149]:
def seed_student_data(session, challenge):
    """
    This function reads the 'inject_type' and 'seed' from the challenge
    and forces the DB to have 3-5 rows that match the criteria.
    """
    random.seed(challenge['seed']) # Use same seed to match the problem
    
    if challenge['inject_type'] == "Temporal Gap":
        # Force 5 users to have the 'rapid update' timestamps
        for _ in range(5):
            u = session.exec(select(User)).random_one()
            t1 = datetime.now()
            t2 = t1 + timedelta(minutes=random.randint(1, 4))
            # ... insert logs ...
            
    elif challenge['inject_type'] == "Set Difference":
        # Force 5 branches to have ZERO accounts despite being in active cities
        pass

In [103]:
import random

def generate_complex_exercise(student_id):
    random.seed(student_id)
    
    # 1. EXPANDED ARCHITECTURAL PATHS (The "From" clause)
    paths = [
        "User ‚Üî Account ‚Üî Transaction (Flow analysis)",
        "User ‚Üî AuditLog (Behavioral analysis)",
        "Branch ‚Üî City ‚Üî Country (Geographic analysis)",
        "Account ‚Üî Card (Product usage)",
        "User ‚Üî Account ‚Üî Currency ‚Üî Country (Cross-border analysis)",
        "Account ‚Üî Deposit ‚Üî Withdraw (Liquidity analysis)",
        "City ‚Üî Branch ‚Üî Account ‚Üî Transaction (Local economic activity)",
        "User ‚Üî AuditLog (Security/Forensics)",
        "Currency ‚Üî Account ‚Üî User (Foreign Exchange Exposure)",
        "Country ‚Üî City ‚Üî User ‚Üî Account (Demographic wealth)",
        "Card ‚Üî Account ‚Üî Deposit (Credit-limit/Prepaid behavior)",
        "User ‚Üî Transaction (Peer-to-peer / Direct flow)",
        "Branch ‚Üî Account (Operational load)",
        "AuditLog (Self-Join for Session analysis)",
        "Currency ‚Üî Country (National monetary metrics)"
    ]
    
    # 2. EXPANDED STRUCTURAL REQUIREMENTS (The "How")
    structures = [
        {"desc": "Relative Comparison", "sql_hint": "Requires a Correlated Subquery or a JOIN on a CTE/Subquery."},
        {"desc": "Threshold Frequency", "sql_hint": "Requires GROUP BY with a HAVING clause."},
        {"desc": "Non-Existence / Exclusion", "sql_hint": "Requires a LEFT JOIN with a NULL check or a NOT EXISTS/NOT IN clause."},
        {"desc": "Ranking/Positioning", "sql_hint": "Requires Window Functions (RANK(), DENSE_RANK(), or ROW_NUMBER())."},
        {"desc": "Temporal Gap / Sequence", "sql_hint": "Requires LEAD() or LAG() window functions, or a Self-Join on timestamps."},
        {"desc": "Set Theory Difference", "sql_hint": "Requires using EXCEPT or a combination of INTERSECT and subqueries."},
        {"desc": "Conditional Aggregation", "sql_hint": "Requires using CASE WHEN inside a SUM() or COUNT() function."},
        {"desc": "Running Totals / Cumulative", "sql_hint": "Requires a Window Function with an 'ORDER BY' in the OVER clause."}
    ]
    
    # 3. EXPANDED DYNAMIC LOGIC POOL
    logic_pool = [
        "Find users whose average transaction amount is higher than the average for their entire branch.",
        "Identify the city where the currency exchange diversity (number of unique currencies) is highest.",
        "Find users who have a 'LOGIN' audit log but have 0 associated accounts.",
        "List accounts that have more withdrawals than deposits in terms of volume (sum), but fewer in terms of count.",
        "Identify users who updated their 'EMAIL' and then added a 'CARD' within the same 24-hour period.",
        "Find the 'Whale' users: those whose balance is in the top 5% of their specific country.",
        "Find 'Dormant Accounts': users who haven't had a transaction in the last 6 months but have a balance > $val_b.",
        "Identify IPs in the AuditLog that have successfully logged into more than {val_a} different accounts.",
        "Find the branch that has the highest ratio of 'Debit' to 'Credit' cards issued.",
        "Identify users who have accounts in more than {val_a} different currencies.",
        "Calculate the month-over-month growth of total deposits for the year 2025.",
        "Find users who have updated their LAST_NAME and FIRST_NAME in the same session (within 5 minutes).",
        "Identify countries where the average account balance is below the global average.",
        "Find users who have a 'Withdraw' larger than {val_a}% of their current total balance.",
        "List the top 3 cities in each country by the number of 'Checking' accounts.",
        "Detect 'Round-Trip' transactions: deposits followed by a withdraw of the exact same amount within 1 hour."
    ]

    selected_path = random.choice(paths)
    selected_struct = random.choice(structures)
    selected_logic = random.choice(logic_pool)
    
    # Random parameters for uniqueness
    val_a = random.choice([2, 5, 10, 15, 25])
    val_b = random.choice([500, 1000, 5000, 10000, 25000])

    # Final logic formatting
    mission = selected_logic.replace('{val_a}', str(val_a)).replace('{val_b}', str(val_b))
    mission = mission.replace('5%', f'{val_a}%').replace('24-hour', f'{val_a}-hour')

    prompt = f"""
    --- üèÜ ADVANCED CHALLENGE FOR STUDENT {student_id} ---
    
    CONTEXTUAL PATH: {selected_path}
    TECHNICAL FOCUS: {selected_struct['desc']}
    
    MISSION:
    {mission}
    
    SPECIFIC CONSTRAINTS:
    - Use a minimum threshold of ${val_b} where applicable.
    - {selected_struct['sql_hint']}
    - Output must be scannable: use clear column aliases (e.g., 'total_volume', 'user_ranking').
    - Only consider data updated/created in the year 2025.
    - Result must be ordered by {random.choice(['the primary metric desc', 'username asc', 'timestamp desc'])}.
    """
    return prompt

In [108]:
print(generate_complex_exercise(101))


    --- üèÜ ADVANCED CHALLENGE FOR STUDENT 101 ---

    CONTEXTUAL PATH: Country ‚Üî City ‚Üî User ‚Üî Account (Demographic wealth)
    TECHNICAL FOCUS: Ranking/Positioning

    MISSION:
    Find users who have updated their LAST_NAME and FIRST_NAME in the same session (within 5 minutes).

    SPECIFIC CONSTRAINTS:
    - Use a minimum threshold of $500 where applicable.
    - Requires Window Functions (RANK(), DENSE_RANK(), or ROW_NUMBER()).
    - Output must be scannable: use clear column aliases (e.g., 'total_volume', 'user_ranking').
    - Only consider data updated/created in the year 2025.
    - Result must be ordered by timestamp desc.
    
