<a href="https://colab.research.google.com/github/luasampaio/data-engineering/blob/main/52_ntb_scd_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

# Inicializa Spark
spark = SparkSession.builder.appName("SCD_Type2").getOrCreate()

# Dados de exemplo
data = [
    (1, "Fornecedor A", "12345678000195", "Ativo", "2023-01-01"),
    (2, "Fornecedor B", "98765432000196", "Ativo", "2023-01-01"),
    (3, "Fornecedor C", "12345678000195", "Inativo", None),

]

# Esquema com os campos obrigatórios de SCD Tipo 2
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Fornecedor", StringType(), True),
    StructField("CNPJ", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Data_Inicio", StringType(), True),
])

# Criação do DataFrame inicial
initial_df = spark.createDataFrame(data, schema)

# Adiciona colunas de controle para SCD Tipo 2
initial_df = initial_df.withColumn("Data_Inicio", current_timestamp()) \
                       .withColumn("Data_Fim", lit(None).cast(TimestampType())) \
                       .withColumn("Ultimo_Registro", lit(True))

# Exibe o resultado
initial_df.show(truncate=False)


+---+------------+--------------+-------+--------------------------+--------+---------------+
|id |Fornecedor  |CNPJ          |Status |Data_Inicio               |Data_Fim|Ultimo_Registro|
+---+------------+--------------+-------+--------------------------+--------+---------------+
|1  |Fornecedor A|12345678000195|Ativo  |2025-05-04 12:49:09.808238|NULL    |true           |
|2  |Fornecedor B|98765432000196|Ativo  |2025-05-04 12:49:09.808238|NULL    |true           |
|3  |Fornecedor C|12345678000195|Inativo|2025-05-04 12:49:09.808238|NULL    |true           |
+---+------------+--------------+-------+--------------------------+--------+---------------+



In [None]:
display(initial_df)

DataFrame[id: int, Fornecedor: string, CNPJ: string, Status: string, Data_Inicio: timestamp, Data_Fim: timestamp, Ultimo_Registro: boolean]

In [None]:
initial_df.printSchema()


root
 |-- id: integer (nullable = true)
 |-- Fornecedor: string (nullable = true)
 |-- CNPJ: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Data_Inicio: timestamp (nullable = false)
 |-- Data_Fim: timestamp (nullable = true)
 |-- Ultimo_Registro: boolean (nullable = false)



In [None]:
initial_df.count()

3

In [None]:
initial_df.show()

+---+------------+--------------+-------+--------------------+--------+---------------+
| id|  Fornecedor|          CNPJ| Status|         Data_Inicio|Data_Fim|Ultimo_Registro|
+---+------------+--------------+-------+--------------------+--------+---------------+
|  1|Fornecedor A|12345678000195|  Ativo|2025-05-04 12:49:...|    NULL|           true|
|  2|Fornecedor B|98765432000196|  Ativo|2025-05-04 12:49:...|    NULL|           true|
|  3|Fornecedor C|12345678000195|Inativo|2025-05-04 12:49:...|    NULL|           true|
+---+------------+--------------+-------+--------------------+--------+---------------+



definir a minha regra de negocio dos dados que estão ativos e iniativos

- Primeiro passo atualizar os dados inativos com a data fim

In [None]:
resultado_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Fornecedor: string (nullable = true)
 |-- CNPJ: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Data_Inicio: string (nullable = true)
 |-- Data_Fim: string (nullable = true)
 |-- Ultimo_Registro: boolean (nullable = true)



In [None]:
resultado_df.show()

+---+------------+--------------+-------+--------------------+--------------------+---------------+
| id|  Fornecedor|          CNPJ| Status|         Data_Inicio|            Data_Fim|Ultimo_Registro|
+---+------------+--------------+-------+--------------------+--------------------+---------------+
|  1|Fornecedor A|12345678000195|  Ativo|          2023-01-01|2025-05-04 12:49:...|          false|
|  1|Fornecedor A|12345678000195|Inativo|2025-05-04 12:49:...|                NULL|           true|
|  3|Fornecedor C|12345678007808|  Ativo|          2023-01-01|                NULL|           true|
|  4|Fornecedor D|11111111000111|  Ativo|2025-05-04 12:49:...|                NULL|           true|
+---+------------+--------------+-------+--------------------+--------------------+---------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit
from pyspark.sql.types import *

