<a href="https://colab.research.google.com/github/ryan-miles/stellationharness/blob/main/SynthSalesDataGenerator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install faker



In [None]:
import json
import random
from faker import Faker
from datetime import datetime, timedelta

fake = Faker()

def random_items():
    items = []
    for _ in range(random.randint(1, 5)):
        items.append({
            # Increase the number of random characters in the SKU pattern
            # This provides a much larger pool of unique values
            "sku": fake.unique.bothify(text="SKU######"), # Changed from SKU#### to SKU######
            "name": fake.word().capitalize(),
            "qty": random.randint(1, 5),
            "price": round(random.uniform(5, 150), 2)
        })
    # Clear the unique provider for 'sku' generation after each batch of items.
    # This helps prevent the UniquenessException when generating many orders,
    # especially if SKUs don't need to be globally unique across ALL orders,
    # but rather unique within a large dataset.
    # If SKUs MUST be globally unique, consider a much larger pattern or a different approach.
    # However, for generating sample data, clearing unique is often sufficient.
    fake.unique.clear()
    return items

def random_payment():
    method = random.choice(["credit_card", "paypal", "apple_pay", "bank_transfer"])
    payment = {"method": method}
    if method == "credit_card":
        payment["last4"] = fake.credit_card_number()[-4:]
    elif method == "paypal":
        payment["email"] = fake.email()
    elif method == "bank_transfer":
        payment["account"] = fake.bban()
    return payment

def random_order():
    has_gift = random.choice([True, False])
    order = {
        # Using a larger pattern for order_id as well, just in case
        "order_id": fake.unique.bothify(text="ORD########"), # Changed from ORD##### to ORD########
        "customer": {
            # Using a larger pattern for customer_id as well
            "customer_id": fake.unique.bothify(text="CUST######"), # Changed from CUST#### to CUST######
            "name": fake.name(),
            "email": fake.email()
        },
        "items": random_items(),
        "order_date": (datetime.utcnow() - timedelta(days=random.randint(0, 365))).isoformat() + "Z",
        "status": random.choice(["processing", "shipped", "delivered", "canceled"]),
        "shipping_address": {
            "line1": fake.street_address(),
            "city": fake.city(),
            "state": fake.state_abbr(),
            "zip": fake.zipcode()
        },
        "payment": random_payment()
    }
    if has_gift:
        order["gift_message"] = fake.sentence()

    # Clear the unique provider for order and customer IDs after each order.
    # Similar to clearing SKUs, this is useful if global uniqueness across all 20,000 orders
    # isn't strictly required for the generated sample data.
    fake.unique.clear()
    return order


def generate_orders(n=1000):
    return [random_order() for _ in range(n)]

if __name__ == "__main__":
    orders = generate_orders(20000) # Changed from 1000 to 20000
    with open("orders.json", "w") as f:
        json.dump(orders, f, indent=2)

In [None]:
import json

# Define the input and output filenames
input_file_path = "orders.json"
output_file_path = "10orders.json"

try:
    # Open and read the input JSON file
    with open(input_file_path, 'r') as infile:
        orders_data = json.load(infile)

    # Select the top 10 records
    top_10_orders = orders_data[:10]

    # Open the output file in write mode
    with open(output_file_path, 'w') as outfile:
        # Write the top 10 orders to the new JSON file
        json.dump(top_10_orders, outfile, indent=2)

    print(f"Successfully extracted the top 10 orders and saved to '{output_file_path}'")

except FileNotFoundError:
    print(f"Error: The input file '{input_file_path}' was not found. Make sure 'orders.json' exists.")
