In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%sql
USE CATALOG atliq_project;

Read market data from bronze layer

In [0]:
market_df = spark.read.format("parquet").load("abfss://bronze@adlsdbspractice.dfs.core.windows.net/atliqsale/dim_market.parquet")

Cleaning sub_zone, region columns

In [0]:
# Capitalize the 'sub_zone' column using upper()
market_df = market_df.withColumn("sub_zone", upper("sub_zone"))

In [0]:
# Capitalize the 'region' column using upper()
market_df = market_df.withColumn("region", upper("region"))

Deduplicate

In [0]:
market_df = market_df.dropDuplicates(["market", "sub_zone", "region"])

Data quality check - If data are bad quality, go to quarantine table

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS staging;
CREATE SCHEMA IF NOT EXISTS silver;

In [0]:
%sql
-- Create a clean market table
CREATE TABLE IF NOT EXISTS staging.market
 (market STRING,
  sub_zone STRING,
  region STRING)
USING DELTA;

-- create a quarantine market table
CREATE TABLE IF NOT EXISTS staging.quarantined_market
 (market STRING,
  sub_zone STRING,
  region STRING)
USING DELTA;

In [0]:
#Flag Violations - if any value of the columns are null, flag as 1
flag_market_df = market_df.withColumn("quarantine_check",\
                             when((col("market").isNull() | col("sub_zone").isNull() | \
                             col("region").isNull()), lit("1")).otherwise(lit("0")))

In [0]:
flag_market_df.createOrReplaceTempView("flag_market_df")

In [0]:
%sql
-- insert bad records into the quarantine market table
INSERT INTO staging.quarantined_market
 SELECT market, sub_zone, region
 FROM flag_market_df 
 WHERE quarantine_check = 1;
-- insert good records into market table
INSERT INTO staging.market
 SELECT market, sub_zone, region
 FROM flag_market_df
 WHERE quarantine_check = 0;

num_affected_rows,num_inserted_rows
27,27


SCD type 2 for market

In [0]:
%sql
-- SCD type 2 for market in silver layer
CREATE TABLE IF NOT EXISTS silver.markets (
        market STRING,
        sub_zone STRING,
        region STRING,
        inserted_date TIMESTAMP,
        modified_date TIMESTAMP,
        is_current BOOLEAN
    ) USING DELTA;

In [0]:
%sql 
-- Step 1: Mark existing records as historical (is_current = false and modified_date = current_timestamp) for market that will be updated
MERGE INTO silver.markets AS target 
USING staging.market AS source 
ON target.market = source.market
WHEN MATCHED
AND (
    target.sub_zone <> source.sub_zone
    OR target.region <> source.region
)
AND target.is_current = true
THEN
UPDATE
SET target.is_current = false,
    target.modified_date = current_timestamp()
WHEN NOT MATCHED THEN
INSERT (
    market,
    sub_zone,
    region,
    inserted_date,
    modified_date,
    is_current
    )
VALUES (
        source.market,
        source.sub_zone,
        source.region,
        -- Set inserted_date to current timestamp
        current_timestamp(),
        -- Set modified_date to really far timeline
        '9999-12-31',
        -- Mark as this is current value
        true 
    );

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
27,0,0,27


In [0]:
%sql
-- Step 2: Insert new updated records into the silver layer with is_current = true 
INSERT INTO silver.markets
(
  market,
  sub_zone,
  region,
  inserted_date,
  modified_date,
  is_current
)
SELECT
  source.market,
  source.sub_zone,
  source.region,
-- Set inserted date is current timestamp
  current_timestamp(),
-- Set modified date is really far away
  '9999-12-31',
-- Set is_current to true
  true
FROM staging.market AS source
INNER JOIN silver.markets AS target 
ON target.market = source.market
WHERE target.is_current = false

num_affected_rows,num_inserted_rows
0,0
