In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

events = (spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
                         header=True,
                         inferSchema=True))

# Top 5 products by revenue
revenue = events.filter(F.col("event_type") == "purchase") \
    .groupBy("product_id") \
    .agg(F.sum("price").alias("revenue")) \
    .orderBy(F.desc("revenue")).limit(5)

revenue.show()

# Running total per user
window = Window.partitionBy("user_id").orderBy("event_time")
events.withColumn("cumulative_events", F.count("*").over(window))

# # Conversion rate by category
# events.groupBy("category_code", "event_type").count() \
#     .pivot("event_type").sum("count") \
#     .withColumn("conversion_rate", F.col("purchase")/F.col("view")*100)

# Conversion rate by category (replace pivot with conditional aggregation)
conversion = events.groupBy("category_code").agg(
    F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchase"),
    F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("view")
).withColumn(
    "conversion_rate", F.col("purchase") / F.col("view") * 100
)

conversion.show()