##Loading necessary libraries and Configuring Spark to ADLS Gen-2

In [0]:
import pandas as pd
import numpy as np
from pyspark.sql.types import *
import pyspark.sql.functions as F

spark.conf.set(
    "fs.azure.account.key.moviedatanalytics.dfs.core.windows.net","xs8eaOJ+uzl3/n8bz4au+ieUyllqksTZvuvLBa+ybkV3/DQjfPKN/75c/reepUbu/QMY2vM5HgUh+ASt7RUWaw==")

##Creating Paths

In [0]:
adls_source_path = 'abfss://amazonsalesanalyrics@moviedatanalytics.dfs.core.windows.net/'

master_data_path = adls_source_path + 'master_data/'

state_analysis_path = master_data_path + 'state_analysis/'
category_analysis_path = master_data_path + 'category_analysis/'
promotion_analysis_path = master_data_path + 'promotion_analysis/'
cancellation_analysis_path = master_data_path + 'cancellation_analysis/'
size_sales_amount_path = master_data_path + 'size_sales_amount/'
size_sales_quantity_path = master_data_path + 'size_sales_quantity_analysis/'
intl_only_path = master_data_path + 'intl_only_analysis/'

##Loading the processed Amazon India and Amazon Intl Sales Tables

In [0]:
retail_india_l0 = spark.sql("select * from amazon_india_sales_analysis.retail_india_data_l0")
retail_intl_l0 = spark.sql("select * from amazon_india_sales_analysis.retail_intl_data_l0")

##Analysis-1: Statewise Analysis

In [0]:
retail_india_l0.select('ship_state').distinct().display()

ship_state
DADRA AND NAGAR
SIKKIM
delhi
MEGHALAYA
Odisha
WEST BENGAL
Punjab
GOA
CHHATTISGARH
RAJASTHAN


In [0]:
retail_india_l0 = retail_india_l0.withColumn('ship_state',F.upper('ship_state'))

retail_india_l0.select('ship_state').distinct().display()

ship_state
DADRA AND NAGAR
SIKKIM
MEGHALAYA
WEST BENGAL
NEW DELHI
GOA
CHHATTISGARH
RAJASTHAN
APO
""


In [0]:
retail_india_l0 = retail_india_l0.withColumn("ship_state",F.when(F.col('ship_state') == "APO", F.lit('ANDHRA PRADESH')).otherwise(F.col('ship_state')))\
    .withColumn("ship_state",F.when(F.col('ship_state') == "AR", F.lit('ARUNACHAL PRADESH')).otherwise(F.col('ship_state')))\
    .withColumn("ship_state",F.when(F.col('ship_state') == "NL", F.lit('NAGALAND')).otherwise(F.col('ship_state')))\
    .withColumn("ship_state",F.when(F.col('ship_state') == "ORISSA", F.lit('ODISHA')).otherwise(F.col('ship_state')))\
    .withColumn("ship_state",F.when(F.col('ship_state') == "PB", F.lit('PUNJAB')).otherwise(F.col('ship_state')))\
    .withColumn("ship_state",F.when(F.col('ship_state') == "PUNJAB/MOHALI/ZIRAKPUR", F.lit('PUNJAB')).otherwise(F.col('ship_state')))\
    .withColumn("ship_state",F.when(F.col('ship_state') == "PONDICHERRY", F.lit('PUDUCHERRY')).otherwise(F.col('ship_state')))\
    .withColumn("ship_state",F.when(F.col('ship_state') == "RAJSHTHAN", F.lit('RAJASTHAN')).otherwise(F.col('ship_state')))\
    .withColumn("ship_state",F.when(F.col('ship_state') == "RAJSTHAN", F.lit('RAJASTHAN')).otherwise(F.col('ship_state')))\
    .withColumn("ship_state",F.when(F.col('ship_state') == "RJ", F.lit('RAJASTHAN')).otherwise(F.col('ship_state')))\
    .withColumnRenamed('ship_state','state')

In [0]:
order_quantity = retail_india_l0.groupBy('order_id').agg(F.sum('qty').alias('sum_order_quantity'),\
                                                         F.sum('amount').alias('sum_amount'))

state_wise_order_quantity = retail_india_l0.select('state','order_id').distinct().join(order_quantity, ['order_id'],'inner')

state_wise_order_quantity_analysis = state_wise_order_quantity.groupBy('state').agg(F.avg('sum_order_quantity').alias('Average_Order_Quantity'),F.avg('sum_amount').alias('Average_Sales_Amount'))

