# Silver layer

Silver = staging curado e incremental: padroniza tipos, normaliza valores (status, datas), deduplica e prepara SCD. Tabelas “técnicas” com sufixos _clean, _dedup e a SCD T2 operacional de cliente (ex.: `silver.dim_customer_scd`).

In [0]:
CREATE SCHEMA IF NOT EXISTS data_modelling.silver;
USE CATALOG data_modelling

## Abordagem incremental (Databricks / Delta)
Use `MERGE INTO` + watermark por coluna de tempo do evento/atualização (`order_date`, `updated_at`, `last_update_date` ou `_ingestion_timestamp` se tiver). A Silver inteira pode (e deve) ser incremental:

### 1) Silver Orders – limpeza incremental
Objetivo: tipar datas/numéricos, padronizar status, remover nulos críticos e suportar duplicatas por `order_id`.
No primeiro build, cria a tabela; nos próximos, faça MERGE por NK (`order_id`).

In [0]:
%sql
--DROP TABLE silver.orders_clean;

In [0]:
-- =========================================================
-- SILVER: orders_clean (dedupe + idempotência via hash)
-- =========================================================
CREATE TABLE IF NOT EXISTS silver.orders_clean (
  order_id      STRING,
  customer_id   STRING,
  order_date    DATE,
  order_status  STRING,
  total_amount  DECIMAL(18, 2),
  row_hash      STRING
) USING DELTA;

-- 1) Stage: normaliza e parseia order_date para timestamp (order_ts) + DATE
CREATE OR REPLACE TEMP VIEW stage_orders AS
SELECT
  order_id,
  customer_id,
  -- timestamp de referência (suporta múltiplos formatos)
  COALESCE(
    try_to_timestamp(order_date, 'yyyy-MM-dd HH:mm:ss'),
    try_to_timestamp(order_date, 'yyyy/MM/dd HH:mm:ss'),
    try_to_timestamp(order_date, 'dd/MM/yyyy HH:mm:ss'),
    try_to_timestamp(order_date, 'dd-MM-yyyy HH:mm:ss'),
    try_to_timestamp(order_date, 'yyyy-MM-dd'),
    try_to_timestamp(order_date, 'yyyy/MM/dd'),
    try_to_timestamp(order_date, 'dd/MM/yyyy'),
    try_to_timestamp(order_date, 'dd-MM-yyyy')
  ) AS order_ts,
  UPPER(TRIM(order_status)) AS order_status_norm,
  CAST(regexp_replace(total_amount, ',', '.') AS DECIMAL(18,2)) AS total_amount_norm
FROM bronze.orders
WHERE order_id IS NOT NULL;

-- 2) Janela incremental (watermark de 60 dias)
CREATE OR REPLACE TEMP VIEW stage_orders_win AS 
SELECT *
FROM stage_orders
WHERE order_ts >= date_sub(current_timestamp(), 60);

-- 3) Dedup: mantém 1 linha por order_id (mais recente por order_ts)
CREATE OR REPLACE TEMP VIEW stage_orders_dedup AS
SELECT
  order_id,
  customer_id,
  CAST(order_ts AS DATE)          AS order_date,
  order_status_norm               AS order_status,
  total_amount_norm               AS total_amount,
  order_ts
FROM (
  SELECT
    s.*,
    ROW_NUMBER() OVER (
      PARTITION BY order_id
      ORDER BY order_ts DESC NULLS LAST,
               customer_id              -- desempate determinístico
    ) as rn
  FROM stage_orders_win s
  WHERE order_ts IS NOT NULL
) z
WHERE rn = 1;

-- 4) Calcula hash para idempotência (evita UPDATE sem mudança real)
CREATE OR REPLACE TEMP VIEW stage_orders_final AS
SELECT
  order_id,
  customer_id,
  order_date,
  order_status,
  total_amount,
  sha2(
    concat_ws('||',
    coalesce(customer_id, ''),
    coalesce(date_format(order_date, 'yyyy-MM-dd')),
    coalesce(order_status, ''),
    cast(coalesce(total_amount, 0) as string)
    ),
    256
  ) AS row_hash
FROM stage_orders_dedup;

-- 5) MERGE idempotente: só atualiza quando o hash difere
MERGE INTO silver.orders_clean t
USING stage_orders_final AS s
  ON t.order_id = s.order_id
WHEN MATCHED AND (t.row_hash IS NULL OR t.row_hash <> s.row_hash) THEN UPDATE SET
  t.customer_id   = s.customer_id,
  t.order_date    = s.order_date,
  t.order_status = s.order_status,
  t.total_amount  = s.total_amount,
  t.row_hash      = s.row_hash
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, order_date, order_status, total_amount, row_hash) 
VALUES (s.order_id, s.customer_id, s.order_date, s.order_status, s.total_amount, s.row_hash);


