In [0]:
%sql
--# Separação em tabelas Dimensão + Fato (camada Silver.)
--# Verificações DO SCHEMA silver, criado no arquivo '00.config'
--# Dados Padronizados
--# SDC Tipo 2

SHOW DATABASES





In [0]:
# Criação do dataframe temporário 'staging' de preparação para a tabela delta 'dim.estabelecimento_SCD2:'
# Dados Padronizados

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Fonte Bronze
src = spark.table("ampev.bronze.estabelecimentos")
# -------------
# Com uma janela lógica 'window' podemos criar e separar em grupos a coluna EstabelecimentoID, ordenado por data de ingestão.
# Padroniza e pega a última versão por EstabelecimentoID (pela ingestão Bronze)
w = Window.partitionBy("EstabelecimentoID").orderBy(F.col("_bronze_ingest_ts").desc())
# Isso não faz shuffle físico automático como um groupBy. A função 'window' cria uma partição lógica para cálculo da função analítica dentro do dataframe. Dessa forma, o 'partitioning' é feito na memória, sem necessidade de shuffle físico. Criando assim 2 janelas de grupos, um para os inserts e outro para as atualizações. Ordenada da mais recente para a mais antiga.
# Isso é essencial para a camada SILVER por que na camada BRONZE podem vir dados duplicados, mudanças de cadastro e atualizações de atributos.

# 'stg' ou 'staging dataframe' é uma variável de execução temporária intermediária para ficar pronta pra ser escrita na camada SILVER. 'src' é a variável que recebe a tabela pronta da camada BRONZE:
stg = (src
  .select(
      F.col("EstabelecimentoID").cast("long").alias("EstabelecimentoID"),
      F.col("Local").cast("string").alias("Local"),
      F.col("Email").cast("string").alias("Email"),
      F.col("Telefone").cast("string").alias("Telefone"),
      F.col("_ingest_ts").alias("_bronze_ingest_ts"), # Renomear a '_ingest_ts' para 'bronze_ingest_ts'
      F.col("_source_file").alias("_bronze_source_file"), # Renomear a '_source_file' para 'bronze_source_file'
  )
  # Aqui criamos uma coluna no DF para auxiliar na geração de números com 'row_number' aplicada na janela 'w'
  # Ou seja, o registro mais recente recebe o número 1, o segundo o número 2.
  .withColumn("_rn", F.row_number().over(w))
  .filter(F.col("_rn") == 1) # Com o 'filter' mantemos o mais recente, baseada na última ingestão.
  .drop("_rn") # Bom retirar a coluna para não poluir a tabela.
)

# Cria hash de atributos de negócio (para detectar mudança)
#    (inclui coalesce pra evitar null quebrar comparação)
# Dentro do 'staging dataframe' criemos uma coluna '_attr_hash' que será usado para comparar se houve mudança nos dados
stg = (stg
  .withColumn(
      "_attr_hash",  # 'attr' é abreviação de 'attributes' ou atributos.
      F.sha2(                   # A função criptográfica SHA-2 é oriunda de funções SQL como 'F'
          F.concat_ws("||",     # A função 'concat' oriunda do SQL também como 'F'
# A função 'concat' irá concatenar os campos 'local||email||telefone' e irá transformá-los em um único atributo hash. Quando realizar o MERGE da camada SILVER haverá comparação com o 'attributes_hash' da dimensão, se houver diferença, significa que houve mudança nos dados.                      
              F.coalesce(F.col("Local"), F.lit("")), # 'coalesce' se o valor fica NULL, vira STRING vazia com valor fixo 'lit'
              F.coalesce(F.col("Email"), F.lit("")), # Sem 'coalesce' pode gerar inconsistência com o hash sem valores fixos
              F.coalesce(F.col("Telefone"), F.lit(""))
          ),256)))



In [0]:
# Resultado final do 'staging':

display(stg)

In [0]:
# Ordenação por estabelecimento:

stg.orderBy("EstabelecimentoID").display()


In [0]:
# Criação da tabela dim_estabelecimentos_scd2:
# Dimensão histórica de estabelecimentos com controle de versões (SCD Type 2).
# A tabela Delta guarda a versão atual e versões antigas, incluindo quando cada versão começou e terminou.

