In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!


In [7]:
import requests, random, pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.getOrCreate()

# 1) Fetch base products (DummyJSON has 100 total)
resp = requests.get("https://dummyjson.com/products?limit=100").json()
base_products = resp.get("products", [])

# 2) Simulate 1000 products by randomizing and reindexing
simulated_products = []
for i in range(1000):
    p = dict(random.choice(base_products))
    p.pop("id", None)  # remove old id
    p = {"Product_ID": i + 1, **p}  # insert Product_ID at the beginning
    p["title"] = f"{p['title']} #{i+1}"  # make each title unique
    p["price"] = str(round(float(p.get("price", 0)) * random.uniform(0.8, 1.2), 2))
    p["rating"] = str(round(random.uniform(3.0, 5.0), 1))
    p["stock"] = str(random.randint(10, 500))
    simulated_products.append(p)

# 3) Convert to Spark DataFrame
pdf_products = pd.DataFrame(simulated_products)

# Reorder columns to ensure Product_ID is first (in case Pandas reorders)
cols = ["Product_ID"] + [c for c in pdf_products.columns if c != "Product_ID"]
pdf_products = pdf_products[cols]

df_products = spark.createDataFrame(pdf_products)
df_products = df_products.withColumn("ingestion_time", current_timestamp())

df_products.write.format("delta").mode("overwrite").saveAsTable("Products")

StatementMeta(, e1547ded-3af8-4e69-b2bc-853dc1a7a1d7, 9, Finished, Available, Finished)

In [5]:
import requests, pandas as pd, random
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.getOrCreate()

# 1️⃣ Fetch base 100 users
resp = requests.get("https://dummyjson.com/users?limit=100").json()
users = resp.get("users", [])

# 2️⃣ Simulate 1000 users by randomizing and changing IDs
simulated_users = []
for i in range(1000):
    u = dict(random.choice(users))
    u.pop("id", None)  # remove old id
    u = {"Customer_ID": i + 1, **u}  # insert new ID with proper naming
    u["email"] = f"user{i+1}@example.com"
    simulated_users.append(u)

# 3️⃣ Convert to Pandas → Spark DataFrame
pdf_users = pd.DataFrame(simulated_users)

# Reorder columns so Customer_ID appears first
cols = ["Customer_ID"] + [c for c in pdf_users.columns if c != "Customer_ID"]
pdf_users = pdf_users[cols]

df_customers = spark.createDataFrame(pdf_users)
df_customers = df_customers.withColumn("ingestion_time", current_timestamp())

# 4️⃣ Save as Delta Table
df_customers.write.format("delta").mode("overwrite").saveAsTable("Customers")


StatementMeta(, 590fb8bf-3ec6-41b3-8048-08a547d42a11, 7, Finished, Available, Finished)

In [10]:
import requests, random, pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.getOrCreate()

# 1️⃣ Fetch base carts (20 from DummyJSON)
base_resp = requests.get("https://dummyjson.com/carts?limit=100").json()
base_carts = base_resp.get("carts", [])

# 2️⃣ Simulate 1000 unique carts from the base ones
simulated_carts = []
for i in range(1000):
    base = random.choice(base_carts)
    customer_id = random.randint(1, 1000)
    cart_id = i + 1
    for p in base["products"]:
        simulated_carts.append({
            "Cart_ID": cart_id,
            "Customer_ID": customer_id,
            "Product_ID": p["id"],
            "product_title": p["title"],
            "price": float(p["price"]) * random.uniform(0.8, 1.2),
            "quantity": random.randint(1, 5),
            "discountPercentage": float(p["discountPercentage"]),
            "total": float(p["price"]) * random.uniform(0.8, 1.2) * random.randint(1, 5),
            "discountedTotal": float(p["discountedTotal"]) * random.uniform(0.8, 1.2),
            "cart_total": float(base["total"]) * random.uniform(0.8, 1.2),
            "cart_discounted_total": float(base["discountedTotal"]) * random.uniform(0.8, 1.2),
            "cart_total_products": base["totalProducts"],
            "cart_total_quantity": base["totalQuantity"]
        })

# 3️⃣ Convert to Spark DataFrame
pdf_carts = pd.DataFrame(simulated_carts)

# Reorder so Cart_ID, Customer_ID, Product_ID come first
cols = ["Cart_ID", "Customer_ID", "Product_ID"] + [c for c in pdf_carts.columns if c not in ["Cart_ID", "Customer_ID", "Product_ID"]]
pdf_carts = pdf_carts[cols]

df_carts = spark.createDataFrame(pdf_carts)
df_carts = df_carts.withColumn("ingestion_time", current_timestamp())


# Optionally save as a managed Delta table
df_carts.write.format("delta").mode("overwrite").saveAsTable("Carts")


StatementMeta(, e1547ded-3af8-4e69-b2bc-853dc1a7a1d7, 12, Finished, Available, Finished)

In [13]:
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Mockaroo credentials
api_key = "def315d0"
schema_name = "orders_schema"

# API endpoint (fetch 1000 records)
url = f"https://my.api.mockaroo.com/{schema_name}.json?count=1000&key={api_key}"

# Fetch and convert data
data = requests.get(url).json()
pdf = pd.DataFrame(data)

# Rename columns for consistency
pdf.rename(
    columns={
        "order_id": "Order_ID",
        "customer_id": "Customer_ID",
        "product_id": "Product_ID",
        "order_date": "Order_Date",
        "quantity": "Quantity",
        "price": "Price",
        "status": "Status"
    },
    inplace=True
)

# Convert to Spark DataFrame
df_orders = spark.createDataFrame(pdf)

# Add ingestion timestamp
df_orders = df_orders.withColumn("ingestion_time", current_timestamp())

# Save to Lakehouse (Bronze layer)
df_orders.write.format("delta").mode("overwrite").saveAsTable("Orders")


StatementMeta(, e1547ded-3af8-4e69-b2bc-853dc1a7a1d7, 15, Finished, Available, Finished)