## Downloading csv files from S3

In [1]:
import boto3
from botocore import UNSIGNED
from botocore.config import Config
import os

# Step 1: Set up the unsigned S3 client
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))

bucket_name = 'wcd-de-midterm-nl-public'

# Step 2: List all objects in the bucket
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket_name)

downloaded_files = []

for page in pages:
    if 'Contents' not in page:
        print(f"No files found in bucket '{bucket_name}' with prefix '{prefix}'")
        break

    for obj in page['Contents']:
        key = obj['Key']
        filename = os.path.basename(key)

        # Skip empty keys (e.g., "folder/")
        if not filename:
            continue

        # Download the file to the current directory
        s3.download_file(bucket_name, key, filename)
        print(f"✅ Downloaded: {filename}")
        downloaded_files.append(filename)

print(f"\n🎉 Downloaded {len(downloaded_files)} files:")
print(downloaded_files)


✅ Downloaded: calendar_mid.csv
✅ Downloaded: inventory_mid.csv
✅ Downloaded: product_mid.csv
✅ Downloaded: sales_mid.csv
✅ Downloaded: store_mid.csv

🎉 Downloaded 5 files:
['calendar_mid.csv', 'inventory_mid.csv', 'product_mid.csv', 'sales_mid.csv', 'store_mid.csv']


## Creating Spark Session

In [2]:
import pyspark
from pyspark.sql.types import StringType,BooleanType,DateType, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

spark = SparkSession.builder.getOrCreate()
spark

25/07/02 10:49:04 WARN Utils: Your hostname, DESKTOP-MVSJR25 resolves to a loopback address: 127.0.1.1; using 192.168.151.161 instead (on interface eth0)
25/07/02 10:49:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/02 10:49:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load the Data

**My analysis and insights are done based on the data as of July 2, 2025**

In [3]:
calendar_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load("calendar_mid.csv")
product_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load("product_mid.csv")
inventory_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load("inventory_mid.csv")
store_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load("store_mid.csv")
sales_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load("sales_mid.csv")

calendar_df.show(2)
product_df.show(2)
inventory_df.show(2)
store_df.show(2)
sales_df.show(2)

                                                                                

+----------+-------------+-------------+--------------+------+------+---------+--------+-----------+-------+----------+
|    CAL_DT|CAL_TYPE_DESC|DAY_OF_WK_NUM|DAY_OF_WK_DESC|YR_NUM|WK_NUM|YR_WK_NUM|MNTH_NUM|YR_MNTH_NUM|QTR_NUM|YR_QTR_NUM|
+----------+-------------+-------------+--------------+------+------+---------+--------+-----------+-------+----------+
|1998-06-04|       Fiscal|            4|      Thursday|  1998|    23|   199823|       6|      19986|      2|     19982|
|1998-04-27|       Fiscal|            1|        Monday|  1998|    18|   199818|       5|      19985|      2|     19982|
+----------+-------------+-------------+--------------+------+------+---------+--------+-----------+-------+----------+
only showing top 2 rows

+--------+--------------+-----+----+----------+-----------+----------------+------------+-------------+---------------+----------------+
|PROD_KEY|     PROD_NAME|  VOL| WGT|BRAND_NAME|STATUS_CODE|STATUS_CODE_NAME|CATEGORY_KEY|CATEGORY_NAME|SUBCATEGORY_KEY

## Exploring the Calendar Table

In [4]:
calendar_df.select('DAY_OF_WK_NUM').distinct().orderBy('DAY_OF_WK_NUM').show()

+-------------+
|DAY_OF_WK_NUM|
+-------------+
|            0|
|            1|
|            2|
|            3|
|            4|
|            5|
|            6|
+-------------+



In [5]:
# mapping out the day of week number to whether it is Monday, Tuesday, etc. As every calendar may have a slightly different interpretation of this mapping assignment.

# We see that 0 corresponds to Sunday and 6 to Saturday. Hence Sunday is the first day of the week while Saturday is the last day of the week
days_of_week=range(7)
for day in days_of_week:
    print(day)
    calendar_df.filter(col('DAY_OF_WK_NUM')==day).select('DAY_OF_WK_DESC').distinct().show()

0
+--------------+
|DAY_OF_WK_DESC|
+--------------+
|        Sunday|
+--------------+

1
+--------------+
|DAY_OF_WK_DESC|
+--------------+
|        Monday|
+--------------+

2
+--------------+
|DAY_OF_WK_DESC|
+--------------+
|       Tuesday|
+--------------+