spark = SparkSession.builder.appName("SCD_Type2_Merge").getOrCreate()

# Histórico atual
historico_data = [
    (1, "Fornecedor A", "12345678000195", "Ativo", "2023-01-01", None, True),
    (2, "Fornecedor B", "98765432000196", "Ativo", "2023-01-01", None, True),
    (3, "Fornecedor C", "12345678007808", "Ativo", "2023-01-01", None, True)
]
historico_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType()),
    StructField("Data_Inicio", StringType()),
    StructField("Data_Fim", StringType()),
    StructField("Ultimo_Registro", BooleanType())
])
historico_df = spark.createDataFrame(historico_data, schema=historico_schema)

# Novos dados
novos_data = [
    (1, "Fornecedor A", "12345678000195", "Inativo"),
    (2, "Fornecedor B", "98765432000196", "Ativo"),
    (4, "Fornecedor D", "11111111000111", "Ativo")
]
novos_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType())
])
novos_df = spark.createDataFrame(novos_data, schema=novos_schema)

# Condição de junção para detectar alterações de status para "Inativo"
join_cond = (historico_df["CNPJ"] == novos_df["CNPJ"]) & \
            (historico_df["Fornecedor"] == novos_df["Fornecedor"]) & \
            (novos_df["Status"] == "Inativo") & \
            (historico_df["Status"] != novos_df["Status"])

# Registros alterados (Status atualizado para "Inativo")
alterados_df = historico_df.join(novos_df, join_cond) \
    .filter(historico_df.Ultimo_Registro == True) \
    .select(
        historico_df["*"],
        novos_df["Status"].alias("Novo_Status")
    )

# Fecha os registros antigos
historico_atualizado_df = alterados_df.withColumn("Ultimo_Registro", lit(False)) \
    .withColumn("Data_Fim", current_timestamp()) \
    .drop("Novo_Status")


# Insere o novo registro com status atualizado
novos_registros_df = alterados_df.select(
    col("id"),
    col("Fornecedor"),
    col("CNPJ"),
    col("Novo_Status").alias("Status"),
    current_timestamp().alias("Data_Inicio"),
    lit(None).cast(StringType()).alias("Data_Fim"),
    lit(True).alias("Ultimo_Registro")
)

# Registros inalterados
inalterados_df = historico_df.join(novos_df, historico_df["CNPJ"] == novos_df["CNPJ"], "left_anti")

# Registros novos (que ainda não existem no histórico)
novos_inseridos_df = novos_df.join(historico_df, novos_df["CNPJ"] == historico_df["CNPJ"], "left_anti") \
    .withColumn("Data_Inicio", current_timestamp()) \
    .withColumn("Data_Fim", lit(None).cast(StringType())) \
    .withColumn("Ultimo_Registro", lit(True))

# Resultado final
resultado_df = historico_atualizado_df.union(novos_registros_df) \
    .union(inalterados_df) \
    .union(novos_inseridos_df)

resultado_df.orderBy("CNPJ").show(truncate=False)


+---+------------+--------------+-------+--------------------------+--------------------------+---------------+
|id |Fornecedor  |CNPJ          |Status |Data_Inicio               |Data_Fim                  |Ultimo_Registro|
+---+------------+--------------+-------+--------------------------+--------------------------+---------------+
|4  |Fornecedor D|11111111000111|Ativo  |2025-05-04 12:49:35.205627|NULL                      |true           |
|1  |Fornecedor A|12345678000195|Inativo|2025-05-04 12:49:35.205627|NULL                      |true           |
|1  |Fornecedor A|12345678000195|Ativo  |2023-01-01                |2025-05-04 12:49:35.205627|false          |
|3  |Fornecedor C|12345678007808|Ativo  |2023-01-01                |NULL                      |true           |
+---+------------+--------------+-------+--------------------------+--------------------------+---------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit, row_number
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Inicia sessão Spark
spark = SparkSession.builder.appName("SCD_Type2_Merge").getOrCreate()

# Histórico atual
historico_data = [
    (1, "Fornecedor A", "12345678000195", "Ativo", "2023-01-01", None, True),
    (2, "Fornecedor B", "98765432000196", "Ativo", "2023-01-01", None, True),
    (3, "Fornecedor C", "12345678007808", "Ativo", "2023-01-01", None, True)
]
historico_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType()),
    StructField("Data_Inicio", StringType()),
    StructField("Data_Fim", StringType()),
    StructField("Ultimo_Registro", BooleanType())
])
historico_df = spark.createDataFrame(historico_data, schema=historico_schema)

