In [0]:
import pyspark.pandas as ps

In [0]:
sales = ps.read_csv("/FileStore/tables/sales.csv")

In [0]:
print(sales)

   customer_id order_date  product_id
0            A 2021-01-01           2
1            A 2021-01-07           2
2            A 2021-01-10           3
3            A 2021-01-11           3
4            C 2021-01-01           3
5            C 2021-01-07           3
6            B 2021-01-02           2
7            B 2021-01-04           1
8            B 2021-01-11           1
9            B 2021-01-16           3
10           B 2021-02-01           3
11           C 2021-01-01           3
12           A 2021-01-11           3
13           B 2021-01-01           2
14           A 2021-01-01           1


In [0]:
from pyspark.sql.functions import *

In [0]:
from pyspark.sql import *

In [0]:
sales = spark.read.csv("/FileStore/tables/sales.csv",header=True)
menu = spark.read.csv("/FileStore/tables/menu.csv",header=True)
members = spark.read.csv("/FileStore/tables/members.csv",header=True)

In [0]:
display(sales)
display(menu)
display(members)

customer_id,order_date,product_id
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
C,2021-01-01,3
C,2021-01-07,3
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1
B,2021-01-16,3


product_id,product_name,price
1,sushi,10
3,ramen,12
2,curry,15


customer_id,join_date
A,2021-01-07
B,2021-01-09


In [0]:
totalamtspend = sales.join(menu,"product_id").groupBy('customer_id').agg({'price':'sum'}).withColumnRenamed("sum(price)","totalamount").orderBy("customer_id")
display(totalamtspend)

customer_id,totalamount
A,76.0
B,74.0
C,36.0


In [0]:
visitcount = sales.groupBy('customer_id').agg(countDistinct("order_date").alias("visitor_count"))
display(visitcount)

customer_id,visitor_count
B,6
C,2
A,4


In [0]:
window_spec= Window.partitionBy("customer_id").orderBy("order_date")
first_item = sales.withColumn("rank",dense_rank().over(window_spec)).filter("rank==1").join(menu,"product_id").select("customer_id","product_name").distinct()

display(first_item)

customer_id,product_name
A,curry
A,sushi
B,curry
C,ramen


In [0]:
most_purchase = sales.join(menu,'product_id').groupBy('product_id','product_name').agg(count('product_id').alias('most_purchase')).orderBy('most_purchase',ascending=0).drop('product_id')
display(most_purchase)

product_name,most_purchase
ramen,8
curry,4
sushi,3


In [0]:
most_popular = sales.join(menu,'product_id').groupBy('customer_id','product_name').agg(count('product_id').alias('order_count')).withColumn('rank',dense_rank().over(Window.partitionBy('customer_id').orderBy(col('order_count').desc()))).filter("rank==1").select('customer_id','product_name','order_count')

display(most_popular)

customer_id,product_name,order_count
A,ramen,3
B,sushi,2
B,ramen,2
B,curry,2
C,ramen,3


In [0]:
item_after_member= sales.join(members,'customer_id').filter(sales.order_date >= members.join_date).withColumn('rank',dense_rank().over(Window.partitionBy('customer_id').orderBy('order_date'))).filter('rank==1').join(menu,'product_id').select('customer_id','order_date','product_name')

display(item_after_member)

customer_id,order_date,product_name
A,2021-01-07,curry
B,2021-01-11,sushi


In [0]:
item_before_member = sales.join(members,'customer_id').filter(sales.order_date < members.join_date).withColumn('rank',dense_rank().over(Window.partitionBy('customer_id').orderBy(col('order_date').desc()))).filter('rank==1').join(menu,'product_id').select('customer_id','order_date','product_name')

display(item_before_member)

customer_id,order_date,product_name
A,2021-01-01,curry
A,2021-01-01,sushi
B,2021-01-04,sushi


