# Bibliotecas

In [1]:
from pyspark.sql import SparkSession
from delta import *
import os
from delta.tables import DeltaTable

# Sessão SPARK

In [2]:
spark = SparkSession.builder \
            .master("spark://spark-master:7077") \
            .config("spark.jars.packages", 
                    "org.apache.hadoop:hadoop-aws:3.2.2,"
                    "io.delta:delta-spark_2.12:3.2.0,"
                    "io.delta:delta-storage:3.2.0,"
                    "com.amazonaws:aws-java-sdk-bundle:1.12.180") \
            .config("spark.executor.memory", "4g") \
            .config("spark.executor.cores", "2") \
            .config("spark.driver.memory", "4g") \
            .config("spark.driver.cores", "2") \
            .config("spark.dynamicAllocation.enabled", "true") \
            .config("spark.dynamicAllocation.minExecutors", "1") \
            .config("spark.dynamicAllocation.maxExecutors", "2") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
	        .config("spark.hadoop.fs.s3a.endpoint", os.getenv("MINIO_HOST")) \
            .config("spark.hadoop.fs.s3a.access.key", os.getenv("MINIO_ACCESS_KEY")) \
            .config("spark.hadoop.fs.s3a.secret.key", os.getenv("MINIO_SECRET_KEY")) \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.com.amazonaws.services.s3.enableV4", "true") \
            .config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
            .config("spark.hadoop.fs.AbstractFileSystem.s3a.impl","org.apache.hadoop.fs.s3a.S3A") \
            .getOrCreate()    

# Tabela Source

In [3]:
#lendo tabela
emp_path = "s3a://bronze/employe"
spark.read.format("delta").load(emp_path).createOrReplaceTempView("employe")

In [13]:
spark.sql("""
select * from employe
""").show()

+------+--------+---------+-------+
|emp_id|emp_name|dept_code| salary|
+------+--------+---------+-------+
|  1002|     Bob|     D102|60000.0|
|  1003| Charlie|     D103|70000.0|
|  1007|   Grace|     D107|75000.0|
|  1006|   Frank|     D106|70000.0|
|  1002|     Bob|     D102|62000.0|
+------+--------+---------+-------+



In [14]:
spark.sql("""
INSERT INTO employe (emp_id, emp_name, dept_code, salary)
VALUES
    (1001, 'Alice', 'D101', 50000),
    (1002, 'Bob', 'D102', 60000),
    (1003, 'Charlie', 'D103', 70000)
""")

DataFrame[]

In [12]:
spark.sql("""
DELETE FROM employe WHERE emp_id = '1001'
""")

DataFrame[num_affected_rows: bigint]

# Tabela Target

In [4]:
emp_path = "s3a://silver/employe_scd2"

spark.sql(f"""
CREATE TABLE IF NOT EXISTS employe_scd2 (
    emp_id INT,
    emp_name STRING,
    dept_code STRING,
    salary DOUBLE,
    start_date DATE,
    end_date DATE,
    is_current BOOLEAN
) USING DELTA LOCATION '{emp_path}'
""")

spark.sql("""
INSERT INTO employe_scd2 (emp_id, emp_name, dept_code, salary, start_date, end_date, is_current)
VALUES
    (1001, 'Alice', 'D101', 50000, '2020-01-01', NULL, TRUE),
    (1002, 'Bob', 'D102', 60000, '2020-01-01', NULL, TRUE),
    (1003, 'Charlie', 'D103', 70000, '2020-01-01', NULL, TRUE)
""")

DataFrame[]

In [5]:
spark.sql("SELECT * FROM employe_scd2").show()

+------+--------+---------+-------+----------+--------+----------+
|emp_id|emp_name|dept_code| salary|start_date|end_date|is_current|
+------+--------+---------+-------+----------+--------+----------+
|  1002|     Bob|     D102|60000.0|2020-01-01|    NULL|      true|
|  1003| Charlie|     D103|70000.0|2020-01-01|    NULL|      true|
|  1001|   Alice|     D101|50000.0|2020-01-01|    NULL|      true|
+------+--------+---------+-------+----------+--------+----------+



# Merge SCD 2

In [6]:
employeTable  = DeltaTable.forPath(spark, emp_path)  # Tabela Delta que contém os registros de empregados históricos
updatesDF  = spark.table("employe")  # DataFrame com os novos dados

