### DB Setup

We'll store our data using SQLite in our local directory:

In [1]:
import sqlite3
conn = sqlite3.connect('sales_data.db')

### Mock Data

We want to create a simple simulation of sales data for an arbitrary number of retailers (that each have a random number of stores, up to some max number).  This data will span an arbitrary amount of abstract time periods (in the default case, 58 weeks), and will contain sales in a currency (USD) and sales in units for an arbitrary number of products (default 10).  Prices of products will be random up to a cap (default 15.99) and units will be capped (default 15 units sold).

To add some additional realism, we will mock missing data to simulate new stores (missing weeks up to a point in time) and closed stores (missing weeks after a point in time).

Following the data generation, we will duplicate it and double the copy.  We will then upload the doubled data first to simulate a bad upload and then upload the correct data to simulate a correction.  Our table follows the principle of not deleting anything to demonstrate latest source of truth.

In summary:

1. Pick variables (number of abstract products, stores, retailers, dates, price, units sold)
2. Generate abstract objects (products, retailers, stores in each retailer, dates)
3. For each abstract product:
    1. determine product price
    2. determine max units sold in stores
        1. A future iteration should have this calculated at the retailer level, since an abstract product could be produce, which would sell more in some retailers over others
4. For each retailer:
    1. determine store count
    2. For each product, for each store (up to the retailer max), for each date:
        1. create a skeleton containing the \[date, store, retailer, product\]
        2. pick a random number of units sold for that set, and assign it to units sold
        3. using the unit price, multiply by units sold to calculate total sales in currency
        4. create two rows by appending the values for units sold and sales in currency to the skeleton
        5. add row to list of rows
5. convert total list of rows containing all retailer data into a pandas dataframe
6. Lazily cluster data by retailer:
    1. Group by store, retailer, value
    2. calculate mean
    3. sort values by retailer
    4. round-robin distribute Test (T), Control (C), and Not Selected (NS) to each store starting at the top
7. Append cluster assignments to data
8. Add noise:
    1. Pick 1 out of every 20 stores in each retailer to flag as new or closed
    2. for each new store, pick a random date and delete all data before that date for that store
    3. for each closed store, pick a random date and delete all data after that date for that store
9. Duplicate and double the `value` column in the copy
10. upload to the database
11. Generate the tables and populate views
12. Populate tables

In [2]:
import random

# Mock Data Parameters
NUM_PRODUCTS = 10
NUM_STORES = 600
NUM_DATES = 58
NUM_RETAILERS = 3

MIN_RET_STORES = 250
MAX_UNIT_DOLLAR = 15
MAX_UNIT_SOLD = 15

# Planning
PRODUCTS = [f'PRODUCT_{k}' for k in range(NUM_PRODUCTS)]
RETAILERS = [f'RETAILER_{k}' for k in range(NUM_RETAILERS)]
STORES = [str(k).zfill(len(str(NUM_STORES))) for k in range(NUM_STORES)]
DATES = [f'WEEK_{str(k).zfill(2)}' for k in range(NUM_DATES)]

PRODUCT_PRICES = dict()
PRODUCT_SELL_MAX = dict()

# 1. For each product:
#     1. Determine Product Price
#     2. Determine the max products sold in stores
# 2. calculate the number of stores in each retailer (min_stores, max_stores)
# 3. For each retailer, for each week, for each store (up to retailer max), for each product:
#     1. create row skeleton: week, product, store, retailer
#     2. calculate number of products sold that week using (1.B.)
#     3. create two rows: row skeleton + unit, (2.B.), row skeleton + usd, (2.B.) * (1.A)

rows = list()

for p in PRODUCTS:
    # Determine product price
    _r = random.randint(0, MAX_UNIT_DOLLAR)
    _d = _r if _r == 0 else random.randint(0, _r)
    _c = random.randint(0, 99)
    _p = float('.'.join([str(_d), str(_c).zfill(2)]))

    # Implement values
    PRODUCT_PRICES[p] = _p
    PRODUCT_SELL_MAX[p] = random.randint(1, MAX_UNIT_SOLD)

