In [0]:
# dataset 
# sale (csv file)
df_sale = spark.read.csv("/FileStore/tables/sales_data.csv", header=True, inferSchema=True)
# store (csv file)
df_store = spark.read.csv("/FileStore/tables/store_data.csv", header=True, inferSchema=True)


In [0]:
display(df_sale.filter(" sale_id = 'sale_id' "))


In [0]:
from pyspark.sql.functions import * 

# cleaning data : 
# handle null values in both sales_df and store_df_sale = df_sale.fillna()
# # remove duplicates and invalid rows
# # standardize date formats for proper analysis




In [0]:
# cleaning sale_df
df_sale = df_sale.fillna({"quantity": 0, "total_amount": 0})
df_sale = df_sale.dropDuplicates()
df_sale = df_sale.filter(col("sale_id").isNotNull())
df_sale = df_sale.filter(col("store_id").isNotNull())
df_sale = df_sale.filter(col("sale_date").isNotNull())
df_sale = df_sale.filter((col("quantity") > 0) & (col("total_amount") > 0))

display(df_sale)

In [0]:
from pyspark.sql.functions import *
# cleaning store_df
store_df = df_store.filter(col("store_id").isNotNull())
avg_store_size = store_df.select(avg("store_size")).first()[0]
print(avg_store_size)

store_df = store_df.fillna({"store_size":avg_store_size})
store_df = store_df.filter(col("open_date").isNotNull())
display(store_df)

2606.2


store_id,store_region,store_size,open_date
2.0,West,1510.0,2009-07-07
3.0,West,4989.0,2000-08-23
4.0,West,4942.0,2016-08-12
5.0,North,2606.2,2002-06-09
7.0,North,2606.2,2005-07-19
9.0,North,3653.0,2018-12-24
10.0,East,2606.2,2006-01-18
11.0,East,2606.2,2018-04-22
12.0,West,840.0,2000-03-22
13.0,West,2606.2,2009-10-03


In [0]:
# transforming 
# join the sales and store datasets 
combined_df = df_sale.join(store_df, on="store_id", how="inner")
display(combined_df)




store_id,sale_id,product_id,sale_date,quantity,total_amount,store_region,store_size,open_date
4.0,sale_289,50,2020-09-05,63,2908.22,West,4942.0,2016-08-12
17.0,sale_577,5,2020-06-22,86,890.78,East,2606.2,2010-09-23
5.0,sale_771,36,2020-11-28,68,2818.64,North,2606.2,2002-06-09
13.0,sale_514,38,2022-08-21,27,1210.16,West,2606.2,2009-10-03
20.0,sale_626,14,2021-06-05,21,914.44,North,4867.0,2019-11-25
2.0,sale_905,48,2022-06-11,45,1332.05,West,1510.0,2009-07-07
12.0,sale_119,27,2020-05-02,44,597.59,West,840.0,2000-03-22
9.0,sale_965,23,2021-10-02,94,2644.09,North,3653.0,2018-12-24
18.0,sale_614,47,2021-02-25,3,28.52,West,1191.0,2016-01-23
19.0,sale_730,6,2022-02-28,19,820.12,North,2606.2,2018-10-12


In [0]:
from pyspark.sql.functions import year, col , round 
# create new columns 
# sales per square foot: calculate total sales divided by store size 
combined_df = combined_df.withColumn("year", year(col("sale_date")))
combined_df = combined_df.withColumn("sales_per_sqft", round(col("total_amount")/col("store_size"),2))
display(combined_df)

# sale year: extract the year from the sale date 
# aggregate sales and quantity by store and region 
combined_df.createOrReplaceTempView("combined")
store_sale_sql = spark.sql("""
    select store_id , store_region, sum(total_amount) as total_sales, sum(quantity) as total_quantity
    from combined 
    group by store_id , store_region """)
display(store_sale_sql)
                           

store_id,sale_id,product_id,sale_date,quantity,total_amount,store_region,store_size,open_date,year,sales_per_sqft
4.0,sale_289,50,2020-09-05,63,2908.22,West,4942.0,2016-08-12,2020,0.59
17.0,sale_577,5,2020-06-22,86,890.78,East,2606.2,2010-09-23,2020,0.34
5.0,sale_771,36,2020-11-28,68,2818.64,North,2606.2,2002-06-09,2020,1.08
13.0,sale_514,38,2022-08-21,27,1210.16,West,2606.2,2009-10-03,2022,0.46
20.0,sale_626,14,2021-06-05,21,914.44,North,4867.0,2019-11-25,2021,0.19
2.0,sale_905,48,2022-06-11,45,1332.05,West,1510.0,2009-07-07,2022,0.88
12.0,sale_119,27,2020-05-02,44,597.59,West,840.0,2000-03-22,2020,0.71
9.0,sale_965,23,2021-10-02,94,2644.09,North,3653.0,2018-12-24,2021,0.72
18.0,sale_614,47,2021-02-25,3,28.52,West,1191.0,2016-01-23,2021,0.02
19.0,sale_730,6,2022-02-28,19,820.12,North,2606.2,2018-10-12,2022,0.31


store_id,store_region,total_sales,total_quantity
2.0,West,47392.19999999999,1648
4.0,West,38284.52,1240
11.0,East,66396.06000000001,2178
14.0,North,41333.41999999999,1778
13.0,West,65943.8,2351
5.0,North,49333.47,1818
20.0,North,53057.23000000001,2138
16.0,East,40136.36,1554
7.0,North,52866.74,1948
10.0,East,72579.89000000001,2453


In [0]:
# analysis 
# identify the top five stores based on total sales 
top_stores = spark.sql("""
    select store_id , sum(total_amount) as total_sale
        from combined 
        group by store_id 
        order by sum(total_amount) desc
        limit 5""")
# find the top five products by total quantity sold 
top_products = spark.sql("""
    select product_id , sum(total_amount) as total_sale
        from combined 
        group by product_id
        order by sum(total_amount) desc
        limit 5""")
display(top_stores)
display(top_products)


store_id,total_sale
10.0,72579.89000000001
11.0,66396.06000000001
13.0,65943.8
17.0,54421.82
20.0,53057.23000000001


product_id,total_sale
46,30442.28
14,29157.57
23,25776.19
11,25723.21
43,24186.57


In [0]:
top_stores.write.parquet("/FileStore/tables/top_stores")
top_products.write.parquet("/FileStore/tables/top_products")