## Project Description
**Objective**:  
- Clean and prepare sales and store data
- Add derived columns for advanced insights
- Store performance and product trends

**Datasets**  
>  **Sales data**(CSV file)  
> - sale_id: unique id for each sale
> - store_id: store where the sale occured
> - product_id: product sold
> - sale_date: date of the sale
> - quantity: numbe rof items sold
> - total_amount: total amount of the sales(USD)

> **Store data**(CSV file)   
> - store_id: unique id of the store
> - store_region: region where the store is located
> - store_size: size of the store in square feet
> - open_date: date when the store was opened

**Project Requirements**  
- Handle null values in both of the datsets
- Remove duplicates and invalid rows

**Data Tranformations:**  
- Join the sales and store datasets  
- Create new columns
- per square foot : calculate total sales divided by the store size
- year: extract the year from the date col
- Aggregate sales and quantity by store and region

**Analysis**:
- Identify the top five stores based on total sales
- Find the top five products by total quantity sold

**Save results:**
- Save the transformed and aggregated results as parquet files


In [0]:
display(dbutils.fs.ls("/FileStore/tables/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/Child_notebook.ipynb,Child_notebook.ipynb,921,1742536467000
dbfs:/FileStore/tables/Customer_Analysis.ipynb,Customer_Analysis.ipynb,2802130,1742536469000
dbfs:/FileStore/tables/Delta_lake.ipynb,Delta_lake.ipynb,3951955,1742536470000
dbfs:/FileStore/tables/FirstExample.ipynb,FirstExample.ipynb,5099,1742536470000
dbfs:/FileStore/tables/Joins.ipynb,Joins.ipynb,1319362,1742536471000
dbfs:/FileStore/tables/New_Dir_by_utils/,New_Dir_by_utils/,0,0
dbfs:/FileStore/tables/New_Dir_using_magic_command/,New_Dir_using_magic_command/,0,0
dbfs:/FileStore/tables/Parent_notebook.ipynb,Parent_notebook.ipynb,1811,1742536471000
dbfs:/FileStore/tables/Product_analysis.ipynb,Product_analysis.ipynb,366544,1742536471000
dbfs:/FileStore/tables/RetailProject_DataCleaning.ipynb,RetailProject_DataCleaning.ipynb,815530,1742536472000


In [0]:
df_sales = spark.read.csv("/FileStore/tables/sales_data.csv",inferSchema = True,header = True)
df_store = spark.read.csv("/FileStore/tables/store_data.csv",inferSchema = True,header = True)

In [0]:
display(df_sales.limit(10))

sale_id,store_id,product_id,sale_date,quantity,total_amount
sale_1,19.0,46,2022-08-19,96,3024.78
sale_2,9.0,43,2021-03-15,100,4549.74
sale_3,3.0,20,2020-05-25,88,1477.8
sale_4,2.0,26,2022-06-25,30,231.85
sale_5,1.0,15,2021-06-15,83,2568.68
sale_6,14.0,3,2022-08-20,83,522.16
sale_7,8.0,14,2021-06-12,31,1301.42
sale_8,18.0,4,2021-12-01,95,680.5
sale_9,,37,2022-01-18,78,1698.47
sale_10,15.0,45,2022-02-07,34,385.49


In [0]:
from pyspark.sql.functions import col
filtered_df = df_sales.filter(col("sale_date").isNull())

display(filtered_df.limit(10))
# sale_id,store_id,sale_date has null values

sale_id,store_id,product_id,sale_date,quantity,total_amount
sale_12,1.0,35,,56,2017.73
sale_23,2.0,48,,84,1076.87
sale_26,14.0,1,,9,148.27
sale_36,4.0,17,,73,2985.87
sale_38,9.0,42,,56,1668.19
sale_40,2.0,11,,94,4293.04
sale_48,2.0,4,,16,88.11
sale_56,1.0,15,,-2,0.0
sale_57,19.0,2,,15,78.65
,9.0,3,,65,3002.72


In [0]:
from pyspark.sql.functions import col

df_sales = df_sales.filter(col("sale_id").isNotNull())
df_sales = df_sales.filter(col("store_id").isNotNull())
df_sales = df_sales.filter(col("sale_date").isNotNull())
df_sales = df_sales.filter((col("quantity")>0) & (col("total_amount")>0))
#  or
# df_sales = df_sales.na.drop()

display(df_sales.limit(10))

sale_id,store_id,product_id,sale_date,quantity,total_amount
sale_1,19.0,46,2022-08-19,96,3024.78
sale_2,9.0,43,2021-03-15,100,4549.74
sale_3,3.0,20,2020-05-25,88,1477.8
sale_4,2.0,26,2022-06-25,30,231.85
sale_5,1.0,15,2021-06-15,83,2568.68
sale_6,14.0,3,2022-08-20,83,522.16
sale_7,8.0,14,2021-06-12,31,1301.42
sale_8,18.0,4,2021-12-01,95,680.5
sale_10,15.0,45,2022-02-07,34,385.49
sale_11,10.0,11,2021-01-06,100,4283.9


In [0]:
filtered_df = df_sales.filter(col("sale_id").isNull())
display(filtered_df.limit(10))

sale_id,store_id,product_id,sale_date,quantity,total_amount


In [0]:
display(df_sales.filter(col("quantity") <= 0).count())
# df_sales=df_sales.filter(col("quantity") >0)

0

In [0]:
# checking duplicate values

# row_count=df_sales.count()
# print(row_count)
# distinct_row_count=df_sales.distinct().count()
# print(distinct_row_count)
# sale_id_count=df_sales.select("sale_id").count()
# print(sale_id_count)
# distinct_sale_id_count=df_sales.select("sale_id").distinct().count()
# print(distinct_sale_id_count)
#  or
df_sales=df_sales.dropDuplicates()

In [0]:
display(df_store)

store_id,store_region,store_size,open_date
1.0,West,2263.0,
2.0,West,1510.0,2009-07-07
3.0,West,4989.0,2000-08-23
4.0,West,4942.0,2016-08-12
5.0,North,,2002-06-09
,East,3107.0,2007-03-28
7.0,North,,2005-07-19
8.0,West,691.0,
9.0,North,3653.0,2018-12-24
10.0,East,,2006-01-18


In [0]:
from pyspark.sql.functions import avg

df_store = df_store.filter(col("store_id").isNotNull())
avg_store_size = df_store.select(avg("store_size")).collect()[0][0]
df_store = df_store.na.fill({"store_size":avg_store_size})
df_store = df_store.filter(col("open_date").isNotNull())
display(df_store)

store_id,store_region,store_size,open_date
2.0,West,1510.0,2009-07-07
3.0,West,4989.0,2000-08-23
4.0,West,4942.0,2016-08-12
5.0,North,2606.2,2002-06-09
7.0,North,2606.2,2005-07-19
9.0,North,3653.0,2018-12-24
10.0,East,2606.2,2006-01-18
11.0,East,2606.2,2018-04-22
12.0,West,840.0,2000-03-22
13.0,West,2606.2,2009-10-03


In [0]:
#checking duplicate values

# row_count=df_store.count()
# print(row_count)
# distinct_row_count=df_store.distinct().count()
# print(distinct_row_count)
# df_store_count=df_store.select("store_id").count()
# print(df_store_count)
# distinct_df_store_count=df_store.select("store_id").distinct().count()
# print(distinct_df_store_count)

# or
df_store = df_store.dropDuplicates()
df_store.count()


Out[12]: 16

In [0]:
main_df=df_sales.join(df_store, on="store_id",how='inner')
display(main_df.limit(10))

store_id,sale_id,product_id,sale_date,quantity,total_amount,store_region,store_size,open_date
5.0,sale_420,6,2022-07-16,37,295.1,North,2606.2,2002-06-09
4.0,sale_755,49,2021-05-04,12,505.45,West,4942.0,2016-08-12
20.0,sale_576,33,2021-02-07,2,89.61,North,4867.0,2019-11-25
19.0,sale_845,24,2020-06-16,97,2289.33,North,2606.2,2018-10-12
18.0,sale_999,11,2020-03-08,63,810.25,West,1191.0,2016-01-23
5.0,sale_364,5,2021-10-02,63,2923.55,North,2606.2,2002-06-09
11.0,sale_228,37,2021-11-27,37,394.83,East,2606.2,2018-04-22
5.0,sale_717,37,2020-02-05,66,2603.03,North,2606.2,2002-06-09
9.0,sale_2,43,2021-03-15,100,4549.74,North,3653.0,2018-12-24
13.0,sale_33,50,2022-05-02,67,1882.4,West,2606.2,2009-10-03


In [0]:
from pyspark.sql.functions import round,year
new_df = main_df.withColumn("SalePerSqrFoot",round(main_df.total_amount/main_df.store_size,2))
new_df = new_df.withColumn("sale_year",year("sale_date"))

display(new_df.limit(10))

store_id,sale_id,product_id,sale_date,quantity,total_amount,store_region,store_size,open_date,SalePerSqrFoot,sale_year
5.0,sale_420,6,2022-07-16,37,295.1,North,2606.2,2002-06-09,0.11,2022
4.0,sale_755,49,2021-05-04,12,505.45,West,4942.0,2016-08-12,0.1,2021
20.0,sale_576,33,2021-02-07,2,89.61,North,4867.0,2019-11-25,0.02,2021
19.0,sale_845,24,2020-06-16,97,2289.33,North,2606.2,2018-10-12,0.88,2020
18.0,sale_999,11,2020-03-08,63,810.25,West,1191.0,2016-01-23,0.68,2020
5.0,sale_364,5,2021-10-02,63,2923.55,North,2606.2,2002-06-09,1.12,2021
11.0,sale_228,37,2021-11-27,37,394.83,East,2606.2,2018-04-22,0.15,2021
5.0,sale_717,37,2020-02-05,66,2603.03,North,2606.2,2002-06-09,1.0,2020
9.0,sale_2,43,2021-03-15,100,4549.74,North,3653.0,2018-12-24,1.25,2021
13.0,sale_33,50,2022-05-02,67,1882.4,West,2606.2,2009-10-03,0.72,2022


In [0]:
new_df

Out[24]: DataFrame[store_id: double, sale_id: string, product_id: int, sale_date: date, quantity: int, total_amount: double, store_region: string, store_size: double, open_date: date, SalePerSqrFoot: double, sale_year: int]

In [0]:
# top 5 products by total quantity sold
from pyspark.sql.functions import col, sum
top5products = new_df.groupBy("product_id") \
          .agg(sum(col("quantity")).alias("total_quantity")) \
          .orderBy(col("total_quantity").desc()) \
          .limit(5)

top5products.show()

+----------+--------------+
|product_id|total_quantity|
+----------+--------------+
|        14|          1028|
|        30|          1000|
|        15|           911|
|        46|           881|
|        18|           876|
+----------+--------------+



In [0]:
# top 5 stores based on sales

top5stores = new_df.groupBy(col("store_id"))\
                .agg(sum(col("total_amount")).alias("total_sales"))\
                .orderBy(col("total_sales").desc()).limit(5)

top5stores.show() 

+--------+------------------+
|store_id|       total_sales|
+--------+------------------+
|    10.0| 72579.89000000001|
|    11.0|          66396.06|
|    13.0| 65943.80000000002|
|    17.0| 54421.82000000001|
|    20.0|53057.229999999996|
+--------+------------------+



In [0]:
# aggregate sale and quantity by store and region
new_df.createOrReplaceTempView("combined")

aggregated = spark.sql(
    """
    SELECT store_id,store_region,ROUND(SUM(total_amount),2) as total_sales,SUM(quantity) as total_quantity
    FROM combined
    GROUP BY store_id,store_region
    """
)

display(aggregated)

store_id,store_region,total_sales,total_quantity
2.0,West,47392.2,1648
4.0,West,38284.52,1240
11.0,East,66396.06,2178
14.0,North,41333.42,1778
13.0,West,65943.8,2351
5.0,North,49333.47,1818
20.0,North,53057.23,2138
16.0,East,40136.36,1554
7.0,North,52866.74,1948
10.0,East,72579.89,2453


In [0]:
#top 5 product based on quantity sold
top5products = spark.sql(
    """
    SELECT product_id,SUM(quantity) AS total_quantity
    FROM combined 
    GROUP BY product_id
    ORDER BY total_quantity DESC
    LIMIT 5
    """
)
display(top5products)

product_id,total_quantity
14,1028
30,1000
15,911
46,881
18,876


In [0]:
# top 5 stores based on sales
top5_store = spark.sql(
    """
    SELECT store_id,ROUND(SUM(total_amount),2) AS total_sales
    FROM combined 
    GROUP BY store_id
    ORDER BY total_sales DESC 
    LIMIT 5
    """
    )
display(top5_store)

store_id,total_sales
10.0,72579.89
11.0,66396.06
13.0,65943.8
17.0,54421.82
20.0,53057.23


In [0]:
top5products.write.mode("overwrite").parquet("/FileStore/tables/top_products")
top5_store.write.mode("overwrite").parquet("/FileStore/tables/top_stores")