In [0]:
import json
import random
import uuid
import time
from datetime import datetime
import os

base_path = "/Volumes/products/sales_raw/stream_input/"
customers_path = base_path + "customers/"
products_path = base_path + "products/"
orders_path = base_path + "orders/"
order_items_path = base_path + "order_items/"

os.makedirs(customers_path, exist_ok=True)
os.makedirs(products_path, exist_ok=True)
os.makedirs(orders_path, exist_ok=True)
os.makedirs(order_items_path, exist_ok=True)

customers = [{"customer_id": f"CUST{str(i).zfill(3)}", "name": f"Customer {i}"} for i in range(10)]
products = [{"product_id": f"PROD{str(i).zfill(3)}", "name": f"Product {i}", "price": round(random.uniform(10, 100), 2)} for i in range(20)]

# Guardar dimensiones si no existen
if not os.path.exists(customers_path + "customers.json"):
    with open(customers_path + "customers.json", "w") as f:
        for row in customers:
            f.write(json.dumps(row) + "\n")

if not os.path.exists(products_path + "products.json"):
    with open(products_path + "products.json", "w") as f:
        for row in products:
            f.write(json.dumps(row) + "\n")

# Inyección de datos en streaming con nulos aleatorios
while True:
    order_id = str(uuid.uuid4())

    # Randomiza si el customer_id será nulo
    customer = random.choice(customers)
    customer_id = customer["customer_id"] if random.random() > 0.1 else None  # 10% de nulos

    num_items = random.randint(1, 5)
    order_items = []

    for _ in range(num_items):
        product = random.choice(products)
        
        # Randomiza campos nulos
        product_id = product["product_id"] if random.random() > 0.1 else None  # 10%
        quantity = random.randint(1, 3) if random.random() > 0.1 else None
        price = product["price"] if random.random() > 0.1 else None

        order_items.append({
            "order_id": order_id,
            "product_id": product_id,
            "quantity": quantity,
            "price": price,
            "timestamp": datetime.utcnow().isoformat()
        })

    order_record = {
        "order_id": order_id,
        "customer_id": customer_id,
        "timestamp": datetime.utcnow().isoformat()
    }

    ts = int(time.time() * 1000)
    with open(orders_path + f"orders_{ts}.json", "w") as f:
        f.write(json.dumps(order_record) + "\n")

    with open(order_items_path + f"order_items_{ts}.json", "w") as f:
        for item in order_items:
            f.write(json.dumps(item) + "\n")

    time.sleep(10)