### 2) Silver Order Items – deduplicação incremental

Grão: (`order_id`, `product_id`) mantendo o mais recente por `updated_at`.
Crie a tabela e MERGE por par (`order_id`, `product_id`) com `ROW_NUMBER()`.

In [0]:
%sql
--DROP TABLE silver.order_items_clean;

In [0]:
-- =========================================================
-- SILVER: order_items_clean (dedupe + idempotência via hash)
-- =========================================================
CREATE TABLE IF NOT EXISTS silver.order_items_clean (
  order_item_id    STRING,
  order_id         STRING,
  product_id       STRING,
  quantity         DECIMAL(18,2),
  unit_price       DECIMAL(18,2),
  discount_amount  DECIMAL(18,2),
  updated_at       TIMESTAMP,
  row_hash         STRING
) USING DELTA;

-- 1) Stage: normaliza tipos e parseia updated_at com múltiplos formatos
CREATE OR REPLACE TEMP VIEW stage_order_items AS
SELECT
  order_item_id,
  order_id,
  product_id,
  CAST(regexp_replace(quantity, ',', '.') AS DECIMAL(18,2))                    AS quantity,
  CAST(regexp_replace(unit_price, ',', '.') AS DECIMAL(18,2))                  AS unit_price,
  CAST(regexp_replace(COALESCE(discount_amount,'0'), ',', '.') AS DECIMAL(18,2)) AS discount_amount,
  COALESCE(
    try_to_timestamp(updated_at, 'yyyy-MM-dd HH:mm:ss'),
    try_to_timestamp(updated_at, 'yyyy/MM/dd HH:mm:ss'),
    try_to_timestamp(updated_at, 'dd/MM/yyyy HH:mm:ss'),
    try_to_timestamp(updated_at, 'dd-MM-yyyy HH:mm:ss'),
    try_to_timestamp(updated_at, 'yyyy-MM-dd'),
    try_to_timestamp(updated_at, 'yyyy/MM/dd'),
    try_to_timestamp(updated_at, 'dd/MM/yyyy'),
    try_to_timestamp(updated_at, 'dd-MM-yyyy')
  ) AS parsed_updated_at
FROM bronze.order_items
WHERE order_id   IS NOT NULL
  AND product_id IS NOT NULL;

-- 2) Janela incremental (watermark) - só processa últimos 60 dias
CREATE OR REPLACE TEMP VIEW stage_order_items_win AS
SELECT *
FROM stage_order_items
WHERE parsed_updated_at >= date_sub(current_timestamp(), 60);

-- 3) Dedup: mantém 1 linha por (order_id, product_id), a mais recente por updated_at
CREATE OR REPLACE TEMP VIEW stage_order_items_dedup AS
SELECT
  order_item_id,
  order_id,
  product_id,
  quantity,
  unit_price,
  discount_amount,
  parsed_updated_at AS updated_at
FROM (
  SELECT
    s.*,
    ROW_NUMBER() OVER (
      PARTITION BY order_id, product_id
      ORDER BY parsed_updated_at DESC NULLS LAST,
               order_item_id DESC          -- desempate determinístico
    ) AS rn
  FROM stage_order_items_win s
  WHERE parsed_updated_at IS NOT NULL
) z
WHERE rn = 1;

-- 4) Calcula hash da linha para garantir UPDATE só quando mudar
CREATE OR REPLACE TEMP VIEW stage_order_items_final AS
SELECT
  order_item_id,
  order_id,
  product_id,
  quantity,
  unit_price,
  discount_amount,
  updated_at,
  sha2(concat_ws('||',
    cast(coalesce(quantity,0)        as string),
    cast(coalesce(unit_price,0)      as string),
    cast(coalesce(discount_amount,0) as string),
    coalesce(date_format(updated_at,'yyyy-MM-dd HH:mm:ss'),'')
  ), 256) AS row_hash
FROM stage_order_items_dedup;

-- 5) MERGE idempotente: atualiza só quando o hash difere
MERGE INTO silver.order_items_clean AS t
USING stage_order_items_final AS s
ON  t.order_id   = s.order_id
AND t.product_id = s.product_id
WHEN MATCHED AND (t.row_hash IS NULL OR t.row_hash <> s.row_hash) THEN UPDATE SET
  t.order_item_id   = s.order_item_id,
  t.quantity        = s.quantity,
  t.unit_price      = s.unit_price,
  t.discount_amount = s.discount_amount,
  t.updated_at      = s.updated_at,
  t.row_hash        = s.row_hash
WHEN NOT MATCHED THEN INSERT (
  order_item_id, order_id, product_id, quantity, unit_price, discount_amount, updated_at, row_hash
) VALUES (
  s.order_item_id, s.order_id, s.product_id, s.quantity, s.unit_price, s.discount_amount, s.updated_at, s.row_hash
);


