In [0]:
%sql
USE CATALOG onlineretail;
CREATE SCHEMA IF NOT EXISTS bronze;

In [0]:
%sql
USE CATALOG onlineretail;
CREATE SCHEMA IF NOT EXISTS log;

In [0]:
%sql
USE CATALOG onlineretail;

CREATE TABLE IF NOT EXISTS log.control (
  PROCESS_NAME STRING,
  LAYER STRING,
  TABLE_NAME STRING,
  STATUS STRING,
  INGESTED_TIMESTAMP TIMESTAMP,
  LAST_UPDATED_TIMESTAMP TIMESTAMP
)
USING DELTA
LOCATION 'abfss://log@adlsonlineretailshrey.dfs.core.windows.net/control'

Customer File

In [0]:


df_customer =  (
    spark
    .read
    .format('parquet')
    .option('header', True)
    .option('inferSchema', True)
    .load("abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/customer")
)

In [0]:
df_customer = df_customer.dropDuplicates()

In [0]:
df_customer.createOrReplaceTempView("df_customer_stage1")

In [0]:
%sql
CREATE TABLE IF NOT EXISTS onlineretail.bronze.dim_customers (
  customer_id STRING,
  customer_name STRING,
  firstname STRING,
  lastname STRING,
  gender STRING,
  city STRING,
  loyalty_status STRING,
  effective_start_date STRING,
  effective_end_date STRING,
  is_active STRING,
  ingested_date TIMESTAMP,
  last_updated_date TIMESTAMP
)
USING DELTA
LOCATION 'abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/delta/customer'
    

In [0]:
%sql
INSERT INTO onlineretail.bronze.dim_customers (
    customer_id,
    customer_name,
    firstname,
    lastname,
    gender,
    city,
    loyalty_status,
    effective_start_date,
    effective_end_date,
    is_active,
    ingested_date,
    last_updated_date
)
SELECT 
    customer_id, 
    customer_name,
    substring(customer_name,1,instr(customer_name,' ')) as firstname,
    substring(customer_name,instr(customer_name,' ')+1,length(customer_name)) as lastname,
    gender,
    city,
    loyalty_status,
    effective_start_date,
    effective_end_date,
    is_active,
    current_timestamp() as ingested_date,
    current_timestamp() as last_updated_date 
FROM df_customer_stage1;


In [0]:
files = dbutils.fs.ls("abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/customer")

for file in files: 
  filename = file[1]
  spark.sql(f"""
  INSERT INTO onlineretail.log.control(PROCESS_NAME,LAYER,TABLE_NAME,STATUS,INGESTED_TIMESTAMP,LAST_UPDATED_TIMESTAMP) values('Ingestion-Customer','bronze','dim_customer','success',CURRENT_TIMESTAMP(),CURRENT_TIMESTAMP())"""
  )
  source = file[0]
  destination = f"abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/customer/archive/{filename}"
  dbutils.fs.mv(source,destination)


In [0]:
%sql
USE CATALOG onlineretail;
CREATE SCHEMA IF NOT EXISTS silver;


CREATE TABLE IF NOT EXISTS onlineretail.silver.dim_customers (
  customer_sk BIGINT GENERATED ALWAYS AS IDENTITY,
  customer_id STRING,
  customer_name STRING,
  firstname STRING,
  lastname STRING,
  gender STRING,
  city STRING,
  loyalty_status STRING,
  effective_start_date STRING,
  effective_end_date STRING,
  is_active STRING,
  ingested_date TIMESTAMP,
  last_updated_date TIMESTAMP
)
USING DELTA
LOCATION 'abfss://silver@adlsonlineretailshrey.dfs.core.windows.net/delta/customer'


SCD Type 1

In [0]:
%sql
merge into onlineretail.silver.dim_customers as target
using onlineretail.bronze.dim_customers as source
on target.customer_id = source.customer_id
when matched then update set 
  target.customer_id = source.customer_id,
  target.customer_name = source.customer_name,
  target.firstname = source.firstname,
  target.lastname = source.lastname,
  target.gender = source.gender,
  target.city = source.city,
  target.loyalty_status = source.loyalty_status,
  target.effective_start_date = source.effective_start_date,
  target.effective_end_date = source.effective_end_date,
  target.is_active = source.is_active,
  target.ingested_date = current_timestamp(),
  target.last_updated_date = current_timestamp()