for r in RETAILERS:
    _store_max = -1 if r == RETAILERS[-1] else random.randint(MIN_RET_STORES, NUM_STORES)
    for p in PRODUCTS:
        for s in STORES[0:_store_max]:
            for d in DATES:
                skel = [d, s, r, p]
                _s = random.randint(0, PRODUCT_SELL_MAX[p])
                r1 = skel + ['SALES_UNITS', str(_s)]
                r2 = skel + ['SALES_USD', "{:.2f}".format(float(_s) * PRODUCT_PRICES[p])]
                rows.append(r1)
                rows.append(r2)

### Data Preview

In [3]:
PRODUCT_PRICES

{'PRODUCT_0': 3.61,
 'PRODUCT_1': 0.92,
 'PRODUCT_2': 0.99,
 'PRODUCT_3': 14.84,
 'PRODUCT_4': 2.62,
 'PRODUCT_5': 1.3,
 'PRODUCT_6': 2.4,
 'PRODUCT_7': 7.78,
 'PRODUCT_8': 11.1,
 'PRODUCT_9': 1.62,
 'PRODUCT_10': 0.21,
 'PRODUCT_11': 0.92,
 'PRODUCT_12': 6.87,
 'PRODUCT_13': 0.12,
 'PRODUCT_14': 3.76}

In [4]:
PRODUCT_SELL_MAX

{'PRODUCT_0': 14,
 'PRODUCT_1': 4,
 'PRODUCT_2': 9,
 'PRODUCT_3': 3,
 'PRODUCT_4': 6,
 'PRODUCT_5': 12,
 'PRODUCT_6': 2,
 'PRODUCT_7': 12,
 'PRODUCT_8': 10,
 'PRODUCT_9': 15,
 'PRODUCT_10': 15,
 'PRODUCT_11': 4,
 'PRODUCT_12': 12,
 'PRODUCT_13': 4,
 'PRODUCT_14': 10}

In [5]:
rows[:10]