spark.sql("""
CREATE TABLE IF NOT EXISTS ampev.silver.dim_estabelecimentos_scd2 (
  surrogate_key BIGINT GENERATED ALWAYS AS IDENTITY,  
  -- Chave substituta (incremental). A chave de negócio se repete em SCD2.
  -- A surrogate_key NÃO se repete: cada versão do mesmo EstabelecimentoID ganha uma surrogate_key nova.
  EstabelecimentoID BIGINT,  -- Chave de negócio (business key)
  -- Atributos versionados: se algum mudar, uma nova linha será criada e a versão anterior será encerrada.
  Local STRING,
  Email STRING,
  Telefone STRING,

  -- Controle SCD2:
  start_date DATE,        -- Início de validade da versão.
  end_date DATE,          -- Fim de validade da versão (ex.: data de expiração).
  is_current BOOLEAN,     -- TRUE = versão ativa; FALSE = versão histórica (facilita consultas sem depender só de data).

  -- Colunas técnicas / auditoria:
  _attr_hash STRING,                -- Hash dos atributos (Local, Email, Telefone) para detectar mudanças.
  _bronze_ingest_ts TIMESTAMP,      -- Timestamp de ingestão na Bronze (rastreabilidade).
  _bronze_source_file STRING,       -- Arquivo de origem na Bronze (rastreabilidade).
  _silver_ts TIMESTAMP              -- Timestamp de processamento na Silver.
) USING DELTA
""")

print("Tabela dim_estabelecimentos_scd2 validada/criada.")

# Observação:
# Em SCD2, o EstabelecimentoID (chave de negócio) pode aparecer em várias linhas ao longo do tempo.
# A surrogate_key identifica unicamente cada versão.

In [0]:
print("dim total:", spark.table("ampev.silver.dim_estabelecimentos_scd2").count())

A tabela ```'dim_estabelecimentos_scd2'``` quando criada, começa vazia.

Dessa forma a dimensão-dataframe ```'dim_cur'``` fica vazio por todos os registros serem TRUE na coluna ```'is_current'``` ainda. 

Assim no left join, todo mundo do stg não encontra match, então ```'_is_new'``` = ```true``` para todos também.
```'_is_changed'``` = false (porque não existe “antes” para comparar)
```'chg'``` vira o “lote inicial” (initial load)

> Comportamento correto do SCD2 na primeira execução.

In [0]:
# Criação e carregamento da dimensão current:

# dim_cur ("dimensão current"): carrega somente as versões atuais da dimensão (is_current = TRUE),
# trazendo apenas as colunas necessárias para comparação (EstabelecimentoID e _attr_hash).
# Isso melhora performance e reduz leitura de histórico desnecessário.
# O retorno é uma dimensão vazia ainda para novos upserts

dim_cur = (spark.table("ampev.silver.dim_estabelecimentos_scd2")
  .filter(F.col("is_current") == True)
  .select("EstabelecimentoID", "_attr_hash")
)

display(dim_cur)

In [0]:
# Detectar novos registros e registros alterados:
# chg ("changes"): detecta INSERTS (novos IDs) e CHANGES (IDs existentes com hash diferente).

chg = (stg.alias("s")
  .join(dim_cur.alias("d"), on="EstabelecimentoID", how="left") 
  # Left join para manter todos do staging e identificar novos IDs
  .withColumn("_is_new", F.col("d.EstabelecimentoID").isNull())       # Se não achou match na dimensão current => novo ID
  .withColumn(
      "_is_changed",
      (~F.col("_is_new")) & (F.col("s._attr_hash") != F.col("d._attr_hash"))
      # Se não é novo e hash mudou => houve mudança em atributos
  )
  .filter(F.col("_is_new") | F.col("_is_changed"))                # Mantém apenas novos ou alterados (evita custo no MERGE)
  .select("s.*", "_is_new", "_is_changed")                        # Mantém colunas do staging + flags
)

display(chg)
# Resumo:
# '_is_new' = TRUE -> INSERT (primeira versão do ID, is_current=TRUE)
# '_is_changed' = TRUE -> "fecha" versão atual (end_date / is_current=FALSE) e INSERT de nova versão (start_date hoje / is_current=TRUE)


In [0]:
# Criar view temporária (somente se houver mudanças)
# Se não houver nada para mudar, encerra limpo.

if not chg.take(1):
    print("SCD2: nenhuma mudança detectada.")
else:
    chg.createOrReplaceTempView("v_dim_estab_changes")
    print("SCD2: view temporária v_dim_estab_changes criada com changes (novos e alterados).")


In [0]:
display(spark.table("v_dim_estab_changes").orderBy("EstabelecimentoID"))


In [0]:
# MERGE (UPDATE) na dimensão Delta Lake SCD2:
# Fecha (expira) as versões atuais (is_current = true) somente para os IDs que tiveram mudança (_is_changed = true).