state_wise_analytics = retail_india_l0.groupBy('state').agg(F.countDistinct('order_id').alias('Number_of_Orders'), \
                                                              F.sum('qty').alias('Sales_Quantity'), \
                                                              F.sum('amount').alias('Sales_Amount'))


state_wise_order_analysis = state_wise_analytics.join(state_wise_order_quantity_analysis, ['state'], "inner")

In [0]:
state_wise_order_analysis.display()

state,Number_of_Orders,Sales_Quantity,Sales_Amount,Average_Order_Quantity,Average_Sales_Amount
DADRA AND NAGAR,53,58,39276.0,1.0943396226415094,770.1176470588235
SIKKIM,186,182,134847.0,0.978494623655914,774.9827586206897
MEGHALAYA,179,184,111234.0,1.0279329608938548,650.4912280701755
WEST BENGAL,5336,5318,3357170.0,0.9966266866566716,668.7589641434263
NEW DELHI,72,72,45609.0,1.0,670.7205882352941
GOA,1038,1055,622220.0,1.0163776493256262,623.4669338677355
CHHATTISGARH,821,822,541811.0,1.0012180267965896,688.4510800508259
RAJASTHAN,2369,2432,1683325.0,1.0265934993668215,751.149040606872
TRIPURA,132,134,86799.0,1.0151515151515151,683.4566929133858
DELHI,6214,6326,4185475.0,1.018023817186997,706.0517881241566


##Analysis-2: Category Analysis

In [0]:
order_quantity_category = retail_india_l0.groupBy('order_id','category').agg(F.sum('qty').alias('sum_order_quantity'),\
                                                         F.sum('amount').alias('sum_amount'))

category_wise_order_quantity_analysis = order_quantity_category.groupBy('category').agg(F.avg('sum_order_quantity').alias('Average_Order_Quantity'),F.avg('sum_amount').alias('Average_Sales_Amount'))

category_wise_analytics = retail_india_l0.groupBy('category').agg(F.countDistinct('order_id').alias('Number_of_Orders'), \
                                                              F.sum('qty').alias('Sales_Quantity'), \
                                                              F.sum('amount').alias('Sales_Amount'))


category_wise_order_analysis = category_wise_analytics.join(category_wise_order_quantity_analysis, ['category'], "inner")

In [0]:
category_wise_order_analysis.display()

category,Number_of_Orders,Sales_Quantity,Sales_Amount,Average_Order_Quantity,Average_Sales_Amount
KURTA,44136,45045,20452141.0,1.0205954323001631,489.6253620933184
SET,45358,45289,37662424.0,0.9984787689051544,877.8710549624726
ETHNIC DRESS,1091,1053,760711.0,0.9651695692025664,730.7502401536983
WESTERN DRESS,13907,13943,10629096.0,1.0025886244337383,791.6800238343512
SAREE,136,152,118509.0,1.1176470588235294,925.8515625
BLOUSE,848,863,434751.0,1.017688679245283,536.7296296296296
DUPATTA,2,3,915.0,1.5,457.5
BOTTOM,381,398,140226.0,1.0446194225721783,380.0162601626016
TOP,9779,9903,5203733.0,1.012680233152674,552.1202122015915


##Analysis-3: Promotion Analysis

In [0]:
promotion_sales = retail_india_l0.filter('promotion_ids == "Promotion"')
promotion_sales = promotion_sales.groupBy('category').agg(F.countDistinct('order_id').alias('Promotion_Order_Counts'),\
                                                          F.sum('qty').alias('Promotion_Quantity_Sold'),\
                                                          F.sum('amount').alias('Promotion_Sales'))

no_promotion_sales = retail_india_l0.filter('promotion_ids == "No Promotion"')
no_promotion_sales = no_promotion_sales.groupBy('category').agg(F.countDistinct('order_id').alias('No_Promotion_Order_Counts'),\
                                                                F.sum('qty').alias('No_Promotion_Quantity_Sold'),\
                                                                F.sum('amount').alias('No_Promotion_Sales'))

promotion_analysis = promotion_sales.join(no_promotion_sales, ['category'],"inner")

promotion_analysis = promotion_analysis.withColumn('Percent_Promotion_Sales_Impact',\
    ((F.col('Promotion_Sales')-F.col('No_Promotion_Sales'))/F.col('No_Promotion_Sales'))*100)\
    .withColumn('Promotion_Quantity_Impact', (F.col('Promotion_Quantity_Sold')-F.col('No_Promotion_Quantity_Sold')))

