In [None]:
# Not actually ran here but included because it's what I used to make some of our the files for our base 
# mock data/initial denormalized tables (mainly the OLTP product catalog at it's related tables)

# we generate our product catalog tables by extracting and transforming the top 500 results from a 
# public boardgame dataset which can be found at: https://www.kaggle.com/datasets/jvanelteren/boardgamegeek-reviews
# whose original source datasets can be found at https://github.com/beefsack/bgg-ranking-historicals

import polars as pl
import numpy as np
from datetime import datetime
from pathlib import Path
import uuid

# Create a UUID object from our seed string to act as the base namespace
NAMESPACE_SEED = 'com.gcp.dagster.analytics.sandbox.generation.v1'
BASE_NAMESPACE = uuid.uuid5(uuid.NAMESPACE_DNS, NAMESPACE_SEED)
# further differentiated namespaces
NAMESPACE_PRODUCTS = uuid.uuid5(BASE_NAMESPACE, 'boardgame_products')
NAMESPACE_AUTHORS = uuid.uuid5(BASE_NAMESPACE, 'boardgame_authors')
NAMESPACE_SUPPLIERS = uuid.uuid5(BASE_NAMESPACE, 'boardgame_suppliers')
NAMESPACE_PUBLISHERS = uuid.uuid5(BASE_NAMESPACE, 'boardgame_publishers')
NAMESPACE_PROMOTIONS = uuid.uuid5(BASE_NAMESPACE, 'boardgame_promotions')
NAMESPACE_CHANNELS = uuid.uuid5(BASE_NAMESPACE, 'boardgame_channels')
NAMESPACE_PAYMENT_TYPES = uuid.uuid5(BASE_NAMESPACE, 'boardgame_payment_types')

# assuming you downloaded the csv to current directory
original_games = pl.read_csv(Path.cwd().joinpath("games_detailed_info2025.csv"))
# pick only the fields we can make use of
original_games = original_games.select([
    "id",
    "name",
    "description",
    "yearpublished",
    "thumbnail",
    "image",
    "boardgamecategory",
    "boardgamedesigner",
    "boardgamepublisher",
    "minage",
    "minplaytime",
    "maxplaytime",
    "minplayers",
    "maxplayers",
    "Board Game Rank",
    "usersrated",
    "average"])

# pick the top 500 boardgames by populatity/number of user ratings
original_games = original_games.top_k(500, by="usersrated")

custom_topk = original_games.rename({
    "id": "boardgamegeek_id",
    "Board Game Rank": "rating_rank",                            
    "boardgamecategory": "category",
    "boardgamepublisher": "publishers",
    "yearpublished": "year_published",
    "boardgamedesigner": "authors",
    })

set_date = datetime(2025,8,1).strftime("%Y-%m-%d %H:%M:%S")

#setting base msrp between $20-60
random_msrp = np.random.uniform(20.00, 60.00, size=len(custom_topk))
random_msrp = np.round(random_msrp, 4)
custom_topk = custom_topk.with_columns(
    pl.Series("msrp", random_msrp)
)
custom_topk = custom_topk.with_columns([
    (pl.col("msrp")*0.5).alias("cost_price"),
    pl.col("msrp").alias("unit_price"), 
    pl.lit("Active").alias("status"), 
    pl.lit(set_date).alias("date_added"), 
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
    ]).with_row_index("product_id", 1)

# using regular expressions to parse string columns into list of strings, would recommend using json_decode() 
# if conditions permit (not possible here because some fields have multiple leading qoutation marks)
custom_topk = custom_topk.with_columns(
    pl.col("category").str.extract_all(r"'([^']*)'")
    .list.eval(pl.element().str.strip_chars("'")).alias("category"), #strip_chars() only replaces leading and trailing chars
    pl.col("authors").str.extract_all(r"'([^']*)'")
    .list.eval(pl.element().str.strip_chars("'")).alias("authors"),
    pl.col("publishers").str.extract_all(r"'([^']*)'")
    .list.eval(pl.element().str.strip_chars("'")).alias("publishers"),
)

# I used ML classification to seperate all the products into 4 suppliers based on product descriptions and
# categories and the classification essentially outputted a bridge table, but there is currently no 
# many-to-many relationship between suppliers and products in our example logic so, avoiding a 
# premature/unnecessary bridge table, the classification table is going to be joined back into the products
 
