
#Use necessary catalog and schema

In [0]:
%sql
USE CATALOG online_retail;
CREATE SCHEMA IF NOT EXISTS gold;
USE gold;


#Read from source tables

In [0]:
cleaned_df = spark.sql("""SELECT * FROM silver.cleaned_data""")

In [0]:
display(cleaned_df)

In [0]:
cleaned_df.createOrReplaceTempView("uvw_src_cleaned_df")


#Merge Data into Target Table

In [0]:
%sql
-- Create Products Dimension Table (Assumption: SCD Type 1)
CREATE TABLE IF NOT EXISTS dim_product (
  --dim_product_id BIGINT GENERATED BY DEFAULT AS IDENTITY (START WITH 1 INCREMENT BY 1) PRIMARY KEY
  dim_product_id BIGINT PRIMARY KEY
  , product_code STRING
  , product_description STRING
  , average_unit_price DOUBLE
)
TBLPROPERTIES ( 
  'spark.databricks.delta.vacuum.logging.enable' = 'true'
);

-- Create Customers Dimension Table (Assumption: SCD Type 1)
CREATE TABLE IF NOT EXISTS dim_customer (
  --dim_customer_id BIGINT GENERATED BY DEFAULT AS IDENTITY (START WITH 1 INCREMENT BY 1) PRIMARY KEY
  dim_customer_id BIGINT PRIMARY KEY
  , customer_id INTEGER
  , country STRING
)
TBLPROPERTIES ( 
  'spark.databricks.delta.vacuum.logging.enable' = 'true'
);

-- Create Fact Sales Table
CREATE TABLE IF NOT EXISTS fact_sale (
    fct_sale_id BIGINT PRIMARY KEY 
    , invoice_number STRING
    , invoice_date TIMESTAMP
    , dim_product_id BIGINT
    , dim_customer_id BIGINT
    , total_quantity INTEGER
    , CONSTRAINT fk_product FOREIGN KEY (dim_product_id) REFERENCES dim_product
    , CONSTRAINT fk_customer FOREIGN KEY (dim_customer_id) REFERENCES dim_customer
)
TBLPROPERTIES ( 
  'spark.databricks.delta.vacuum.logging.enable' = 'true'
);

In [0]:
%sql

--VACUUM dim_product RETAIN 720 HOURS;            -- Commented out as it is not needed to run everytime
--VACUUM dim_customer RETAIN 720 HOURS;           -- Commented out as it is not needed to run everytime
--VACUUM fact_sale RETAIN 720 HOURS;              -- Commented out as it is not needed to run everytime

In [0]:
%sql

WITH CTE_dim_product AS (
  SELECT DISTINCT                                               -- Avoid Duplicates
    ROW_NUMBER() OVER (ORDER BY product_code) AS dim_product_id -- Distinct row number for every product
    , product_code
    , MAX(product_description)      AS product_description
    , ROUND(AVG(unit_price), 2)     AS average_unit_price       -- Multiple unit prices found for a product. Hence, taking the average. Assumption: DIM_Product is a SCD Type 1
  FROM uvw_src_cleaned_df
  GROUP BY product_code
)


MERGE INTO dim_product AS TGT
USING CTE_dim_product AS SRC
ON TGT.product_code = SRC.product_code                          -- Merge on product_code

-- When matched on product_code, and the other columns are not matching, then update the existing row
WHEN MATCHED                                                    
AND TGT.product_description <> SRC.product_description
AND TGT.average_unit_price <> SRC.average_unit_price                                                 
THEN UPDATE SET
TGT.product_description = SRC.product_description
, TGT.average_unit_price = SRC.average_unit_price

-- When not matched on product_code, insert the new row
WHEN NOT MATCHED                                                 
THEN INSERT *

In [0]:
%sql
--Creating a temp view that has only the latest country for each customer
CREATE OR REPLACE TEMP VIEW customers_country AS                  

WITH ranked_customers_country AS (
  SELECT
    customer_id
    , UPPER(country)                                                          AS country
    , invoice_date
    , ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY invoice_date DESC) AS rn             -- Ranking the customer's country by invoice_date DESC to get their latest country
  FROM uvw_src_cleaned_df
)

SELECT DISTINCT
  ROW_NUMBER() OVER (ORDER BY customer_id)      AS dim_customer_id
  , customer_id
  , country
FROM ranked_customers_country
WHERE rn = 1; 

In [0]:
%sql

MERGE INTO dim_customer AS TGT
USING customers_country AS SRC
ON TGT.customer_id = SRC.customer_id          -- Merge on customer_id

-- When matched on customer_id, and the other columns are not matching, then update the existing row
WHEN MATCHED
AND TGT.country <> SRC.country
THEN UPDATE SET
TGT.country = SRC.country