[['WEEK_00', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_UNITS', '0'],
 ['WEEK_00', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_USD', '0.00'],
 ['WEEK_01', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_UNITS', '3'],
 ['WEEK_01', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_USD', '10.83'],
 ['WEEK_02', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_UNITS', '12'],
 ['WEEK_02', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_USD', '43.32'],
 ['WEEK_03', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_UNITS', '11'],
 ['WEEK_03', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_USD', '39.71'],
 ['WEEK_04', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_UNITS', '7'],
 ['WEEK_04', '0000', 'RETAILER_0', 'PRODUCT_0', 'SALES_USD', '25.27']]

In [6]:
len(rows)

17598360

### Stuff into DataFrame

In [7]:
import pandas as pd

df = pd.DataFrame(rows, columns=['date', 'store', 'retailer', 'product', 'unit', 'value'])
df['value'] = df['value'].astype(float)

df.head()

Unnamed: 0,date,store,retailer,product,unit,value
0,WEEK_00,0,RETAILER_0,PRODUCT_0,SALES_UNITS,0.0
1,WEEK_00,0,RETAILER_0,PRODUCT_0,SALES_USD,0.0
2,WEEK_01,0,RETAILER_0,PRODUCT_0,SALES_UNITS,3.0
3,WEEK_01,0,RETAILER_0,PRODUCT_0,SALES_USD,10.83
4,WEEK_02,0,RETAILER_0,PRODUCT_0,SALES_UNITS,12.0


### Apply Lazy Cluster

In [8]:
gbs = df.groupby(['store', 'retailer', 'unit'])['value'].mean().reset_index()

In [9]:
gbs = gbs.loc[gbs['unit'].isin(['SALES_USD'])].sort_values(by=['retailer', 'value']).reset_index(drop=True)

In [10]:
c_map = {0: 'T', 1: 'C', 2: 'NS'}

gbs['cluster'] = pd.Series(gbs.index).apply(lambda x: x % 3).map(c_map)

In [11]:
gbs.groupby(['retailer', 'cluster'])['value'].mean()

retailer    cluster
RETAILER_0  C          16.656545
            NS         16.655788
            T          16.654773
RETAILER_1  C          16.700525
            NS         16.684343
            T          16.691976
RETAILER_2  C          16.673853
            NS         16.673210
            T          16.674128
RETAILER_3  C          16.672383
            NS         16.673314
            T          16.674290
Name: value, dtype: float64

### Merge Cluster Assignment

In [12]:
clusters = gbs[['retailer', 'store', 'cluster']].drop_duplicates().reset_index(drop=True)
clusters.head()

Unnamed: 0,retailer,store,cluster
0,RETAILER_0,2080,T
1,RETAILER_0,416,C
2,RETAILER_0,1012,NS
3,RETAILER_0,1356,T
4,RETAILER_0,52,C


In [13]:
df.head()

Unnamed: 0,date,store,retailer,product,unit,value
0,WEEK_00,0,RETAILER_0,PRODUCT_0,SALES_UNITS,0.0
1,WEEK_00,0,RETAILER_0,PRODUCT_0,SALES_USD,0.0
2,WEEK_01,0,RETAILER_0,PRODUCT_0,SALES_UNITS,3.0
3,WEEK_01,0,RETAILER_0,PRODUCT_0,SALES_USD,10.83
4,WEEK_02,0,RETAILER_0,PRODUCT_0,SALES_UNITS,12.0


### Add some noise

In [14]:
# First, from each retailer, simulate a handful of new stores
# One out of every 20 stores is new or closed
store_meta = df[['retailer', 'store']].drop_duplicates().sample(frac=1).reset_index(drop=True)

# If new, pick n and delete all history before WEEK_{n}
store_meta['new'] = pd.Series(store_meta.index).apply(lambda x: random.sample(DATES, 1)[0] if not (x % 20) else DATES[0])

# If closed, pick n and delete all history after WEEK_{n}
store_meta['closed'] = pd.Series(store_meta.index).apply(lambda x: random.sample(DATES, 1)[0] if (x % 20 == 1) else DATES[-1])

In [15]:
store_meta.head()

Unnamed: 0,retailer,store,new,closed
0,RETAILER_3,2987,WEEK_41,WEEK_57
1,RETAILER_3,1918,WEEK_00,WEEK_21
2,RETAILER_0,191,WEEK_00,WEEK_57
3,RETAILER_2,1771,WEEK_00,WEEK_57
4,RETAILER_3,76,WEEK_00,WEEK_57


In [16]:
df = pd.merge(df, store_meta)

In [17]:
bix = df['date'].between(df['new'], df['closed'])

keep = df.loc[bix]
discard = df.loc[~bix]

print('discard:        ', discard.shape[0])
print('keep:           ', keep.shape[0])
print('original row ct:', len(rows))
print('keep+discard:   ', discard.shape[0] + keep.shape[0])

df = df.loc[bix].reset_index(drop=True)
del keep
del discard

discard:         894750
keep:            16703610
original row ct: 17598360
keep+discard:    17598360


### Duplicate & Double to simulate a bad download and then Upload Data to DB

In [18]:
merge = pd.merge(df, clusters)
merge['value'] = merge['value'].astype(float)

dupes = merge.reset_index(drop=True)
dupes['value'] = dupes['value'] * 2

### Dimensionalize

In [19]:
import sqlite3
conn = sqlite3.connect('sales_data.db')

sql = """
DROP TABLE IF EXISTS fact_sale;
DROP VIEW IF EXISTS fact_salev;

DROP TABLE IF EXISTS dim_date;
CREATE TABLE dim_date (
    date_id INTEGER PRIMARY KEY AUTOINCREMENT
    ,date_value TEXT NOT NULL
    ,UNIQUE(date_value) ON CONFLICT IGNORE
);

DROP VIEW IF EXISTS dim_datev;
CREATE VIEW dim_datev AS
    SELECT DISTINCT
        date AS date_value
    FROM
        raw
    ORDER BY
        date ASC;


DROP TABLE IF EXISTS dim_retailer;
CREATE TABLE dim_retailer (
    retailer_id INTEGER PRIMARY KEY AUTOINCREMENT
    ,retailer_name TEXT NOT NULL
    ,UNIQUE(retailer_name) ON CONFLICT IGNORE
);

DROP VIEW IF EXISTS dim_retailerv;
CREATE VIEW dim_retailerv AS
    SELECT DISTINCT
        retailer AS retailer_name
    FROM
        raw
    ORDER BY
        retailer ASC;


DROP TABLE IF EXISTS dim_store;
CREATE TABLE dim_store (
    store_id INTEGER PRIMARY KEY AUTOINCREMENT
    ,store_name TEXT NOT NULL
    ,retailer_id BIGINT NOT NULL REFERENCES dim_retailer(retailer_id)
    ,UNIQUE(store_name, retailer_id) ON CONFLICT IGNORE
);

DROP VIEW IF EXISTS dim_storev;
CREATE VIEW dim_storev AS
    SELECT DISTINCT
        store AS store_name
        ,retailer_id
    FROM
        raw
    JOIN
        dim_retailer
        ON retailer = retailer_name
    ORDER BY
        retailer_id
        ,store_name;


DROP TABLE IF EXISTS dim_product;
CREATE TABLE dim_product (
    product_id INTEGER PRIMARY KEY AUTOINCREMENT
    ,product_name TEXT NOT NULL
    ,UNIQUE(product_name) ON CONFLICT IGNORE
);

DROP VIEW IF EXISTS dim_productv;
CREATE VIEW dim_productv AS
    SELECT DISTINCT
        product AS product_name
    FROM
        raw
    ORDER BY
        product_name ASC;


DROP TABLE IF EXISTS dim_unit;
CREATE TABLE dim_unit (
    unit_id INTEGER PRIMARY KEY AUTOINCREMENT
    ,unit_name TEXT NOT NULL
    ,UNIQUE(unit_name) ON CONFLICT IGNORE
);

DROP VIEW IF EXISTS dim_unitv;
CREATE VIEW dim_unitv AS
    SELECT DISTINCT
        unit AS unit_name
    FROM
        raw
    ORDER BY
        unit_name ASC;


DROP TABLE IF EXISTS fact_sale;
CREATE TABLE fact_sale (
     fact_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT
    ,date_id INTEGER NOT NULL REFERENCES dim_date(date_id)
    ,store_id INTEGER NOT NULL REFERENCES dim_store(store_id)
    ,product_id INTEGER NOT NULL REFERENCES dim_product(product_id)
    ,unit_id INTEGER NOT NULL REFERENCES dim_unit(unit_id)
    ,value FLOAT
    ,created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

DROP VIEW IF EXISTS fact_salev;
CREATE VIEW fact_salev AS
    SELECT
        date_id
        ,store_id
        ,product_id
        ,unit_id
        ,value
    FROM
        raw
    JOIN
        dim_date
        ON date_value = date
    JOIN
        dim_retailer
        ON retailer_name = retailer
    JOIN
        dim_store
        ON store = store_name
        AND dim_store.retailer_id = dim_retailer.retailer_id
    JOIN
        dim_product
        ON product_name = product
    JOIN
        dim_unit
        ON unit_name = unit;


DROP TABLE IF EXISTS dim_segment;
CREATE TABLE dim_segment (
    segment_id INTEGER PRIMARY KEY AUTOINCREMENT
    ,segment_name TEXT NOT NULL
    ,UNIQUE(segment_name) ON CONFLICT IGNORE
);

DROP VIEW IF EXISTS dim_segmentv;
CREATE VIEW dim_segmentv AS
    SELECT
        DISTINCT cluster AS segment_name
    FROM
        raw;


DROP TABLE IF EXISTS fact_segmentation;
CREATE TABLE fact_segmentation (
    fact_id INTEGER PRIMARY KEY AUTOINCREMENT
    ,analysis_id INTEGER NOT NULL
    ,segment_id INTEGER NOT NULL REFERENCES dim_segment(segment_id)
    ,store_id INTEGER NOT NULL REFERENCES dim_store(store_id)
    ,UNIQUE(analysis_id, store_id) ON CONFLICT REPLACE
);

DROP VIEW IF EXISTS fact_segmentationv;
CREATE VIEW fact_segmentationv AS
    SELECT DISTINCT
        retailer_id AS analysis_id
        ,segment_id
        ,store_id
    FROM
        raw
    JOIN
        dim_segment
        ON cluster = segment_name
    JOIN
        dim_store
        ON store_name = store;
"""

conn.executescript(sql)

<sqlite3.Cursor at 0x7f45c3198e30>

### Upload Bad Data

In [20]:
dupes.to_sql('raw', conn, if_exists='replace', index=False)

pop_sql = """
INSERT INTO dim_date (date_value)
    SELECT date_value FROM dim_datev;

INSERT INTO dim_retailer (retailer_name)
    SELECT retailer_name FROM dim_retailerv;

INSERT INTO dim_store (store_name, retailer_id)
    SELECT store_name, retailer_id FROM dim_storev;

INSERT INTO dim_product (product_name)
    SELECT product_name FROM dim_productv;

INSERT INTO dim_unit (unit_name)
    SELECT unit_name FROM dim_unitv;

INSERT INTO fact_sale (date_id, store_id, product_id, unit_id, value)
    SELECT date_id, store_id, product_id, unit_id, value FROM fact_salev;

INSERT INTO dim_segment (segment_name)
    SELECT segment_name FROM dim_segmentv;

INSERT INTO fact_segmentation (analysis_id, segment_id, store_id)
    SELECT analysis_id, segment_id, store_id FROM fact_segmentationv;
"""

conn.executescript(pop_sql)

<sqlite3.Cursor at 0x7f45c31dbc70>

### Upload Fix

In [21]:
import time

time.sleep(90)

merge.to_sql('raw', conn, if_exists='replace', index=False)

pop_sql = """
INSERT INTO dim_date (date_value)
    SELECT date_value FROM dim_datev;

INSERT INTO dim_retailer (retailer_name)
    SELECT retailer_name FROM dim_retailerv;

INSERT INTO dim_store (store_name, retailer_id)
    SELECT store_name, retailer_id FROM dim_storev;

INSERT INTO dim_product (product_name)
    SELECT product_name FROM dim_productv;

INSERT INTO dim_unit (unit_name)
    SELECT unit_name FROM dim_unitv;

INSERT INTO fact_sale (date_id, store_id, product_id, unit_id, value)
    SELECT date_id, store_id, product_id, unit_id, value FROM fact_salev;

INSERT INTO dim_segment (segment_name)
    SELECT segment_name FROM dim_segmentv;

INSERT INTO fact_segmentation (analysis_id, segment_id, store_id)
    SELECT analysis_id, segment_id, store_id FROM fact_segmentationv;
"""

conn.executescript(pop_sql)

<sqlite3.Cursor at 0x7f45c3198c70>

### Test Upload

In [22]:
dupe_ct = dupes.shape[0]
orig_ct = merge.shape[0]

print(dupe_ct)
print(orig_ct)
print(dupe_ct + orig_ct)

16703610
16703610
33407220


In [23]:
pd.read_sql("select created_at, count(*) from fact_sale group by 1", conn)

Unnamed: 0,created_at,count(*)
0,2022-06-08 14:55:56,16703610
1,2022-06-08 14:58:50,16703610


### Query Data

In [24]:
import pandas as pd
import sqlite3

conn = sqlite3.connect('sales_data.db')

In [25]:
pd.read_sql('''
SELECT
  fs.*
  ,date_value
  ,store_name
  ,retailer_id
  ,product_name
  ,unit_name
  ,segment_name
  ,analysis_id
FROM
  fact_sale fs
JOIN
  dim_date dd
  ON dd.date_id = fs.date_id
JOIN
  dim_store ds
  ON ds.store_id = fs.store_id
JOIN
  dim_product dp
  ON dp.product_id = fs.product_id
JOIN
  dim_unit du
  ON du.unit_id = fs.unit_id
JOIN
  fact_segmentation fseg
  ON fseg.store_id = fs.store_id
JOIN
  dim_segment dseg
  ON dseg.segment_id = fseg.segment_id
LIMIT 5
''', conn)

Unnamed: 0,fact_id,date_id,store_id,product_id,unit_id,value,created_at,date_value,store_name,retailer_id,product_name,unit_name,segment_name,analysis_id
0,1,1,1,1,1,0.0,2022-06-08 14:55:56,WEEK_00,0,1,PRODUCT_0,SALES_UNITS,T,1
1,2,1,1,1,2,0.0,2022-06-08 14:55:56,WEEK_00,0,1,PRODUCT_0,SALES_USD,T,1
2,3,2,1,1,1,6.0,2022-06-08 14:55:56,WEEK_01,0,1,PRODUCT_0,SALES_UNITS,T,1
3,4,2,1,1,2,21.66,2022-06-08 14:55:56,WEEK_01,0,1,PRODUCT_0,SALES_USD,T,1
4,5,3,1,1,1,24.0,2022-06-08 14:55:56,WEEK_02,0,1,PRODUCT_0,SALES_UNITS,T,1


In [26]:
pd.read_sql("select created_at, count(*) from fact_sale", conn)

Unnamed: 0,created_at,count(*)
0,2022-06-08 14:55:56,33407220