classification_filepath = Path.cwd().joinpath("product_suppliers_rationale.json")
join_supplier = pl.read_json(classification_filepath).select("product_id", "supplier_id")
custom_topk = custom_topk.join(join_supplier, on="product_id")

# but we also could have just randomly split the products into different suppliers instead, e.g.:
# custom_topk = custom_topk.with_columns(pl.int_uniform(1, 4).alias("supplier_id"))
# by uncommenting the above line and commenting out the earlier load classification & join lines 

# optional: split off the supplier_id and product_id as a bridge table
custom_topk.select([
    "supplier_id",
    "product_id",
    "date_added",
    "date_removed",
    "date_modified",
    "date_deleted",]).write_parquet(Path.cwd() / "mock_data" / "suppliers_products.parquet")

# reverse-engineer a normalized publisher table and bridge from the denormalized product table
explode_publishers = custom_topk.select(["product_id","publishers"])
# explode the nested fields in the publishers column with their product_id for joining on publisher name
explode_publishers = explode_publishers.explode("publishers")
# clean and filter out some malformed inputs, based on looking through the data
explode_publishers = explode_publishers.with_columns(
    pl.col("publishers").str.strip_chars(' ,"')
    .str.replace_all(r'", "', " ")
    .str.replace_all(r"^s ", "")
    .alias("publishers")
).filter(
    ~((pl.col("publishers").str.starts_with("t ")) |
      (pl.col("publishers") == "s") |
      (pl.col("publishers") == "")
    )
)
# find unique publishers and assign them an publisher_id
unique_publishers = explode_publishers.select(
    pl.col("publishers").unique()
).sort("publishers").with_row_index("publisher_id", 1)
# join exploded products +publisher table and unique publishers to build a normalized bridge table
unique_publishers = unique_publishers.rename({"publishers": "publisher_name"})
explode_publishers = explode_publishers.rename({"publishers": "publisher_name"})
explode_publishers = explode_publishers.join(unique_publishers, on="publisher_name").sort("publisher_id")
explode_publishers = explode_publishers.select(["product_id", "publisher_id"]).with_columns(
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
)
# create an uuid column for the tables that make sense having a uuid
unique_publishers = unique_publishers.with_columns(
    pl.col("publisher_id").cast(pl.String) # Cast the integer index to a string
            .map_elements(
                lambda idx_str: str(uuid.uuid5(NAMESPACE_PUBLISHERS, idx_str)),
                return_dtype=str
            ).alias("publisher_uuid"),
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
)
# write out files as parquet
unique_publishers.write_parquet(Path.cwd() / "mock_data" / "publishers.parquet")
explode_publishers.write_parquet(Path.cwd() / "mock_data" / "publishers_products.parquet")

# reverse-engineer a normalized category table and bridge table from the denormalized product table
explode_categories = custom_topk.select(["product_id","category"])
explode_categories = explode_categories.explode("category")
# clean and filter out some malformed inputs
explode_categories = explode_categories.with_columns(
    pl.col("category").str.strip_chars(' ,"')
).filter(~(
    (pl.col("category") == "") |
    (pl.col("category").str.starts_with("s "))
))
# find unique categories and assign them an id
unique_categories = explode_categories.select(
    pl.col("category").unique()
).sort("category").with_row_index("category_id", 1)
# explode the nested fields in the categories column with their product_id for joining on category name
# to build a normalized bridge table with category_id
explode_categories = explode_categories.join(unique_categories, on="category").sort("category_id").select("category_id","product_id").with_columns(
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
)
# write out files as parquet
unique_categories = unique_categories.rename({"category": "category_name"}).with_columns(
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
)
unique_categories.write_parquet(Path.cwd() / "mock_data" / "categories.parquet")
explode_categories.write_parquet(Path.cwd() / "mock_data" / "categories_products.parquet")

