![image_1769310132881.png](./image_1769310132881.png "image_1769310132881.png")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
events = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/*.csv",
    header=True,
    inferSchema=True
)

In [0]:
events.limit(5).show()
events.printSchema()

In [0]:
events = events.filter(F.col("price").isNotNull())

In [0]:
events = events.withColumn(
    "event_time",
    F.to_timestamp("event_time")
)

In [0]:
categories = events.select("category_code") \
    .dropna() \
    .distinct() \
    .withColumn("category_level", F.split("category_code", "\\.")[0])

**Inner Join**

In [0]:
events_joined = events.join(
    categories,
    on="category_code",
    how="inner"
)

events_joined.limit(5).show()

**Left Join**

In [0]:
events_left = events.join(
    categories,
    on="category_code",
    how="left"
)


**Top 5 Products by Revenue**

In [0]:
from pyspark.sql import functions as F

revenue = (
    events
    .filter(F.col("event_type") == "purchase")
    .groupBy("product_id", "brand")
    .agg(F.sum("price").alias("revenue"))
    .orderBy(F.desc("revenue"))
    .limit(5)
)

revenue.show()


**WINDOW FUNCTIONS**

_üîπ Running total per user_

In [0]:
window_spec = Window.partitionBy("user_id").orderBy("event_time")

events_with_running = events.withColumn(
    "cumulative_events",
    F.count("*").over(window_spec)
)

events_with_running.select(
    "user_id", "event_time", "cumulative_events"
).limit(10).show()

_üîπ Ranking purchases per user_

In [0]:
rank_window = Window.partitionBy("user_id").orderBy(F.desc("price"))

events.withColumn(
    "price_rank",
    F.rank().over(rank_window)
).limit(10).show()


**Conversion Rate**

In [0]:
conversion = (
    events
    .groupBy("category_code")
    .pivot("event_type")
    .count()
    .withColumn(
        "conversion_rate",
        (F.col("purchase") / F.col("view")) * 100
    )
)

conversion.orderBy(F.desc("conversion_rate")).show(10)

**Derived Features**

_1Ô∏è‚É£ Price bucket_

In [0]:
events = events.withColumn(
    "price_bucket",
    F.when(F.col("price") < 100, "LOW")
     .when(F.col("price") < 500, "MEDIUM")
     .otherwise("HIGH")
)


_2Ô∏è‚É£ Is purchase flag_

In [0]:
events = events.withColumn(
    "is_purchase",
    F.when(F.col("event_type") == "purchase", 1).otherwise(0)
)


**UDF**

In [0]:
from pyspark.sql.types import StringType

def price_label(price):
    if price < 100:
        return "cheap"
    elif price < 500:
        return "moderate"
    else:
        return "expensive"

price_udf = F.udf(price_label, StringType())

events.withColumn("price_label", price_udf("price")) \
      .select("price", "price_label") \
      .limit(10).show()


**üî∏ 1. Top 5 users by purchases**

In [0]:
events.filter(F.col("event_type") == "purchase") \
    .groupBy("user_id") \
    .count() \
    .orderBy(F.desc("count")) \
    .limit(5) \
    .show()

**üî∏ 2. Average price per category**

In [0]:
events.groupBy("category_code") \
    .agg(F.avg("price").alias("avg_price")) \
    .orderBy(F.desc("avg_price")) \
    .show(10)

**üî∏ 3. Daily events trend**

In [0]:
events.withColumn("date", F.to_date("event_time")) \
    .groupBy("date") \
    .count() \
    .orderBy("date") \
    .show()

**All Results Save**

In [0]:
revenue.write.mode("overwrite").csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/day03_top_revenue"
)