when not matched by target then 
insert (customer_id,
  customer_name,
  firstname,
  lastname,
  gender,
  city,
  loyalty_status,
  effective_start_date,
  effective_end_date,
  is_active,
  ingested_date,
  last_updated_date
) values (
  source.customer_name,
  source.firstname,
  source.lastname,
  source.gender,
  source.city,
  source.loyalty_status,
  source.effective_start_date,
  source.effective_end_date,
  source.is_active,
  current_timestamp(),
  current_timestamp()
)

SCD Type 2

In [0]:
%sql
merge into onlineretail.silver.dim_customer as target
using onlineretail.bronze.dim_customer as source
on target.customer_id = source.customer_id and target.is_active='Y'
--case 1: customer exists in target table and needs to be updated as inactive
when matched then update set 
  target.is_active = 'N',
  target.effective_end_date = current_timestamp(),
  target.last_updated_date = current_timestamp()
--case 2: customer does exists in target table - new customer
when not matched by target then 
insert (customer_id,
  customer_name, 
  firstname,
  lastname,
  gender,
  city,
  loyalty_status,
  effective_start_date,
  effective_end_date,
  is_active,
  ingested_date,
  last_updated_date
) values (
  source.customer_id,
  source.customer_name, 
  source.firstname,
  source.lastname,
  source.gender,
  source.city,
  source.loyalty_status,
  source.effective_start_date,
  source.effective_end_date,
  source.is_active,
  current_timestamp(),
  current_timestamp()
);

merge into onlineretail.silver.dim_customer as target
using onlineretail.bronze.dim_customer as source
on target.customer_id = source.customer_id and target.is_active='Y'
--case 3: insert new rows for updated customers
when not matched by target then
insert (
  customer_id,
  customer_name,
  firstname,
  lastname,
  gender,
  city,
  loyalty_status,
  effective_start_date,
  effective_end_date,
  is_active,
  ingested_date,
  last_updated_date
) values (
  source.customer_name,
  source.firstname,
  source.lastname,
  source.gender,
  source.city,
  source.loyalty_status,
  source.effective_start_date,
  source.effective_end_date,
  source.is_active,
  current_timestamp(),
  current_timestamp()
);


In [0]:
%sql
INSERT INTO onlineretail.log.control(FILE_NAME,PROCESS_NAME,LAYER,TABLE_NAME,STATUS,INGESTED_TIMESTAMP,LAST_UPDATED_TIMESTAMP) values('Ingestion-Customer','silver','dim_customer','success',CURRENT_TIMESTAMP(),CURRENT_TIMESTAMP())

In [0]:
%sql
USE CATALOG onlineretail;
CREATE SCHEMA IF NOT EXISTS gold;


CREATE TABLE IF NOT EXISTS onlineretail.gold.dim_customers (
  customer_sk BIGINT GENERATED ALWAYS AS IDENTITY,
  customer_id STRING,
  customer_name STRING,
  firstname STRING,
  lastname STRING,
  gender STRING,
  city STRING,
  loyalty_status STRING,
  effective_start_date STRING,
  effective_end_date STRING,
  is_active STRING,
  ingested_date TIMESTAMP,
  last_updated_date TIMESTAMP
)
USING DELTA
LOCATION 'abfss://gold@adlsonlineretailshrey.dfs.core.windows.net/delta/customer'


In [0]:
%sql
merge into onlineretail.gold.dim_customers as target
USING onlineretail.silver.dim_customers as source
on target.customer_id = source.customer_id and source.is_active='Y'
when matched then update set 
  target.customer_name = source.customer_name,
  target.firstname = source.firstname,
  target.lastname = source.lastname,
  target.gender = source.gender,
  target.city = source.city,
  target.loyalty_status = source.loyalty_status,
  target.effective_start_date = source.effective_start_date,
  target.effective_end_date = NULL,
  target.is_active = source.is_active,
  target.last_updated_date = current_timestamp()
  when not matched by target then 
insert (
  customer_id,
  customer_name,
  firstname,
  lastname,
  gender,
  city,
  loyalty_status,
  effective_start_date,
  effective_end_date,
  is_active,
  ingested_date,
  last_updated_date
) values (
  source.customer_id,
  source.customer_name,
  source.firstname,
  source.lastname,
  source.gender,
  source.city,
  source.loyalty_status,
  source.effective_start_date,
  source.effective_end_date,
  source.is_active,
  current_timestamp(),
  current_timestamp()
)

