# Retail Store Assignment

Plan  -
1. GO through the datasets and create schema for the staging tables
2. Create pipelines for incremental loads of the staging
3. Create a OBT Data Model for the next step
4. Create the Dashboard Design
5. Dashboard - Page 1 - Glossary; Page 2 - High Level KPIs and Page 3 - Deepdive friendly

## Boiler Plate Code

In [None]:
# Import PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window


In [None]:
# Create SparkSession
spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("kcc_assignment") \
    .config("spark.driver.memory", "4g") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
#loading the data
df_sales = spark.read\
    .format("csv")\
    .option("inferSchema","true")\
    .option("header","true")\
    .option("delimiter",",")\
    .load("/content/Sales_Data.csv");

df_product = spark.read\
    .format("csv")\
    .option("inferSchema","true")\
    .option("header","true")\
    .option("delimiter",",")\
    .load("/content/Product_Data.csv");

df_store = spark.read\
    .format("csv")\
    .option("inferSchema","true")\
    .option("header","true")\
    .option("delimiter",",")\
    .load("/content/Store_Data.csv");

In [None]:
#renaming the columns for ease of use and maintainging naming standards

def rename_columns(df):
    old_names = df.schema.names
    new_names = [col_name.lower().replace(' ', '_') for col_name in old_names]
    for old_name, new_name in zip(old_names, new_names):
        df = df.withColumnRenamed(old_name, new_name)
    return df

df_sales = rename_columns(df_sales)
df_product = rename_columns(df_product)
df_store = rename_columns(df_store)

In [None]:
# Set the legacy time parser policy to handle the 'yy' format
#spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


df_sales = df_sales.withColumn("transaction_date",to_date("transaction_date","M/dd/yy"))


## new paragraph


In [None]:
df_sales = df_sales.drop("transaction_date_new")

For Sales Dataset - Do the EDA to Figure out the Primary Key and other things


In [None]:
# Checking if there are any duplicates in txn id level data
df_sales.groupBy("transaction_id").agg(count("*").alias("count")).where(col("count") > 1).agg(count("*")).show()

+--------+
|count(1)|
+--------+
|     115|
+--------+



In [None]:
null_check_df = df_sales.filter(df_sales["transaction_id"].isNull())
null_check_df.show()
# no nulls at transaction level

+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+
|transaction_id|customer_id|product_id|quantity_sold|price_per_unit|transaction_date|salesperson_id|payment_method|store_id|
+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+
+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+



In [None]:
# filter the data with duplicates in transaction id
df_sales_dedup = df_sales.dropDuplicates(["transaction_id"])

In [None]:
df_sales_dedup.agg(count("*").alias("count")).show()

+-----+
|count|
+-----+
|30000|
+-----+



In [None]:
df_sales_dedup.show(5)

+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+
|transaction_id|customer_id|product_id|quantity_sold|price_per_unit|transaction_date|salesperson_id|payment_method|store_id|
+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+
|       TXN0001|      C1175|  PROD2089|           12|          98.8|        11/21/23|         SP059| Bank Transfer|    SI16|
|       TXN0002|      C1213|  PROD1359|            2|        168.66|        11/15/23|         SP012|         Check|    SI06|
|       TXN0003|      C1212|  PROD1263|           14|        413.96|         9/18/23|         SP086| Bank Transfer|    SI18|
|       TXN0004|      C1266|  PROD0423|            7|        417.73|         10/7/23|         SP013|         Check|    SI11|
|       TXN0005|      C1185|  PROD0382|            7|        450.67|         10/4/23|         SP052| Bank Transfer|    SI09|


In [None]:
df_duplicates = df_sales.withColumn("rnk", row_number().\
                    over(Window.partitionBy("transaction_id").orderBy("transaction_date"))).\
                    where(col("rnk") > 1).\
                    select("transaction_id")


In [None]:
df_dup_rows = df_sales.join(df_duplicates, on="transaction_id", how="inner")

In [None]:
df_dup_rows.orderBy("transaction_id").show()

