In [1]:
!kaggle datasets download lovishbansal123/sales-of-a-supermarket

Dataset URL: https://www.kaggle.com/datasets/lovishbansal123/sales-of-a-supermarket
License(s): apache-2.0
Downloading sales-of-a-supermarket.zip to /home/resurrexi/projects/kaggle_supermarket
  0%|                                               | 0.00/35.9k [00:00<?, ?B/s]
100%|███████████████████████████████████████| 35.9k/35.9k [00:00<00:00, 188MB/s]


In [2]:
import zipfile

with zipfile.ZipFile("sales-of-a-supermarket.zip", "r") as f:
    f.extractall("sales-of-a-supermarket")

In [3]:
import pandas as pd
from pathlib import Path

current_dir = Path.cwd()
csv_path = current_dir / "sales-of-a-supermarket" / "supermarket_sales.csv"
df = pd.read_csv(csv_path)
df.head()

Unnamed: 0,Invoice ID,Branch,City,Customer type,Gender,Product line,Unit price,Quantity,Tax 5%,Total,Date,Time,Payment,cogs,gross margin percentage,gross income,Rating
0,750-67-8428,A,Yangon,Member,Female,Health and beauty,74.69,7,26.1415,548.9715,1/5/2019,13:08,Ewallet,522.83,4.761905,26.1415,9.1
1,226-31-3081,C,Naypyitaw,Normal,Female,Electronic accessories,15.28,5,3.82,80.22,3/8/2019,10:29,Cash,76.4,4.761905,3.82,9.6
2,631-41-3108,A,Yangon,Normal,Male,Home and lifestyle,46.33,7,16.2155,340.5255,3/3/2019,13:23,Credit card,324.31,4.761905,16.2155,7.4
3,123-19-1176,A,Yangon,Member,Male,Health and beauty,58.22,8,23.288,489.048,1/27/2019,20:33,Ewallet,465.76,4.761905,23.288,8.4
4,373-73-7910,A,Yangon,Normal,Male,Sports and travel,86.31,7,30.2085,634.3785,2/8/2019,10:37,Ewallet,604.17,4.761905,30.2085,5.3


In [4]:
df.describe()

Unnamed: 0,Unit price,Quantity,Tax 5%,Total,cogs,gross margin percentage,gross income,Rating
count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0
mean,55.67213,5.51,15.379369,322.966749,307.58738,4.761905,15.379369,6.9727
std,26.494628,2.923431,11.708825,245.885335,234.17651,0.0,11.708825,1.71858
min,10.08,1.0,0.5085,10.6785,10.17,4.761905,0.5085,4.0
25%,32.875,3.0,5.924875,124.422375,118.4975,4.761905,5.924875,5.5
50%,55.23,5.0,12.088,253.848,241.76,4.761905,12.088,7.0
75%,77.935,8.0,22.44525,471.35025,448.905,4.761905,22.44525,8.5
max,99.96,10.0,49.65,1042.65,993.0,4.761905,49.65,10.0


## Report criteria

Due to the open-ended nature of the task, I work backwards by defining a potential report that may be asked from the provided dataset. In this case, let's suppose the owner of the supermarket wants to see the performance of their supermarket locations over time. This would answer questions like:

* How many sales transactions does each branch have month-over-month?
* What is the YTD total gross income of all the locations combined?
* What is the rolling average of ratings for the locations over a 7 day window?

Note: Regarding the last bullet, median can also be used as a measure of central tendency, but since the `Rating` values are uniformally distributed, I used that as a heuristic to decide on using the average.

## Dimension and fact tables

Given the report criteria above with the sample questions to address, I can formalize the dimension and fact tables. Data types defined below are SQLite types since SQLite is the DB used for this example.

### dim_location

Field | Type | Constraints
--- | --- | ---
id | TEXT | PK
name | TEXT |
branch | TEXT | UNIQUE with city
city | TEXT | UNIQUE with branch

### dim_date

Field | Type | Constraints
--- | --- | ---
id | TEXT | PK
date | TEXT | UNIQUE
year | INTEGER |
month | INTEGER |
day | INTEGER |

### fact_sales

Field | Type | Constraints
--- | --- | ---
invoice_id | TEXT | PK
location_id | TEXT | FK
date_id | INTEGER | FK
gross_income | REAL |
rating | REAL

The other quantitative measures of the transaction can be included, such as `Quantity`, `Tax 5%`, `Total`, etc. but for the sake of brevity I'm only including `gross income` and `Rating` to answer the sample questions above.