### 3) Silver Customers – SCD Type 2 incremental

Mantenha a tabela técnica SCD na Silver (com `effective_start`, `effective_end`, `is_current`) e gere a dim de negócio na Gold. O material já dá um esqueleto de SCD2 com `MERGE` e current flag.

In [0]:
%sql
--DROP TABLE silver.dim_customer_scd;

In [0]:
-- ============================================================
-- SCD TYPE 2 - Customers (Silver) com HASH + DEDUP incremental
-- ============================================================
-- 1) Tabela alvo (inclui row_hash para idempotência)
CREATE TABLE IF NOT EXISTS silver.dim_customer_scd (
  customer_sk      BIGINT GENERATED ALWAYS AS IDENTITY,
  customer_id      STRING,
  customer_name    STRING,
  email            STRING,
  city             STRING,
  state            STRING,
  effective_start  TIMESTAMP,
  effective_end    TIMESTAMP,
  is_current       BOOLEAN,
  row_hash         STRING
) USING DELTA;

-- (Opcional) Backfill do hash se a tabela já existia sem ele
-- Evita update em massa no primeiro MERGE/INSERT depois de adicionar a coluna
UPDATE silver.dim_customer_scd
SET row_hash = COALESCE(row_hash,
  sha2(concat_ws('||',
    coalesce(customer_name,''),
    coalesce(lower(email),''),              -- normaliza email em lowercase
    coalesce(city,''),
    coalesce(upper(trim(state)),'')         -- normaliza UF
  ), 256)
);

-- 2) Stage bruto: parse robusto do last_update_date -> src_ts
CREATE OR REPLACE TEMP VIEW stage_customers_raw AS
SELECT
  customer_id,
  customer_name,
  lower(email)                    AS email_norm,       -- normaliza e-mail
  city,
  upper(trim(state))              AS state_norm,       -- normaliza UF (SP, RJ, ...)
  COALESCE(
    try_to_timestamp(last_update_date, 'yyyy-MM-dd HH:mm:ss'),
    try_to_timestamp(last_update_date, 'yyyy/MM/dd HH:mm:ss'),
    try_to_timestamp(last_update_date, 'dd/MM/yyyy HH:mm:ss'),
    try_to_timestamp(last_update_date, 'dd-MM-yyyy HH:mm:ss'),
    try_to_timestamp(last_update_date, 'yyyy-MM-dd'),
    try_to_timestamp(last_update_date, 'yyyy/MM/dd'),
    try_to_timestamp(last_update_date, 'dd/MM/yyyy'),
    try_to_timestamp(last_update_date, 'dd-MM-yyyy')
  ) AS src_ts
FROM bronze.customers
WHERE customer_id IS NOT NULL;

-- 3) Janela incremental (watermark de 90 dias)
CREATE OR REPLACE TEMP VIEW stage_customers_window AS
SELECT *
FROM stage_customers_raw
WHERE COALESCE(src_ts, current_timestamp()) >= date_sub(current_timestamp(), 90);

-- 4) Dedup por customer_id (última versão por src_ts; desempate determinístico)
CREATE OR REPLACE TEMP VIEW stage_customers_latest AS
SELECT
  customer_id,
  customer_name,
  email_norm      AS email,
  city,
  state_norm      AS state,
  src_ts
FROM (
  SELECT
    s.*,
    ROW_NUMBER() OVER (
      PARTITION BY customer_id
      ORDER BY src_ts DESC NULLS LAST,
               customer_name DESC,
               email_norm DESC,
               city DESC,
               state_norm DESC
    ) AS rn
  FROM stage_customers_window s
) z
WHERE rn = 1;

-- 5) Calcula hash da "linha de negócio" (define mudança real)
CREATE OR REPLACE TEMP VIEW stage_customers_hash AS
SELECT
  customer_id,
  customer_name,
  email,
  city,
  state,
  src_ts,
  sha2(concat_ws('||',
    coalesce(customer_name,''),
    coalesce(email,''),
    coalesce(city,''),
    coalesce(state,'')
  ), 256) AS source_hash
FROM stage_customers_latest;

-- 6) Expirar versões correntes QUE mudaram (usa hash para evitar updates desnecessários)
MERGE INTO silver.dim_customer_scd AS tgt
USING stage_customers_hash AS src
ON  tgt.customer_id = src.customer_id
AND tgt.is_current  = TRUE
WHEN MATCHED AND tgt.row_hash <> src.source_hash THEN
  UPDATE SET
    tgt.effective_end = COALESCE(src.src_ts, current_timestamp()),
    tgt.is_current    = FALSE;