In [0]:
%sql
INSERT INTO onlineretail.log.control(FILE_NAME,PROCESS_NAME,LAYER,TABLE_NAME,STATUS,INGESTED_TIMESTAMP,LAST_UPDATED_TIMESTAMP) values('Ingestion-Customer','gold','dim_customer','success',CURRENT_TIMESTAMP(),CURRENT_TIMESTAMP())

Product

In [0]:
df_product = (
              spark
              .read
              .format("parquet")
              .options(header="true")
              .options(inferchema="true")
              .load("abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/product")
)

In [0]:
df_product = df_product.dropDuplicates()

In [0]:
df_product.createOrReplaceTempView("df_product_stage1")

In [0]:
%sql
use catalog onlineretail;

create table if not exists onlineretail.bronze.dim_product(
  product_id STRING,
  product_name STRING,
  category STRING,
  brand STRING,
  unit_price double,
  effective_start_date timestamp,
  effective_end_date timestamp,
  is_active STRING,
  ingested_date timestamp,
  last_updated_date timestamp
)
USING DELTA
LOCATION 'abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/delta/product'

In [0]:
%sql
insert into onlineretail.bronze.dim_product (
  product_id,
  product_name,
  category,
  brand,
  unit_price,
  effective_start_date,
  effective_end_date,
  is_active,
  ingested_date,
  last_updated_date
  )
select 
product_id,
product_name,
category,
brand,
unit_price,
effective_start_date,
effective_end_date,
is_active,
current_timestamp(),
current_timestamp()
from onlineretail.bronze.df_product_stage1

In [0]:
files = dbutils.fs.ls("abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/product")

for file in files: 
  filename = file[1]
  spark.sql(f"""
  INSERT INTO onlineretail.log.control(PROCESS_NAME,LAYER,TABLE_NAME,STATUS,INGESTED_TIMESTAMP,LAST_UPDATED_TIMESTAMP) values('Ingestion-Product','bronze','dim_product','success',CURRENT_TIMESTAMP(),CURRENT_TIMESTAMP())"""
  )
  source = file[0]
  destination = f"abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/product/archive/{filename}"
  dbutils.fs.mv(source,destination)


In [0]:
%sql
create table if not exists onlineretail.silver.dim_product(
  product_sk bigint generated always as identity,
  product_id STRING,
  product_name STRING,
  category STRING,
  brand STRING,
  unit_price double,
  effective_start_date timestamp,
  effective_end_date timestamp,
  is_active STRING,
  ingested_date timestamp,
  last_updated_date timestamp
)
USING DELTA
LOCATION 'abfss://silver@adlsonlineretailshrey.dfs.core.windows.net/delta/product'

SCD Type1

In [0]:
%sql
merge into onlineretail.silver.dim_product as target
using onlineretail.bronze.dim_product as source
on target.product_id=source.product_id and target.is_active='Y'
when matched then update set 
  target.product_name = source.product_name,
  target.category = source.category,
  target.brand = source.brand,
  target.unit_price = source.unit_price,
  target.effective_end_date = current_timestamp(),
  target.last_updated_date = current_timestamp()
when not matched by target then insert (
  product_id,
  product_name, 
  category,
  brand,
  is_active,
  effective_start_date,
  effective_end_date,
  ingested_date,
  last_updated_date
) values (
  source.product_id,
  source.product_name, 
  source.category,
  source.brand,
  source.is_active,
  source.effective_start_date,
  source.effective_end_date,
  current_timestamp(),
  current_timestamp()) ;

SCD Type2

In [0]:
%sql
--ingest into silver table
merge into onlineretail.silver.dim_product as target
using onlineretail.bronze.dim_product as source
on target.product_id = source.product_id and target.is_active='Y'
--case 1: product exists in target table and needs to be updated as inactive
when matched then update set 
  target.is_active = 'N',
  target.effective_end_date = current_timestamp(),
  target.last_updated_date = current_timestamp()
--case 2: product does not exist in target table
when not matched by target then 
insert (product_id,
  product_name, 
  category,
  brand,
  is_active,
  effective_start_date,
  effective_end_date,
  ingested_date,
  last_updated_date
) values (
  source.product_id,
  source.product_name, 
  source.category,
  source.brand,
  source.is_active,
  source.effective_start_date,
  source.effective_end_date,
  current_timestamp(),
  current_timestamp()
);