# Novos dados
novos_data = [
    (1, "Fornecedor A", "12345678000195", "Inativo"),
    (2, "Fornecedor B", "98765432000196", "Ativo"),
    (4, "Fornecedor D", "11111111000111", "Ativo")
]
novos_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType())
])
novos_df = spark.createDataFrame(novos_data, schema=novos_schema)

# Condição para identificar mudanças
join_cond = (historico_df["CNPJ"] == novos_df["CNPJ"]) & \
            (historico_df["id"] == novos_df["id"]) & \
            (historico_df["Fornecedor"] == novos_df["Fornecedor"]) & \
            (novos_df["Status"] == "Inativo")


# Registros alterados
alterados_df = historico_df.join(novos_df, join_cond) \
    .filter(historico_df.Ultimo_Registro == True) \
    .select(historico_df["*"], novos_df["Status"].alias("Novo_Status"))

# Atualiza o antigo como inativo com Data_Fim
historico_atualizado_df = alterados_df.withColumn("Ultimo_Registro", lit(False)) \
    .withColumn("Data_Fim", current_timestamp()) \
    .drop("Novo_Status")

# Novo registro com status atualizado
novos_registros_df = alterados_df.select(
    col("id"),
    col("Fornecedor"),
    col("CNPJ"),
    col("Novo_Status").alias("Status"),
    current_timestamp().alias("Data_Inicio"),
    lit(None).cast(StringType()).alias("Data_Fim"),
    lit(True).alias("Ultimo_Registro")
)

# Inalterados
inalterados_df = historico_df.join(novos_df, historico_df["CNPJ"] == novos_df["CNPJ"], "left_anti")

# Novos que não existiam
novos_inseridos_df = novos_df.join(historico_df, novos_df["CNPJ"] == historico_df["CNPJ"], "left_anti") \
    .withColumn("Data_Inicio", current_timestamp()) \
    .withColumn("Data_Fim", lit(None).cast(StringType())) \
    .withColumn("Ultimo_Registro", lit(True))

# União final
resultado_df = historico_atualizado_df.union(novos_registros_df) \
    .union(inalterados_df) \
    .union(novos_inseridos_df)

# Ordena por versão usando row_number
janela = Window.partitionBy("CNPJ").orderBy("Data_Inicio")
resultado_final_df = resultado_df.withColumn("Ordem_Registro", row_number().over(janela))

# Exibe resultado
resultado_final_df.orderBy("CNPJ", "Ordem_Registro").show(truncate=False)


+---+------------+--------------+-------+--------------------------+--------------------------+---------------+--------------+
|id |Fornecedor  |CNPJ          |Status |Data_Inicio               |Data_Fim                  |Ultimo_Registro|Ordem_Registro|
+---+------------+--------------+-------+--------------------------+--------------------------+---------------+--------------+
|4  |Fornecedor D|11111111000111|Ativo  |2025-05-04 12:49:45.958588|NULL                      |true           |1             |
|1  |Fornecedor A|12345678000195|Ativo  |2023-01-01                |2025-05-04 12:49:45.958588|false          |1             |
|1  |Fornecedor A|12345678000195|Inativo|2025-05-04 12:49:45.958588|NULL                      |true           |2             |
|3  |Fornecedor C|12345678007808|Ativo  |2023-01-01                |NULL                      |true           |1             |
+---+------------+--------------+-------+--------------------------+--------------------------+---------------+

In [None]:
ativos = resultado_df.filter(resultado_df["Status"] == "Ativo")
ativos.show()

+---+------------+--------------+------+--------------------+--------------------+---------------+
| id|  Fornecedor|          CNPJ|Status|         Data_Inicio|            Data_Fim|Ultimo_Registro|
+---+------------+--------------+------+--------------------+--------------------+---------------+
|  1|Fornecedor A|12345678000195| Ativo|          2023-01-01|2025-05-04 12:02:...|          false|
|  3|Fornecedor C|12345678007808| Ativo|          2023-01-01|                NULL|           true|
|  4|Fornecedor D|11111111000111| Ativo|2025-05-04 12:02:...|                NULL|           true|
+---+------------+--------------+------+--------------------+--------------------+---------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import *

