In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

from delta import *

import logging

logging.getLogger("py4j").setLevel(logging.DEBUG)

In [2]:
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

25/10/01 21:22:19 WARN Utils: Your hostname, DESKTOP-ES2AP8K resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/01 21:22:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/mnt/c/Users/Alexandre/.venv/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/alexandre/.ivy2/cache
The jars for the packages stored in: /home/alexandre/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-196a627c-64fa-4a14-bd1f-e8f3b3779975;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 379ms :: artifacts dl 14ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3  

In [4]:
data = [
    ("ID001", "CLIENTE_X","SP","ATIVO",   250000.00),
    ("ID002", "CLIENTE_Y","SC","INATIVO", 400000.00),
    ("ID003", "CLIENTE_Z","DF","ATIVO",   1000000.00)
]

schema = (
    StructType([
        StructField("ID_CLIENTE",     StringType(),True),
        StructField("NOME_CLIENTE",   StringType(),True),
        StructField("UF",             StringType(),True),
        StructField("STATUS",         StringType(),True),
        StructField("LIMITE_CREDITO", FloatType(), True)
    ])
)

df = spark.createDataFrame(data=data,schema=schema)

df.show(truncate=False)

                                                                                

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE|UF |STATUS |LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|ID001     |CLIENTE_X   |SP |ATIVO  |250000.0      |
|ID002     |CLIENTE_Y   |SC |INATIVO|400000.0      |
|ID003     |CLIENTE_Z   |DF |ATIVO  |1000000.0     |
+----------+------------+---+-------+--------------+



# GRAVANDO DELTA TABLE

In [5]:
( 
    df
    .write
    .format("delta")
    .mode('overwrite')
    .save("./RAW/CLIENTES")
)

                                                                                

# INSERINDO NOVOS DADOS

In [6]:
new_data = [
    ("ID001","CLIENTE_X","SP","INATIVO", 0.00),
    ("ID002","CLIENTE_Y","SC","ATIVO",   400000.00),
    ("ID004","CLIENTE_Z","DF","ATIVO",   5000000.00)
]

df_new = spark.createDataFrame(data=new_data, schema=schema)

df_new.show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
+----------+------------+---+-------+--------------+



# OPERAÇÃO MERGE,INSERIR E ATUALIZAR

In [7]:
deltaTable = DeltaTable.forPath(spark, "./RAW/CLIENTES")

(
    deltaTable.alias("dados_atuais")
    .merge(
        df_new.alias("novos_dados"),
        "dados_atuais.ID_CLIENTE = novos_dados.ID_CLIENTE"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

25/10/01 21:25:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [8]:
deltaTable.toDF().show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+



# OPERAÇÃO DELETE

In [9]:
deltaTable.delete("LIMITE_CREDITO < 500000.0")
deltaTable.toDF().show()

                                                                                

+----------+------------+---+------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF|STATUS|LIMITE_CREDITO|
+----------+------------+---+------+--------------+
|     ID003|   CLIENTE_Z| DF| ATIVO|     1000000.0|
|     ID004|   CLIENTE_Z| DF| ATIVO|     5000000.0|
+----------+------------+---+------+--------------+



# OPERAÇÃO UPDATE

In [10]:
eltaTable = DeltaTable.forPath(spark, "./RAW/CLIENTES")

# Atualizar LIMITE_CREDITO do cliente ID003
deltaTable.update(
    condition = "ID_CLIENTE = 'ID004'",
    set = { "LIMITE_CREDITO": "12000000.0" }
)
deltaTable.toDF().show()

+----------+------------+---+------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF|STATUS|LIMITE_CREDITO|
+----------+------------+---+------+--------------+
|     ID003|   CLIENTE_Z| DF| ATIVO|     1000000.0|
|     ID004|   CLIENTE_Z| DF| ATIVO|         1.2E7|
+----------+------------+---+------+--------------+