+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+
|transaction_id|customer_id|product_id|quantity_sold|price_per_unit|transaction_date|salesperson_id|payment_method|store_id|
+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+
|       TXN0948|      C1011|  PROD2320|           17|        161.68|        12/24/23|         SP038|         Check|    SI06|
|       TXN0948|      C1011|  PROD2320|           17|        161.68|        12/24/23|         SP038|         Check|    SI06|
|       TXN0949|      C1192|  PROD0696|            3|         86.81|         10/7/23|         SP031|          Cash|    SI19|
|       TXN0949|      C1192|  PROD0696|            3|         86.81|         10/7/23|         SP031|          Cash|    SI19|
|       TXN0950|      C1284|  PROD2188|           12|        413.28|         12/5/23|         SP083|          Cash|    SI14|


In [None]:
df_dup_rows.count()

230

In [None]:
df_dup_rows.dropDuplicates().count()

115

Identified the above rows as the row level duplicates;
A simple drop duplicates of transaction Id would work.
But going with the approach of row number based filtering
row_number on transaction ID , Desc order by transaction_date and filter by rnk = 1;


---



In [None]:
df_sales.show(5)

+----------------------+-------------------+------------------+---------------------+----------------------+------------------------+----------------------+----------------------+----------------+----+-----+
|staging_transaction_id|staging_customer_id|staging_product_id|staging_quantity_sold|staging_price_per_unit|staging_transaction_date|staging_salesperson_id|staging_payment_method|staging_store_id|year|month|
+----------------------+-------------------+------------------+---------------------+----------------------+------------------------+----------------------+----------------------+----------------+----+-----+
|               TXN0001|              C1175|          PROD2089|                   12|                  98.8|                11/21/23|                 SP059|         Bank Transfer|            SI16|NULL| NULL|
|               TXN0002|              C1213|          PROD1359|                    2|                168.66|                11/15/23|                 SP012|            

In [None]:
# Set the legacy time parser policy to handle the 'yy' format
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


df_sales = df_sales.withColumn("transaction_date",to_date("transaction_date","M/dd/yy"))


In [None]:
df_sales.show(5)

+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+
|transaction_id|customer_id|product_id|quantity_sold|price_per_unit|transaction_date|salesperson_id|payment_method|store_id|
+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+
|       TXN0001|      C1175|  PROD2089|           12|          98.8|      2023-01-21|         SP059| Bank Transfer|    SI16|
|       TXN0002|      C1213|  PROD1359|            2|        168.66|      2023-01-15|         SP012|         Check|    SI06|
|       TXN0003|      C1212|  PROD1263|           14|        413.96|      2023-01-18|         SP086| Bank Transfer|    SI18|
|       TXN0004|      C1266|  PROD0423|            7|        417.73|      2023-01-07|         SP013|         Check|    SI11|
|       TXN0005|      C1185|  PROD0382|            7|        450.67|      2023-01-04|         SP052| Bank Transfer|    SI09|


In [None]:
df_sales = df_sales.withColumn("rnk", row_number().\
                    over(Window.partitionBy("transaction_id").orderBy(desc("transaction_date")))).\
                    where(col("rnk") == 1).\
                    drop("rnk")

In [None]:
df_sales.count()

30000

In [None]:
df_sales.columns

['transaction_id',
 'customer_id',
 'product_id',
 'quantity_sold',
 'price_per_unit',
 'transaction_date',
 'salesperson_id',
 'payment_method',
 'store_id']

In [None]:
spark.sql("DROP DATABASE IF EXISTS kcc_malls CASCADE;")

DataFrame[]

In [None]:
spark.sql("CREATE DATABASE kcc_malls;")

DataFrame[]

In [None]:
spark.sql("USE kcc_malls;")

DataFrame[]

In [None]:
spark.sql("""DROP TABLE IF EXISTS kcc_malls.sales_data;""");
spark.sql("""
CREATE TABLE kcc_malls.sales_data
(`transaction_id` STRING,
 `customer_id` STRING,
 `product_id` STRING,
 `quantity_sold` BIGINT,
 `price_per_unit` FLOAT,
 `transaction_date` DATE,
 `salesperson_id` STRING,
 `payment_method` STRING,
 `store_id` STRING)
 PARTITIONED BY (`year` int, `month` int)
 CLUSTERED BY (`transaction_id`) INTO 16 BUCKETS
 STORED AS PARQUET;
""")



DataFrame[]