# Inicializa sessão Spark
spark = SparkSession.builder.appName("SCD_Type2_Complete").getOrCreate()

# Dados do histórico atual
historico_data = [
    (1, "Fornecedor A", "12345678000195", "Ativo", "2023-01-01", None, True, 1),
    (2, "Fornecedor B", "98765432000196", "Ativo", "2023-01-01", None, True, 1),
    (3, "Fornecedor C", "12345678007808", "Ativo", "2023-01-01", None, True, 1)
]

historico_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType()),
    StructField("Data_Inicio", StringType()),
    StructField("Data_Fim", StringType()),
    StructField("Ultimo_Registro", BooleanType()),
    StructField("Ordem_Registro", IntegerType())
])

historico_df = spark.createDataFrame(historico_data, schema=historico_schema)

# Novos dados do sistema transacional
novos_data = [
    (1, "Fornecedor A", "12345678000195", "Inativo"),  # Mudou
    (2, "Fornecedor B", "98765432000196", "Ativo"),    # Igual
    (4, "Fornecedor D", "11111111000111", "Ativo")     # Novo
]

novos_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType())
])

novos_df = spark.createDataFrame(novos_data, schema=novos_schema)

# Junta históricos com os novos dados, mantendo apenas registros atuais
join_keys = ["CNPJ", "Fornecedor","id"]
join_df = historico_df.filter(col("Ultimo_Registro") == True) \
    .join(novos_df, join_keys, "inner") \
    .select(
        historico_df["*"],
        novos_df["Status"].alias("Novo_Status")
    )

# Fecha os registros anteriores
historico_atualizado_df = join_df.withColumn("Ultimo_Registro", lit(False)) \
    .withColumn("Data_Fim", current_timestamp()) \
    .drop("Novo_Status")

# Cria novos registros (mesmo com status igual)
novos_registros_df = join_df.select(
    col("id"),
    col("Fornecedor"),
    col("CNPJ"),
    col("Novo_Status").alias("Status"),
    current_timestamp().alias("Data_Inicio"),
    lit(None).cast(StringType()).alias("Data_Fim"),
    lit(True).alias("Ultimo_Registro")
)

# Identifica a ordem do novo registro (versão +1 por CNPJ)
window_spec = Window.partitionBy("CNPJ").orderBy("Data_Inicio")
novos_registros_df = novos_registros_df.withColumn("Ordem_Registro", row_number().over(window_spec) + 1)

# Atualiza o histórico (sem sobrescrever os registros antigos)
historico_sem_atuais_df = historico_df.filter(col("Ultimo_Registro") == False)

# Registros novos (CNPJ que ainda não existem no histórico)
novos_inseridos_df = novos_df.join(historico_df, join_keys, "left_anti") \
    .withColumn("Data_Inicio", current_timestamp()) \
    .withColumn("Data_Fim", lit(None).cast(StringType())) \
    .withColumn("Ultimo_Registro", lit(True)) \
    .withColumn("Ordem_Registro", lit(1))

# Resultado final com histórico completo
resultado_final_df = historico_sem_atuais_df.unionByName(historico_atualizado_df) \
    .unionByName(novos_registros_df) \
    .unionByName(novos_inseridos_df)

# Exibe o resultado ordenado
resultado_final_df.orderBy("CNPJ", "Ordem_Registro").show(truncate=False)


+---+------------+--------------+-------+-------------------------+-------------------------+---------------+--------------+
|id |Fornecedor  |CNPJ          |Status |Data_Inicio              |Data_Fim                 |Ultimo_Registro|Ordem_Registro|
+---+------------+--------------+-------+-------------------------+-------------------------+---------------+--------------+
|4  |Fornecedor D|11111111000111|Ativo  |2025-05-04 12:51:29.55273|NULL                     |true           |1             |
|1  |Fornecedor A|12345678000195|Ativo  |2023-01-01               |2025-05-04 12:51:29.55273|false          |1             |
|1  |Fornecedor A|12345678000195|Inativo|2025-05-04 12:51:29.55273|NULL                     |true           |2             |
|2  |Fornecedor B|98765432000196|Ativo  |2023-01-01               |2025-05-04 12:51:29.55273|false          |1             |
|2  |Fornecedor B|98765432000196|Ativo  |2025-05-04 12:51:29.55273|NULL                     |true           |2             |


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit
from pyspark.sql.types import *
from datetime import datetime

