In [0]:
# Day 3
# Reloading October data:
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, LongType, DoubleType

schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", LongType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", LongType(), True),
    StructField("user_session", StringType(), True)
])

df = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
    header=True,
    schema=schema
)

In [0]:
# verfiying data
events = df
print("Total events:", events.count())
events.show(5, truncate=False)

In [0]:
# Small sample to Pandas (For inspection)
pdf = events.limit(1000).toPandas()
pdf.head()

In [0]:
# Creating a category dimension:
category_dim = events.select("category_id", "category_code").dropna().dropDuplicates()
category_dim = category_dim.withColumnRenamed(
    "category_code", "dim_category_code"
)
category_dim.show(5)

In [0]:
# Performing Inner join
inner_joined = events.join(category_dim, on="category_id", how="inner")
inner_joined.count()

In [0]:
# Performing Left join
left_joined = events.join(category_dim, on="category_id", how="left")

left_joined.select(
    "category_id",
    "category_code",        # from events
    "dim_category_code"     # from dimension
).show(10)

In [0]:
# Calculating running total of events for each user
from pyspark.sql import functions as F
from pyspark.sql.window import Window

user_window = Window.partitionBy("user_id").orderBy("event_time")

events_with_running = events.withColumn(
    "cumulative_events",
    F.count("*").over(user_window)
)

events_with_running.select(
    "user_id", "event_time", "event_type", "cumulative_events"
).show(10)

In [0]:
# Flaging purchases
events_feat = events.withColumn(
    "is_purchase",
    F.when(F.col("event_type") == "purchase", 1).otherwise(0)
)

# Creating Price Buckets
events_feat = events_feat.withColumn(
    "price_bucket",
    F.when(F.col("price") < 50, "Low")
     .when(F.col("price") < 200, "Medium")
     .otherwise("High")
)

events_feat.select("price", "price_bucket", "is_purchase").show(10)

In [0]:
# Calculating conversion rate
conversion = (
    events.groupBy("category_code", "event_type")
          .count()
          .groupBy("category_code")
          .pivot("event_type")
          .sum("count")
          .withColumn(
              "conversion_rate",
              (F.col("purchase") / F.col("view")) * 100
          )
)

conversion.show(10)