In [None]:
spark.sql("DESC kcc_malls.sales_data").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|      transaction_id|   string|   NULL|
|         customer_id|   string|   NULL|
|          product_id|   string|   NULL|
|       quantity_sold|   bigint|   NULL|
|      price_per_unit|    float|   NULL|
|    transaction_date|     date|   NULL|
|      salesperson_id|   string|   NULL|
|      payment_method|   string|   NULL|
|            store_id|   string|   NULL|
|                year|      int|   NULL|
|               month|      int|   NULL|
|# Partition Infor...|         |       |
|          # col_name|data_type|comment|
|                year|      int|   NULL|
|               month|      int|   NULL|
+--------------------+---------+-------+



In [None]:
df_sales.createOrReplaceTempView("sales_data_staging");

In [None]:
df_sales.columns

['transaction_id',
 'customer_id',
 'product_id',
 'quantity_sold',
 'price_per_unit',
 'transaction_date',
 'salesperson_id',
 'payment_method',
 'store_id']

In [None]:
spark.sql("""
MERGE INTO kcc_malls.sales_data main
USING sales_data_staging staging
ON main.transaction_id = staging.transaction_id
AND main.year = year(staging.transaction_date)
AND main.month = month(staging.transaction_date)
WHEN MATCHED
THEN UPDATE SET
main.customer_id = staging.customer_id,
main.product_id = staging.product_id,
main.quantity_sold = staging.quantity_sold,
main.price_per_unit = staging.price_per_unit,
main.transaction_date = staging.transaction_date,
main.salesperson_id = staging.salesperson_id,
main.payment_method = staging.payment_method,
main.store_id = staging.store_id
WHEN NOT MATCHED
THEN INSERT
(
transaction_id,
customer_id,
product_id,
quantity_sold,
price_per_unit,
transaction_date,
salesperson_id,
payment_method,
store_id,
year,
month
)
 VALUES
(
staging.transaction_id,
staging.customer_id,
staging.product_id,
staging.quantity_sold,
staging.price_per_unit,
staging.transaction_date,
staging.salesperson_id,
staging.payment_method,
staging.store_id,
year(staging.transaction_date),
month(staging.transaction_date)
)
;

""")


UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.

In [None]:
from pyspark.sql.functions import year, month

# Rename columns in df_sales to avoid ambiguity during join
df_sales = df_sales.withColumnRenamed("transaction_id", "staging_transaction_id") \
                   .withColumnRenamed("customer_id", "staging_customer_id") \
                   .withColumnRenamed("product_id", "staging_product_id") \
                   .withColumnRenamed("quantity_sold", "staging_quantity_sold") \
                   .withColumnRenamed("price_per_unit", "staging_price_per_unit") \
                   .withColumnRenamed("transaction_date", "staging_transaction_date") \
                   .withColumnRenamed("salesperson_id", "staging_salesperson_id") \
                   .withColumnRenamed("payment_method", "staging_payment_method") \
                   .withColumnRenamed("store_id", "staging_store_id")

# Add year and month columns to df_sales
df_sales = df_sales.withColumn("year", year(df_sales["staging_transaction_date"])) \
                   .withColumn("month", month(df_sales["staging_transaction_date"]))

# Read existing data from the target table
df_main = spark.table("kcc_malls.sales_data")

# Perform a left join to identify matching and non-matching records
joined_df = df_main.join(df_sales,
                         (df_main["transaction_id"] == df_sales["staging_transaction_id"]) &
                         (df_main["year"] == df_sales["year"]) &
                         (df_main["month"] == df_sales["month"]),
                         "outer")

# Update matching records
updated_df = joined_df.filter(df_sales["staging_transaction_id"].isNotNull()) \
                      .select(df_sales["staging_transaction_id"].alias("transaction_id"),
                              df_sales["staging_customer_id"].alias("customer_id"),
                              df_sales["staging_product_id"].alias("product_id"),
                              df_sales["staging_quantity_sold"].alias("quantity_sold"),
                              df_sales["staging_price_per_unit"].alias("price_per_unit"),
                              df_sales["staging_transaction_date"].alias("transaction_date"),
                              df_sales["staging_salesperson_id"].alias("salesperson_id"),
                              df_sales["staging_payment_method"].alias("payment_method"),
                              df_sales["staging_store_id"].alias("store_id"),
                              df_sales["year"],
                              df_sales["month"])