spark = SparkSession.builder.appName("SCD_Type2_OnlyInativos").getOrCreate()

# Histórico atual com tipo de data correto
historico_data = [
    (1, "Fornecedor A", "12345678000195", "Ativo", datetime(2023, 1, 1), None, True, 1),
    (2, "Fornecedor B", "98765432000196", "Ativo", datetime(2023, 1, 1), None, True, 1),
    (3, "Fornecedor C", "12345678007808", "Ativo", datetime(2023, 1, 1), None, True, 1)
]
historico_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType()),
    StructField("Data_Inicio", TimestampType()),
    StructField("Data_Fim", TimestampType()),
    StructField("Ultimo_Registro", BooleanType()),
    StructField("Ordem_Registro", IntegerType())
])
historico_df = spark.createDataFrame(historico_data, historico_schema)

# Novos dados transacionais
novos_data = [
    (1, "Fornecedor A", "12345678000195", "Inativo"),  # Mudou
    (2, "Fornecedor B", "98765432000196", "Ativo"),    # Igual
    (4, "Fornecedor D", "11111111000111", "Ativo")     # Novo
]
novos_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType())
])
novos_df = spark.createDataFrame(novos_data, novos_schema)

# Filtra registros que mudaram para Inativo
join_keys = ["CNPJ", "Fornecedor"]
inativos_df = historico_df.filter(col("Ultimo_Registro") == True) \
    .join(novos_df.filter(col("Status") == "Inativo"), join_keys, "inner") \
    .filter(historico_df.Status != "Inativo") \
    .select(
        historico_df["*"],
        novos_df["Status"].alias("Novo_Status")
    )

# Atualiza registros antigos (fecha validade)
fechados_df = inativos_df.withColumn("Ultimo_Registro", lit(False)) \
    .withColumn("Data_Fim", current_timestamp()) \
    .drop("Novo_Status")

# Cria novo registro com status Inativo
novos_inativos_df = inativos_df.select(
    col("id"),
    col("Fornecedor"),
    col("CNPJ"),
    col("Novo_Status").alias("Status"),
    current_timestamp().alias("Data_Inicio"),
    lit(None).cast(TimestampType()).alias("Data_Fim"),
    lit(True).alias("Ultimo_Registro"),
    (col("Ordem_Registro") + 1).alias("Ordem_Registro")
)

# Registros que não mudaram
nao_alterados_df = historico_df.join(fechados_df, ["id", "CNPJ", "Fornecedor"], "left_anti")

# Resultado final
resultado_df = nao_alterados_df.unionByName(fechados_df).unionByName(novos_inativos_df)

# Exibe resultado
resultado_df.orderBy("CNPJ", "Ordem_Registro").show(truncate=False)


+---+--------------+------------+-------+--------------------------+--------------------------+---------------+--------------+
|id |CNPJ          |Fornecedor  |Status |Data_Inicio               |Data_Fim                  |Ultimo_Registro|Ordem_Registro|
+---+--------------+------------+-------+--------------------------+--------------------------+---------------+--------------+
|1  |12345678000195|Fornecedor A|Ativo  |2023-01-01 00:00:00       |2025-05-04 13:00:02.419764|false          |1             |
|1  |12345678000195|Fornecedor A|Inativo|2025-05-04 13:00:02.419764|NULL                      |true           |2             |
|3  |12345678007808|Fornecedor C|Ativo  |2023-01-01 00:00:00       |NULL                      |true           |1             |
|2  |98765432000196|Fornecedor B|Ativo  |2023-01-01 00:00:00       |NULL                      |true           |1             |
+---+--------------+------------+-------+--------------------------+--------------------------+---------------+

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit, when
from pyspark.sql.types import *
from datetime import datetime

spark = SparkSession.builder.appName("SCD_Type2_OnlyInativos").getOrCreate()