-- When not matched on customer_id, insert the new row
WHEN NOT MATCHED
THEN INSERT *

In [0]:
%sql
SELECT 
  ROW_NUMBER() OVER (ORDER BY invoice_number, P.dim_product_id, C.dim_customer_id) AS fct_sale_id
  , invoice_number
  , P.dim_product_id
  , C.dim_customer_id
  , SUM(quantity)     AS total_quantity
FROM uvw_src_cleaned_df SRCV
LEFT JOIN dim_product AS P 
  ON SRCV.product_code = P.product_code
LEFT JOIN dim_customer AS C
  ON SRCV.customer_id = C.customer_id
GROUP BY 
  invoice_number
  , P.dim_product_id
  , C.dim_customer_id

In [0]:
%sql

WITH CTE_fact_sale AS (
  SELECT DISTINCT                                -- Avoid duplicates
    -- Distinct row number for every product in a invoice for a customer
    ROW_NUMBER() OVER (ORDER BY invoice_number, P.dim_product_id, C.dim_customer_id) AS fct_sale_id                       
    , invoice_number
    , P.dim_product_id
    , C.dim_customer_id
    , MAX(invoice_date) AS invoice_date           -- Taking the latest invoice date for each invoice number
    , SUM(quantity)     AS total_quantity
  FROM uvw_src_cleaned_df SRCV
  LEFT JOIN dim_product AS P 
    ON SRCV.product_code = P.product_code
  LEFT JOIN dim_customer AS C
    ON SRCV.customer_id = C.customer_id
  GROUP BY                                       -- Grouping by invoice number, product and customer code as granularity is invoice by product by customer
    invoice_number
    , P.dim_product_id
    , C.dim_customer_id
)


-- Merging on invoice number, product and customer code as granularity is invoice by product by customer
MERGE INTO fact_sale AS TGT
USING CTE_fact_sale AS SRC
ON TGT.invoice_number = SRC.invoice_number      
AND TGT.dim_product_id =  SRC.dim_product_id
AND TGT.dim_customer_id =  SRC.dim_customer_id

-- When matched on invoice, product, and customer, and the other columns are not matching, then update the existing row
WHEN MATCHED
AND TGT.invoice_date <> SRC.invoice_date
AND TGT.total_quantity <> SRC.total_quantity
THEN UPDATE SET 
TGT.invoice_date = SRC.invoice_date
,TGT.total_quantity =  SRC.total_quantity

-- When not matched on invoice, product, and customer, then insert the new row
WHEN NOT MATCHED
THEN INSERT *

In [0]:
%sql

OPTIMIZE dim_product
ZORDER BY (product_code);     -- Optimizing on product_code as it is used often for filtering and joining

OPTIMIZE dim_customer
ZORDER BY (customer_id);

OPTIMIZE fact_sale
ZORDER BY (invoice_number);

In [0]:
product_quality_check = spark.sql("""SELECT product_code, COUNT(*) 
FROM dim_product
GROUP BY product_code
HAVING COUNT(*) > 1
""")

if product_quality_check.count() > 0:
    raise Exception("Validation Error: Duplicate product code found in dim_product table.")
else:
    print("No duplicates found in dim_product")

In [0]:
customer_quality_check = spark.sql("""SELECT customer_id, COUNT(*) 
FROM dim_customer
GROUP BY customer_id
HAVING COUNT(*) > 1
""")

if customer_quality_check.count() > 0:
    raise Exception("Validation Error: Duplicate customer id found in dim_customer table.")
else:
    print("No duplicates found in dim_customer")

In [0]:
sale_quality_check = spark.sql("""SELECT invoice_number, dim_product_id, dim_customer_id, COUNT(*) 
FROM fact_sale
GROUP BY invoice_number, dim_product_id, dim_customer_id
HAVING COUNT(*) > 1""")

if sale_quality_check.count() > 0:
    raise Exception("Validation Error: Duplicate sale transactions found in fact_sale table.")
else:
    print("No duplicates found in fact_sale")

In [0]:
null_product_check = spark.sql("""SELECT * FROM fact_sale WHERE dim_product_id is NULL""")

if null_product_check.count() > 0:
    raise Exception("Validation Error: NULL values found in dim_product_id column of fact_sale table.")
else:
    print("No null values found in dim_product_id column of fact_sale table.")

In [0]:
null_customer_check = spark.sql("""SELECT * FROM fact_sale WHERE dim_customer_id is NULL""")

if null_customer_check.count() > 0:
    raise Exception("Validation Error: NULL values found in dim_customer_id column of fact_sale table.")
else:
    print("No null values found in dim_customer_id column of fact_sale table.")