In [0]:
from pyspark.sql import SparkSession, functions as F, types as T, Window
from delta.tables import DeltaTable
import os

In [0]:
# --- Caminho da tabela Delta ---
catalog_path = "/Volumes/workspace/default/refined/d_servicos"

# --- Cria a tabela Delta vazia se não existir ---
if not DeltaTable.isDeltaTable(spark, catalog_path):
    schema = T.StructType([
        T.StructField("pk_servico", T.StringType(), True),
        T.StructField("sk_servico", T.LongType(), True),
        T.StructField("tipo_servico", T.StringType(), True),
        T.StructField("descricao", T.StringType(), True),
        T.StructField("start_date", T.DateType(), True),
        T.StructField("update_date", T.DateType(), True)
    ])
    df_empty = spark.createDataFrame([], schema)
    os.makedirs(catalog_path, exist_ok=True)
    df_empty.write.format("delta").mode("overwrite").save(catalog_path)
    print("Tabela Delta criada com sucesso em:", catalog_path)

In [0]:
# --- Carrega a dimensão existente ---
delta_table = DeltaTable.forPath(spark, catalog_path)
df_dim_existente = delta_table.toDF()

# --- Consome os dados de entrada ---
df_consumos = spark.read.format("delta").load("/Volumes/workspace/default/raw/tb_consumos") \
                       .select(
                           F.col("tipo_servico"), 
                           F.col("descricao")
                           ) \
                       .dropDuplicates() \
                       .withColumn(
                        "pk_servico",
                        F.sha2(F.concat_ws("||", F.col("tipo_servico"), F.col("descricao")), 256)
                    )

# --- Último surrogate key (sk_servico) ---
last_id = df_dim_existente.agg(F.max("sk_servico")).collect()[0][0]
if last_id is None:
    last_id = 0

# --- Prepara registros de entrada com surrogate key nova ---
window = Window.orderBy("tipo_servico", "descricao")
df_prepared = (
    df_consumos.withColumn(
        "sk_servico", (F.row_number().over(window) + last_id).cast(T.LongType())
    )
    .withColumn("start_date", F.current_date())                 # só será usada no insert
    .withColumn("update_date", F.lit(None).cast(T.DateType()))  # começa nulo
)

In [0]:
# --- Executa o MERGE (chave de negócio: tipo_servico + descricao) ---
(
    delta_table.alias("target")
    .merge(
        df_prepared.alias("source"),
        "target.pk_servico = source.pk_servico"
    )
    # Se já existir registro ativo -> atualiza a versão antiga (não mexe em start_date)
    .whenMatchedUpdate(
        condition="target.update_date IS NULL",
        set={
            "update_date": "current_date()"
        }
    )
    # Se não existir -> insere nova versão (start_date só entra aqui!)
    .whenNotMatchedInsert(values={
        "pk_servico": "source.pk_servico",
        "sk_servico": "source.sk_servico",
        "tipo_servico": "source.tipo_servico",
        "descricao": "source.descricao",
        "start_date": "source.start_date",   
        "update_date": "source.update_date"
    })
    .execute()
)

In [0]:
df = spark.read.format("delta").load("/Volumes/workspace/default/refined/d_servicos")
display(df)