-- 7) Inserir primeira versão OU nova versão apenas quando necessário
INSERT INTO silver.dim_customer_scd (
  customer_id, customer_name, email, city, state,
  effective_start, effective_end, is_current, row_hash
)
SELECT
  s.customer_id,
  s.customer_name,
  s.email,
  s.city,
  s.state,
  COALESCE(s.src_ts, current_timestamp()) AS effective_start,
  TIMESTAMP('9999-12-31')                 AS effective_end,
  TRUE                                    AS is_current,
  s.source_hash                           AS row_hash
FROM stage_customers_hash s
LEFT JOIN silver.dim_customer_scd c
  ON c.customer_id = s.customer_id AND c.is_current = TRUE
WHERE c.customer_id IS NULL           -- novo cliente
   OR c.row_hash <> s.source_hash;    -- mudança real


### 4) Silver Products – limpeza incremental

Padronize tipos/campos e guarde pronto para publicar em `gold.dim_produto`.

In [0]:
%sql
--DROP TABLE silver.products_clean;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver.products_clean (
  product_id   STRING,
  product_name STRING,
  category     STRING,
  subcategory  STRING,
  brand        STRING,
  cost_price   DECIMAL(18,2),
  list_price   DECIMAL(18,2),
  is_active    STRING,
  last_update  TIMESTAMP,
  row_hash     STRING
) USING DELTA;

-- 1) Stage: normaliza e parseia
CREATE OR REPLACE TEMP VIEW stage_products AS
SELECT
  product_id,
  product_name,
  category,
  subcategory,
  brand,
  CAST(regexp_replace(cost_price, ',', '.') AS DECIMAL(18,2)) AS cost_price,
  CAST(regexp_replace(list_price, ',', '.') AS DECIMAL(18,2)) AS list_price,
  UPPER(CAST(is_active AS STRING)) AS is_active,
  COALESCE(
    try_to_timestamp(last_update, 'yyyy-MM-dd HH:mm:ss'),
    try_to_timestamp(last_update, 'yyyy/MM/dd HH:mm:ss'),
    try_to_timestamp(last_update, 'dd/MM/yyyy HH:mm:ss'),
    try_to_timestamp(last_update, 'dd-MM-yyyy HH:mm:ss'),
    try_to_timestamp(last_update, 'yyyy-MM-dd'),
    try_to_timestamp(last_update, 'yyyy/MM/dd'),
    try_to_timestamp(last_update, 'dd/MM/yyyy'),
    try_to_timestamp(last_update, 'dd-MM-yyyy')
  ) AS parsed_last_update
FROM bronze.products
WHERE product_id IS NOT NULL;

-- 2) Dedup determinístico (última por last_update; empates com ordenação estável)
CREATE OR REPLACE TEMP VIEW stage_products_dedup AS
SELECT
  product_id,
  product_name,
  category,
  subcategory,
  brand,
  cost_price,
  list_price,
  is_active,
  parsed_last_update AS last_update
FROM (
  SELECT
    p.*,
    ROW_NUMBER() OVER (
      PARTITION BY product_id
      ORDER BY
        parsed_last_update DESC NULLS LAST,
        product_name DESC,
        category DESC,
        subcategory DESC,
        brand DESC
    ) AS rn
  FROM stage_products p
) z
WHERE rn = 1;

-- 3) Calcula hash de linha (comparação barata e estável)
CREATE OR REPLACE TEMP VIEW stage_products_final AS
SELECT
  product_id,
  product_name,
  category,
  subcategory,
  brand,
  cost_price,
  list_price,
  is_active,
  last_update,
  sha2(concat_ws('||',
    coalesce(product_name,''),
    coalesce(category,''),
    coalesce(subcategory,''),
    coalesce(brand,''),
    cast(coalesce(cost_price,   0) as string),
    cast(coalesce(list_price,   0) as string),
    coalesce(is_active,''),
    coalesce(date_format(last_update,'yyyy-MM-dd HH:mm:ss'), '')
  ), 256) AS row_hash
FROM stage_products_dedup;

-- 4) MERGE idempotente (só atualiza quando o hash difere)
MERGE INTO silver.products_clean AS t
USING stage_products_final AS s
ON t.product_id = s.product_id
WHEN MATCHED AND (
  t.row_hash IS NULL OR t.row_hash <> s.row_hash
) THEN UPDATE SET
  t.product_name = s.product_name,
  t.category     = s.category,
  t.subcategory  = s.subcategory,
  t.brand        = s.brand,
  t.cost_price   = s.cost_price,
  t.list_price   = s.list_price,
  t.is_active    = s.is_active,
  t.last_update  = s.last_update,
  t.row_hash     = s.row_hash
WHEN NOT MATCHED THEN INSERT (
  product_id, product_name, category, subcategory, brand,
  cost_price, list_price, is_active, last_update, row_hash
) VALUES (
  s.product_id, s.product_name, s.category, s.subcategory, s.brand,
  s.cost_price, s.list_price, s.is_active, s.last_update, s.row_hash
);