# Histórico atual
historico_data = [
    (1, "Fornecedor A", "12345678000195", "Ativo", datetime(2023, 1, 1), None, True, 1),
    (2, "Fornecedor B", "98765432000196", "Ativo", datetime(2023, 1, 1), None, True, 1),
    (3, "Fornecedor C", "12345678007808", "Ativo", datetime(2023, 1, 1), None, True, 1)
]
historico_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType()),
    StructField("Data_Inicio", TimestampType()),
    StructField("Data_Fim", TimestampType()),
    StructField("Ultimo_Registro", BooleanType()),
    StructField("Ordem_Registro", IntegerType())
])
historico_df = spark.createDataFrame(historico_data, historico_schema)

# Novos dados transacionais
novos_data = [
    (1, "Fornecedor A", "12345678000195", "Inativo"),  # Mudou
    (2, "Fornecedor B", "98765432000196", "Ativo"),    # Igual
    (4, "Fornecedor D", "11111111000111", "Ativo")     # Novo
]
novos_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType())
])
novos_df = spark.createDataFrame(novos_data, novos_schema)

# Chaves de junção
join_keys = ["CNPJ", "Fornecedor"]

# 1. Identifica registros que mudaram para Inativo
inativos_df = historico_df.filter(col("Ultimo_Registro") == True) \
    .join(novos_df.filter(col("Status") == "Inativo"), join_keys, "inner") \
    .filter(historico_df.Status != "Inativo") \
    .select(
        historico_df["*"],
        novos_df["Status"].alias("Novo_Status")
    )

# 2. Atualiza registros antigos (marca como não último e define data fim)
fechados_df = inativos_df.withColumn("Ultimo_Registro", lit(False)) \
    .withColumn("Data_Fim", current_timestamp()) \
    .drop("Novo_Status")

# 3. Cria novo registro com status Inativo (com Data_Fim preenchida)
novos_inativos_df = inativos_df.select(
    col("id"),
    col("Fornecedor"),
    col("CNPJ"),
    col("Novo_Status").alias("Status"),
    current_timestamp().alias("Data_Inicio"),
    current_timestamp().alias("Data_Fim"),
    lit(True).alias("Ultimo_Registro"),
    (col("Ordem_Registro") + 1).alias("Ordem_Registro")
)

# 4. Identifica novos registros (não existem no histórico)
novos_registros_df = novos_df.join(historico_df, join_keys, "left_anti") \
    .select(
        col("id"),
        col("Fornecedor"),
        col("CNPJ"),
        col("Status"),
        current_timestamp().alias("Data_Inicio"),
        when(col("Status") == "Inativo", current_timestamp())
            .otherwise(lit(None).cast(TimestampType())).alias("Data_Fim"),
        lit(True).alias("Ultimo_Registro"),
        lit(1).alias("Ordem_Registro")
    )

# 5. Registros que não foram alterados
nao_alterados_df = historico_df.join(fechados_df, ["id", "CNPJ", "Fornecedor"], "left_anti")

# 6. Resultado final
resultado_df = nao_alterados_df.unionByName(fechados_df) \
    .unionByName(novos_inativos_df) \
    .unionByName(novos_registros_df)

# 7. Exibe resultado
resultado_df.orderBy("CNPJ", "Ordem_Registro").show(truncate=False)


+---+--------------+------------+-------+--------------------------+--------------------------+---------------+--------------+
|id |CNPJ          |Fornecedor  |Status |Data_Inicio               |Data_Fim                  |Ultimo_Registro|Ordem_Registro|
+---+--------------+------------+-------+--------------------------+--------------------------+---------------+--------------+
|4  |11111111000111|Fornecedor D|Ativo  |2025-05-04 13:08:26.266851|NULL                      |true           |1             |
|1  |12345678000195|Fornecedor A|Ativo  |2023-01-01 00:00:00       |2025-05-04 13:08:26.266851|false          |1             |
|1  |12345678000195|Fornecedor A|Inativo|2025-05-04 13:08:26.266851|2025-05-04 13:08:26.266851|true           |2             |
|3  |12345678007808|Fornecedor C|Ativo  |2023-01-01 00:00:00       |NULL                      |true           |1             |
|2  |98765432000196|Fornecedor B|Ativo  |2023-01-01 00:00:00       |NULL                      |true           |

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit, when
from pyspark.sql.types import *
from datetime import datetime

spark = SparkSession.builder.appName("SCD_Type2_OnlyInativos").getOrCreate()