3
+--------------+
|DAY_OF_WK_DESC|
+--------------+
|     Wednesday|
+--------------+

4
+--------------+
|DAY_OF_WK_DESC|
+--------------+
|      Thursday|
+--------------+

5
+--------------+
|DAY_OF_WK_DESC|
+--------------+
|        Friday|
+--------------+

6
+--------------+
|DAY_OF_WK_DESC|
+--------------+
|      Saturday|
+--------------+



In [6]:
calendar_df.withColumn('CAL_DT', calendar_df.CAL_DT.cast(StringType()))\
    .select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in calendar_df.columns]
   ).show()

+------+-------------+-------------+--------------+------+------+---------+--------+-----------+-------+----------+
|CAL_DT|CAL_TYPE_DESC|DAY_OF_WK_NUM|DAY_OF_WK_DESC|YR_NUM|WK_NUM|YR_WK_NUM|MNTH_NUM|YR_MNTH_NUM|QTR_NUM|YR_QTR_NUM|
+------+-------------+-------------+--------------+------+------+---------+--------+-----------+-------+----------+
|     0|            0|            0|             0|     0|     0|        0|       0|          0|      0|         0|
+------+-------------+-------------+--------------+------+------+---------+--------+-----------+-------+----------+



## Exploring the Store Table

In [7]:
# Finding out the different provinces and states for the stores. This is to inform us of how we can incorporate geography into our dashboards later
# We see that this database focuses primarily on US stores, as the abbreviations are all US states
store_df.select("PROV_STATE_CD").orderBy("PROV_STATE_CD").distinct().show(100)

+-------------+
|PROV_STATE_CD|
+-------------+
|           SC|
|           AZ|
|           LA|
|           MN|
|           NJ|
|           OR|
|           VA|
|         null|
|           RI|
|           KY|
|           NH|
|           MI|
|           NV|
|           WI|
|           ID|
|           CA|
|           CT|
|           NE|
|           MT|
|           NC|
|           MD|
|           MO|
|           IL|
|           ND|
|           WA|
|           MS|
|           AL|
|           IN|
|           OH|
|           TN|
|           NM|
|           IA|
|           PA|
|           SD|
|           NY|
|           TX|
|           WV|
|           GA|
|           MA|
|           KS|
|           FL|
|           CO|
|           AK|
|           AR|
|           OK|
|           UT|
+-------------+



In [8]:
# Seeing if there's nulls in the store_df table
# A few nulls exist for the geographical information, such as state and postal code, but since they are not part of the calculations for the fact table, we do not need to worry about them for the ETL process
store_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in store_df.columns]
   ).show()

+---------+---------+----------+----+----+------+--------+--------+-------------+---------------+-------------+-------------+---------------+----------+----------+----------+-----------+-------------+--------------+--------+---------+
|STORE_KEY|STORE_NUM|STORE_DESC|ADDR|CITY|REGION|CNTRY_CD|CNTRY_NM|POSTAL_ZIP_CD|PROV_STATE_DESC|PROV_STATE_CD|STORE_TYPE_CD|STORE_TYPE_DESC|FRNCHS_FLG|STORE_SIZE|MARKET_KEY|MARKET_NAME|SUBMARKET_KEY|SUBMARKET_NAME|LATITUDE|LONGITUDE|
+---------+---------+----------+----+----+------+--------+--------+-------------+---------------+-------------+-------------+---------------+----------+----------+----------+-----------+-------------+--------------+--------+---------+
|        0|        0|         0|   0|   0|   151|       0|       0|          151|             20|           20|            0|              0|       151|       151|         0|          0|            0|             0|       0|        0|
+---------+---------+----------+----+----+------+--------+--

## Exploring the Product Table

In [9]:
# Seeing if there's nulls in the product_df table
# No nulls to worry about
product_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in product_df.columns]
   ).show()

+--------+---------+---+---+----------+-----------+----------------+------------+-------------+---------------+----------------+
|PROD_KEY|PROD_NAME|VOL|WGT|BRAND_NAME|STATUS_CODE|STATUS_CODE_NAME|CATEGORY_KEY|CATEGORY_NAME|SUBCATEGORY_KEY|SUBCATEGORY_NAME|
+--------+---------+---+---+----------+-----------+----------------+------------+-------------+---------------+----------------+
|       0|        0|  0|  0|         0|          0|               0|           0|            0|              0|               0|
+--------+---------+---+---+----------+-----------+----------------+------------+-------------+---------------+----------------+



## Exploring the Fact (Inventory and Sales) Tables

