In [0]:
# Databricks notebook source
"""
# 02_process_silver_scd2 (SCD Type 2)
Autor: Leonardo Lima

DECISÃO DE ARQUITETURA:
Optamos pela implementação de SCD Tipo 2 (Slowly Changing Dimension) ao invés do Tipo 1.
Embora o Tipo 1 (sobrescrita) seja mais performático e simples, o Tipo 2 foi escolhido para garantir:
1. Auditoria Histórica: Capacidade de reconstruir o estado dos dados em qualquer ponto do tempo (Time Travel).
2. Rastreabilidade: Monitoramento de mudanças de endereço/perfil do cliente ao longo do tempo.
3. Compliance: Atendimento a requisitos legais de retenção de histórico.

Lógica de Implementação:
Utilizamos a estratégia de "Merge com Union" para realizar UPDATES (fechar registros antigos) e INSERTS (abrir novos) em uma única transação atômica.
"""

from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
# 1. Configuração 
# A tabela será Gerenciada pelo Unity Catalog.
volume_root = "/Volumes/main/default/lakehouse" 
source_path = f"{volume_root}/bronze_customers"       # Leitura continua do arquivo (Bronze)
target_table_name = "main.default.silver_customers"   # Nome da Tabela

print(f"Lendo de: {source_path}")
print(f"Gerenciando Tabela: {target_table_name}")

In [0]:
# 2. Leitura da Bronze (Apenas o que é novo/delta)
# Em um cenário real de streaming, usaríamos readStream com foreachBatch. 
# Aqui, para simplificar a didática do SCD2, vamos ler o snapshot atual da Bronze.

df_bronze = spark.read.format("delta").load(source_path)

window_spec = Window.partitionBy("customer_id").orderBy(col("ingestion_ts").desc())

df_updates = (df_bronze
    .withColumn("rank", row_number().over(window_spec))
    .filter("rank = 1")
    .drop("rank", "source_file", "_rescued_data")
)

In [0]:
# 3. Verificação Inicial (Bootstrapping)
# Se a tabela Silver não existe, criamos ela com a carga inicial e paramos por aqui.
if not spark.catalog.tableExists(target_table_name):
    print(f"⚠️ Tabela {target_table_name} não existe. Criando carga inicial...")
    
    (df_updates
        .withColumn("is_current", lit(True))
        .withColumn("start_date", current_timestamp())
        .withColumn("end_date", lit(None).cast("timestamp"))
        .write
        .format("delta")
        .mode("overwrite")
        # REMOVIDO: .option("path", ...) -> Agora é 100% gerenciada
        .saveAsTable(target_table_name)
    )
    print("✅ Carga inicial concluída! Tabela criada.")
    dbutils.notebook.exit("Inicialização completa")

In [0]:
# 4. A Mágica do SCD Tipo 2 (Merge com Union) - CORRIGIDO

target_table = DeltaTable.forName(spark, target_table_name)
df_target = target_table.toDF()

# Join para trazer o estado atual
df_join = df_updates.alias("updates").join(
    df_target.filter("is_current = true").alias("target"),
    on="customer_id",
    how="left"
)

# Condição de Mudança
change_condition = (
    (col("updates.city") != col("target.city")) | 
    (col("updates.phone") != col("target.phone")) |
    (col("updates.email") != col("target.email"))
)

# A: Registros para INSERIR (Novos ou Alterados)
rows_to_insert = df_join.filter(
    col("target.customer_id").isNull() | change_condition
).select(
    col("updates.customer_id"),
    col("updates.name"),
    col("updates.email"),
    col("updates.city"),
    col("updates.phone"),
    col("updates.update_ts"),
    lit(True).alias("is_current"),
    current_timestamp().alias("start_date"),
    lit(None).cast("timestamp").alias("end_date"),
    lit(None).alias("mergeKey") 
)

# B: Registros para ATUALIZAR (Fechar antigo)
rows_to_update = df_join.filter(
    col("target.customer_id").isNotNull() & change_condition
).select(
    col("updates.customer_id"),
    col("updates.name"), 
    col("updates.email"),
    col("updates.city"),
    col("updates.phone"),
    col("updates.update_ts"),
    lit(False).alias("is_current"),
    col("target.start_date").alias("start_date"), 
    current_timestamp().alias("end_date"),
    col("updates.customer_id").alias("mergeKey")
)

# Unir
df_stage = rows_to_insert.union(rows_to_update)

print(f"Processando {df_stage.count()} alterações...")

In [0]:
# 5. Execução do MERGE Atômico

(target_table.alias("target")
    .merge(
        df_stage.alias("stage"),
        "target.customer_id = stage.mergeKey AND target.is_current = true"
    )
    .whenMatchedUpdate(
        set = {
            "is_current": "stage.is_current",
            "end_date": "stage.end_date"
        }
    )
    .whenNotMatchedInsert(
        values = {
            "customer_id": "stage.customer_id",
            "name": "stage.name",
            "email": "stage.email",
            "city": "stage.city",
            "phone": "stage.phone",
            "update_ts": "stage.update_ts",
            "is_current": "stage.is_current",
            "start_date": "stage.start_date",
            "end_date": "stage.end_date"
        }
    )
    .execute()
)

print("✅ Processamento Silver finalizado.")

### Validação

In [0]:
# %sql
# SELECT customer_id, count(*) as qtd_ativos
# FROM delta.`/Volumes/main/default/lakehouse/silver_customers` -- Ajuste o caminho se necessário
# WHERE is_current = true
# GROUP BY customer_id
# HAVING count(*) > 1

In [0]:
# %sql
# -- Busca um cliente que tenha histórico (count > 1)
# SELECT * FROM delta.`/Volumes/main/default/lakehouse/silver_customers`
# WHERE customer_id = (
#     SELECT customer_id 
#     FROM delta.`/Volumes/main/default/lakehouse/silver_customers` 
#     GROUP BY customer_id 
#     HAVING count(*) > 1 
#     LIMIT 1
# )
# ORDER BY start_date DESC

In [0]:
# %sql
# -- Mostra a versão 0 da tabela (logo após você ter limpado e rodado a primeira vez)
# SELECT * FROM delta.`/Volumes/main/default/lakehouse/silver_customers` VERSION AS OF 0
# WHERE customer_id = (
#     SELECT customer_id 
#     FROM delta.`/Volumes/main/default/lakehouse/silver_customers` 
#     GROUP BY customer_id 
#     HAVING count(*) > 1 
#     LIMIT 1
# )