### Importing Necessary Files


In [31]:
import os
import random
import time
import argparse
from datetime import timezone
import logging

import yaml
import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values
from faker import Faker

In [32]:
# ---------- Logging setup ----------
logging.basicConfig(
    level=logging.INFO,  # INFO level shows success messages
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("db_connection.log"),  # logs to file
        logging.FileHandler("data_masking.log"), 
        logging.FileHandler("data_population.log"), # logs to file
        logging.StreamHandler()                    # also prints to console
    ]
)

In [29]:
# ---------- Configuration loading ----------
def load_config(yaml_path="dbconfig.yaml"):
    """
    Load database configuration from a YAML file only.
    The YAML must define: host, port, user, password, dbname.
    Example:
        host: localhost
        port: 5432
        user: myuser
        password: mypassword
        dbname: rushmore_db
    """
    if not os.path.exists(yaml_path):
        logging.error(f"Configuration file '{yaml_path}' not found.")
        raise FileNotFoundError(f"Configuration file '{yaml_path}' not found.")

    with open(yaml_path, "r", encoding="utf-8") as f:
        cfg = yaml.safe_load(f) or {}

    required_keys = ["host", "port", "user", "password", "dbname"]
    missing = [k for k in required_keys if k not in cfg or cfg[k] in (None, "")]
    if missing:
        logging.error(f"Missing required keys in {yaml_path}: {missing}")   
        raise ValueError(f"Missing required keys in {yaml_path}: {missing}")
    
    logging.info(f"Configuration loaded successfully from '{yaml_path}'.")
    return cfg


# ---------- Database connection ----------
def get_conn(cfg):
    """
    Establish a PostgreSQL connection using psycopg2 and the provided config dictionary.
    """
    try:
        conn = psycopg2.connect(
            host=cfg["host"],
            port=cfg["port"],
            user=cfg["user"],
            password=cfg["password"],
            dbname=cfg["dbname"]
        )
        logging.info(
            f"Database connection established successfully to '{cfg['dbname']}' at host '{cfg['host']}'."
        )
        return conn
    except psycopg2.Error as e:
        logging.error(f"Database connection failed: {e}")
        raise

# ---------- Connection Test ----------
if __name__ == "__main__":
    try:
        cfg = load_config("dbconfig.yaml")
        conn = get_conn(cfg)
        conn.close()
        logging.info("Connection test completed successfully — database reachable.")
    except Exception as e:
        logging.exception(f"Error during connection test: {e}")

2025-11-10 21:50:45,856 [INFO] Configuration loaded successfully from 'dbconfig.yaml'.
2025-11-10 21:50:46,232 [INFO] Database connection established successfully to 'rushmore-db' at host 'rushmore-db.postgres.database.azure.com'.
2025-11-10 21:50:46,237 [INFO] Connection test completed successfully — database reachable.


In [33]:
# ---------- Helpers for masking ----------
def mask_email(email: str) -> str:
    """
    Simple masking: keep first char of local part and domain intact,
    replace rest of local part with asterisks.
    Example: john.smith@example.com -> j*******@example.com
    """
    if not email or '@' not in email:
        logging.warning(f"Invalid or missing email provided for masking: {email}")
        return email
    
    local, domain = email.split('@', 1)
    if len(local) <= 2:
        masked_local = local[0] + "*"*(len(local)-1)
    else:
        masked_local = local[0] + "*"*(len(local)-1)

    masked_email = f"{masked_local}@{domain}"
    logging.info(f"Masked email: {email} -> {masked_email}")
    return masked_email

def mask_phone(phone: str) -> str:
    """
    Keep last 4 digits, replace other characters with * (simple).
    Non-digits preserved for readability but masked in digits.
    """
    if not phone:
        logging.warning("Empty or No phone number provided for masking.")
        return phone
    
    digits = [c for c in phone if c.isdigit()]
    if not digits:
        logging.warning(f"No digits found in phone number: {phone}")
        return phone
    
    if len(digits) <= 4:
        masked_digits = "*" * len(digits)
    else:
        last4 = ''.join(digits[-4:])
        masked_digits = "*" * (len(digits) - 4) + last4

    logging.info(f"Masked phone: {phone} -> {masked_digits}")
    return masked_digits


# ---------- Optional Test ----------
if __name__ == "__main__":
    test_email = "john.smith@example.com"
    test_phone = "+44 7123 456789"

    logging.info("Starting masking test...")
    mask_email(test_email)
    mask_phone(test_phone)
    logging.info("Masking test completed successfully.")