In [10]:
# Seeing whether the out_of_stock_flg column contains values that aren't 0 nor 1
# Because it only contains either 0 or 1, as it should, then we don't have to worry about data quality when creating columns based on this one for the ETL step
inventory_df.select('OUT_OF_STOCK_FLG').distinct().show()



+----------------+
|OUT_OF_STOCK_FLG|
+----------------+
|               1|
|               0|
+----------------+



                                                                                

In [11]:
# Seeing if there's nulls in the inventory_df table
# No nulls in the inventory_df table
inventory_df.withColumn('CAL_DT', inventory_df.CAL_DT.cast(StringType()))\
    .withColumn('PROMOTION_FLG', inventory_df.PROMOTION_FLG.cast(IntegerType()))\
    .withColumn('NEXT_DELIVERY_DT', inventory_df.NEXT_DELIVERY_DT.cast(StringType()))\
    .select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in inventory_df.columns]
   ).show()



+------+---------+--------+---------------------+----------------------+----------------+---------+-------------+----------------+
|CAL_DT|STORE_KEY|PROD_KEY|INVENTORY_ON_HAND_QTY|INVENTORY_ON_ORDER_QTY|OUT_OF_STOCK_FLG|WASTE_QTY|PROMOTION_FLG|NEXT_DELIVERY_DT|
+------+---------+--------+---------------------+----------------------+----------------+---------+-------------+----------------+
|     0|        0|       0|                    0|                     0|               0|        0|            0|               0|
+------+---------+--------+---------------------+----------------------+----------------+---------+-------------+----------------+



                                                                                

In [12]:
# Seeing if there's nulls in the sales_df table
# No nulls in the sales_df table
sales_df.withColumn('TRANS_DT', sales_df.TRANS_DT.cast(StringType()))\
    .select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sales_df.columns]
   ).show()



+--------+--------+--------+---------+----------+---------+-----------+---------+--------+----------+----------+---------+-------+
|TRANS_DT|TRANS_ID|PROD_KEY|STORE_KEY|TRANS_TIME|SALES_QTY|SALES_PRICE|SALES_AMT|DISCOUNT|SALES_COST|SALES_MGRN|SHIP_COST|NEXTVAL|
+--------+--------+--------+---------+----------+---------+-----------+---------+--------+----------+----------+---------+-------+
|       0|       0|       0|        0|         0|        0|          0|        0|       0|         0|         0|        0|      0|
+--------+--------+--------+---------+----------+---------+-----------+---------+--------+----------+----------+---------+-------+



                                                                                

In [13]:
# Finding out the date range of the inventory and sales tables
# We see that the earliest date for the "fact" tables are on the Jan 1, 2022 and the latest day is December 30, 2025. This is a static table rather than a traditional ETL dataset for the sake of this project.
inventory_df.select(min('CAL_DT')).show()
inventory_df.select(max('CAL_DT')).show()
sales_df.select(min('TRANS_DT')).show()
sales_df.select(max('TRANS_DT')).show()

                                                                                

+-----------+
|min(CAL_DT)|
+-----------+
| 2022-01-01|
+-----------+



                                                                                

+-----------+
|max(CAL_DT)|
+-----------+
| 2025-12-30|
+-----------+



                                                                                

+-------------+
|min(TRANS_DT)|
+-------------+
|   2022-01-01|
+-------------+





+-------------+
|max(TRANS_DT)|
+-------------+
|   2025-12-30|
+-------------+



                                                                                

## Mapping out Schema

This is to check whether all the columns that will be joined have the same data type as each other. If not, then we need to do data type conversions so that the joins will actually work

In [14]:
# We see that for all the columns of the tables that we want to join, the data types are the same between them. Hence there is no need to convert data types
print('calendar_df schema')
calendar_df.printSchema()
print('product_df schema')
product_df.printSchema()
print('inventory_df schema')
inventory_df.printSchema()
print('store_df schema')
store_df.printSchema()
print('sales_df schema')
sales_df.printSchema()

calendar_df schema
root
 |-- CAL_DT: date (nullable = true)
 |-- CAL_TYPE_DESC: string (nullable = true)
 |-- DAY_OF_WK_NUM: integer (nullable = true)
 |-- DAY_OF_WK_DESC: string (nullable = true)
 |-- YR_NUM: integer (nullable = true)
 |-- WK_NUM: integer (nullable = true)
 |-- YR_WK_NUM: integer (nullable = true)
 |-- MNTH_NUM: integer (nullable = true)
 |-- YR_MNTH_NUM: integer (nullable = true)
 |-- QTR_NUM: integer (nullable = true)
 |-- YR_QTR_NUM: integer (nullable = true)