promotion_analysis = promotion_analysis.select('Category','Promotion_Order_Counts','No_Promotion_Order_Counts',\
                                                'Promotion_Quantity_Sold','No_Promotion_Quantity_Sold','Promotion_Quantity_Impact',\
                                                'Promotion_Sales','No_Promotion_Sales','Percent_Promotion_Sales_Impact')

In [0]:
promotion_analysis.display()

Category,Promotion_Order_Counts,No_Promotion_Order_Counts,Promotion_Quantity_Sold,No_Promotion_Quantity_Sold,Promotion_Quantity_Impact,Promotion_Sales,No_Promotion_Sales,Percent_Promotion_Sales_Impact
KURTA,25491,18647,28060,16985,11075,13090148.0,7361993.0,77.8071236959883
SET,31491,13869,33172,12117,21055,27944396.0,9718028.0,187.55212477263905
ETHNIC DRESS,641,450,649,404,245,484852.0,275859.0,75.76080533895939
WESTERN DRESS,10709,3202,11110,2833,8277,8438285.0,2190811.0,285.16718238131904
SAREE,97,39,120,32,88,93532.0,24977.0,274.47251471353644
BLOUSE,476,372,504,359,145,268011.0,166740.0,60.735876214465634
BOTTOM,269,112,294,104,190,107590.0,32636.0,229.6666258119868
TOP,5623,4157,5939,3964,1975,3160706.0,2043027.0,54.70701072477261


##Analysis-4:  Cancellation Impact Analysis

In [0]:
cancelled_orders = retail_india_l0.filter('status == "Cancelled"')
cancelled_orders = cancelled_orders.groupBy('category').agg(F.countDistinct('order_id').alias('Cancelled_Orders'),\
                                                            F.sum('qty').alias('Cancelled_Quantity'),\
                                                            F.sum('amount').alias('Cancelled_Amount'))

not_cancelled_orders = retail_india_l0.filter('status != "Cancelled"')
not_cancelled_orders = not_cancelled_orders.groupBy('category').agg(F.countDistinct('order_id').alias('Not_Cancelled_Orders'),\
                                                                    F.sum('qty').alias('Not_Cancelled_Quantity'),\
                                                                    F.sum('amount').alias('Not_Cancelled_Amount'))

cancellation_analysis = cancelled_orders.join(not_cancelled_orders, ['category'],"inner")

cancellation_analysis = cancellation_analysis.withColumn('Percent_Cancellation_Sale_Impact',\
                                        F.col('Cancelled_Amount')/(F.col('Not_Cancelled_Amount')+F.col('Cancelled_Amount'))*100)\
    .withColumn('Cancellation_Quantity_Impact', (F.col('Not_Cancelled_Quantity')-F.col('Cancelled_Quantity')))

cancellation_analysis = cancellation_analysis.select('Category','Cancelled_Orders','Not_Cancelled_Orders',\
                                                'Cancelled_Quantity','Not_Cancelled_Quantity','Cancellation_Quantity_Impact',\
                                                'Cancelled_Amount','Not_Cancelled_Amount','Percent_Cancellation_Sale_Impact')

In [0]:
cancellation_analysis.display()

Category,Cancelled_Orders,Not_Cancelled_Orders,Cancelled_Quantity,Not_Cancelled_Quantity,Cancellation_Quantity_Impact,Cancelled_Amount,Not_Cancelled_Amount,Percent_Cancellation_Sale_Impact
KURTA,4394,39742,2253,42792,40539,1026271.0,19425870.0,5.017914750343253
SET,4503,40855,2256,43033,40777,1930751.0,35731673.0,5.126465041124279
ETHNIC DRESS,87,1004,37,1016,979,27967.0,732744.0,3.676429024951658
WESTERN DRESS,983,12924,525,13418,12893,419506.0,10209590.0,3.946770261553758
SAREE,13,123,5,147,142,3815.0,114694.0,3.2191647891721304
BLOUSE,64,784,32,831,799,16362.0,418389.0,3.763533608893367
BOTTOM,25,356,14,384,370,4773.0,135453.0,3.4037910230627704
TOP,856,8923,535,9368,8833,299667.0,4904066.0,5.758692846077998


##Analysis-5:  Cancellation Impact Analysis

In [0]:
size_sales_agg = retail_india_l0.groupBy('Category', 'Size').agg(F.sum('qty').alias('Size_Quantity_Ordered'),\
                                                                 F.sum('amount').alias('Size_Sales_Amount'))