# doing the same as above with authors
# reverse-engineer a normalized authors table and bridge from the denormalized product table
explode_authors = custom_topk.select(["product_id","authors"])
explode_authors = explode_authors.explode("authors")
# clean and filter out some malformed inputs
explode_authors = explode_authors.with_columns(
    pl.col("authors").str.strip_chars(' ,"')
).filter(~(pl.col("authors") == ""))
# explode the nested fields in the authors column to find unique authors and assign them an id
unique_authors = explode_authors.select(
    pl.col("authors").unique()
).sort("authors").with_row_index("author_id", 1)
# explode the nested fields in the authors column with their product_id for joining on authors name
# to build a normalized bridge table with authors_id
explode_authors = explode_authors.join(unique_authors, on="authors").sort("author_id").select("author_id","product_id").with_columns(
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
)
unique_authors = unique_authors.rename({"authors": "author_name"})
# create an uuid column for the tables that make sense having a uuid
unique_authors = unique_authors.with_columns(
    pl.col("author_id").cast(pl.String) # Cast the integer index to a string
            .map_elements(
                lambda idx_str: str(uuid.uuid5(NAMESPACE_AUTHORS, idx_str)),
                return_dtype=str
            ).alias("author_uuid"),
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
)
# write out files as parquet
unique_authors.write_parquet(Path.cwd() / "mock_data" / "authors.parquet")
explode_authors.write_parquet(Path.cwd() / "mock_data" / "authors_products.parquet")

# create an uuid column for the tables that make sense having a uuid
custom_topk = custom_topk.with_columns(
    pl.col("product_id").cast(pl.String) # Cast the integer index to a string
            .map_elements(
                lambda idx_str: str(uuid.uuid5(NAMESPACE_PRODUCTS, idx_str)),
                return_dtype=str
            ).alias("product_uuid")
)

# order columns
custom_topk = custom_topk.select([
    "product_id",
    "product_uuid",
    "name",
    "description",
    "year_published",
    # "supplier_id", # splitting it out as a bridge table
    "boardgamegeek_id",
    "minage",
    "minplaytime",
    "maxplaytime",
    "minplayers",
    "maxplayers",
    "rating_rank",
    "usersrated",
    "average",
    "thumbnail",
    "image",
    "cost_price",
    "msrp",
    "unit_price",
    "status",
    "date_added",
    "date_removed",
    "date_modified",
    "date_deleted",
])

output_filepath = Path.cwd() / "mock_data" / "products"
custom_topk.write_parquet(f"{output_filepath}.parquet")
# custom_topk.write_json(f"{output_filepath}.json")
# custom_topk.write_csv(f"{output_filepath}.csv")

# come up with a few fictional suppliers, I just wrote them out manually to ensure they're cohesive but you 
# could use the Faker library to generate most of the fields, but if you go that route I'd recommend generating the 
# whole address at once (don't forget to set different locales/country for your Faker instances) then parse the 
# individual fields like province, city, street_address, postal code from the full address as opposed to
# generating each of those individually as Faker doesn't remember the province when generating the city, city
# for street address, etc. - so it will not be cohesive

supplier_name = ["Euro-Stratego Distribution", "American Adventures Inc.", "Party Game Emporium", "Classic & Abstract Co."]
# description was used for product->supplier classification, but not relevant if you just randomly assigned 
# products to suppliers
description = [
    "Specializes in deep, strategic Eurogames focusing on economics, city-building, and resource management.",
    "Focuses on highly thematic games with strong player interaction, miniatures, and adventure elements.",
    "Supplies light, social, and family-friendly games perfect for larger groups and casual players.",
    "Distributes timeless abstract strategy games, puzzle games, and updated classics for all ages."
]
country = ["Germany", "USA", "Canada", "UK"]
cities = ["Hamburg", "Long Beach", "Vancouver", "London" ]
provinces = ["Hamburg", "California", "British Columbia", "London"]
addresses = [
    "Lagerhaus-Straße 17",
    "789 Ocean Gateway Blvd",
    "456 Waterfront St",
    "Unit 12, Thames Gateway Park, River Road, Barking"
]
postal_code = ["21129","90802","V6C 3T4","IG11 0JG"]
phone_number = ["+49 40 12345678","+1 (562) 555-0187","+1 (604) 555-0123","+44 20 7946 0831"]
names = ["Klaus Müller","Jennifer Chen","David Singh","Eleanor Davies"]
email = ["k.muller@hamburg-logistik.de","j.chen@pacificgatewaylogistics.com","david.singh@vancouverportalservices.ca","eleanor.davies@thamesfreight.co.uk"]