--insert the new row for updated product
merge into onlineretail.silver.dim_product as target
using onlineretail.bronze.dim_product as source
on target.product_id = source.product_id and target.is_active='Y'
when not matched by target then 
insert (product_id,
  product_name, 
  category,
  brand,
  is_active,
  effective_start_date,
  effective_end_date,
  ingested_date,
  last_updated_date
) values (
  source.product_id,
  source.product_name, 
  source.category,
  source.brand,
  source.is_active,
  source.effective_start_date,
  source.effective_end_date,
  current_timestamp(),
  current_timestamp()
);


In [0]:
%sql
INSERT INTO onlineretail.log.control(FILE_NAME,PROCESS_NAME,LAYER,TABLE_NAME,STATUS,INGESTED_TIMESTAMP,LAST_UPDATED_TIMESTAMP) values('Ingestion-Product','silver','dim_product','success',CURRENT_TIMESTAMP(),CURRENT_TIMESTAMP())

In [0]:
%sql
INSERT INTO onlineretail.log.control(FILE_NAME,PROCESS_NAME,LAYER,TABLE_NAME,STATUS,INGESTED_TIMESTAMP,LAST_UPDATED_TIMESTAMP) values('Ingestion-Product','gold','dim_product','success',CURRENT_TIMESTAMP(),CURRENT_TIMESTAMP())

In [0]:
%sql
use catalog onlineretail;

create table if not exists onlineretail.gold.dim_product(
  product_sk bigint generated always as identity,
  product_id STRING,
  product_name STRING,
  category STRING,
  brand STRING,
  unit_price double,
  effective_start_date timestamp,
  effective_end_date timestamp,
  is_active STRING,
  ingested_date timestamp,
  last_updated_date timestamp
)
USING DELTA
LOCATION 'abfss://gold@adlsonlineretailshrey.dfs.core.windows.net/delta/product'

In [0]:
%sql
merge into onlineretail.gold.dim_product as target
using onlineretail.silver.dim_product as source
on target.product_id = source.product_id and target.is_active='Y'
when matched then update set 
  target.product_name = source.product_name,
  target.category = source.category,
  target.brand = source.brand,
  target.unit_price = source.unit_price,
  target.effective_start_date = current_timestamp(),
  target.effective_end_date = NULL,
  target.ingested_date = current_timestamp(),
  target.last_updated_date = current_timestamp()
when not matched by target then insert (
  product_id,
  product_name, 
  category,
  brand,
  is_active,
  effective_start_date,
  ingested_date,
  last_updated_date
) values (
  source.product_id,
  source.product_name, 
  source.category,
  source.brand,
  source.is_active,
  source.effective_start_date,
  current_timestamp(),
  current_timestamp()
);

In [0]:
%sql
INSERT INTO onlineretail.log.control(FILE_NAME,PROCESS_NAME,LAYER,TABLE_NAME,STATUS,INGESTED_TIMESTAMP,LAST_UPDATED_TIMESTAMP) values('Ingestion-Customer','gold','dim_product','success',CURRENT_TIMESTAMP(),CURRENT_TIMESTAMP())

Sales

In [0]:
df_sales = (
              spark
              .read
              .format("parquet")
              .options(header="true")
              .options(inferchema="true")
              .load("abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/sales")
)

In [0]:
df_sales = df_sales.dropDuplicates()

In [0]:
df_sales.createOrReplaceTempView("df_sales_stage1")

In [0]:
%sql
use catalog onlineretail;
create table if not exists onlineretail.bronze.fact_sales(
  transaction_id string,
  transaction_date string,
  customer_id string,
  store_id string,
  rep_id string,
  product_id string,
  quantity int,
  unit_price double,
  total_amount double,
  payment_mode string,
  status string,
  ingested_date timestamp,
  last_updated_date timestamp
)
USING DELTA
LOCATION 'abfss://bronze@adlsonlineretailshrey.dfs.core.windows.net/delta/sales'

In [0]:
%sql
insert into onlineretail.bronze.fact_sales (
  transaction_id,
  transaction_date,
  customer_id,
  store_id,
  rep_id,
  product_id,
  quantity,
  unit_price,
  total_amount,
  payment_mode,
  status,
  ingested_date,
  last_updated_date
)
select 
transaction_id,
transaction_date,
customer_id,
store_id,
rep_id,
product_id,
quantity,
unit_price, 
total_amount,
payment_mode,
status,
current_timestamp(),
current_timestamp()
from df_sales_stage1;