product_df schema
root
 |-- PROD_KEY: integer (nullable = true)
 |-- PROD_NAME: string (nullable = true)
 |-- VOL: double (nullable = true)
 |-- WGT: double (nullable = true)
 |-- BRAND_NAME: string (nullable = true)
 |-- STATUS_CODE: integer (nullable = true)
 |-- STATUS_CODE_NAME: string (nullable = true)
 |-- CATEGORY_KEY: integer (nullable = true)
 |-- CATEGORY_NAME: string (nullable = true)
 |-- SUBCATEGORY_KEY: integer (nullable = true)
 |-- SUBCATEGORY_NAME: string (nullable = true)

inventory_df schem

## Combining the sales data into daily aggregations

In [15]:
daily_sales_df = sales_df.groupBy('TRANS_DT', 'PROD_KEY', 'STORE_KEY')\
    .agg(sum('SALES_QTY').alias('SALES_QTY'),\
    sum('SALES_AMT').alias('SALES_AMT'),\
    sum('SALES_COST').alias('SALES_COST')
    )
daily_sales_df.show()



+----------+--------+---------+---------+---------+----------+
|  TRANS_DT|PROD_KEY|STORE_KEY|SALES_QTY|SALES_AMT|SALES_COST|
+----------+--------+---------+---------+---------+----------+
|2022-01-01| 1064589|     3310|        6|      611|      1207|
|2022-01-02|  149455|     2365|       22|     4902|      3523|
|2022-01-02|    5889|     4811|       43|      492|       399|
|2022-01-02|  470743|     1106|        4|      991|      1360|
|2022-01-02|  704553|     4927|       14|      110|       119|
|2022-01-02|  149455|     5104|       26|     4412|      3523|
|2022-01-03|  738097|     2170|        4|      807|      1306|
|2022-01-03|  245758|     4170|       11|      111|        89|
|2022-01-03|  731566|     4220|       23|      155|       149|
|2022-01-03|  731566|     4928|       20|      155|       149|
|2022-01-03|  609559|     3170|        8|      755|      1100|
|2022-01-03|  450797|     1054|       12|      184|       398|
|2022-01-03|  731566|     8102|       29|      121|    

                                                                                

## Creating a table that contains all permutations of week, products, and stores

In [16]:
aggregated_df = calendar_df.select('YR_WK_NUM').distinct()\
    .join(product_df.select('PROD_KEY').distinct())\
        .join(store_df.select('STORE_KEY').distinct())
aggregated_df.show()

+---------+--------+---------+
|YR_WK_NUM|PROD_KEY|STORE_KEY|
+---------+--------+---------+
|   202641|  275481|     4818|
|   202641|  275481|     4190|
|   202641|  275481|     4929|
|   202641|  275481|     8105|
|   202641|  275481|     3220|
|   202641|  275481|     6010|
|   202641|  275481|     5030|
|   202641|  275481|     9018|
|   202641|  275481|     4923|
|   202641|  275481|     9006|
|   202641|  275481|     2180|
|   202641|  275481|     2355|
|   202641|  275481|     4155|
|   202641|  275481|     2240|
|   202641|  275481|     5040|
|   202641|  275481|     9016|
|   202641|  275481|     5105|
|   202641|  275481|     3180|
|   202641|  275481|     4120|
|   202641|  275481|     2148|
+---------+--------+---------+
only showing top 20 rows



## Exploring Joins and Null Values for the tables

The purpose is to find out how to inform of the most appropriate joins given the information on the nulls that get returned from a certain join type

In [17]:
# We see that only the sales table columns have nulls but not the inventory table. The question becomes whether we do an inner join or left join. This depends on the percentage of nulls that exist in the sales table compared to how many rows are there in total. If this percentage is large, then this could skew the fact table metrics and an inner join may be a better option. We will find out below.
sales_inv_combined_eda_df = daily_sales_df\
    .join(inventory_df, (daily_sales_df['TRANS_DT']==inventory_df['CAL_DT']) & (daily_sales_df['PROD_KEY']==inventory_df['PROD_KEY']) & (daily_sales_df['STORE_KEY']==inventory_df['STORE_KEY']), 'outer')\
        .select(daily_sales_df['TRANS_DT'], 
                daily_sales_df['PROD_KEY'],
                daily_sales_df['STORE_KEY'],
                inventory_df['CAL_DT'],
                inventory_df['PROD_KEY'].alias('Inventory_PROD_KEY'),
                inventory_df['STORE_KEY'].alias('Inventory_STORE_KEY'),
                'SALES_QTY',
                'SALES_AMT',
                'SALES_COST',
                'INVENTORY_ON_HAND_QTY',
                'INVENTORY_ON_ORDER_QTY',
                'OUT_OF_STOCK_FLG',
                'WASTE_QTY'
                )
