d ##### Usecase:Get sales data by products
###### Solutions are given for following problem statements
###### Problem statement-1:Get sales data for products per day
###### Problem statement-2:Get sales data for products per month
###### Problem statement-3:Get sales data for products per year
###### Problem statement-4:Get daily sales data from most sold products
###### Problem statement-5:Get third and final quarters of sales data from Top5 revenue generating products for each month
###### Problem statement-6:Get sales data for each product category
###### Problem statement-7:Get monthly sales data from Top5 revenue generating categories
###### Problem statement-8:Get highest product price for each category
###### Problem statement-9:Get the sales data of best-selling and the second best-selling products in every category
###### Problem statement-10:Get TOP5 highest selling product for each year

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum,round,avg,rank, col
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.functions import unix_timestamp, lit
spark = SparkSession.builder.master('local').appName('GetSalesDatabyproducts').enableHiveSupport().getOrCreate()

In [3]:
ordersdf = spark.read.csv("/FileStore/tables/retaildbtext/Orders.txt", sep=',',schema ='order_id int,order_date string,order_customer_id int, order_status string')
ordersdf.show(5,False)

In [4]:
orderItemsdf = spark.read.csv("/FileStore/tables/retaildbtext/Order_Items.txt", sep=',', schema='order_item_id int, order_item_order_id int, order_item_product_id  int, order_item_quantity int,order_item_subtotal float,order_item_product_price float')
orderItemsdf.show(5,False)

In [5]:
customerItemsdf = spark.read.csv("/FileStore/tables/retaildbtext/Customers.txt", sep=';', schema='customer_id int, customer_fname string, customer_lname string, customer_email string,customer_password string,customer_street string,customer_city string, customer_state string, customer_zipcode string')
customerItemsdf.show(5,False)

In [6]:
productItemsdf = spark.read.csv("/FileStore/tables/retaildbtext/Products.txt", sep=';', schema='product_id int, product_category_id int, product_name string, product_desc string, product_price float, product_image string ')
productItemsdf.show(5,False)

In [7]:
categoriesItemsdf = spark.read.csv("/FileStore/tables/retaildbtext/Categories.txt", sep=',', schema='category_id int, category_department_id int, category_name string')
categoriesItemsdf.show(5,False)

In [8]:
ordersMap = ordersdf.select('order_id','order_date','order_status')
ordersMap.show(5,False)

In [9]:
orderItemsMap=orderItemsdf.select('order_item_order_id','order_item_product_id','order_item_subtotal','order_item_quantity')
orderItemsMap.show(5,False)


In [10]:
ordersdfjoin = ordersMap.join(orderItemsMap, ordersMap.order_id == orderItemsMap.order_item_order_id,'inner')
ordersdfjoin.show(5,False)

In [11]:
ordersdfSalesmap = ordersdfjoin.select('order_id','order_date','order_item_product_id','order_status','order_item_subtotal','order_item_quantity')
ordersdfSalesmap.show(10,False)

In [12]:
productsMap = productItemsdf.select('product_id','product_category_id','product_name','product_price')
productsMap.show(5,False)

In [13]:
prodordersdfjoin = ordersdfSalesmap.join(productsMap, ordersdfSalesmap.order_item_product_id == productsMap.product_id,'inner')
prodordersdfjoin.show(5,False)

In [14]:
prodordersdfsalesmap =prodordersdfjoin.select('order_id','order_date','order_status','order_item_subtotal','order_item_quantity','product_id','product_category_id','product_name','product_price')
prodordersdfsalesmap.show(5,False)


In [15]:
prodordersdfsalesmap.printSchema()

In [16]:
prodcatdfsalesjoin = prodordersdfsalesmap.join(categoriesItemsdf, prodordersdfsalesmap.product_category_id == categoriesItemsdf.category_id,'inner')
prodcatdfsalesjoin.show(5,False)

In [17]:
prodordersdfsalesmap=prodcatdfsalesjoin.select('order_id','order_date','order_status','order_item_subtotal','order_item_quantity','product_id','product_category_id','product_name',\
                                               'product_price','category_name')
prodordersdfsalesmap.show(5,False)

###### Problem statement-1:Get sales data for products per day

In [19]:
productsSalesDataPerDay = prodordersdfsalesmap.groupBy('order_date','product_name').agg(round(avg('order_item_subtotal'),2).alias("avgsales"), 
                                                                min('order_item_subtotal').alias("min_sales"),
                                                                max('order_item_subtotal').alias("max_sales"),
                                                                round(sum('order_item_subtotal'),2).alias("Totalsales"),
                                                                sum('order_item_quantity').alias("Totalquantity"),
                                                                count('order_item_subtotal').alias("Numberofsales")).sort('order_date','product_name')
