In [0]:
%sql
CREATE CATALOG IF NOT EXISTS analytics_catalog;

CREATE SCHEMA IF NOT EXISTS analytics_catalog.retail_schema;

CREATE VOLUME IF NOT EXISTS analytics_catalog.retail_schema.ecommerce_volume;

CREATE VOLUME IF NOT EXISTS analytics_catalog.retail_schema.ecommerce_output;

SHOW GRANT ON VOLUME analytics_catalog.retail_schema.ecommerce_output;

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, year, month, dayofmonth, when, count, rank
from pyspark.sql.window import Window
import time

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("E-Commerce Data Pipeline") \
    .getOrCreate()

In [0]:
ecom_df = spark.read.option("header", True).option("inferSchema", True).csv("dbfs:/Volumes/analytics_catalog/retail_schema/ecommerce_volume/ecommerce_transactions.csv")
region_df = spark.read.option("header", True).option("inferSchema", True).csv("dbfs:/Volumes/analytics_catalog/retail_schema/ecommerce_volume/country_region.csv")

In [0]:
ecom_clean = ecom_df.dropDuplicates() \
    .filter(col("customer_id").isNotNull()) \
    .filter(col("quantity") > 0)

In [0]:
ecom_enriched = ecom_clean.withColumn("order_value", expr("quantity * unit_price")) \
    .withColumn("year", year("order_date")) \
    .withColumn("month", month("order_date")) \
    .withColumn("day", dayofmonth("order_date"))
ecom_enriched.show()

In [0]:
country_revenue = ecom_enriched.groupBy("country") \
    .sum("order_value") \
    .withColumnRenamed("sum(order_value)", "total_revenue")
country_revenue.show()

In [0]:
window_spec = Window.partitionBy("country").orderBy(col("order_value").desc())
ranked_df = ecom_enriched.withColumn("rank", rank().over(window_spec))
ranked_df.show()

top_customer = ranked_df.filter(col("rank") == 1).select("country", "customer_id", "order_value")
top_customer.show()

In [0]:
region_joined = ecom_enriched.join(region_df, on="country", how="left")

region_summary = region_joined.groupBy("region") \
    .sum("order_value") \
    .withColumnRenamed("sum(order_value)", "region_revenue")
region_summary.show()

In [0]:
monthly_pivot = ecom_enriched.groupBy("category", "month") \
    .sum("order_value") \
    .groupBy("category") \
    .pivot("month") \
    .sum("sum(order_value)")
monthly_pivot.show()

In [0]:
price_bands = ecom_enriched.withColumn("price_band", when(col("order_value") < 50, "Low")
                                       .when(col("order_value") < 200, "Medium")
                                       .otherwise("High")) \
                           .groupBy("price_band").agg(count("*").alias("count"))
price_bands.show()

In [0]:
# Approximate partition count from input
print("Approx partition count (based on input files):", len(ecom_enriched.inputFiles()))

# Repartition safely (this works)
ecom_repart = ecom_enriched.repartition(4)

# Safe message without triggering .rdd
print("Requested repartition to 4 partitions. (Note: Cannot verify exact count on serverless)")

In [0]:
# First run (cold execution)
start = time.time()
ecom_enriched.groupBy("customer_id").sum("order_value").collect()
print("1st run (cold):", time.time() - start)

# Second run (may be faster due to query plan reuse)
start = time.time()
ecom_enriched.groupBy("customer_id").sum("order_value").collect()
print("2nd run (warm-ish):", time.time() - start)

In [0]:
# Write Country Revenue to its own folder
country_revenue.write.mode("overwrite").parquet(
    "dbfs:/Volumes/analytics_catalog/retail_schema/ecommerce_output/country_revenue", compression="snappy")

# Write Region Summary to its own folder
region_summary.write.mode("overwrite").parquet(
    "dbfs:/Volumes/analytics_catalog/retail_schema/ecommerce_output/region_summary", compression="snappy")

# Write Top Customer to its own folder
top_customer.write.mode("overwrite").parquet(
    "dbfs:/Volumes/analytics_catalog/retail_schema/ecommerce_output/top_customer", compression="snappy")