With the table schemas defined, let's now create the tables in SQLite.

In [5]:
import sqlite3

conn = sqlite3.connect("supermarket.db")
cursor = conn.cursor()

# load and execute DDL script
script_path = current_dir / "ddl.sql"
ddl_query = script_path.read_text()

cursor.executescript(ddl_query)
conn.commit()

Now, transform and load the data.

I start with `dim_location`. The PK field is a text, which will be the hashed value of `name` which will be the concatenation of `city` and `branch`. The reason for this is to ensure idempotency, but also makes it easier when loading data into the DB tables later.

With `dim_date`, I can prefill the table with a range of dates that encompass the data, which in this case is from 2019-01-01 to 2019-03-31. The table will have to be revisited and updated with more dates when additional transactions get added. I'm choosing to prefill the table with the range of dates in case the existing data has gaps in dates (due to potential holidays or non-business events). The PK field in this table will also be a hash valued, for the same reasons as above.

Lastly, for `fact_sales`, use the `id`s generated from the dim tables to populate the foreign key values, and fill in the `gross_income` and `rating` columns with the respective columns from the data.

I will use a performant way of loading the data into the tables, by just reading each row once.

In [6]:
import csv
import hashlib
from datetime import date, datetime, timedelta

date_query = """
INSERT OR IGNORE INTO dim_date (id, date, year, month, day)
VALUES (?, ?, ?, ?, ?)
"""
location_query = """
INSERT OR IGNORE INTO dim_location (id, name, branch, city)
VALUES (?, ?, ?, ?)
"""
sales_query = """
INSERT OR IGNORE INTO fact_sales (invoice_id, location_id, date_id, gross_income, rating)
VALUES (?, ?, ?, ? ,?)
"""

# prefill dim_date
for d in range(90):
    curr_date = date(2019, 1, 1) + timedelta(days=d)
    date_str = curr_date.strftime("%Y-%m-%d")
    date_id = hashlib.sha256(date_str.encode("utf-8")).hexdigest()
    cursor.execute(date_query, (date_id, date_str, curr_date.year, curr_date.month, curr_date.day))
conn.commit()

csv_path = current_dir / "sales-of-a-supermarket" / "supermarket_sales.csv"
with open(csv_path, "r") as f_in:
    reader = csv.DictReader(f_in)
    for row in reader:
        # location attributes
        loc_branch = row["Branch"]
        loc_city = row["City"]
        loc_name = row["City"] + "-" + row["Branch"]
        loc_id = hashlib.sha256(loc_name.encode("utf-8")).hexdigest()

        # transaction-level attributes
        invoice_id = row["Invoice ID"]
        date_ymd = datetime.strptime(row["Date"], "%m/%d/%Y").strftime("%Y-%m-%d")
        date_id = hashlib.sha256(date_ymd.encode("utf-8")).hexdigest()
        gross_income = row["gross income"]
        rating = row["Rating"]

        # insert to dim_location
        cursor.execute(location_query, (loc_id, loc_name, loc_branch, loc_city))

        # insert to fact_sales
        cursor.execute(sales_query, (invoice_id, loc_id, date_id, gross_income, rating))
    conn.commit()

## Data validation

Data validation should be employed to ensure the data is correctly loaded. Some worthwhile checks are:

* Checking to see if the foreign keys in the fact table falls within the sets of IDs in the dim tables
* Checking to make sure the fact measures fall within the range of values as predetermined in the dataset
    * `gross_income` should be between 0.5 and 49.7
    * `rating` should be between 4 and 10

In [7]:
# each query result should have 0 rows
location_ids_query = """
SELECT a.location_id
FROM fact_sales AS a
LEFT JOIN dim_location AS b
ON a.location_id = b.id
WHERE b.id IS NULL
"""
res = cursor.execute(location_ids_query).fetchall()
assert len(res) == 0

date_ids_query = """
SELECT a.date_id
FROM fact_sales AS a
LEFT JOIN dim_date AS b
ON a.date_id = b.id
WHERE b.id IS NULL
"""
res = cursor.execute(date_ids_query).fetchall()
assert len(res) == 0

income_query = """
SELECT *
FROM fact_sales
WHERE NOT (gross_income BETWEEN 0.50 AND 49.65)
"""
res = cursor.execute(income_query).fetchall()
assert len(res) == 0

rating_query = """
SELECT *
FROM fact_sales
WHERE NOT (rating BETWEEN 4 AND 10)
"""
res = cursor.execute(income_query).fetchall()
assert len(res) == 0

## Report

With the data loaded, let's build out some queries to answer the questions posed aboved.

