In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [0]:
%sql
--Creating sales table:

CREATE OR REPLACE TABLE sales(
  customer_id VARCHAR(1),
  order_date DATE,
  product_id INTEGER
);

INSERT INTO sales
  (customer_id, order_date, product_id)
VALUES
  ('A', '2021-01-01', '1'),
  ('A', '2021-01-01', '2'),
  ('A', '2021-01-07', '2'),
  ('A', '2021-01-10', '3'),
  ('A', '2021-01-11', '3'),
  ('A', '2021-01-11', '3'),
  ('B', '2021-01-01', '2'),
  ('B', '2021-01-02', '2'),
  ('B', '2021-01-04', '1'),
  ('B', '2021-01-11', '1'),
  ('B', '2021-01-16', '3'),
  ('B', '2021-02-01', '3'),
  ('C', '2021-01-01', '3'),
  ('C', '2021-01-01', '3'),
  ('C', '2021-01-07', '3');

num_affected_rows,num_inserted_rows
15,15


In [0]:
sales_df = spark.read.table("sales")
sales_df.show()

+-----------+----------+----------+
|customer_id|order_date|product_id|
+-----------+----------+----------+
|          A|2021-01-01|         1|
|          A|2021-01-01|         2|
|          A|2021-01-07|         2|
|          A|2021-01-10|         3|
|          A|2021-01-11|         3|
|          A|2021-01-11|         3|
|          B|2021-01-01|         2|
|          B|2021-01-02|         2|
|          B|2021-01-04|         1|
|          B|2021-01-11|         1|
|          B|2021-01-16|         3|
|          B|2021-02-01|         3|
|          C|2021-01-01|         3|
|          C|2021-01-01|         3|
|          C|2021-01-07|         3|
+-----------+----------+----------+



In [0]:
%sql
--Creating Menu table:

CREATE OR REPLACE TABLE menu (
  product_id INTEGER,
  product_name VARCHAR(5),
  price INTEGER
);

INSERT INTO menu
  (product_id, product_name, price)
VALUES
  ('1', 'sushi', '10'),
  ('2', 'curry', '15'),
  ('3', 'ramen', '12');


num_affected_rows,num_inserted_rows
3,3


In [0]:
menu_df = spark.read.table("menu")
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       sushi|   10|
|         2|       curry|   15|
|         3|       ramen|   12|
+----------+------------+-----+



In [0]:
%sql
--Creating members table:

CREATE OR REPLACE TABLE members (
  customer_id VARCHAR(1),
  join_date DATE
);

INSERT INTO members
  (customer_id, join_date)
VALUES
  ('A', '2021-01-07'),
  ('B', '2021-01-09');

num_affected_rows,num_inserted_rows
2,2


In [0]:
members_df = spark.read.table("members")
members_df .show()

+-----------+----------+
|customer_id| join_date|
+-----------+----------+
|          A|2021-01-07|
|          B|2021-01-09|
+-----------+----------+



In [0]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql import Window 
from pyspark.sql.functions import dense_rank

In [0]:
sales_df.display()
menu_df.display()
members_df.display()

customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
A,2021-01-11,3
B,2021-01-01,2
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1


product_id,product_name,price
1,sushi,10
2,curry,15
3,ramen,12


customer_id,join_date
A,2021-01-07
B,2021-01-09


In [0]:
#select sum(m.price), s.customer_id from menu as m join sales as s on m.product_id = s.product_id group by s.customer_id;

from pyspark.sql.window import Window
from pyspark.sql.functions import sum,count,col,row_number,desc,max,lead

joined_df=sales_df.join(menu_df,on="product_id")
window_spec=Window.partitionBy(sales_df.customer_id)

total_amount_df=joined_df.select("customer_id",sum("price").over(window_spec).alias("TotalSpentByCustomer")).distinct().show()

+-----------+--------------------+
|customer_id|TotalSpentByCustomer|
+-----------+--------------------+
|          A|                  76|
|          B|                  74|
|          C|                  36|
+-----------+--------------------+