productsSalesDataPerDay.coalesce(1).write.csv("/FileStore/tables/productsSalesDataPerDay", compression="none", header ='true')
productsSalesDataPerDay.show(20,False)

###### Problem statement-2: Get sales data for products per month

In [21]:
productsSalesDataPerMonth = prodordersdfsalesmap.groupBy(substring('order_date',0,7).alias('ordermonth'),'product_name').agg(round(avg('order_item_subtotal'),2).alias("avgsales"), 
                                                                min('order_item_subtotal').alias("min_sales"),
                                                                max('order_item_subtotal').alias("max_sales"),
                                                                round(sum('order_item_subtotal'),2).alias("Totalsales"),
                                                                sum('order_item_quantity').alias("Totalquantity"),
                                                                count('order_item_subtotal').alias("Numberofsales")).sort('ordermonth','product_name')
productsSalesDataPerMonth.coalesce(1).write.csv("/FileStore/tables/productsSalesDataPerMonth", compression="none", header ='true')
productsSalesDataPerMonth.show(20,False)

###### Problem statement-3: Get sales data  for products per year

In [23]:
productsSalesDataPerYear = prodordersdfsalesmap.groupBy(substring('order_date',0,4).alias('orderyear'),'product_name').agg(round(avg('order_item_subtotal'),2).alias("avgsales"), 
                                                                min('order_item_subtotal').alias("min_sales"),
                                                                max('order_item_subtotal').alias("max_sales"),
                                                                round(sum('order_item_subtotal'),2).alias("Totalsales"),
                                                                sum('order_item_quantity').alias("Totalquantity"),
                                                                count('order_item_subtotal').alias("Numberofsales")).sort('orderyear','product_name')
productsSalesDataPerYear.coalesce(1).write.csv("/FileStore/tables/productsSalesDataPerYear", compression="none", header ='true')
productsSalesDataPerYear.show(20,False)

###### Problem statement-4:Get daily sales data from most sold products

In [25]:
soldproductsMap =  prodordersdfsalesmap.groupBy('order_date','product_id','product_name').agg(sum('order_item_quantity').alias("soldproductsperday"))
soldproductsMapsorted = soldproductsMap.sort('order_date','soldproductsperday',ascending=[1,0,0])
soldproductsMapsorted.coalesce(1).write.csv("/FileStore/tables/MostSoldProductseachday", compression="none", header ='true')
soldproductsMapsorted.show(20, False)

###### Problem statement-5:Get third and final quarters of 2013 sales data from Top5 revenue generating products for each month

In [27]:
SalesDataPerMonth=prodordersdfsalesmap.where((to_date(prodordersdfsalesmap.order_date) >=lit("2013-10-01")) & 
                                                 (to_date(prodordersdfsalesmap.order_date) <=lit("2013-12-31")))

productsSalesDataPerMonth = SalesDataPerMonth.groupBy(substring('order_date',0,7).alias('ordermonth'),'product_name').agg(round(avg('order_item_subtotal'),2).alias("avgsales"), 
                                                                round(sum('order_item_subtotal'),2).alias("Totalsales"),
                                                                sum('order_item_quantity').alias("Totalquantity"),
                                                                count('order_item_subtotal').alias("Numberofsales")).sort('ordermonth','product_name')

window = Window.partitionBy(productsSalesDataPerMonth['ordermonth']).orderBy(productsSalesDataPerMonth['Totalsales'].desc())
top5productspurchasedeachmonth=productsSalesDataPerMonth.select('ordermonth', 'product_name', 'Totalsales').withColumn('rank', rank().over(window).alias('rank'))
top5rankedproductseachmonth = top5productspurchasedeachmonth.filter(col('rank')<=5).sort('ordermonth','Totalsales',ascending=[1,0])
top5rankedproductseachmonth.coalesce(1).write.csv("/FileStore/tables/top5rankedproductseachmonth", compression="none", header ='true')
top5rankedproductseachmonth.show(20,False)

###### Problem statement-6:Get sales data for each product category

In [29]:
productsSalesDataPercategory = prodordersdfsalesmap.groupBy('product_category_id','category_name').agg(round(avg('order_item_subtotal'),2).alias("avgsales"), 
                                                                round(sum('order_item_subtotal'),2).alias("Totalsales"),
                                                                sum('order_item_quantity').alias("Totalquantity"),
                                                                count('order_item_subtotal').alias("Numberofsales")).sort('product_category_id','category_name')