In [8]:
# How many sales transactions does each branch have every month?
query = """
SELECT b.year || PRINTF('%02d', b.month) AS yrmo,
    c.branch,
    COUNT(a.invoice_id) AS txn_count
FROM dim_date AS b
LEFT JOIN fact_sales AS a
ON a.date_id = b.id
INNER JOIN dim_location AS c
ON a.location_id = c.id
GROUP BY 1, 2
"""

df = pd.read_sql(query, conn)
df

Unnamed: 0,yrmo,branch,txn_count
0,201901,A,119
1,201901,B,111
2,201901,C,122
3,201902,A,94
4,201902,B,109
5,201902,C,100
6,201903,A,127
7,201903,B,112
8,201903,C,106


In [9]:
# What is the YTD gross income of all the locations combined? (assume 2019 is the current year)
query = """
SELECT SUM(a.gross_income) AS total_gross_income
FROM fact_sales AS a
INNER JOIN dim_date AS b
ON a.date_id = b.id
WHERE b.year = 2019
"""

df = pd.read_sql(query, conn)
df

Unnamed: 0,total_gross_income
0,15379.369


In [10]:
# What is the rolling average of ratings for the locations over a 7 day window?
query = """
WITH cte AS (
    SELECT c.name
        ,b.date
        ,AVG(a.rating) AS daily_rating
    FROM dim_date AS b
    LEFT JOIN fact_sales AS a
    ON a.date_id = b.id
    INNER JOIN dim_location AS c
    ON a.location_id = c.id
    GROUP BY 1, 2
)
SELECT name
    ,date
    ,AVG(daily_rating) OVER (
        PARTITION BY name
        ORDER BY date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) AS rolling_7day_rating
FROM cte
"""

df = pd.read_sql(query, conn)
df

Unnamed: 0,name,date,rolling_7day_rating
0,Mandalay-B,2019-01-01,7.333333
1,Mandalay-B,2019-01-02,6.766667
2,Mandalay-B,2019-01-03,7.355556
3,Mandalay-B,2019-01-04,6.733333
4,Mandalay-B,2019-01-05,6.973333
...,...,...,...
258,Yangon-A,2019-03-26,7.351190
259,Yangon-A,2019-03-27,7.243571
260,Yangon-A,2019-03-28,7.275714
261,Yangon-A,2019-03-29,7.197143


## Cloud deployment

Cloud components needed:

* Workflow orchestrator (Composer)
* Data lake (GCS + Bigquery)
* Dashboard and reporting (Looker)
* Security and access (IAM)

### Composer

Using Google's managed Airflow service to orchestrate the data pipeline. This would involve running a one-time DAG with tasks for extracting, transforming, and loading. If the data source would be constantly added with more data, the DAG would have to be a scheduled job where it can retrieve the data in batches.

### GCS

Specifically used for the extraction step since a CSV file has to be downloaded. In a medallion architecture, GCS would usually be the entrypoint for which external data gets persisted to. Again, if the DAG was scheduled, we employ hive-based partitioning when persisting the data to storage.

### BigQuery

The data warehouse for persisting silver/gold layer assets. In this example, the dim and fact tables would be considered the silver-layer assets and be created in BQ. The queries that were used to answer the questions would be materialized as views, and be considered gold-layer assets. These views would then be exposed for the dashboard/reporting service. Since the task originally asked for SQLite as the DB, the queries will have to be converted to BQ dialect when transitioning to BQ. BQ also has loose enforcement around constraints, so adding additional data validation checks would help.

### Looker

Looker would be the dashboard/reporting service to report out the aggregated metrics. The value of Looker would be seen if the data source would be constantly added with more data, since the graphs/visuals will be updated with refreshed data.

### IAM

IAM would manage permissions and access to the cloud services. Service accounts would have to be created to allow for read/write to GCS/BQ.

All cloud resources can be created and managed via IaC, e.g. Terraform.

![gcp.svg](gcp.svg)

## Closing thoughts

* In most real world scenarios today, the dim and fact tables are usually residing in a transaction DB, serving an application. The role of BQ would then be used for OLAP, where the data would usually be denormalized for faster querying. There is no right solution; it depends on business use-case and available tech stack.
* Alternative to batch ingest would be streaming ingest using PubSub, or using Datastream for CDC which can directly stream changes to BQ from the source DB.
* Sometimes the schema of tables can change due to new requirements. Table migrations can be run using a DAG for running versioned SQL scripts, or utilizing migration tools like SQLAlchemy + Alembic.