In [0]:
#select count(order_date), customer_id from sales group by order_date;

#joined_df.select("customer_id","price").groupBy("customer_id").agg(sum("price")).show()
visitlog_df = sales_df.select("customer_id","order_date").groupBy("customer_id").agg(count("order_date")).show()

+-----------+-----------------+
|customer_id|count(order_date)|
+-----------+-----------------+
|          B|                6|
|          C|                3|
|          A|                6|
+-----------+-----------------+



In [0]:
#with cte as (select * from sales join menu on sales.product_id = menu.product_id),
#cte1 as ( select customer_id, order_date, product_name, row_number() over(partition by(customer_id) order by (order_date) asc)as row_num from cte)
#select * from cte1 where row_num=1;

join_df = sales_df.join(menu_df, sales_df.product_id == menu_df.product_id)
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
cte_df = join_df.withColumn("row_num", row_number().over(window_spec)).filter("row_num = 1")

result_df = cte_df.select("customer_id", "order_date", "product_name")
result_df.show()

+-----------+----------+------------+
|customer_id|order_date|product_name|
+-----------+----------+------------+
|          A|2021-01-01|       sushi|
|          B|2021-01-01|       curry|
|          C|2021-01-01|       ramen|
+-----------+----------+------------+



In [0]:
#select count(sales.product_id), any_value(product_name) from sales join menu on sales.product_id = menu.product_id group by sales.product_id, product_name ORDER BY count(sales.product_id) DESC LIMIT 1;

joined = sales_df.join(menu_df, 'product_id')
grouped = joined.groupBy("product_id", "product_name").agg(count("product_id").alias("num_purchase"))
ordered = grouped.orderBy("num_purchase", ascending = 0).drop('product_id')
result = ordered.limit(1)

result.show()

+------------+------------+
|product_name|num_purchase|
+------------+------------+
|       ramen|           8|
+------------+------------+



In [0]:
"""
SELECT s.customer_id, m.product_name, COUNT(s.product_id) as count_orders
FROM sales_df s
JOIN menu_df m ON s.product_id = m.product_id
GROUP BY s.customer_id, m.product_name
HAVING RANK() OVER (PARTITION BY s.customer_id ORDER BY COUNT(s.product_id) DESC) = 1;
"""

joined = sales_df.join(menu_df, 'product_id')
grouped = joined.groupBy('customer_id', 'product_name').agg(count('product_id').alias('count_orders'))
selected = grouped.withColumn('denserank', dense_rank().over(Window.partitionBy('customer_id').orderBy(col("count_orders").desc())))
filtered = selected.filter('denserank = 1')

result = filtered.drop('denserank')
result.show()


+-----------+------------+------------+
|customer_id|product_name|count_orders|
+-----------+------------+------------+
|          A|       ramen|           3|
|          B|       sushi|           2|
|          B|       ramen|           2|
|          B|       curry|           2|
|          C|       ramen|           3|
+-----------+------------+------------+



In [0]:
windowSpec = Window.partitionBy("customer_id").orderBy(col("order_date").desc())

result_df = sales_df.join(members_df, 'customer_id').filter(sales_df.order_date < members_df.join_date).withColumn('dense_rank', dense_rank().over(windowSpec)).filter('dense_rank = 1').join(menu_df, 'product_id').select('customer_id', 'order_date', 'product_name')

result_df.display()

customer_id,order_date,product_name
A,2021-01-01,curry
A,2021-01-01,sushi
B,2021-01-04,sushi


In [0]:
windowSpec = Window.partitionBy("customer_id").orderBy(col("order_date").desc())

result_df = sales_df.join(members_df, 'customer_id').filter(sales_df.order_date > members_df.join_date).withColumn('dense_rank', dense_rank().over(windowSpec)).filter('dense_rank = 1').join(menu_df, 'product_id').select('customer_id', 'order_date', 'product_name')

result_df.display()

customer_id,order_date,product_name
A,2021-01-11,ramen
A,2021-01-11,ramen
B,2021-02-01,ramen