spark.sql("""
MERGE INTO ampev.silver.dim_estabelecimentos_scd2 AS t
USING (
  SELECT EstabelecimentoID
  FROM v_dim_estab_changes
  WHERE _is_changed = true
) AS s
ON t.EstabelecimentoID = s.EstabelecimentoID
AND t.is_current = true
WHEN MATCHED THEN UPDATE SET
  t.end_date   = date_sub(current_date(), 1),
  t.is_current = false,
  t._silver_ts = current_timestamp()
""")

# ------------------------------------------------------------------------------
# INSERT (APPEND): insere novas versões (novos IDs + IDs alterados).
# Para changes: após fechar a versão antiga no MERGE, inserimos a nova versão como current.
# Para novos: como não existe linha atual, o MERGE não atualiza nada; aqui entra a 1ª versão como current.

new_rows = (
    chg
    .drop("_is_new", "_is_changed")                 # Remove flags auxiliares
    .withColumn("start_date", F.current_date())     # Início de validade da nova versão
    .withColumn("end_date", F.lit(None).cast("date"))  # Sem fim (versão vigente)
    .withColumn("is_current", F.lit(True))             # Marca como versão atual
    .withColumn("_silver_ts", F.current_timestamp())# Auditoria de processamento na Silver
    .select(
        "EstabelecimentoID","Local","Email","Telefone",
        "start_date","end_date","is_current",
        "_attr_hash","_bronze_ingest_ts","_bronze_source_file","_silver_ts"
    )
)

(new_rows.write
  .mode("append")
  .format("delta")
  .saveAsTable("ampev.silver.dim_estabelecimentos_scd2")
)

print("SCD2: dimensão atualizada (novos + mudanças).")

# ------------------------------------------------------------------------------
# Observações:
# - ID alterado:
#     1) MERGE fecha a linha atual (is_current: true -> false; end_date = ontem).
#     2) APPEND insere a nova versão (is_current = true; start_date = hoje; end_date = null).
# - ID novo:
#     MERGE não faz nada (não há linha atual); APPEND insere a primeira versão como current.
#
# Atenção:
# - Esse padrão assume carga diária.
# - Se rodar múltiplas vezes no mesmo dia, end_date pode ficar < start_date.
#   Para cargas intradiárias, prefira start_ts/end_ts TIMESTAMP ou uma regra de end_date "exclusiva".


In [0]:
%sql
-- # Validação do SCD2:
-- # Ver a linha atual por estabelecimento:

SELECT *
FROM ampev.silver.dim_estabelecimentos_scd2
WHERE is_current = true
ORDER BY EstabelecimentoID;




In [0]:
%sql
-- Ver histórico de um estabelecimento específico

SELECT EstabelecimentoID, Local, Email, Telefone, start_date, end_date, is_current
FROM ampev.silver.dim_estabelecimentos_scd2
WHERE EstabelecimentoID = 1
ORDER BY start_date;


In [0]:
%sql
-- Backfill: trocar NULL por 9999-12-31 (apenas versões atuais)

UPDATE ampev.silver.dim_estabelecimentos_scd2
SET end_date = DATE('9999-12-31')
WHERE is_current = true AND end_date IS NULL;


In [0]:
# Observações importantes:
# Este SCD2 usa corte por dia (start_date = current_date(), end_date = current_date()-1).
# É possível fazer com granularidade por timestamp (start_ts/end_ts).

In [0]:
# Tabela Fato Pedidos: fato_pedidos 
# Tabela limpa, enriquecida e tipagem correta para a Camada Silver:
# PedidoID & EstabelecimentoID deverá ser uma chave composta. 'data_venda' convertida para 'DATE'
# Aplicação SCD Tipo 02:
# ampev.silver.fato_pedidos_scd2 

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Fonte Bronze
src = spark.table("ampev.bronze.pedidos")

# Padroniza tipos e pega a última versão por (PedidoID, EstabelecimentoID)
w = Window.partitionBy("PedidoID", "EstabelecimentoID").orderBy(F.col("_bronze_ingest_ts").desc())

stg = (src
  .select(
      F.col("PedidoID").cast("long").alias("PedidoID"),
      F.col("EstabelecimentoID").cast("long").alias("EstabelecimentoID"),
      F.col("Produto").cast("string").alias("Produto"),
      F.col("quantidade_vendida").cast("long").alias("quantidade_vendida"),
      F.col("Preco_Unitario").cast("double").alias("Preco_Unitario"),
      F.to_date(F.col("data_venda"), "yyyy-MM-dd").alias("data_venda"),
      F.col("_ingest_ts").alias("_bronze_ingest_ts"),
      F.col("_source_file").alias("_bronze_source_file"),
  )
  .withColumn("_rn", F.row_number().over(w))
  .filter(F.col("_rn") == 1)
  .drop("_rn")
)

