In [0]:
from pyspark.sql.functions import *

spark.conf.set(
  "<storage account name>"
  "<access key>"
)

container = ["raw", "curated", "presentation"]
storage   = "adlsdemodevde"

raw_base = f"abfss://{container[0]}@{storage}.dfs.core.windows.net/raw/retail"
curated_base = f"abfss://{container[1]}@{storage}.dfs.core.windows.net"
presentation_base = f"abfss://{container[2]}@{storage}.dfs.core.windows.net/presentation/retail"
bronze_base = f"{curated_base}/bronze"




In [0]:
orders_df = (
    spark.range(1, 1_000_001)
    .withColumnRenamed("id", "order_id")
    .withColumn("order_ts", current_timestamp())
    .withColumn("customer_id", (rand() * 50_000).cast("int") + 1)
    .withColumn("product_id", (rand() * 500).cast("int") + 1)
    .withColumn(
        "country",
        when(rand() < 0.7, "India")
        .when(rand() < 0.9, "USA")
        .otherwise("UK")
    )
    .withColumn("quantity", (rand() * 5 + 1).cast("int"))
    .withColumn("unit_price", (rand() * 2000 + 100))
)

orders_df.write.mode("overwrite") \
    .option("header", True) \
    .csv(f"{raw_base}/orders")
# dbutils.fs.ls(f"{raw_base}/orders")
customers_df = (
    spark.range(1, 50_001)
    .withColumnRenamed("id", "customer_id")
    .withColumn("customer_name", concat(lit("Customer-"), col("customer_id")))
    .withColumn("email", concat(col("customer_name"), lit("@mail.com")))
    .withColumn(
        "country",
        when(rand() < 0.65, "India")
        .when(rand() < 0.85, "USA")
        .otherwise("UK")
    )
    .withColumn(
        "signup_date",
        date_sub(current_date(), (rand() * 1000).cast("int"))
    )
    .withColumn("is_active", rand() > 0.1)
)

customers_df.write.mode("overwrite") \
    .option("header", True) \
    .csv(f"{raw_base}/customers")
products_df = (
    spark.range(1, 501)
    .withColumnRenamed("id", "product_id")
    .withColumn("product_name", concat(lit("Product-"), col("product_id")))
    .withColumn(
        "category",
        when(col("product_id") % 4 == 0, "Electronics")
        .when(col("product_id") % 4 == 1, "Clothing")
        .when(col("product_id") % 4 == 2, "Grocery")
        .otherwise("Home")
    )
    .withColumn(
        "sub_category",
        when(col("category") == "Electronics", "Mobile")
        .when(col("category") == "Clothing", "Men")
        .otherwise("General")
    )
    .withColumn(
        "brand",
        when(rand() < 0.5, "BrandA").otherwise("BrandB")
    )
)

products_df.write.mode("overwrite") \
    .option("header", True) \
    .csv(f"{raw_base}/products")


In [0]:
raw_orders = spark.read \
    .option("header", True) \
    .csv(f"{raw_base}/orders")
raw_customers = spark.read \
    .option("header", True) \
    .csv(f"{raw_base}/customers")
raw_products = spark.read \
    .option("header", True)\
    .csv(f"{raw_base}/products")
# ======================================

raw_orders.write.mode("overwrite") \
    .format("delta") \
    .save(f"{bronze_base}/orders")

raw_customers.write.mode("overwrite") \
    .format("delta") \
    .save(f"{bronze_base}/customers")

raw_products.write.mode("overwrite") \
    .format("delta") \
    .save(f"{bronze_base}/products")

# ======================================

bronze_orders = spark.read.format("delta").load(
    f"{bronze_base}/orders"
)
bronze_customers = spark.read.format("delta").load(
    f"{bronze_base}/customers"
)
bronze_products = spark.read.format("delta").load(
    f"{bronze_base}/products"
)


In [0]:
orders_clean = bronze_orders.filter(
    "order_id IS NOT NULL AND customer_id IS NOT NULL AND product_id IS NOT NULL"
)

customers_clean = bronze_customers.filter("customer_id IS NOT NULL")
products_clean  = bronze_products.filter("product_id IS NOT NULL")

orders_clean = orders_clean.withColumnRenamed(
    "country", "order_country"
)

customers_clean = customers_clean.withColumnRenamed(
    "country", "customer_country"
)
products_clean = products_clean.withColumnRenamed(
    "country", "product_country"
)


In [0]:
from pyspark.sql.functions import broadcast

orders_with_products = (
    orders_clean
    .join(
        broadcast(products_clean),
        on="product_id",
        how="left"
    )
)
silver_enriched = (
    orders_with_products
    .join(
        customers_clean,
        on="customer_id",
        how="left"
    )
)
silver_base = f"abfss://{container[1]}@{storage}.dfs.core.windows.net/silver"

silver_enriched.write.mode("overwrite") \
    .format("delta") \
    .partitionBy("order_country") \
    .save(f"{silver_base}/orders_enriched")


In [0]:
spark.read.format("delta") \
    .load(f"{silver_base}/orders_enriched") \
    .groupBy("order_country") \
    .count() \
    .show()


In [0]:
silver_df = spark.read.format("delta").load(
    f"{silver_base}/orders_enriched"
)

gold_country_sales = (
    silver_df
    .groupBy("order_country")
    .agg(
        count("*").alias("total_orders"),
        sum("quantity").alias("total_quantity"),
        sum(col("quantity").cast("float") * col("unit_price").cast("float")).alias("total_revenue")
    )
)

gold_country_sales.write.mode("overwrite") \
    .format("delta") \
    .save(f"{presentation_base}/country_sales")

In [0]:
spark.read.format("delta") \
    .load(f"{presentation_base}/country_sales") \
    .show()