suppliers_dict = {
    "supplier_uuid": [str(uuid.uuid5(NAMESPACE_SUPPLIERS, str(i))) for i in range(1,len(supplier_name)+1)],
    "supplier_name" : supplier_name,
    "description": description,
    "country": country,
    "city": cities,
    "province": provinces,
    "address": addresses,
    "postal_code": postal_code,
    "contact_name": names,
    "contact_number": phone_number,
    "contact_email": email,
    "date_added": [set_date] * len(supplier_name),
    "date_removed": [None] * len(supplier_name),
    "date_modified": [set_date] * len(supplier_name),
    "date_deleted": [None] * len(supplier_name)
}

# create an uuid column for the tables that make sense having a uuid
suppliers = pl.DataFrame(suppliers_dict).with_row_index("supplier_id",1)
suppliers.write_parquet(Path.cwd() / "mock_data" / "suppliers.parquet")
# suppliers.write_csv(Path.cwd() / "mock_data" / "suppliers.csv")

description_schema = ["promotion_uuid","promotion_name","promotion_code","description"]
promo_descriptions = [
    ("5 Off Item", "5OFFITEM", "Get $5.00 off a single item."),
    ("10 Off Item Over 50", "10OFF50ITEM", "Get $10.00 off a single item with a price of $50.00 or more."),
    ("10% Off Item", "10PERCENTITEM", "Get 10% off a single item."),
    ("15% Off Item", "15PERCENTITEM", "Get 15% off a single item."),
    ("20% Off Top 10 products from 2024 ", "20PCTTOP2024", "Get 20% off items from the bestsellers from 2024."),
    ("Buy One Get One 50% Off", "BOGO50PCT", "Buy one item, get the second item of equal or lesser value for 50% off."),
    ("Buy One Get One Free on Minatures", "BOGOFREEMINI", "Buy one item from the Minatures group, get a second item from the group of equal or lesser value for free."),
    ("Select Items for 20", "PARTY20", "Get Party Games products for a fixed price of $20.00."),
    ("15 Off Orders Over 100", "TOTAL15FLAT", "Get $15.00 off your total order of $100.00 or more."),
    ("15% Off Orders Over 100", "TOTAL15PCT", "Get 15% off your total order of $100.00 or more."),
]
# made the initial list a list of tuples, before realiziing I'd probably mutate the fields
promo_descriptions = [list(item) for item in promo_descriptions]

for i,e in enumerate(promo_descriptions):
    e.insert(0, str(uuid.uuid5(NAMESPACE_PROMOTIONS, str(i))))


promo_schema = ["discount_level","discount_pool","stackable", "priority", "discount_type",
                 "discount_value", "min_quantity", "min_subtotal", "promotion_group_id"]
# level, pool, stackable, priority, type, value, min_quantity, min_subtotal, promo_group
promotions_core = [
    ### line items level
    # basic flat discount 
    ("LINE_ITEM", "FLAT", True, 10, "FLAT", 5.00, 1, 0.00, None),
    ("LINE_ITEM", "FLAT", True, 10, "FLAT", 10.00, 1, 50.00, None),
    # basic percentage discounts
    ("LINE_ITEM","PERCENT", True, 20, "PERCENT", 10.00, 1, 0.00, None),
    ("LINE_ITEM","PERCENT", True, 20, "PERCENT", 15.00, 1, 0.00, None),
    # percentage based on product group
    ("LINE_ITEM","PERCENT", True, 20, "PERCENT", 20.00, 1, 0.00, 1),
    # basic BOGO 50% off
    ("LINE_ITEM", 'BOGO', True, 10, "PERCENT", 50.00, 2, 0.00, None),
    # BOGO free based on product group
    ("LINE_ITEM", 'BOGO', True, 10, "PERCENT", 100.00, 2, 0.00, 2),
    # fixed price discount, based on product group
    ("LINE_ITEM", "FIXED", True, 10, "FIXED", 20.00, 1, 0.00, 3),
    ### transaction/order total level
    # basic flat, req. min $100 subtotal
    ("ORDER", "FLAT", True, 30, "FLAT", 15.00, 1, 100.00, None),
    # basic percentage, req. min $100 subtotal
    ("ORDER","PERCENT", True, 30, "PERCENT", 15.00, 1, 100.00, None),
]
# made the initial list a list of tuples, before realiziing I'd probably mutate the fields
promotions_core = [list(item) for item in promotions_core]
combined_promos_schema = description_schema
combined_promos_schema.extend(promo_schema)
combined_promos = []
for i,_ in enumerate(promo_descriptions):
    combined_promos.append(promo_descriptions[i] + promotions_core[i])