# Hash dos atributos (tudo que caracteriza o "estado" do pedido)
stg = (stg
  .withColumn(
      "_attr_hash",
      F.sha2(
          F.concat_ws("||",
              F.coalesce(F.col("Produto"), F.lit("")),
              F.coalesce(F.col("quantidade_vendida").cast("string"), F.lit("")),
              F.coalesce(F.col("Preco_Unitario").cast("string"), F.lit("")),
              F.coalesce(F.col("data_venda").cast("string"), F.lit(""))
          ),
          256
      )
  )
)

# Cria tabela SCD2 se não existir:
spark.sql("""
CREATE TABLE IF NOT EXISTS ampev.silver.fato_pedidos_scd2 (
  surrogate_key BIGINT GENERATED ALWAYS AS IDENTITY,

  PedidoID BIGINT,
  EstabelecimentoID BIGINT,
  Produto STRING,
  quantidade_vendida BIGINT,
  Preco_Unitario DOUBLE,
  data_venda DATE,

  start_date DATE,
  end_date DATE,
  is_current BOOLEAN,

  _attr_hash STRING,
  _bronze_ingest_ts TIMESTAMP,
  _bronze_source_file STRING,
  _silver_ts TIMESTAMP
) USING DELTA
""")

# Carrega somente o "current" para comparar hash
cur = (spark.table("ampev.silver.fato_pedidos_scd2")
  .filter(F.col("is_current") == True)
  .select("PedidoID", "EstabelecimentoID", "_attr_hash")
)

# Detecta novos e alterados com base na chave composta
chg = (stg.alias("s")
  .join(cur.alias("c"), on=["PedidoID", "EstabelecimentoID"], how="left")
  .withColumn("_is_new", F.col("c._attr_hash").isNull())
  .withColumn("_is_changed", (~F.col("_is_new")) & (F.col("s._attr_hash") != F.col("c._attr_hash")))
  .filter(F.col("_is_new") | F.col("_is_changed"))
  .select("s.*", "_is_new", "_is_changed")
)

# Se não houver nada para mudar, encerra
if not chg.take(1):
    print("SCD2: nenhuma mudança detectada em ampev.silver.fato_pedidos_scd2.")
else:
    chg.createOrReplaceTempView("v_pedidos_changes")

    # Fecha a versão atual dos que mudaram (chave composta)
    spark.sql("""
    MERGE INTO ampev.silver.fato_pedidos_scd2 t
    USING (
      SELECT PedidoID, EstabelecimentoID
      FROM v_pedidos_changes
      WHERE _is_changed = true
    ) s
    ON t.PedidoID = s.PedidoID
   AND t.EstabelecimentoID = s.EstabelecimentoID
   AND t.is_current = true
    WHEN MATCHED THEN UPDATE SET
      t.end_date   = date_sub(current_date(), 1),
      t.is_current = false,
      t._silver_ts = current_timestamp()
    """)

    # Insere nova versão (novos + alterados)
    new_rows = (chg
      .drop("_is_new", "_is_changed")
      .withColumn("start_date", F.current_date())
      .withColumn("end_date", F.lit(None).cast("date"))
      .withColumn("is_current", F.lit(True))
      .withColumn("_silver_ts", F.current_timestamp())
      .select(
          "PedidoID","EstabelecimentoID","Produto","quantidade_vendida","Preco_Unitario","data_venda",
          "start_date","end_date","is_current",
          "_attr_hash","_bronze_ingest_ts","_bronze_source_file","_silver_ts"
      )
    )

    (new_rows.write
      .mode("append")
      .format("delta")
      .saveAsTable("ampev.silver.fato_pedidos_scd2")
    )

    print("SCD2: fato_pedidos_scd2 atualizado (novos + mudanças) com chave composta.")



In [0]:
%sql
-- Validações: Current (estado atual)

SELECT PedidoID, EstabelecimentoID, Produto, quantidade_vendida, Preco_Unitario, data_venda,
       start_date, end_date, is_current
FROM ampev.silver.fato_pedidos_scd2
WHERE is_current = true
ORDER BY PedidoID, EstabelecimentoID;


In [0]:
%sql
-- Validações: Histórico de um pedido (pela chave composta)

SELECT PedidoID, EstabelecimentoID, Produto, quantidade_vendida, Preco_Unitario, data_venda,
       start_date, end_date, is_current
FROM ampev.silver.fato_pedidos_scd2
WHERE PedidoID = 1 AND EstabelecimentoID = 50
ORDER BY start_date;