2025-11-10 22:15:19,069 [INFO] Starting masking test...
2025-11-10 22:15:19,072 [INFO] Masked email: john.smith@example.com -> j*********@example.com
2025-11-10 22:15:19,073 [INFO] Masked phone: +44 7123 456789 -> ********6789
2025-11-10 22:15:19,074 [INFO] Masking test completed successfully.


In [None]:
# ---------- Data creation functions ----------
def create_stores(cur, faker, num_stores=5):
    logging.info(f"Starting to create {num_stores} stores...")
    stores = []
    for i in range(num_stores):
        address = faker.address().replace('\n', ', ')
        city = f"{faker.city()} RushMore Pizzeria"
        phone_number = faker.phone_number()
        opened_at = faker.date_time_this_decade(tzinfo=timezone.utc)
        stores.append((address, city, phone_number, opened_at))

    insert = "INSERT INTO stores (address, city, phone_number, opened_at) VALUES %s RETURNING id"
    try:
        execute_values(cur, insert, stores)
        ids = [row[0] for row in cur.fetchall()]
        logging.info(f"Inserted {len(ids)} stores successfully.")
        return ids
    except Exception as e:
        logging.exception(f"Error inserting stores: {e}")
        raise

def create_customers(cur, faker, num_customers=1000):
    logging.info(f"Starting to create {num_customers} customers...")
    customers = []
    for _ in range(num_customers):
        first_name = faker.first_name()
        last_name = faker.last_name()
        email = faker.unique.email()
        phone_number = faker.phone_number()
        created_at = faker.date_time_this_year(tzinfo=timezone.utc)
        customers.append((first_name, last_name, email, phone_number, created_at))

    insert = ("INSERT INTO customers (first_name, last_name, email, phone_number, created_at) "
              "VALUES %s RETURNING id")

    ids = []
    batch_size = 1000
    try:
        for i in range(0, len(customers), batch_size):
            chunk = customers[i:i+batch_size]
            execute_values(cur, insert, chunk)
            ids.extend([row[0] for row in cur.fetchall()])
            logging.info(f"Inserted customers: {len(ids)}/{len(customers)}")
        logging.info(f"Inserted {len(ids)} customers successfully.")
        return ids
    except Exception as e:
        logging.exception(f"Error inserting customers: {e}")
        raise


def create_ingredients(cur, faker, num_ingredients=50):
    logging.info(f"Starting to create {num_ingredients} ingredients...")
    ing = []
    basic = ['Tomato', 'Cheese', 'Pepperoni', 'Mushroom', 'Basil',
             'Chicken', 'Onion', 'Peppers', 'Olive Oil', 'Garlic',
             'Dough', 'Sausage', 'Spinach', 'Feta', 'Pineapple',
             'Ham', 'Bacon', 'Jalapeno', 'Corn', 'BBQ Sauce']
    
    # Generate ingredient data
    for i in range(num_ingredients):
        name = random.choice(basic) + ("" if i < len(basic) else f" {faker.word()}")
        stock_quantity = random.randint(10, 500)
        unit = random.choice(['grams', 'ml', 'pieces'])
        ing.append((name, stock_quantity, unit))

    insert = "INSERT INTO ingredients (name, stock_quantity, unit) VALUES %s RETURNING ingredient_id"
    try:
        execute_values(cur, insert, ing)
        ids = [row[0] for row in cur.fetchall()]
        logging.info(f"Inserted {len(ids)} ingredients successfully.")
        return ids
    except Exception as e:
        logging.exception(f"Error inserting ingredients: {e}")
        raise

def create_menu_items(cur, faker, num_items=30):
    logging.info(f"Starting to create {num_items} menu items...")
    items = []
    categories = ['Classic', 'Vegetarian/Vegan', 'Gourmet/Special', 'Meat Lovers', 'Seafood', 'Deluxe']
    sizes = ['Small', 'Medium', 'Large', 'Family']

    for _ in range(num_items):
        name = f"{faker.word().capitalize()} {random.choice(['Margherita', 'Pepperoni Feast', 'Hawaiian', 'Four Cheese', 'Spinach & Feta', 'BBQ Chicken', 'Veggie Delight', 'Meat Supreme', 'Seafood Special'])}"
        category = random.choice(categories)
        size = random.choice(sizes)
        price = round(random.uniform(5.0, 25.0), 2)
        items.append((name, category, size, price))

    insert = "INSERT INTO menu_items (name, category, size, price) VALUES %s RETURNING id"
    try:
        execute_values(cur, insert, items)
        ids = [row[0] for row in cur.fetchall()]
        logging.info(f"Inserted {len(ids)} menu items successfully.")
        return ids
    except Exception as e:
        logging.exception(f"Error inserting menu items: {e}")
        raise