promos_core = pl.DataFrame(combined_promos, schema=combined_promos_schema,orient="row").with_row_index("promotion_id", 1).with_columns(
    pl.lit("Active").alias("status"),
    pl.lit(set_date).alias("start_date"),
    pl.lit(None).alias("end_date"),
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
)
# promos_core.cast({"promotion_group_id": pl.UInt32})
promos_core.write_parquet(Path.cwd() / "mock_data" / "promotions.parquet")

# name, type, description, category_id
promo_groups_schema = ["group_name","group_type","description","category_id"]
promo_groups = [
    ("Top 10 boardgames of the past year (2024)", "Custom", "Contains a curated list of the top 10 selling products of the previous year (2024)", None),
    ("Minatures","Category","Products that are classified under the Minatures category",37),
    ("Party Games", "Category", "Products that are classified under the Party Games category",47),
]
promo_groups = pl.DataFrame(promo_groups,schema=promo_groups_schema,orient="row").with_row_index("promotion_group_id",1).with_columns(
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
)
promo_groups.write_parquet(Path.cwd() / "mock_data" / "promotion_groups.parquet")

promotion_groups_products = {
    "promotion_group_id" : [1]*10,
    "product_id" : list(range(1,11)),
    "date_added" : [set_date]*10,
    "date_removed" : [None]*10,
    "date_modified" : [set_date]*10,
    "date_deleted" : [None]*10,
}

pl.from_dict(promotion_groups_products).write_parquet(Path.cwd() / "mock_data" / "promotion_groups_products.parquet")

channel_schema = ["channel_uuid","channel_name","channel_type","description"]
channels = [
    ["Main Website", "Online", "The primary e-commerce website for all global sales and customer engagement."],
    ["Flagship Store - Downtown", "In-Store", "The main physical retail brick-and-mortar location in the city center."],
    ["Official Mobile App","Mobile App","The official shopping application for both iOS and Android platforms."],
    ["Amazon Marketplace US","Third-Party","Products sold through the Amazon US third-party marketplace platform"]
]
for i, e in enumerate(channels):
    e.insert(0,str(uuid.uuid5(NAMESPACE_CHANNELS, str(i))))

channels_df = pl.DataFrame(channels, schema=channel_schema, orient="row").with_row_index("channel_id",1).with_columns(
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
).write_parquet(Path.cwd() / "mock_data"/ "channels.parquet")

payment_types_schema = ["payment_type_uuid","type_name","description"]
payment_types = [
    ["Credit Card", "Payment made with a major credit card network (e.g., Visa, Mastercard, American Express)."],
    ["Debit Card", "Payment made with a bank-issued debit card, drawing funds directly from a checking account."],
    ["Gift Card", "Payment made using a prepaid gift card issued by the retailer. The transaction deducts from the card's balance."],
    ["PayPal", "Payment processed through the PayPal digital wallet service."],
    ["Digital Wallet", "Contactless payment made using a digital wallet service (Apple Pay, Google Pay, Samsung Wallet, etc.)."],
    ["Cash", "Payment made with physical currency (banknotes and coins), typically used for in-store transactions."],
    ["Klarna", "A 'Buy Now, Pay Later' service that allows customers to pay for their purchase in installments."],
    ["Store Credit", "Credit applied to a customer's account, typically from a product return, used as full or partial payment."],
]
for i, e in enumerate(payment_types):
    e.insert(0,str(uuid.uuid5(NAMESPACE_PAYMENT_TYPES, str(i))))

payment_types_df = pl.DataFrame(payment_types,schema=payment_types_schema, orient="row").with_row_index("payment_type_id",1).with_columns(
    pl.lit(set_date).alias("date_added"),
    pl.lit(None).alias("date_removed"),
    pl.lit(set_date).alias("date_modified"),
    pl.lit(None).alias("date_deleted"),
).write_parquet(Path.cwd() / "mock_data" / "payment_types.parquet")