sales_inv_combined_eda_df.withColumn('TRANS_DT', sales_inv_combined_eda_df.TRANS_DT.cast(StringType()))\
    .withColumn('CAL_DT', sales_inv_combined_eda_df.CAL_DT.cast(StringType()))\
    .select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sales_inv_combined_eda_df.columns]
   ).show()

[Stage 88:>                                                         (0 + 8) / 8]

+--------+--------+---------+------+------------------+-------------------+---------+---------+----------+---------------------+----------------------+----------------+---------+
|TRANS_DT|PROD_KEY|STORE_KEY|CAL_DT|Inventory_PROD_KEY|Inventory_STORE_KEY|SALES_QTY|SALES_AMT|SALES_COST|INVENTORY_ON_HAND_QTY|INVENTORY_ON_ORDER_QTY|OUT_OF_STOCK_FLG|WASTE_QTY|
+--------+--------+---------+------+------------------+-------------------+---------+---------+----------+---------------------+----------------------+----------------+---------+
|  140508|  140508|   140508|     0|                 0|                  0|   140508|   140508|    140508|                    0|                     0|               0|        0|
+--------+--------+---------+------+------------------+-------------------+---------+---------+----------+---------------------+----------------------+----------------+---------+



                                                                                

In [18]:
# Find the percentage of nulls in the sales table as a percentage of the inventory table
# We see that ~11.78% of the combined sales and inventory data has sales data that is missing.
# Because this percentage is not very big, it wouldn't skew the fact table metrics by much if we replace the null values with 0 in the sales table columns after joining the tables together. In this case we can extrapolate that the nulls just mean the product of a store for a particular day didn't sell

# Hence due to the low percentage of nulls, we can do a left join in the ETL pipeline between the inventory and sales columns and use the inventory table primary keys. 
sales_inv_combined_eda_df.select(count(when(isnan('SALES_QTY') | col('SALES_QTY').isNull(), 'SALES_QTY'))).collect()[0][0]/inventory_df.count()

                                                                                

0.1178465750115743

In [19]:
# Redoing the join between sales and inventory table based on above finding
sales_inv_calendar_df = inventory_df\
    .join(daily_sales_df, (inventory_df['CAL_DT']==daily_sales_df['TRANS_DT']) & (inventory_df['PROD_KEY']==daily_sales_df['PROD_KEY']) & (inventory_df['STORE_KEY']==daily_sales_df['STORE_KEY']), 'left')\
    .join(calendar_df, inventory_df['CAL_DT']==calendar_df['CAL_DT'], 'left')\
        .select(inventory_df['CAL_DT'], 
                inventory_df['PROD_KEY'],
                inventory_df['STORE_KEY'],
                'SALES_QTY',
                'SALES_AMT',
                'SALES_COST',
                'INVENTORY_ON_HAND_QTY',
                'INVENTORY_ON_ORDER_QTY',
                'OUT_OF_STOCK_FLG',
                'WASTE_QTY',
                'DAY_OF_WK_NUM',
                'YR_WK_NUM'
                )
sales_inv_calendar_df.show()



+----------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+
|    CAL_DT|PROD_KEY|STORE_KEY|SALES_QTY|SALES_AMT|SALES_COST|INVENTORY_ON_HAND_QTY|INVENTORY_ON_ORDER_QTY|OUT_OF_STOCK_FLG|WASTE_QTY|DAY_OF_WK_NUM|YR_WK_NUM|
+----------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+
|2022-01-01|  539839|     2370|       26|      162|       303|                30.72|                 10.24|               0|      0.0|            6|   202152|
|2022-01-01| 1064589|     2365|        9|      785|      1207|                 13.5|                  14.4|               1|      1.0|            6|   202152|
|2022-07-06|  207098|     3040|       22|     1128|       871|                 8.68|                 15.19|               0|      0.0|            3|   202227|
|2022-07-06|  523991|     3040|       32|     

                                                                                

In [20]:
# None of the calendar columns have null values. Hence we only need to replace the sales columns with 0 for the null values
sales_inv_calendar_df.withColumn('CAL_DT', sales_inv_calendar_df.CAL_DT.cast(StringType()))\
    .select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sales_inv_calendar_df.columns]
   ).show()

