In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
%fs
ls /Volumes/workspace/ecommerce/ecommerce_data

In [0]:
df_oct = spark.read.parquet("/Volumes/workspace/ecommerce/ecommerce_data/parquet/oct/")

In [0]:
df_oct.printSchema()

In [0]:
print("Total records:", df_oct.count())

In [0]:
display(df_oct.limit(5))

### Top 5 products by revenue

In [0]:
revenue = df_oct.filter(F.col("event_type") == "purchase") \
    .groupBy("product_id", "brand") \
    .agg(F.sum("price").alias("revenue")) \
    .orderBy(F.desc("revenue")).limit(5)

In [0]:
revenue.display()

### Running total per user

In [0]:
from pyspark.sql import Window
from pyspark.sql import functions as F

window = Window.partitionBy("user_id").orderBy("event_time")

df_with_running_total = df_oct.withColumn(
    "cumulative_events",
    F.count("*").over(window)
)

df_with_running_total.display(5)

### Pivot is not supported inside Databricks interactive clusters. 
#### This happens in some serverless / interactive cluster configurations and certain Unity Catalog setups.
#### it’s an environment limitation, not the Logic. Use Conditional Aggregation 
##### Below is an example

In [0]:
# Conversion rate by category
df_oct.groupBy("category_code", "event_type").count() \
    .pivot("event_type").sum("count") \
    .withColumn("conversion_rate", F.col("purchase")/F.col("view")*100)

In [0]:
df_oct.groupBy("category_code") \
    .pivot("event_type", ["view", "purchase"]) \
    .count() \
    .withColumn("conversion_rate", F.col("purchase")/F.col("view")*100) \
    .show()

### Implemented funnel analysis in Spark using conditional aggregation and window functions to track user journeys from view to cart to purchase, ensuring correct event sequencing and conversion measurement

In [0]:
from pyspark.sql import Window
w = Window.partitionBy("user_id").orderBy("event_time")
df_seq = df_oct.withColumn("next_event", F.lead("event_type").over(w))

In [0]:
df_seq.select(F.col('event_type')).distinct().show()

In [0]:
df_seq.select(F.col('next_event')).distinct().show()

In [0]:
view_to_cart = df_seq.filter(
    (F.col("event_type") == "view") & (F.col("next_event") == "cart")
).select("user_id").distinct()

In [0]:
cart_to_purchase = df_seq.filter(
    (F.col("event_type") == "cart") & (F.col("next_event") == "purchase")
).select("user_id").distinct()

In [0]:
total_users = df_oct.select("user_id").distinct().count()
view_users = df_oct.filter(F.col("event_type") == "view").select("user_id").distinct().count()
cart_users = df_oct.filter(F.col("event_type") == "cart").select("user_id").distinct().count()
purchase_users = df_oct.filter(F.col("event_type") == "purchase").select("user_id").distinct().count()

view_to_cart_users = view_to_cart.count()
cart_to_purchase_users = cart_to_purchase.count()

print("Total users:", total_users)
print("Users with view:", view_users)
print("Users with cart:", cart_users)
print("Users with purchase:", purchase_users)
print("Users with view → cart:", view_to_cart_users)
print("Users with cart → purchase:", cart_to_purchase_users)

## USER DEFINED FUNCTION
### UDFs are slow because they break Spark optimization
### Pandas UDF – Vectorized, high-performance UDF

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def to_upper(x):
    return x.upper()

upper_udf = udf(to_upper, StringType())

df_oct.withColumn("name_upper", upper_udf(df_oct.brand))