In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,to_date, date_format, when
from pyspark.sql.functions import count, mean, stddev, min, max, skewness, kurtosis
from pyspark.sql import functions as F
import pandas as pd
from ydata_profiling import ProfileReport




# Initialize Spark session
spark = SparkSession.builder \
    .appName("Load Data from Default Schema") \
    .getOrCreate()


**Data Ingestion**

Standard practice is to load raw data into staging tables, perform cleaning and transformation and then load into Silver storage
from Silver storage all sources are reconciled and finally compiled at one place in Gold Layer. Due to lack of time I am performing transformation and cleaning in memory using spark Dataframes. 

In [0]:

# Dimensions data
# Import data from raw files in to the Dataframe.

# Loading the Dimension Tables here

df_product = spark.read.csv("/Volumes/azure_databricks/default/assignmentdata/product.csv", header=True, inferSchema=True)
df_location = spark.read.csv("/Volumes/azure_databricks/default/assignmentdata/location.csv", header=True, inferSchema=True)

**Data Profiling**

 Since its the first time loading files so creating a data profile report helps getting an indept insights into the data



In [0]:
result_pdf = df_product.select("*").toPandas()
profile = ProfileReport(result_pdf, title="Pandas Profiling Report")

# Saving report to file
#profile.to_file("/Volumes/azure_databricks/default/assignmentdata/Data_profiling_products.html")

#Display the report 
profile.to_notebook_iframe()

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

In [0]:
result_pdf = df_location.select("*").toPandas()
profile = ProfileReport(result_pdf, title="Pandas Profiling Report")
profile.to_notebook_iframe()

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

**Loading Transaction Fact Table**

Could have loaded these files in one go but few of the files had different headers and the column sequence was different that's why loading them one by one after fixing basic format issues that I caught

In [0]:
# Loading transactional Fact Data 

#Could have loaded these files in one go but few of the files had different headers and the column sequence was different that's why loading them one by one after fixing basic format issues that I caught


files = dbutils.fs.ls('/Volumes/azure_databricks/default/assignmentdata/')
dfs = []

for fi in files: 
  if "trans_fact" in fi.path :
        df = spark.read.csv(fi.path, header=True)
    # Append the DataFrame to the list
        try:
          if "trans_id" in df.columns :
              df = df.withColumnRenamed("trans_id", "trans_key")
          
          # Fix - Date format later on
          #df = df.withColumn("trans_dt", to_date("trans_dt"))
          df = df.select('store_location_key','product_key','collector_key','trans_dt','sales','units','trans_key')
          dfs.append(df)
        except :
            #print file name that has any issues while loading. We can setup an alert for such files.
            print(fi.path)
    


# Union all DataFrames in the list to create a single DataFrame
df_trans_fct = dfs[0]  # Initialize with the first DataFrame
for df in dfs[1:]:
    df_trans_fct = df_trans_fct.union(df)
df_trans_fct = df_trans_fct.na.fill(0, subset=['sales', 'units'])
# Show the combined DataFrame
#df_trans_fct.show()

# adding the column for marking loyalty status
df_trans_fct = df_trans_fct.withColumn('loyalty_status', when(col("collector_key")!= '-1', 1).otherwise(0))


**Counts Check**

In [0]:
total_count = df_trans_fct.count()
display(total_count)

200000

**Data Sampling**
Transactions data is a huge data set and its hard to create a detailed report using all of it.
 I am just picking a random sample of 1% of the data set and running the profiling report for just the sample

In [0]:
# Transactions data is a huge data set and its hard to create a detailed report using all of it. I am just picking a sample of 1% of the data set and running the profiling report for just the sample
# Calculate the fraction for the sample
sample_fraction = 0.01

# Take a random sample of 1% rows
sample_df = df_trans_fct.sample(withReplacement=False, fraction=sample_fraction, seed=42)

result_pdf = sample_df.select("*").toPandas()
profile = ProfileReport(result_pdf, title="Pandas Profiling Report")
profile.to_file("/Volumes/azure_databricks/default/assignmentdata/trans_fact_profile.html") 

**Join all data sets**

In [0]:
# Joining all three data sources

joined_df = df_trans_fct.join(df_product, 'product_key',how='left').join(df_location, 'store_location_key',how='left')


200000

**Basic join check to ensure no cross joins happened**

In [0]:
counts = joined_df.count()


display(counts)

# Counts for this joined dataframe is the same as df_trans_fct that means no cross joins happened causing duplicates. 


#Insights

In [0]:
# Province performance 
province_overall_performance = joined_df.groupby('province').agg(F.mean(joined_df.sales).alias('avg_sales')).orderBy(F.desc('avg_sales'))
display(province_overall_performance)