In [0]:
total_item_amt = sales.join(menu,'product_id').join(members,'customer_id').filter(sales.order_date < members.join_date).withColumn('price',col('price').cast('integer')).groupBy('customer_id').agg(countDistinct('product_id').alias('unique_menu_item'), sum('price').alias('total_amt'))

display(total_item_amt)

customer_id,unique_menu_item,total_amt
B,2,40
A,2,25


In [0]:
points= sales.join(menu,'product_id').withColumn('points',when((col('product_id')==1),col('price')*20).otherwise(col('price')*10)).groupBy('customer_id').agg(sum('points').alias('total_points'))

display(points)

customer_id,total_points
B,940.0
C,360.0
A,860.0


In [0]:
members = members.withColumn('valid_date',date_add(col('join_date'),6)).withColumn('last_date',to_date(lit('2021-01-31')))

points2 = sales.join(members,'customer_id').join(menu,"product_id").filter('order_date < last_date').withColumn('points',when(col('product_name')=='sushi',2*10*col('price')).when((col('order_date') >= col('join_date')) & (col('order_date')<= col('last_date')),2*10*col("price")).otherwise(10 * col('price'))).groupBy('customer_id').agg(sum('points').alias('total_points'))


display(points2)

customer_id,total_points
B,940.0
A,1370.0


In [0]:
all = sales.join(menu,'product_id',"left").join(members,'customer_id',"left").withColumn('member',when(col('join_date')>col('order_date'),'N').when(col('join_date') <= col('order_date'),'Y').otherwise('N')).select('customer_id','order_date','product_name','price','member')

display(all)

customer_id,order_date,product_name,price,member
A,2021-01-01,curry,15,N
A,2021-01-07,curry,15,Y
A,2021-01-10,ramen,12,Y
A,2021-01-11,ramen,12,Y
C,2021-01-01,ramen,12,N
C,2021-01-07,ramen,12,N
B,2021-01-02,curry,15,N
B,2021-01-04,sushi,10,N
B,2021-01-11,sushi,10,Y
B,2021-01-16,ramen,12,Y


In [0]:
rankAll= sales.join(menu,'product_id',"left").join(members,'customer_id',"left").withColumn('member',when(col('join_date')>col('order_date'),'N').when(col('join_date') <= col('order_date'),'Y').otherwise('N')).withColumn('ranking',when(col('member')=='N',None).when(col('member') == 'Y',rank().over(Window.partitionBy('customer_id','member').orderBy('order_date'))).otherwise(0))

display(rankAll)

customer_id,product_id,order_date,product_name,price,join_date,valid_date,last_date,member,ranking
A,2,2021-01-01,curry,15,2021-01-07,2021-01-13,2021-01-31,N,
A,1,2021-01-01,sushi,10,2021-01-07,2021-01-13,2021-01-31,N,
A,2,2021-01-07,curry,15,2021-01-07,2021-01-13,2021-01-31,Y,1.0
A,3,2021-01-10,ramen,12,2021-01-07,2021-01-13,2021-01-31,Y,2.0
A,3,2021-01-11,ramen,12,2021-01-07,2021-01-13,2021-01-31,Y,3.0
A,3,2021-01-11,ramen,12,2021-01-07,2021-01-13,2021-01-31,Y,3.0
B,2,2021-01-01,curry,15,2021-01-09,2021-01-15,2021-01-31,N,
B,2,2021-01-02,curry,15,2021-01-09,2021-01-15,2021-01-31,N,
B,1,2021-01-04,sushi,10,2021-01-09,2021-01-15,2021-01-31,N,
B,1,2021-01-11,sushi,10,2021-01-09,2021-01-15,2021-01-31,Y,1.0


In [0]:
# target_path ='/mnt/rankAll'

# rankAll.write.format('parquet').save(target_path)

In [0]:
display(spark.read.parquet(target_path))

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-3855980594243669>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mdisplay[0m[0;34m([0m[0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mparquet[0m[0;34m([0m[0mtarget_path[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mNameError[0m: name 'target_path' is not defined