In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

from delta import *

In [2]:
spark = ( 
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate() 
)

25/04/16 02:08:54 WARN Utils: Your hostname, bonetti-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/04/16 02:08:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/bonetti/.cache/pypoetry/virtualenvs/atividade-engenharia-de-dados-pyspark-oL9R8c7J-py3.12/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/bonetti/.ivy2/cache
The jars for the packages stored in: /home/bonetti/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-50543418-74ee-4a22-a24b-6370a50ba6ac;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 279ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0  

In [3]:
spark

In [4]:
data = [
    ("ID001", "CLIENTE_X","SP","ATIVO",   250000.00),
    ("ID002", "CLIENTE_Y","SC","INATIVO", 400000.00),
    ("ID003", "CLIENTE_Z","DF","ATIVO",   1000000.00)
]

schema = (
    StructType([
        StructField("ID_CLIENTE",     StringType(),True),
        StructField("NOME_CLIENTE",   StringType(),True),
        StructField("UF",             StringType(),True),
        StructField("STATUS",         StringType(),True),
        StructField("LIMITE_CREDITO", FloatType(), True)
    ])
)

df = spark.createDataFrame(data=data,schema=schema)

df.show(truncate=False)

                                                                                

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE|UF |STATUS |LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|ID001     |CLIENTE_X   |SP |ATIVO  |250000.0      |
|ID002     |CLIENTE_Y   |SC |INATIVO|400000.0      |
|ID003     |CLIENTE_Z   |DF |ATIVO  |1000000.0     |
+----------+------------+---+-------+--------------+



In [5]:
( 
    df
    .write
    .format("delta")
    .mode('overwrite')
    .save("./RAW/CLIENTES")
)

25/04/16 02:09:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [6]:
new_data = [
    ("ID001","CLIENTE_X","SP","INATIVO", 0.00),
    ("ID002","CLIENTE_Y","SC","ATIVO",   400000.00),
    ("ID004","CLIENTE_Z","DF","ATIVO",   5000000.00)
]

df_new = spark.createDataFrame(data=new_data, schema=schema)

df_new.show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
+----------+------------+---+-------+--------------+



In [7]:
deltaTable = DeltaTable.forPath(spark, "./RAW/CLIENTES")

(
    deltaTable.alias("dados_atuais")
    .merge(
        df_new.alias("novos_dados"),
        "dados_atuais.ID_CLIENTE = novos_dados.ID_CLIENTE"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


                                                                                

In [8]:
deltaTable.delete("LIMITE_CREDITO < 400000.0")

                                                                                

In [9]:
(
    deltaTable
    .history()
    .select("version", "timestamp", "operation", "operationMetrics")
    .show()
)

+-------+--------------------+---------+--------------------+
|version|           timestamp|operation|    operationMetrics|
+-------+--------------------+---------+--------------------+
|      2|2025-04-16 02:10:...|   DELETE|{numRemovedFiles ...|
|      1|2025-04-16 02:10:...|    MERGE|{numTargetRowsCop...|
|      0|2025-04-16 02:09:...|    WRITE|{numFiles -> 3, n...|
+-------+--------------------+---------+--------------------+



In [10]:
(
    spark
    .read
    .format('delta')
    .option("versionAsOf", 2)
    .load('./RAW/CLIENTES')
    .show()
)

                                                                                

+----------+------------+---+------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF|STATUS|LIMITE_CREDITO|
+----------+------------+---+------+--------------+
|     ID002|   CLIENTE_Y| SC| ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF| ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF| ATIVO|     1000000.0|
+----------+------------+---+------+--------------+

