In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *

##### Reading all files

In [0]:
sales_df = spark.read.option("header", True).csv("/FileStore/tables/sales.csv")
menu_df = spark.read.option("header", True).csv("/FileStore/tables/menu.csv")
members_df = spark.read.option("header", True).csv("/FileStore/tables/members.csv")

In [0]:
display(sales_df)

customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
A,2021-01-11,3
B,2021-01-01,2
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1


##### Question 1: What is the total amount each customer spent at the restaurant?

In [0]:
total_spent_df = (sales_df
                  .join(menu_df, "product_id")
                  .groupBy("customer_id").agg({'price':'sum'})
                  .withColumnRenamed("sum(price)", "totalspent")
                  .orderBy("customer_id")
                 )

display(total_spent_df)

customer_id,totalspent
A,76.0
B,74.0
C,36.0


##### Question 2: How many days has each customer visited the restaurant?

In [0]:
visitcount_df = (sales_df
                .groupBy("customer_id")
                .agg(countDistinct("order_date").alias("visit_count"))
                )

display(visitcount_df)

customer_id,visit_count
B,6
C,2
A,4


##### Question 3: What was the first item from the menu purchased by each customer?

In [0]:
windowSpec = Window.partitionBy("customer_id").orderBy("order_date")

first_item_df = (sales_df
               .withColumn("rank", dense_rank().over(windowSpec))
               .filter("rank == 1")
               .join(menu_df, "product_id")
               .select("customer_id", "product_name", "order_date")
               .distinct())

display(first_item_df)

customer_id,product_name,order_date
A,sushi,2021-01-01
A,curry,2021-01-01
B,curry,2021-01-01
C,ramen,2021-01-01


##### Question 4: What is the most purchased item on the menu and how many times was it purchased by all customers?

In [0]:
max_ordered_df = (sales_df
                 .join(menu_df, "product_id")
                 .groupBy("product_id", "product_name")
                 .agg(count("product_id").alias("orders"))
                 .orderBy("orders", ascending = 0)
                 .select("product_name", "orders")
                 .limit(1))

display(max_ordered_df)

product_name,orders
ramen,8


##### Question 5: Which item was the most popular for each customer?

In [0]:
display(sales_df)

customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
A,2021-01-11,3
B,2021-01-01,2
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1


In [0]:
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
popular_data = sales_df.join(menu_df, "product_id")
display(popular_data)

product_id,customer_id,order_date,product_name,price
1,A,2021-01-01,sushi,10
2,A,2021-01-01,curry,15
2,A,2021-01-07,curry,15
3,A,2021-01-10,ramen,12
3,A,2021-01-11,ramen,12
3,A,2021-01-11,ramen,12
2,B,2021-01-01,curry,15
2,B,2021-01-02,curry,15
1,B,2021-01-04,sushi,10
1,B,2021-01-11,sushi,10


In [0]:
famous = popular_data.groupBy("customer_id", "product_name").agg(count("product_name").alias("no_of_times_ordered")).orderBy("customer_id")

display(famous)

customer_id,product_name,no_of_times_ordered
A,ramen,3
A,sushi,1
A,curry,2
B,ramen,2
B,sushi,2
B,curry,2
C,ramen,3


In [0]:
w = Window.partitionBy("customer_id").orderBy(desc("no_of_times_ordered"))

famous = famous.withColumn("rank", dense_rank().over(w)).filter("rank == 1").drop("rank")

display(famous)

customer_id,product_name,no_of_times_ordered
A,ramen,3
B,sushi,2
B,ramen,2
B,curry,2
C,ramen,3


#####Question 6:  Which item was purchased first by the customer after they became a member?

In [0]:
first_purchased = members_df.join(sales_df, "customer_id").join(menu_df, "product_id").filter("order_date >= join_date")
# menu_df.show()

w = Window.partitionBy("customer_id").orderBy("order_date")

first_purchased.withColumn("rank", dense_rank().over(w)).filter("rank == 1").drop("rank").select("customer_id", "product_name", "join_date", "order_date").show()

+-----------+------------+----------+----------+
|customer_id|product_name| join_date|order_date|
+-----------+------------+----------+----------+
|          A|       curry|2021-01-07|2021-01-07|
|          B|       sushi|2021-01-09|2021-01-11|
+-----------+------------+----------+----------+



#####Question 7:  Which item was purchased just before the customer became a member?

In [0]:
all_data = members_df.join(sales_df, "customer_id").join(menu_df, "product_id")
just_before = all_data.filter("order_date < join_date")

w = Window.partitionBy("customer_id").orderBy(desc("order_date"))

just_before_purchase = just_before.withColumn("rank", dense_rank().over(w)).filter("rank == 1").drop("rank").select("customer_id", "order_date", "product_name")
display(just_before_purchase)

customer_id,order_date,product_name
A,2021-01-01,sushi
A,2021-01-01,curry
B,2021-01-04,sushi


#####Question 8:  What is the total items and amount spent for each member before they became a member?

In [0]:
just_before.groupBy("customer_id").agg(countDistinct("product_id"), sum("price")).orderBy("customer_id").show()

+-----------+-----------------+----------+
|customer_id|count(product_id)|sum(price)|
+-----------+-----------------+----------+
|          A|                2|      25.0|
|          B|                2|      40.0|
+-----------+-----------------+----------+



#####Question 9: If each $1 spent equates to 10 points and sushi has a 2x points multiplier — how many points would each customer have?

In [0]:
sales_menu = sales_df.join(menu_df, "product_id")
sales_menu \
.withColumn("points", when(col('product_id') == 1,col('price')*20).otherwise(col('price')*10)) \
.groupBy("customer_id").agg(sum("points").alias("total_points")) \
.orderBy("customer_id") \
.show()

+-----------+------------+
|customer_id|total_points|
+-----------+------------+
|          A|       860.0|
|          B|       940.0|
|          C|       360.0|
+-----------+------------+



#####Question 10: In the first week after a customer joins the program (including their join date) they earn 2x points on all items, not just sushi — how many points do customer A and B have at the end of January?