# Insert new records
new_df = joined_df.filter(df_main["transaction_id"].isNull()) \
                  .select(df_sales["staging_transaction_id"].alias("transaction_id"),
                              df_sales["staging_customer_id"].alias("customer_id"),
                              df_sales["staging_product_id"].alias("product_id"),
                              df_sales["staging_quantity_sold"].alias("quantity_sold"),
                              df_sales["staging_price_per_unit"].alias("price_per_unit"),
                              df_sales["staging_transaction_date"].alias("transaction_date"),
                              df_sales["staging_salesperson_id"].alias("salesperson_id"),
                              df_sales["staging_payment_method"].alias("payment_method"),
                              df_sales["staging_store_id"].alias("store_id"),
                              df_sales["year"],
                              df_sales["month"])


#old unchanged records
old_df = joined_df.filter(df_sales["staging_transaction_id"].isNull()).\
                          select(df_main["transaction_id"],
                          df_main["customer_id"],
                          df_main["product_id"],
                          df_main["quantity_sold"],
                          df_main["price_per_unit"],
                          df_main["transaction_date"],
                          df_main["salesperson_id"],
                          df_main["payment_method"],
                          df_main["store_id"],
                          df_main["year"],
                          df_main["month"])
# Union updated and new records
final_df = updated_df.union(new_df).union(old_df)

# Overwrite the original table with the final DataFrame
final_df.write.mode("overwrite").insertInto("kcc_malls.sales_data")

Made a mistake while reading the data

Date is in some random shit format
use yyyy-mm-dd format

In [None]:
df_sales.select(["transaction_date"]).show(5)

+----------------+
|transaction_date|
+----------------+
|      2023-01-21|
|      2023-01-15|
|      2023-01-18|
|      2023-01-07|
|      2023-01-04|
+----------------+
only showing top 5 rows



In [None]:
df_dates.show(5)

TypeError: 'Column' object is not callable

In [None]:
spark.sql("SELECT * FROM kcc_malls.sales_data").show(5)

+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+----+-----+
|transaction_id|customer_id|product_id|quantity_sold|price_per_unit|transaction_date|salesperson_id|payment_method|store_id|year|month|
+--------------+-----------+----------+-------------+--------------+----------------+--------------+--------------+--------+----+-----+
|       TXN0004|      C1266|  PROD0423|            7|        417.73|      2023-10-07|         SP013|         Check|    SI11|2023|   10|
|       TXN0005|      C1185|  PROD0382|            7|        450.67|      2023-10-04|         SP052| Bank Transfer|    SI09|2023|   10|
|       TXN0012|      C1265|  PROD1153|           18|        138.57|      2023-10-17|         SP033|          Cash|    SI06|2023|   10|
|       TXN0014|      C1249|  PROD0939|            4|        244.38|      2023-10-22|         SP002|          Cash|    SI19|2023|   10|
|       TXN0015|      C1077|  PROD2421|         

# Product Dimension
\

In [None]:
df_product.count()

3010

In [None]:
df_product.show(5)

+----------+------------+----------------+-----------+----------+----------+
|product_id|product_name|product_category|supplier_id|orig_price|date_added|
+----------+------------+----------------+-----------+----------+----------+
|  PROD0001|        Lamp|       Furniture|    SUP2175|    408.78|   4/17/22|
|  PROD0002|    Wardrobe|        Clothing|    SUP2727|    881.32|   1/23/21|
|  PROD0003|      Camera|     Accessories|    SUP1802|    131.51|   7/29/20|
|  PROD0004|        NULL|     Kitchenware|    SUP0374|    647.34|   1/29/23|
|  PROD0005|    Wardrobe|     Kitchenware|    SUP1980|    925.23|   4/19/22|
+----------+------------+----------------+-----------+----------+----------+
only showing top 5 rows



In [None]:
#checking nulls
null_check_df = df_product.filter(df_product["product_id"].isNull())
null_check_df.show()

+----------+------------+----------------+-----------+----------+----------+
|product_id|product_name|product_category|supplier_id|orig_price|date_added|
+----------+------------+----------------+-----------+----------+----------+
+----------+------------+----------------+-----------+----------+----------+



In [None]:
df_product.groupBy("product_id").agg(count("*").alias("counts")).where(col("counts") > 1).show()