size_sales_amount = size_sales_agg.groupBy('Category').pivot('Size').agg(F.first('Size_Sales_Amount')).fillna(0)
size_sales_quantity = size_sales_agg.groupBy('Category').pivot('Size').agg(F.first('Size_Quantity_Ordered')).fillna(0)

column_order = ['Category','Free','XS','S','M','L','XL','XXL','3XL','4XL','5XL','6XL']

size_sales_amount = size_sales_amount.select(*column_order)
size_sales_quantity = size_sales_quantity.select(*column_order)

In [0]:
size_sales_amount.display()

Category,Free,XS,S,M,L,XL,XXL,3XL,4XL,5XL,6XL
KURTA,0.0,1186362.0,2253248.0,3419517.0,3579090.0,3473041.0,3103599.0,2347787.0,268861.0,347120.0,473516.0
SET,0.0,4109002.0,5621797.0,6976297.0,6104693.0,5680664.0,4606076.0,4350547.0,56457.0,67689.0,89202.0
ETHNIC DRESS,0.0,52878.0,120479.0,128440.0,116475.0,130405.0,95013.0,117021.0,0.0,0.0,0.0
WESTERN DRESS,0.0,951805.0,1496487.0,1771200.0,1920586.0,1638053.0,1500328.0,1350637.0,0.0,0.0,0.0
SAREE,118509.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
BLOUSE,75644.0,9427.0,69564.0,75436.0,68799.0,69066.0,66815.0,0.0,0.0,0.0,0.0
DUPATTA,915.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
BOTTOM,0.0,8806.0,21534.0,17692.0,23295.0,23032.0,22922.0,22945.0,0.0,0.0,0.0
TOP,0.0,440743.0,603786.0,913963.0,871806.0,903552.0,863365.0,606518.0,0.0,0.0,0.0


In [0]:
size_sales_quantity.display()

Category,Free,XS,S,M,L,XL,XXL,3XL,4XL,5XL,6XL
KURTA,0,2783,5157,7713,8043,7838,6952,5135,352,456,616
SET,0,4882,6708,8357,7399,6873,5608,5287,46,57,72
ETHNIC DRESS,0,77,163,167,170,173,143,160,0,0,0
WESTERN DRESS,0,1280,1962,2326,2546,2125,1944,1760,0,0,0
SAREE,152,0,0,0,0,0,0,0,0,0,0
BLOUSE,211,25,128,138,128,116,117,0,0,0,0
DUPATTA,3,0,0,0,0,0,0,0,0,0,0
BOTTOM,0,26,67,50,67,64,64,60,0,0,0
TOP,0,870,1146,1696,1647,1734,1688,1122,0,0,0


##Analysis-6: International Order Only Analysis

In [0]:
intl_order_only_order_ids = retail_intl_l0.selectExpr('intl_order_id as order_id').join(retail_india_l0,['order_id'],"leftanti")
retail_intl_l0 = retail_intl_l0.select('intl_order_id','category','intl_amount')\
                                .join(intl_order_only_order_ids.selectExpr('order_id as intl_order_id'),['intl_order_id'],"inner")

intl_only_analysis = retail_intl_l0.groupBy('category').agg(F.countDistinct('intl_order_id').alias('Number_of_Orders'),\
                                                            F.sum('intl_amount').alias('Sales_Amount'))

In [0]:
intl_only_analysis.display()

category,Number_of_Orders,Sales_Amount
KURTA,2424,1042282.979999997
SET,2487,1855257.1200000064
ETHNIC DRESS,57,42388.540000000015
WESTERN DRESS,1087,677005.1599999988
SAREE,8,5424.76
BLOUSE,49,25512.410000000007
BOTTOM,29,19356.34
TOP,375,168213.5999999999


## Writing Output tables to Data Lake Gen-2 in Parquet Format 

In [0]:
state_wise_order_analysis.write.mode("overwrite").parquet(path=state_analysis_path)
category_wise_order_analysis.write.mode("overwrite").parquet(path=category_analysis_path)
promotion_analysis.write.mode("overwrite").parquet(path=promotion_analysis_path)
cancellation_analysis.write.mode("overwrite").parquet(path=cancellation_analysis_path)
size_sales_amount.write.mode("overwrite").parquet(path=size_sales_amount_path)
size_sales_quantity.write.mode("overwrite").parquet(path=size_sales_quantity_path)
intl_only_analysis.write.mode("overwrite").parquet(path=intl_only_path)