[Stage 116:>                                                        (0 + 8) / 8]

+------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+
|CAL_DT|PROD_KEY|STORE_KEY|SALES_QTY|SALES_AMT|SALES_COST|INVENTORY_ON_HAND_QTY|INVENTORY_ON_ORDER_QTY|OUT_OF_STOCK_FLG|WASTE_QTY|DAY_OF_WK_NUM|YR_WK_NUM|
+------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+
|     0|       0|        0|   140508|   140508|    140508|                    0|                     0|               0|        0|            0|        0|
+------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+



                                                                                

In [21]:
# No null values remain after transforming the nulls with 0s
sales_inv_calendar_df = sales_inv_calendar_df.na.fill(value=0, subset=['SALES_QTY', 
                                                     'SALES_AMT', 
                                                     'SALES_COST'])
sales_inv_calendar_df.withColumn('CAL_DT', sales_inv_calendar_df.CAL_DT.cast(StringType()))\
    .select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sales_inv_calendar_df.columns]
   ).show()



+------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+
|CAL_DT|PROD_KEY|STORE_KEY|SALES_QTY|SALES_AMT|SALES_COST|INVENTORY_ON_HAND_QTY|INVENTORY_ON_ORDER_QTY|OUT_OF_STOCK_FLG|WASTE_QTY|DAY_OF_WK_NUM|YR_WK_NUM|
+------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+
|     0|       0|        0|        0|        0|         0|                    0|                     0|               0|        0|            0|        0|
+------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+



                                                                                

In [22]:
sales_inv_calendar_df = sales_inv_calendar_df.withColumn('EOW_Stock_Level', 
                        when(sales_inv_calendar_df.DAY_OF_WK_NUM==6, sales_inv_calendar_df.INVENTORY_ON_HAND_QTY))\
            .withColumn('EOW_Stock_on_Order',
                        when(sales_inv_calendar_df.DAY_OF_WK_NUM==6, sales_inv_calendar_df.INVENTORY_ON_ORDER_QTY))\
            .withColumn('Low_Stock_Flg', (sales_inv_calendar_df.INVENTORY_ON_HAND_QTY<sales_inv_calendar_df.SALES_QTY).cast('integer'))
sales_inv_calendar_df.show()



+----------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+---------------+------------------+-------------+
|    CAL_DT|PROD_KEY|STORE_KEY|SALES_QTY|SALES_AMT|SALES_COST|INVENTORY_ON_HAND_QTY|INVENTORY_ON_ORDER_QTY|OUT_OF_STOCK_FLG|WASTE_QTY|DAY_OF_WK_NUM|YR_WK_NUM|EOW_Stock_Level|EOW_Stock_on_Order|Low_Stock_Flg|
+----------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+---------------+------------------+-------------+
|2022-01-01|  539839|      248|       26|      180|       303|                33.28|                 28.16|               1|      0.0|            6|   202152|          33.28|             28.16|            0|
|2022-01-01|  539839|     1103|       29|      162|       303|                 8.64|                 48.96|               1|      1.0|            6|   202152|          

                                                                                

In [23]:
sales_inv_calendar_df = sales_inv_calendar_df.withColumn('low_stock_impact', col('Low_Stock_Flg')+col('OUT_OF_STOCK_FLG'))\
    .withColumn('potential_low_stock_impact', when(col('Low_Stock_Flg')==1, col('SALES_QTY')-col('INVENTORY_ON_HAND_QTY')).otherwise(0))\
    .withColumn('no_stock_impact', when(col('OUT_OF_STOCK_FLG')==1, col('SALES_AMT')).otherwise(0))
sales_inv_calendar_df.show()



+----------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+---------------+------------------+-------------+----------------+--------------------------+---------------+
|    CAL_DT|PROD_KEY|STORE_KEY|SALES_QTY|SALES_AMT|SALES_COST|INVENTORY_ON_HAND_QTY|INVENTORY_ON_ORDER_QTY|OUT_OF_STOCK_FLG|WASTE_QTY|DAY_OF_WK_NUM|YR_WK_NUM|EOW_Stock_Level|EOW_Stock_on_Order|Low_Stock_Flg|low_stock_impact|potential_low_stock_impact|no_stock_impact|
+----------+--------+---------+---------+---------+----------+---------------------+----------------------+----------------+---------+-------------+---------+---------------+------------------+-------------+----------------+--------------------------+---------------+
|2022-01-01|  539839|      248|       26|      180|       303|                33.28|                 28.16|               1|      0.0|            6|   202152|          33.28|             28.16|   

                                                                                

