In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


In [0]:
df_nov = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)

In [0]:
df_nov.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



###Top 5 products by revenue


In [0]:
# Top 5 products by revenue
revenue = df_nov.filter(F.col("event_type") == "purchase")\
    .groupBy("product_id") \
    .agg(F.sum("price").alias("revenue")) \
    .orderBy(F.desc("revenue")).limit(5).show()

+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|   1005115|2.0625574319999967E7|
|   1005105|1.1445354689999992E7|
|   1005135|  7086522.1299999915|
|   1004249|   6815294.620000016|
|   1002544|   5603193.590000012|
+----------+--------------------+



### Running total per user

In [0]:
# Running total per user
window = Window.partitionBy("user_id").orderBy("event_time")
df_nov.withColumn("cumulative_events", F.count("*").over(window)).display()

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session,cumulative_events
2019-11-29T14:47:32.000Z,view,1004740,2053013555631882655,electronics.smartphone,xiaomi,239.36,94584874,e6abb356-512a-4472-9d36-62e00a5fd01c,1
2019-11-26T05:31:47.000Z,view,4900378,2053013555220840837,appliances.kitchen.juicer,scarlett,112.99,122384079,c04d12ef-da1c-4e4a-b446-8f1bb5a81e35,1
2019-11-28T08:25:09.000Z,view,12702958,2053013553559896355,,cordiant,42.47,122384079,6cee7edb-68ae-4bea-86ac-ef0efd7d5c8f,2
2019-11-28T08:32:51.000Z,view,4900173,2053013555220840837,appliances.kitchen.juicer,moulinex,102.94,122384079,6cee7edb-68ae-4bea-86ac-ef0efd7d5c8f,3
2019-11-28T09:11:30.000Z,view,4900173,2053013555220840837,appliances.kitchen.juicer,moulinex,102.94,122384079,6cee7edb-68ae-4bea-86ac-ef0efd7d5c8f,4
2019-11-28T09:30:06.000Z,view,4900393,2053013555220840837,appliances.kitchen.juicer,scarlett,90.48,122384079,6cee7edb-68ae-4bea-86ac-ef0efd7d5c8f,5
2019-11-28T09:32:12.000Z,view,4900412,2053013555220840837,appliances.kitchen.juicer,kitfort,107.6,122384079,6cee7edb-68ae-4bea-86ac-ef0efd7d5c8f,6
2019-11-28T09:32:23.000Z,view,4900383,2053013555220840837,appliances.kitchen.juicer,scarlett,106.33,122384079,6cee7edb-68ae-4bea-86ac-ef0efd7d5c8f,7
2019-11-28T09:34:36.000Z,view,4900296,2053013555220840837,appliances.kitchen.juicer,redmond,123.3,122384079,6cee7edb-68ae-4bea-86ac-ef0efd7d5c8f,8
2019-11-28T09:35:18.000Z,view,4900379,2053013555220840837,appliances.kitchen.juicer,scarlett,94.98,122384079,6cee7edb-68ae-4bea-86ac-ef0efd7d5c8f,9


In [0]:
df_nov.select('event_type').distinct().show()

+----------+
|event_type|
+----------+
|  purchase|
|      cart|
|      view|
+----------+



### Conversion rate by category

In [0]:
# Conversion rate by category
df_nov.groupBy("category_code", "event_type").count() \
    .groupby("category_code")\
    .pivot("event_type").sum("count") \
    .withColumn("conversion_rate", F.col("purchase")/F.col("view")*100).show()

+--------------------+-----+--------+-------+-------------------+
|       category_code| cart|purchase|   view|    conversion_rate|
+--------------------+-----+--------+-------+-------------------+
|furniture.living_...| 6521|    1562| 417428| 0.3741962685780542|
|      apparel.jumper|  324|      82|  31269|0.26224055774089355|
| stationery.cartrige|  644|     191|  11943| 1.5992631667085322|
|       sport.bicycle| 2227|     536| 106037| 0.5054839348529288|
|        apparel.sock|  101|      19|   3455| 0.5499276410998553|
|appliances.enviro...|   81|      32|   3316| 0.9650180940892641|
|          kids.swing| 1556|     482|  57430| 0.8392826049103257|
|auto.accessories....|   96|      18|   3397| 0.5298793052693553|
|auto.accessories....| 2213|     544|  47145| 1.1538869445328244|
|electronics.audio...| 1615|     489|  44645| 1.0953074252435884|
|  electronics.clocks|69289|   23237|1994440| 1.1650889472734203|
|electronics.audio...| 2280|     696|  60363|  1.153024203568411|
|appliance

###Joins

In [0]:
salary = spark.createDataFrame([
    ("John", 50000),
    ("Jane", 60000),
    ("Bob", 75000),
    ("Alice", 80000)
],["name", "salary"])

employee = spark.createDataFrame([
    ("John", "Manager"),
    ("Jane", "Engineer"),
    ("Bob", "Manager"),
    ("dheeru","engineer")
    ],["name","job"])

In [0]:
employee.join(salary,"name","inner").display()

name,job,salary
John,Manager,50000
Jane,Engineer,60000
Bob,Manager,75000


In [0]:
employee.join(salary,"name","right").display()

name,job,salary
John,Manager,50000
Jane,Engineer,60000
Bob,Manager,75000
Alice,,80000


In [0]:
employee.join(salary,"name","left").display()


name,job,salary
John,Manager,50000.0
Jane,Engineer,60000.0
Bob,Manager,75000.0
dheeru,engineer,


In [0]:
employee.join(salary,"name","outer").display()

name,job,salary
John,Manager,50000.0
Jane,Engineer,60000.0
Bob,Manager,75000.0
dheeru,engineer,
Alice,,80000.0


###UDF(User Defined Function)

In [0]:
salary_df= spark.createDataFrame([
    ('101',"John", 50000),
    ('102',"Jane", 60000),
    ('103',"Bob", 45000),
    ('104',"Alice", 35000),
    ('105',"dheeru",80000),
],["employee_id","name", "salary"])

In [0]:
salary_df.show()

+-----------+------+------+
|employee_id|  name|salary|
+-----------+------+------+
|        101|  John| 50000|
|        102|  Jane| 60000|
|        103|   Bob| 45000|
|        104| Alice| 35000|
|        105|dheeru| 80000|
+-----------+------+------+



In [0]:
def salary_category(salary):
    if salary <= 50000:
        return "juniour"
    else:
        return "senior"
    

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
salary_category_udf = udf(salary_category,StringType())


In [0]:
salary_df.show()

+-----------+------+------+
|employee_id|  name|salary|
+-----------+------+------+
|        101|  John| 50000|
|        102|  Jane| 60000|
|        103|   Bob| 45000|
|        104| Alice| 35000|
|        105|dheeru| 80000|
+-----------+------+------+



In [0]:
salary_category = salary_df.withColumn(
    "salary_category",
    salary_category_udf(salary_df["salary"])
)

salary_category.show()

+-----------+------+------+---------------+
|employee_id|  name|salary|salary_category|
+-----------+------+------+---------------+
|        101|  John| 50000|        juniour|
|        102|  Jane| 60000|         senior|
|        103|   Bob| 45000|        juniour|
|        104| Alice| 35000|        juniour|
|        105|dheeru| 80000|         senior|
+-----------+------+------+---------------+