# Stores performance 
Stores_overall_performance = joined_df.groupby('store_num').agg(F.mean(joined_df.sales).alias('avg_sales')).orderBy(F.desc('avg_sales'))
display(Stores_overall_performance)

# Group by province and store, calculate sales performance
province_store_performance = joined_df.groupby('province', 'store_num').agg(F.mean(joined_df.sales).alias('avg_sales')).orderBy(F.desc('avg_sales'))
display(province_store_performance)


province,avg_sales
ALBERTA,41.50833914212801
MANITOBA,25.64660321627838
SASKATCHEWAN,21.616420387198737
BRITISH COLUMBIA,21.152805926608316
ONTARIO,15.792577054113352


Databricks visualization. Run in Databricks to view.

store_num,avg_sales
9807,157.11371912168363
9802,110.22534516765286
7125,63.80756264236902
7262,29.981304347826093
8185,26.45837209302325
7167,25.765927601809988
4823,25.703245883644424
8187,25.54258426966293
7317,22.664568081991213
8161,21.38518151815181


province,store_num,avg_sales
ALBERTA,9807,157.11371912168363
ALBERTA,9802,110.22534516765286
BRITISH COLUMBIA,7125,63.80756264236902
ALBERTA,7262,29.981304347826093
ONTARIO,8185,26.45837209302325
BRITISH COLUMBIA,7167,25.765927601809988
MANITOBA,4823,25.703245883644424
ONTARIO,8187,25.54258426966293
SASKATCHEWAN,7317,22.664568081991213
ONTARIO,8161,21.38518151815181


Databricks visualization. Run in Databricks to view.

In [0]:


#Calculate average store performance by province
avg_province_performance = province_store_performance.groupby('province').agg(F.median('avg_sales').alias('median_province_sales'))

# Join to get top stores compared to average province sales
top_stores_by_province = province_store_performance.join(avg_province_performance, 'province').withColumn('performance_vs_median', F.col('avg_sales') / F.col('median_province_sales')).orderBy( F.desc('avg_sales'))

display(top_stores_by_province)


province,store_num,avg_sales,median_province_sales,performance_vs_median
ALBERTA,9807,157.11371912168363,16.105377358490568,9.755357830150754
ALBERTA,9802,110.22534516765286,16.105377358490568,6.844008849599748
BRITISH COLUMBIA,7125,63.80756264236902,20.106973525872444,3.173404618067721
ALBERTA,7262,29.981304347826093,16.105377358490568,1.861571056701773
ONTARIO,8185,26.45837209302325,14.936387255785007,1.7714037296921092
BRITISH COLUMBIA,7167,25.765927601809988,20.106973525872444,1.2814423597194249
MANITOBA,4823,25.703245883644424,9.544074074074071,2.6931104771562717
ONTARIO,8187,25.54258426966293,14.936387255785007,1.7100911908781715
SASKATCHEWAN,7317,22.664568081991213,19.094680851063817,1.1869571562244
ONTARIO,8161,21.38518151815181,14.936387255785007,1.43175060688582


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
loyalty_vs_non_loyalty_sales = joined_df.groupby('loyalty_status').agg(F.mean('sales').alias('total_sales')).orderBy(F.desc('total_sales'))
display(loyalty_vs_non_loyalty_sales)

loyalty_status,total_sales
0,21.95492726393004
1,19.0483941660248


In [0]:
from pyspark.sql.functions import col, month
joined_df = joined_df.withColumn("trans_month", month(col("trans_dt")))

loyalty_monthly_sales = joined_df.groupby("trans_month",'loyalty_status').agg(F.mean('sales').alias('avg_sales')).orderBy(F.asc('trans_month'))
display(loyalty_monthly_sales)


trans_month,loyalty_status,avg_sales
,1,17.40146827432872
,0,18.13645607045517
1.0,0,14.076178010471196
1.0,1,23.331671845867
3.0,1,15.882
3.0,0,28.461129807692306
4.0,0,49.53128800490671
4.0,1,21.59740460327074
5.0,0,24.719330912292257
5.0,1,21.31650893796004


Databricks visualization. Run in Databricks to view.

In [0]:


# Group by loyalty status and category, calculate total sales
loyalty_category_sales = joined_df.groupby('loyalty_status', 'category').agg(F.sum('sales').alias('total_sales')).orderBy(F.desc('total_sales'))

# Determine contribution of each category to total sales
total_sales = joined_df.select(F.sum('sales')).collect()[0][0]
loyalty_category_sales = loyalty_category_sales.withColumn('sales_contribution', F.col('total_sales') / total_sales)
loyalty_category_sales = loyalty_category_sales.dropna()
display(loyalty_category_sales)

# Group by category, calculate total sales
category_sales = joined_df.groupby( 'category').agg(F.sum('sales').alias('total_sales')).orderBy(F.desc('total_sales'))