except json.JSONDecodeError:
    print(f"Error: Could not decode JSON from '{input_file_path}'. Check if the file contains valid JSON.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

# The '10orders.json' file is now created with the first 10 records

Successfully extracted the top 10 orders and saved to '10orders.json'


In [None]:
import json

# Define the input and output filenames
input_file_path = "orders.json"
# Change the output file name to reflect the number of orders saved
output_file_path = "10000orders.json"
# Define how many orders to extract
num_orders_to_extract = 10000

try:
    # Open and read the input JSON file
    with open(input_file_path, 'r') as infile:
        orders_data = json.load(infile)

    # Select the top 'num_orders_to_extract' records
    # Ensure we don't try to extract more orders than are in the file
    top_orders = orders_data[:min(num_orders_to_extract, len(orders_data))]

    # Open the output file in write mode
    with open(output_file_path, 'w') as outfile:
        # Write the selected orders to the new JSON file
        json.dump(top_orders, outfile, indent=2)

    print(f"Successfully extracted the top {len(top_orders)} orders and saved to '{output_file_path}'")

except FileNotFoundError:
    print(f"Error: The input file '{input_file_path}' was not found. Make sure 'orders.json' exists.")
except json.JSONDecodeError:
    print(f"Error: Could not decode JSON from '{input_file_path}'. Check if the file contains valid JSON.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

# The '10000orders.json' file is now created with the first 10,000 (or fewer) records

Successfully extracted the top 10000 orders and saved to '10000orders.json'


In [None]:
import json
import random
from faker import Faker
from datetime import datetime, timedelta

fake = Faker()

# Load orders from the new file
# Change the input file name
with open("10000orders.json") as f:
    orders = json.load(f)

def random_fulfillment(order):
    # Randomly decide to fulfill in 1 or 2 sales
    splits = 1 if random.random() < 0.7 else 2
    items = order["items"].copy()
    fulfilled_batches = []
    for _ in range(splits):
        if not items:
            break
        batch = []
        for item in items[:]:
            # Randomly decide how much of each item is fulfilled in this batch
            qty = item["qty"] if splits == 1 else random.randint(1, item["qty"])
            batch.append({**item, "qty": qty})
            # Decrement or remove item
            if qty == item["qty"]:
                items.remove(item)
            else:
                item["qty"] -= qty
        fulfilled_batches.append(batch)
    return fulfilled_batches

def random_sale(order, batch, idx):
    status = random.choices(
        ["completed", "pending", "canceled", "refunded"],
        weights=[0.8, 0.1, 0.05, 0.05]
    )[0]
    # Sale date after order date
    order_dt = datetime.fromisoformat(order["order_date"].replace("Z", ""))
    sale_dt = order_dt + timedelta(hours=random.randint(1, 120))
    # Amount = sum price * qty for items in this batch
    amount = round(sum(item["price"] * item["qty"] for item in batch), 2)
    sale = {
        "sale_id": fake.unique.bothify(text="SALE#######"),
        "order_id": order["order_id"],
        "customer_id": order["customer"]["customer_id"],
        "sale_date": sale_dt.isoformat() + "Z",
        "fulfilled_items": batch,
        "sale_amount": amount,
        "status": status
    }
    return sale

def generate_sales(orders):
    sales = []
    for order in orders:
        fulfilled_batches = random_fulfillment(order)
        for idx, batch in enumerate(fulfilled_batches):
            sales.append(random_sale(order, batch, idx))
        fake.unique.clear()
    return sales

if __name__ == "__main__":
    sales = generate_sales(orders)
    with open("sales.json", "w") as f:
        json.dump(sales, f, indent=2)

In [None]:
import json
import random
from faker import Faker
from datetime import datetime, timedelta

fake = Faker()

# Load orders and sales
with open("orders.json") as f:
    orders = json.load(f)
with open("sales.json") as f:
    sales = json.load(f)

def pick_carrier():
    return random.choice(["UPS", "FedEx", "USPS", "DHL", "Amazon Logistics"])

def random_shipment(sale):
    # Randomly choose to ship all or part of sale
    shipped_items = []
    for item in sale["fulfilled_items"]:
        max_qty = item["qty"]
        shipped_qty = random.randint(1, max_qty)
        shipped_items.append({**item, "qty": shipped_qty})
    status = random.choices(
        ["shipped", "in_transit", "delivered", "delayed", "lost", "returned"],
        weights=[0.4, 0.3, 0.2, 0.05, 0.025, 0.025]
    )[0]
    shipment_date = datetime.fromisoformat(sale["sale_date"].replace("Z", "")) + timedelta(hours=random.randint(2, 72))
    shipment = {
        "shipment_id": fake.unique.bothify(text="SHIP#######"),
        "order_id": sale["order_id"],
        "sale_id": sale["sale_id"],
        "shipment_date": shipment_date.isoformat() + "Z",
        "carrier": pick_carrier(),
        "tracking_number": fake.bothify(text="1Z#######US"),
        "status": status,
        "shipped_items": shipped_items
    }
    return shipment

def generate_shipments(sales):
    shipments = []
    for sale in sales:
        # Some sales may result in more than one shipment, simulating split shipments
        num_shipments = 1 if random.random() < 0.85 else 2
        for _ in range(num_shipments):
            shipments.append(random_shipment(sale))
        fake.unique.clear()
    return shipments

if __name__ == "__main__":
    shipments = generate_shipments(sales)
    with open("shipments.json", "w") as f:
        json.dump(shipments, f, indent=2)
