In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)

In [0]:
# Top 5 products by revenue
revenue = events.filter(F.col("event_type") == "purchase") \
    .groupBy("product_id") \
    .agg(F.sum("price").alias("revenue")) \
    .orderBy(F.desc("revenue")).limit(5).show()

+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|   1005115|1.2406807350000003E7|
|   1005105|1.0239248679999996E7|
|   1004249|   6730112.920000011|
|   1005135|   5567806.640000007|
|   1004767|   5430723.430000007|
+----------+--------------------+



In [0]:
# Running total per user
window = Window.partitionBy("user_id").orderBy("event_time")
events.withColumn("cumulative_events", F.count("*").over(window)).show()

+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+-----------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|cumulative_events|
+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+-----------------+
|2019-10-09 10:30:19|      view|  17301541|2053013553853497655|                NULL|    NULL|162.17|205053188|e1eadbc6-aef5-4cf...|                1|
|2019-10-09 10:30:44|      view|  17301541|2053013553853497655|                NULL|    NULL|162.17|205053188|e1eadbc6-aef5-4cf...|                2|
|2019-10-07 06:23:01|      view|  16200119|2053013556344914381|   kids.fmcg.diapers|   moony| 18.47|222907508|cb653adc-46a2-4d9...|                1|
|2019-10-07 06:26:23|      view|  16200162|2053013556344914381|   kids.fmcg.diapers|   moony| 18.47|

In [0]:
# Conversion rate by category
events.groupBy("category_code", "event_type").count() \
    .groupBy("category_code") \
    .pivot("event_type").sum("count") \
    .withColumn("conversion_rate", F.col("purchase")/F.col("view")*100).show()

+--------------------+-----+--------+-------+-------------------+
|       category_code| cart|purchase|   view|    conversion_rate|
+--------------------+-----+--------+-------+-------------------+
|auto.accessories....| NULL|      46|  12305| 0.3738317757009346|
|furniture.living_...| NULL|    1084| 215471| 0.5030839416905292|
| stationery.cartrige|  106|     134|   7380| 1.8157181571815717|
|       sport.bicycle|  693|     838| 128759| 0.6508282916145668|
|        apparel.sock|    7|      21|   2621| 0.8012209080503624|
|appliances.enviro...|   16|      27|   2172| 1.2430939226519337|
|          kids.swing|  147|     330|  31596|  1.044436004557539|
|electronics.audio...|  196|     430|  28394| 1.5144044516447137|
|auto.accessories....|  716|     494|  42350|  1.166469893742621|
|  electronics.clocks|20344|   17906|1272783| 1.4068384005757462|
|electronics.audio...|  410|     423|  35409| 1.1946115394391257|
|appliances.kitche...| 1080|     936| 105149| 0.8901653843593377|
|appliance