# Histórico atual
historico_data = [
    (1, "Fornecedor A", "12345678000195", "Ativo", datetime(2023, 1, 1), None, True, 1),
    (2, "Fornecedor B", "98765432000196", "Ativo", datetime(2023, 1, 1), None, True, 1),
    (3, "Fornecedor C", "12345678007808", "Ativo", datetime(2023, 1, 1), None, True, 1)
]
historico_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType()),
    StructField("Data_Inicio", TimestampType()),
    StructField("Data_Fim", TimestampType()),
    StructField("Ultimo_Registro", BooleanType()),
    StructField("Ordem_Registro", IntegerType())
])
historico_df = spark.createDataFrame(historico_data, historico_schema)

# Novos dados transacionais
novos_data = [
    (1, "Fornecedor A", "12345678000195", "Inativo"),  # Mudou
    (2, "Fornecedor B", "98765432000196", "Ativo"),    # Igual
    (4, "Fornecedor D", "11111111000111", "Ativo")     # Novo
]
novos_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Fornecedor", StringType()),
    StructField("CNPJ", StringType()),
    StructField("Status", StringType())
])
novos_df = spark.createDataFrame(novos_data, novos_schema)

# Chaves para junção
join_keys = ["CNPJ", "Fornecedor"]

# 1. Identifica registros que mudaram para Inativo
inativos_df = historico_df.filter(col("Ultimo_Registro") == True) \
    .join(novos_df.filter(col("Status") == "Inativo"), join_keys, "inner") \
    .filter(historico_df.Status != "Inativo") \
    .select(
        historico_df["*"],
        novos_df["Status"].alias("Novo_Status")
    )

# 2. Fecha registros antigos com Data_Fim (somente se ficaram inativos)
fechados_df = inativos_df.withColumn("Ultimo_Registro", lit(False)) \
    .withColumn("Data_Fim", current_timestamp()) \
    .drop("Novo_Status")

# 3. Novo registro para o mesmo CNPJ com status Inativo (Data_Fim preenchida)
novos_inativos_df = inativos_df.select(
    col("id"),
    col("Fornecedor"),
    col("CNPJ"),
    col("Novo_Status").alias("Status"),
    current_timestamp().alias("Data_Inicio"),
    current_timestamp().alias("Data_Fim"),
    lit(True).alias("Ultimo_Registro"),
    (col("Ordem_Registro") + 1).alias("Ordem_Registro")
)

# 4. Novos registros (sem histórico anterior)
novos_registros_df = novos_df.join(historico_df, join_keys, "left_anti") \
    .select(
        col("id"),
        col("Fornecedor"),
        col("CNPJ"),
        col("Status"),
        current_timestamp().alias("Data_Inicio"),
        # ❗️ Aqui aplicamos sua regra: Data_Fim só se Status for Inativo
        when(col("Status") == "Inativo", current_timestamp())
            .otherwise(lit(None).cast(TimestampType())).alias("Data_Fim"),
        lit(True).alias("Ultimo_Registro"),
        lit(1).alias("Ordem_Registro")
    )

# 5. Registros que não foram alterados
nao_alterados_df = historico_df.join(fechados_df, ["id", "CNPJ", "Fornecedor"], "left_anti")

# 6. Resultado final
resultado_df = nao_alterados_df.unionByName(fechados_df) \
    .unionByName(novos_inativos_df) \
    .unionByName(novos_registros_df)

# 7. Exibe resultado
resultado_df.orderBy("CNPJ", "Ordem_Registro").show(truncate=False)


+---+--------------+------------+-------+--------------------------+--------------------------+---------------+--------------+
|id |CNPJ          |Fornecedor  |Status |Data_Inicio               |Data_Fim                  |Ultimo_Registro|Ordem_Registro|
+---+--------------+------------+-------+--------------------------+--------------------------+---------------+--------------+
|4  |11111111000111|Fornecedor D|Ativo  |2025-05-04 13:11:11.907948|NULL                      |true           |1             |
|1  |12345678000195|Fornecedor A|Ativo  |2023-01-01 00:00:00       |2025-05-04 13:11:11.907948|false          |1             |
|1  |12345678000195|Fornecedor A|Inativo|2025-05-04 13:11:11.907948|2025-05-04 13:11:11.907948|true           |2             |
|3  |12345678007808|Fornecedor C|Ativo  |2023-01-01 00:00:00       |NULL                      |true           |1             |
|2  |98765432000196|Fornecedor B|Ativo  |2023-01-01 00:00:00       |NULL                      |true           |