In [0]:
/***********************************************************************************
Note: catalog_name and schema_name values should be set in the DLT pipeline
settings under configurations.
***********************************************************************************/

### Create a streaming view based on source data

In [0]:

-- Customers Source View
CREATE OR REFRESH STREAMING TABLE customers_source_view
AS SELECT * FROM STREAM(${catalog_name}.${schema_name}.customers_source);

-- Orders Source View
CREATE OR REFRESH STREAMING TABLE orders_source_view
AS SELECT * FROM STREAM(${catalog_name}.${schema_name}.orders_source);

-- Products Source View
CREATE OR REFRESH STREAMING TABLE products_source_view
AS SELECT * FROM STREAM(${catalog_name}.${schema_name}.products_source);


### Create SCD Type 1 Streaming Tables

In [0]:
-- Customers Target Table
CREATE OR REFRESH STREAMING TABLE ${catalog_name}.${schema_name}.customers_type1;

APPLY CHANGES INTO ${catalog_name}.${schema_name}.customers_type1
  FROM stream(${catalog_name}.${schema_name}.customers_source_view)
  KEYS (customer_id)
  SEQUENCE BY timestamp
  COLUMNS * EXCEPT (timestamp)
  STORED AS SCD TYPE 1;

-- Orders Target Table
CREATE OR REFRESH STREAMING TABLE ${catalog_name}.${schema_name}.orders_type1;

APPLY CHANGES INTO ${catalog_name}.${schema_name}.orders_type1
  FROM stream(${catalog_name}.${schema_name}.orders_source_view)
  KEYS (order_id)
  SEQUENCE BY timestamp
  COLUMNS * EXCEPT (timestamp)
  STORED AS SCD TYPE 1;

-- Products Target Table
CREATE OR REFRESH STREAMING TABLE ${catalog_name}.${schema_name}.products_type1;

APPLY CHANGES INTO ${catalog_name}.${schema_name}.products_type1
  FROM stream(${catalog_name}.${schema_name}.products_source_view)
  KEYS (product_id)
  SEQUENCE BY timestamp
  COLUMNS * EXCEPT (timestamp)
  STORED AS SCD TYPE 1;


### Create SCD Type 2 Streaming Tables

In [0]:
-- Customers Target Table
CREATE OR REFRESH STREAMING TABLE ${catalog_name}.${schema_name}.customers_type2;

APPLY CHANGES INTO ${catalog_name}.${schema_name}.customers_type2
  FROM stream(${catalog_name}.${schema_name}.customers_source_view)
  KEYS (customer_id)
  -- APPLY AS DELETE WHEN id < 5 : Using DELETE Operation on a target table will break the downstream streaming table.
  SEQUENCE BY timestamp
  COLUMNS * EXCEPT (timestamp)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * 
  EXCEPT (timestamp);

-- Orders Target Table
CREATE OR REFRESH STREAMING TABLE ${catalog_name}.${schema_name}.orders_type2;

APPLY CHANGES INTO ${catalog_name}.${schema_name}.orders_type2
  FROM stream(${catalog_name}.${schema_name}.orders_source_view)
  KEYS (order_id)
  SEQUENCE BY timestamp
  COLUMNS * EXCEPT (timestamp)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * 
  EXCEPT (timestamp);

-- Products Target Table
CREATE OR REFRESH STREAMING TABLE ${catalog_name}.${schema_name}.products_type2;

APPLY CHANGES INTO ${catalog_name}.${schema_name}.products_type2
  FROM stream(${catalog_name}.${schema_name}.products_source_view)
  KEYS (product_id)
  SEQUENCE BY timestamp
  COLUMNS * EXCEPT (timestamp)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * 
  EXCEPT (timestamp);


### Create a Materialized View

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW gold_materialized_view AS
SELECT 
    o.customer_id, 
    c.customer_name, 
    o.order_id, 
    o.product_id, 
    o.quantity, 
    o.price, 
    p.brand
FROM 
    LIVE.orders_type1 o
INNER JOIN 
    LIVE.customers_type1 c ON o.customer_id = c.customer_id
INNER JOIN 
    LIVE.products_type1 p ON o.product_id = p.product_id
    WHERE o.order_status != 'Pending'