In [0]:
%sql
create or replace temp view vw_sales_aggregated as 
SELECT
    transaction_id,
    product_id,
    customer_id,
    store_id,
    rep_id,
    payment_mode,
    SUM(quantity) AS total_quantity,
    SUM(total_amount) AS total_amount,
    FIRST(status) OVER (PARTITION BY transaction_id, product_id ORDER BY transaction_date DESC) AS latest_status
FROM onlineretail.bronze.fact_sales
GROUP BY
    transaction_id, product_id, customer_id, store_id, rep_id, payment_mode;

In [0]:
%sql
use catalog onlineretail;

create table if not exists onlineretail.silver.fact_sales(
  transaction_sk bigint generated always as identity,
  transaction_id string,
  transaction_date string,
  customer_id string,
  store_id string,
  rep_id string,
  product_id string,
  original_quantity int,
  unit_price double,
  original_amount double,
  net_quantity int,
  net_amount double,
  payment_mode string,
  status string,
  ingested_date timestamp,
  last_updated_date timestamp
)

SCD Type1

In [0]:
%sql
merge into onlineretail.silver.fact_sales as target
using onlineretail.bronze.vw_sales_aggregate as source
on target.transaction_id = source.transaction_id and target.product_id=source.product_id
when matched then update set
target.net_quantity = target.net_quantity - source.total_quantity,
target.net_amount = target.net_amount - source.total_amount,
target.status = source.latest_status,
target.last_updated_date = current_timestamp()
when not matched by target then insert 
(
  transaction_id,
  transaction_date, 
  customer_id,
  store_id,
  rep_id,
  product_id,
  original_quantity,
  unit_price,
  original_amount,
  net_quantity,
  net_amount,
  payment_mode,
  status,
  ingested_date,
  last_updated_date
)
values (
  source.transaction_id,
  source.transaction_date, 
  source.customer_id,
  source.store_id,
  source.rep_id,
  source.product_id,
  source.total_quantity,
  source.unit_price,
  source.total_amount,
  source.total_quantity,
  source.total_amount,
  source.payment_mode,
  source.latest_status,
  current_timestamp(),
  current_timestamp()
)

In [0]:
%sql
INSERT INTO onlineretail.log.control(FILE_NAME,PROCESS_NAME,LAYER,TABLE_NAME,STATUS,INGESTED_TIMESTAMP,LAST_UPDATED_TIMESTAMP) values('Ingestion-Sales','silver','fact_sales','success',CURRENT_TIMESTAMP(),CURRENT_TIMESTAMP())

In [0]:
%sql
use catalog onlineretail;
create table if not exists onlineretail.gold.sales_summary(
    sales_date date,
    store_id string,
    total_sales double,
    total_refunds double,
    net_sales double,
    total_quantity int,
    transaction_count bigint,
    avg_selling_price double,
    refund_rate double
)
using delta
location "abfss://gold@adlsonlineretailshrey.dfs.core.windows.net/delta/sales_summary"


In [0]:
%sql
INSERT INTO onlineretail.gold.sales_summary
SELECT
    DATE(transaction_date) AS sales_date,
    store_id,
    SUM(original_amount) AS total_sales,
    SUM(original_amount - net_amount) AS total_refunds,
    SUM(net_amount) AS net_sales,
    SUM(net_quantity) AS total_quantity,
    COUNT(DISTINCT transaction_id) AS transaction_count,
    CASE WHEN SUM(net_quantity) > 0 THEN SUM(net_amount)/SUM(net_quantity) ELSE 0 END AS avg_selling_price,
    CASE WHEN SUM(original_amount) > 0 THEN (SUM(original_amount - net_amount)/SUM(original_amount))*100 ELSE 0 END AS refund_rate
FROM onlineretail.silver.fact_sales
GROUP BY DATE(transaction_date), store_id;


In [0]:
%sql
INSERT INTO onlineretail.log.control(FILE_NAME,PROCESS_NAME,LAYER,TABLE_NAME,STATUS,INGESTED_TIMESTAMP,LAST_UPDATED_TIMESTAMP) values('Ingestion-Sales','gold','sales_summary','success',CURRENT_TIMESTAMP(),CURRENT_TIMESTAMP())