In [24]:
# Again, it appears the calculations for the transformations are being done correctly, given the only columns that have nulls (EOW_Stock_level and EOW_Stock_on_Order) are the ones where we are only interested in a subset of it, this case being the end of week. 
# The percentage of nulls for the EOW columns, in this case 86%, is similar with the percentage of days of the week that aren't Saturday, which is 6/7. Hence we are doing the calculations properly
num_rows = sales_inv_calendar_df.count()
sales_inv_calendar_df.withColumn('CAL_DT', sales_inv_calendar_df.CAL_DT.cast(StringType()))\
    .select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sales_inv_calendar_df.columns]
   ).toPandas().div(num_rows)

                                                                                

Unnamed: 0,CAL_DT,PROD_KEY,STORE_KEY,SALES_QTY,SALES_AMT,SALES_COST,INVENTORY_ON_HAND_QTY,INVENTORY_ON_ORDER_QTY,OUT_OF_STOCK_FLG,WASTE_QTY,DAY_OF_WK_NUM,YR_WK_NUM,EOW_Stock_Level,EOW_Stock_on_Order,Low_Stock_Flg,low_stock_impact,potential_low_stock_impact,no_stock_impact
0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.860689,0.860689,0.0,0.0,0.0,0.0


In [25]:
fact_df = sales_inv_calendar_df.groupBy('YR_WK_NUM', 'PROD_KEY', 'STORE_KEY')\
    .agg(
    sum('SALES_QTY').alias('total_sales_qty'),\
    sum('SALES_AMT').alias('total_sales_amt'),\
    (sum('SALES_AMT')/sum('SALES_QTY')).alias('avg_sales_price'),\
    avg('EOW_Stock_Level').alias('EOW_Stock_Level'),\
    avg('EOW_Stock_on_Order').alias('EOW_Stock_on_Order'),\
    sum('SALES_COST').alias('total_sales_cost'),\
    (sum('OUT_OF_STOCK_FLG')/7).alias('percentage_store_out_of_stock'),\
    sum('low_stock_impact').alias('total_low_stock_impact'),\
    sum('potential_low_stock_impact').alias('potential_low_stock_impact'),\
    sum('no_stock_impact').alias('no_stock_impact'),\
    sum('Low_Stock_Flg').alias('low_stock_instances'),\
    sum('OUT_OF_STOCK_FLG').alias('no_stock_instances'),\
    (avg('EOW_Stock_Level')/sum('SALES_QTY')).alias('weeks_on_hand_stock_can_supply')
        )
fact_df.show()

25/07/02 10:54:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:54:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:54:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:54:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:54:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:54:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:54:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:54:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+---------+--------+---------+---------------+---------------+------------------+---------------+------------------+----------------+-----------------------------+----------------------+--------------------------+---------------+-------------------+------------------+------------------------------+
|YR_WK_NUM|PROD_KEY|STORE_KEY|total_sales_qty|total_sales_amt|   avg_sales_price|EOW_Stock_Level|EOW_Stock_on_Order|total_sales_cost|percentage_store_out_of_stock|total_low_stock_impact|potential_low_stock_impact|no_stock_impact|low_stock_instances|no_stock_instances|weeks_on_hand_stock_can_supply|
+---------+--------+---------+---------------+---------------+------------------+---------------+------------------+----------------+-----------------------------+----------------------+--------------------------+---------------+-------------------+------------------+------------------------------+
|   202152|  539839|     4165|             32|            162|            5.0625|           35.2|   

                                                                                

In [26]:
# Given that there are still null values for the metrics regarding EOW stock and stock on order level, this likely means there are certain weeks where there is no inventory data for the Saturday of a particular product in a given store.

# Furthermore we see that avg sales price has 11.784% null values, which is roughly the percentage of nulls for the sales data as a percentage of total inventory data. This strongly suggests that the null values are caused by a divide by 0 between the sales amount by sales quantity, as we have replaced the null sales amount and quantity with zero earlier. We need to replace the nulls with 0 for the sales price because if the total sales amount is 0, then the average sales price has to be 0 too.
num_rows_agg = fact_df.count()
fact_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in fact_df.columns]
   ).toPandas().div(num_rows_agg)

25/07/02 10:55:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:55:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:55:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:55:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:55:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:55:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:55:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:55:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                