# Determine contribution of each category to total sales
total_sales = joined_df.select(F.sum('sales')).collect()[0][0]
category_sales = category_sales.withColumn('sales_contribution', F.col('total_sales') / total_sales)

display(category_sales.dropna())


# Get top 10 product categories by department
top_categories_by_dept = joined_df.groupby('department', 'category') \
    .agg(F.sum('sales').alias('total_sales')) \
    .orderBy('department', F.desc('total_sales')) \
    .groupBy('department').agg(F.collect_list('category').alias('top_categories')) \
    .limit(10)
display(top_categories_by_dept.dropna())
top_categories_by_dept.show(truncate=False)

loyalty_status,category,total_sales,sales_contribution
0,cef3760b,42635.60999999999,0.0100072269288179
0,fe148072,38665.170000000006,0.0090753041983291
0,ffcec4a7,29529.640000000007,0.0069310561900321
0,e49d14f1,27471.47999999999,0.0064479746960459
0,d5a0a65d,20324.52999999999,0.00477047669616
0,687ed9e3,20277.20999999998,0.0047593699715635
1,ffcec4a7,18956.31,0.0044493346266892
1,fe148072,18881.27,0.0044317215959682
0,5530c7b1,17457.270000000004,0.0040974871110708
0,11566ced,17369.479999999996,0.0040768814611908


Databricks visualization. Run in Databricks to view.

category,total_sales,sales_contribution
fe148072,57546.43999999992,0.0135070257942973
ffcec4a7,48485.94999999999,0.0113803908167214
cef3760b,42988.05000000002,0.01008994996383
e49d14f1,38208.15,0.0089680346447562
687ed9e3,30999.52999999991,0.0072760617567497
65d731c8,30583.02999999999,0.0071783028642218
d5a0a65d,30000.789999999924,0.0070416422697789
e0b38f5b,26709.38999999999,0.0062691005678187
11566ced,26635.67,0.0062517973611989
e934fcda,25895.36,0.006078035330641


Databricks visualization. Run in Databricks to view.

department,top_categories
a461091,List(687ed9e3)
34a2a7e0,List(1578f747)
5bffa719,"List(a4d52407, 6c504249, fa3a1bd8, a121fb78, 3ae24ac2, 999b3a55, 8f610228, 108c2838, 2dc2366a, 3f1695bd)"
7569cb40,List(cef3760b)
b947a4a9,"List(382cf3a, 8b4f9982, 3b380e17, 29ecadc0, 6761045, 6023eeb7, f2672c8c, 9db5a1ff)"
24d07cc8,List(50c418ce)
435ca98,"List(e0b38f5b, 21b19a94, 640d751d, 2836e915, e8eeb80f, a05bcbb)"
1a34cbb9,"List(fe148072, ffcec4a7, e49d14f1, 11566ced, e934fcda, 7703921f, a97ccc2c, d18e3df7, a8a688f9, 7b703ee1, fbe05f0d, d47ae288, f649b726, 18d11f6c, 58992f9a, 9c6fe52a, 98417e80, bdff86e, 86e671a9)"
89d0c9d1,List(511e5c1b)


+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|department|top_categories                                                                                                                                                                               |
+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|a461091   |[687ed9e3]                                                                                                                                                                                   |
|34a2a7e0  |[1578f747]                                                                                                                                                                      

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank
# Calculate total sales per store per province
sales_per_store_per_province = joined_df.groupBy("store_num", "province").agg({"sales": "sum"})

# Rank stores within each province based on sales
window_spec = Window.partitionBy("province").orderBy(col("sum(sales)").desc())
ranked_stores = sales_per_store_per_province.withColumn("rank", rank().over(window_spec))

# Filter to get top 5 stores per province
top_stores_by_province = ranked_stores.filter(col("rank") <= 5)

display(top_stores_by_province)


store_num,province,sum(sales),rank
9807,ALBERTA,1030351.7700000012,1
7296,ALBERTA,385435.04999999976,2
9802,ALBERTA,55884.25,3
7247,ALBERTA,17828.780000000013,4
7226,ALBERTA,9994.969999999994,5
7167,BRITISH COLUMBIA,113885.40000000015,1
7104,BRITISH COLUMBIA,101377.43000000012,2
7125,BRITISH COLUMBIA,28011.52,3
7194,BRITISH COLUMBIA,17304.700000000004,4
7175,BRITISH COLUMBIA,14096.529999999995,5


In [0]:
# Write joined_df to a CSV file in Azure Volume
joined_df.write.format('csv') \
    .option('header', 'true') \
    .mode('overwrite') \
    .save('/Volumes/azure_databricks/default/assignmentdata/')