productsSalesDataPercategory.coalesce(1).write.csv("/FileStore/tables/productsSalesDataPercategory", compression="none", header ='true')
productsSalesDataPercategory.show(60,False)

###### Problem statement-7:Get monthly sales data from Top5 revenue generating categories

In [31]:
prodSalesDatatop5catg = prodordersdfsalesmap.groupBy(substring('order_date',0,7).alias("ordermonth"),'product_category_id','category_name').\
                                                                agg(round(avg('order_item_subtotal'),2).alias("avgsales"), 
                                                                round(sum('order_item_subtotal'),2).alias("Totalsales"),
                                                                sum('order_item_quantity').alias("Totalquantity"),
                                                                count('order_item_subtotal').alias("Numberofsales")).sort('ordermonth','product_category_id','category_name')
window = Window.partitionBy(prodSalesDatatop5catg['ordermonth']).orderBy(prodSalesDatatop5catg['Totalsales'].desc())
top5productscategorieseachmonth=prodSalesDatatop5catg.select('ordermonth','product_category_id','category_name','Totalsales').withColumn('rank', rank().over(window).alias('rank'))
top5rankedproductscategorieseachmonth = top5productscategorieseachmonth.filter(col('rank')<=5).sort('ordermonth','Totalsales',ascending=[1,0]) 
top5rankedproductscategorieseachmonth.coalesce(1).write.csv("/FileStore/tables/top5rankedproductscategorieseachmonth", compression="none", header ='true')
top5rankedproductscategorieseachmonth.show(60,False)

###### Problem statement-8:Get highest product price for each category

In [33]:
productsSalesDatahighprice = prodordersdfsalesmap.groupBy('product_category_id','category_name').agg(max('product_price').alias("highestproductprice"))                                                                                                          .sort('product_category_id','category_name')
productsSalesDatahighprice.coalesce(1).write.csv("/FileStore/tables/productsSalesDatahighprice", compression="none", header ='true')
productsSalesDatahighprice.show(60,False)

###### Problem statement-9: Get the sales data of best-selling and the second best-selling products in every category

In [35]:
productSalesDatabstsellcategories = prodordersdfsalesmap.groupBy('product_category_id','category_name','product_name').\
                                                                agg(round(avg('order_item_subtotal'),2).alias("avgsales"), 
                                                                round(sum('order_item_subtotal'),2).alias("Totalsales"),
                                                                sum('order_item_quantity').alias("Totalquantity"),
                                                                count('order_item_subtotal').alias("Numberofsales")).sort('product_category_id','category_name','product_name')
window = Window.partitionBy(productSalesDatabstsellcategories['product_category_id']).orderBy(productSalesDatabstsellcategories['Totalsales'].desc())
top2productscategorieseachmonth=productSalesDatabstsellcategories.select('product_category_id','category_name','product_name','Totalsales').withColumn('rank', rank().over(window).alias('rank'))
top2rankedproductscategorieseachmonth = top2productscategorieseachmonth.filter(col('rank')<=2).sort('product_category_id','Totalsales',ascending=[1,0]) 
top2rankedproductscategorieseachmonth.coalesce(1).write.csv("/FileStore/tables/top2rankedproductscategorieseachmonth", compression="none", header ='true')
top2rankedproductscategorieseachmonth.show(60,False)

###### Problem statement-10:Get TOP5 highest selling product for each year

In [37]:
productsSalesDatatop5peryear = prodordersdfsalesmap.groupBy(substring('order_date',0,4).alias("orderyear"),'product_name').\
                                                                agg(round(avg('order_item_subtotal'),2).alias("avgsales"), 
                                                                round(sum('order_item_subtotal'),2).alias("Totalsales"),
                                                                sum('order_item_quantity').alias("Totalquantity"),
                                                                count('order_item_subtotal').alias("Numberofsales")).sort('orderyear','product_name','Totalsales')
window = Window.partitionBy(productsSalesDatatop5peryear['orderyear']).orderBy(productsSalesDatatop5peryear['Totalsales'].desc())
top5productscategorieseachmonth=productsSalesDatatop5peryear.select('orderyear','product_name','Totalsales').withColumn('rank', rank().over(window).alias('rank'))
top5rankedproductscategorieseachmonth = top5productscategorieseachmonth.filter(col('rank')<=5).sort('orderyear','Totalsales',ascending=[1,0]) 
top5rankedproductscategorieseachmonth.coalesce(1).write.csv("/FileStore/tables/top5rankedproductscategorieseachmonth", compression="none", header ='true')
top5rankedproductscategorieseachmonth.show(60,False)