Unnamed: 0,YR_WK_NUM,PROD_KEY,STORE_KEY,total_sales_qty,total_sales_amt,avg_sales_price,EOW_Stock_Level,EOW_Stock_on_Order,total_sales_cost,percentage_store_out_of_stock,total_low_stock_impact,potential_low_stock_impact,no_stock_impact,low_stock_instances,no_stock_instances,weeks_on_hand_stock_can_supply
0,0.0,0.0,0.0,0.0,0.0,0.117842,0.858594,0.858594,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.875265


In [27]:
# After replacing the nulls with zero for the avg sales price, only the EOW related data still has nulls. We will explore shortly whether these nulls are valid or whether I have made a mistake in my calculations
fact_df = fact_df.na.fill(value=0, subset='avg_sales_price')
fact_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in fact_df.columns]
   ).toPandas().div(num_rows_agg)

25/07/02 10:56:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:56:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:56:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:56:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:56:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:56:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:56:29 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/02 10:56:29 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                

Unnamed: 0,YR_WK_NUM,PROD_KEY,STORE_KEY,total_sales_qty,total_sales_amt,avg_sales_price,EOW_Stock_Level,EOW_Stock_on_Order,total_sales_cost,percentage_store_out_of_stock,total_low_stock_impact,potential_low_stock_impact,no_stock_impact,low_stock_instances,no_stock_instances,weeks_on_hand_stock_can_supply
0,0.0,0.0,0.0,0.0,0.0,0.0,0.858594,0.858594,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.875265


## Finding Percentage of weeks where there's no EOW inventory data

This is to assess why there are nulls in the EOW columns in the aggregated calculations table

In [28]:
# Find out if the number of entries are unique

# As we see that there's no instances where there is a count above 1, this means that all the entries are unique
sales_inv_calendar_df.createOrReplaceTempView('sales_inv_cal_table')
spark.sql('''
          select CAL_DT, PROD_KEY, STORE_KEY, count(*) as unique_entries
          from sales_inv_cal_table
          group by 1, 2, 3
          HAVING count(*)>1
          ''').show()

                                                                                

+------+--------+---------+--------------+
|CAL_DT|PROD_KEY|STORE_KEY|unique_entries|
+------+--------+---------+--------------+
+------+--------+---------+--------------+



In [29]:
# As the entries for the sales_inv_calendar table are unique, we can divide the total number of Saturdays in the sales_inv_calendar table by the total number of entries in the aggregated fact table to count the percentage of instances where there's no Saturday records for a particular week in a particular store for a certain product. In short, we are dividing the total number of Saturdays that exist in the daily aggregations by the total number of weeks that exist. If each product of a particular store in a given week has a Saturday value, the result would hypothetically be 1. Anything less than 1 means that there exists weeks which there are no Saturday rows being recorded for the particular product of a store

# We see percentage of weeks for a particular product of a store where there's no Saturday rows is 85.86%, which is the same as the percentage of nulls for the EOW columns that we've explored in the fact table. Hence our calculations are correct. Additionally, for the weeks_on_hand_stock_can_supply metric, the percentage of nulls for it are slightly higher at 87.5042%. This is likely because the additional 1.67% comes from the cases where there is a divide by 0 error, caused by dividing a sales_qty of 0. Given that we can't quantify the number of weeks on hand the stock can supply if nothing is sold for that week, we will leave it as null for this situation.
1-(sales_inv_calendar_df.filter(col('DAY_OF_WK_NUM')==6).count()/fact_df.count())

                                                                                

0.8585936495693534

## Finding out null data after joining the aggregated table with the table containing all weeks for all products in every store

The purpose of doing this is to see whether it is worth pushing a table that contains all permutations of week, store, and product to s3 in the ETL process

In [30]:
# First find out the percentage where there's actual sales and inventory data compared to the total hypothetical combination of weeks, store, and products

# We see that this is ~0.42%, hence at least 99.58% of data for each column will be null in this combined table. This means that we don't have information for 99.58% of the weeks for a given product of a particular store. In this case there is no sales nor inventory data available for such entries, hence there is not much we can do with this data. We can't even replace these nulls with 0 because a lack of inventory data for a particular week in a particular store for a given product does not mean there is 0 inventory. It can't be extrapolated. Furthermore, given there's so many nulls, it's probably not worth joining these two tables since it'll likely skew the averages significantly. Instead, given this analysis, we will only focus on pushing the weeks for the products of the stores where we have data to the final s3 bucket
fact_df.count()/aggregated_df.count()

                                                                                

0.0042393109852596885