+----------+------+
|product_id|counts|
+----------+------+
|  PROD0018|     2|
|  PROD0170|     2|
|  PROD0169|     2|
|  PROD0171|     2|
|  PROD0845|     2|
|  PROD0167|     2|
|  PROD0843|     2|
|  PROD0842|     2|
|  PROD0168|     2|
|  PROD0844|     2|
+----------+------+



In [None]:
df_product.withColumn("counts",count("*").over(Window.partitionBy("product_id"))).filter(col("counts")>1).show()

+----------+---------------+----------------+-----------+----------+----------+------+
|product_id|   product_name|product_category|supplier_id|orig_price|date_added|counts|
+----------+---------------+----------------+-----------+----------+----------+------+
|  PROD0018|Washing Machine|        Footwear|    SUP3419|    399.57|  10/28/21|     2|
|  PROD0018|Washing Machine|        Footwear|    SUP3419|    399.57|  10/28/21|     2|
|  PROD0167|     Smartwatch|        Footwear|    SUP0301|    329.63|    9/8/21|     2|
|  PROD0167|     Smartwatch|        Footwear|    SUP0301|    329.63|    9/8/21|     2|
|  PROD0168|   Coffee Maker|      Appliances|    SUP3759|    552.53|   4/14/20|     2|
|  PROD0168|   Coffee Maker|      Appliances|    SUP3759|    552.53|   4/14/20|     2|
|  PROD0169|          Table|        Footwear|    SUP4823|    152.52|  11/18/22|     2|
|  PROD0169|          Table|        Footwear|    SUP4823|    152.52|  11/18/22|     2|
|  PROD0170|       Cookware|     Electronic

need to create a coulmn is active and implement a scd type 2 case here
if the product data is getting updated we need to update the is_active flag and insert them here

even here the date needs to imported differently

If the upstream data is coming like a snapshot or incremental, we need to configure acccordingly

In [None]:
df_store.show(5)

+--------+-------------+---------+----------+---------------+
|store_id|   store_name|   region|manager_id|   manager_name|
+--------+-------------+---------+----------+---------------+
|    SI01|   CityGrocer|  Midwest|     MAN01|   Sophia Lopez|
|    SI02|    FreshMart|Northeast|     MAN02|    Linda White|
|    SI03|     ShopEasy|  Midwest|     MAN03|   Elijah Moore|
|    SI04|  CornerStore|     West|     MAN04|    Liam Wilson|
|    SI05|  MarketPlace|     West|     MAN05|   Lucas Martin|
|    SI06| MetroGrocery|    South|     MAN06| Isabella Clark|
|    SI07|    ValueMart|Northeast|     MAN07|  Alice Johnson|
|    SI08|  PrimeGrocer|  Midwest|     MAN08| David Thompson|
|    SI09|    SuperSave|    South|     MAN09|  Emma Martinez|
|    SI10|DiscountDepot|  Midwest|     MAN10|     John Smith|
|    SI11|   FoodBazaar|  Midwest|     MAN11|       Jane Doe|
|    SI12|    BudgetBuy|Northeast|     MAN12|Alexander Perez|
|    SIO1|  CityGrocer_|  Midwest|     MAN01|   Sophia Lopez|
|    SI1

GOLD OBT data schema

txn_id,txn_date, customer_id, product_id, product_name, product_category, original_price, selling_price, qty_sold, sales_person_id, store_id, store_name, store_region, store_manager_id, store_manager_name,(partition cols = year, month, day)

better create a mapping checklist for what columns we need here in the GOLD OBT data.

In [None]:
df_sales.columns

['staging_transaction_id',
 'staging_customer_id',
 'staging_product_id',
 'staging_quantity_sold',
 'staging_price_per_unit',
 'staging_transaction_date',
 'staging_salesperson_id',
 'staging_payment_method',
 'staging_store_id',
 'year',
 'month']

In [None]:
df_product.columns

['product_id',
 'product_name',
 'product_category',
 'supplier_id',
 'orig_price',
 'date_added']

In [None]:
df_store.columns

['store_id', 'store_name', 'region', 'manager_id', 'manager_name']

SCD notes - Audit Columns

Created Timestamp, updated Timestamp, IS_ACTIVE flag, DML_flag ( 0 for delete, 1 for insert, 2 for update)