In [7]:
#Gera Dataframe somente com os registros atualizados
newRecordsToInsert = updatesDF.alias("updates") \
    .join(employeTable.toDF().alias("employe"), "emp_id") \
    .where(
        # Filtra registros atuais e verifica se houve alteração em algum campo
        "(employe.is_current = TRUE AND (updates.salary <> employe.salary OR updates.emp_name <> employe.emp_name OR updates.dept_code <> employe.dept_code))"
    )

In [8]:
newRecordsToInsert.show()

+------+--------+---------+-------+--------+---------+-------+----------+--------+----------+
|emp_id|emp_name|dept_code| salary|emp_name|dept_code| salary|start_date|end_date|is_current|
+------+--------+---------+-------+--------+---------+-------+----------+--------+----------+
|  1001|   Alice|     D101|58000.0|   Alice|     D101|50000.0|2020-01-01|    NULL|      true|
|  1002|     Bob|     D102|62000.0|     Bob|     D102|60000.0|2020-01-01|    NULL|      true|
+------+--------+---------+-------+--------+---------+-------+----------+--------+----------+



In [9]:
# Criando o DataFrame com os registros a serem inseridos e atualizados
stagedUpdates = (
    newRecordsToInsert
    .selectExpr("NULL as mergeKey", "updates.*")  # Cria um mergeKey fictício (NULL) para identificar registros a serem inseridos
    .union(updatesDF.selectExpr("emp_id as mergeKey", "*"))  # Prepara os registros de updates com mergeKey para serem usados na atualização ou inserção
)

In [10]:
stagedUpdates.show()

+--------+------+--------+---------+-------+
|mergeKey|emp_id|emp_name|dept_code| salary|
+--------+------+--------+---------+-------+
|    NULL|  1001|   Alice|     D101|58000.0|
|    NULL|  1002|     Bob|     D102|62000.0|
|    1007|  1007|   Grace|     D107|75000.0|
|    1006|  1006|   Frank|     D106|70000.0|
|    1001|  1001|   Alice|     D101|58000.0|
|    1002|  1002|     Bob|     D102|62000.0|
+--------+------+--------+---------+-------+



In [16]:
# Aplicando a operação SCD Tipo 2 com o comando MERGE
employeTable.alias("employe").merge(
    stagedUpdates.alias("staged_updates"),
    "employe.emp_id = staged_updates.mergeKey"  # A chave de mesclagem é emp_id, comparando os registros da tabela 'employe' e 'staged_updates'
) \
.whenMatchedUpdate(
    condition = "employe.is_current = true AND (employe.salary <> staged_updates.salary OR employe.emp_name <> staged_updates.emp_name OR employe.dept_code <> staged_updates.dept_code)", 
    set = {  
        "is_current": "false",  # Atualiza 'is_current' para false para registros alterados
        "end_date": "current_date()"  # Define a data de término (end_date) para o registro alterado
    }
) \
.whenNotMatchedInsert(
    values = {
        "emp_id": "staged_updates.emp_id",  # Insere os novos dados na tabela
        "emp_name": "staged_updates.emp_name",
        "dept_code": "staged_updates.dept_code",
        "salary": "staged_updates.salary",
        "start_date": "current_date()",  # Define a data de início (start_date) para o novo registro
        "end_date": "NULL",  # O campo 'end_date' será NULL para novos registros
        "is_current": "true"  # Marca o novo registro como 'atual'
    }
) \
.execute()

In [17]:
spark.sql("SELECT * FROM employe_scd2 ORDER BY emp_name").show()

+------+--------+---------+-------+----------+----------+----------+
|emp_id|emp_name|dept_code| salary|start_date|  end_date|is_current|
+------+--------+---------+-------+----------+----------+----------+
|  1001|   Alice|     D101|58000.0|2025-03-30|      NULL|      true|
|  1001|   Alice|     D101|50000.0|2020-01-01|2025-03-30|     false|
|  1002|     Bob|     D102|62000.0|2025-03-30|      NULL|      true|
|  1002|     Bob|     D102|60000.0|2020-01-01|2025-03-30|     false|
|  1003| Charlie|     D103|70000.0|2020-01-01|      NULL|      true|
|  1006|   Frank|     D106|70000.0|2025-03-30|      NULL|      true|
|  1007|   Grace|     D107|75000.0|2025-03-30|      NULL|      true|
+------+--------+---------+-------+----------+----------+----------+

