In [0]:
from delta import *
from pyspark.sql.functions import *
import os
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, DateType, BooleanType

In [0]:
data = [('123 Main St', True, '2022-01-01', None, 1),
        ('456 Maple Ave', False, '2022-02-01', None, 2),
        ('789 Oak Blvd', True, '2022-03-01', None, 3)]

schema_clientes = StructType([
        StructField("endereco", StringType(), False),
        StructField("ativo", BooleanType(), False),
        StructField("dt_inicio", StringType(), False),
        StructField("dt_final", StringType(), True),
        StructField("id_cliente", IntegerType(), False)
    ])

df = spark.createDataFrame(data, schema=schema_clientes).select("id_cliente", "endereco", "ativo", "dt_inicio", "dt_final")

df.write.format("delta").mode("overwrite").saveAsTable("default.clientes")

In [0]:
new_data = [('111 Main St', '2023-03-23', 1),
        ('456 Maple Ave', '2022-02-01',  2),
        ('789 Oak Blvd', '2022-03-01',  3)]

schema_clientes = StructType([
        StructField("endereco", StringType(), False),
        StructField("dt_inicio", StringType(), False),
        StructField("id_cliente", IntegerType(), False)
    ])

new_df = spark.createDataFrame(new_data, schema=schema_clientes).select("id_cliente", "endereco", "dt_inicio")

In [0]:
# Ler a tabela com os dados atuais
tb_clientes = DeltaTable.forName(spark, "default.clientes")

In [0]:
# Dataframe com clientes existentes que possuem um novo endereço.
tb_insert = new_df \
  .alias("updates") \
  .join(tb_clientes.toDF().alias("clientes"), "id_cliente") \
  .where("clientes.ativo = true AND updates.endereco <> clientes.endereco")

# União das linhas que serão inseridas e/ou alteradas
tb_update = (
  tb_insert
  .selectExpr("NULL as mergeKey", "updates.*")   # Linhas que serão inseridas na condição whenNotMatched 
  .union(new_df.selectExpr("id_cliente as mergeKey", "*"))
)

In [0]:
# As linhas que já estão na tabela atual, mas possuem um novo endereço estão com a coluna MergeKey = nulo, pois na etapa do merge serão inseridas como uma nova versão da linha
display(tb_update)

mergeKey,id_cliente,endereco,dt_inicio
,1,111 Main St,2023-03-23
1.0,1,111 Main St,2023-03-23
2.0,2,456 Maple Ave,2022-02-01
3.0,3,789 Oak Blvd,2022-03-01


In [0]:
# Merge SCD tipo 2
tb_clientes.alias("old").merge(
    tb_update.alias("new"),
    "old.id_cliente = mergeKey") \
.whenMatchedUpdate(
    condition = "old.ativo = true AND old.endereco <> new.endereco",
    set = {
        "ativo": "false",
        "dt_final": "new.dt_inicio"
    }
).whenNotMatchedInsert(
    values = {
        "id_cliente": "new.id_cliente",
        "endereco": "new.endereco",
        "ativo": "true",
        "dt_inicio": "new.dt_inicio",
        "dt_final": "null"
    }
).execute()

In [0]:
display(spark.table("default.clientes"))

id_cliente,endereco,ativo,dt_inicio,dt_final
1,123 Main St,False,2022-01-01,2023-03-23
2,456 Maple Ave,False,2022-02-01,
3,789 Oak Blvd,True,2022-03-01,
1,111 Main St,True,2023-03-23,
