In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

from delta import *

In [3]:
# Create SparkSession 
spark = ( 
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate() 
)

24/01/11 18:27:35 WARN Utils: Your hostname, sidnei-CBX resolves to a loopback address: 127.0.1.1; using 192.168.1.18 instead (on interface wlp1s0)
24/01/11 18:27:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/sidnei/.cache/pypoetry/virtualenvs/pyspark-begin-wiCe-RuI-py3.10/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/sidnei/.ivy2/cache
The jars for the packages stored in: /home/sidnei/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-13683c3b-d4e0-4613-b465-b1a2c968d230;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar ...
	[SUCCESSFUL ] io.delta#delta-core_2.12;2.4.0!delta-core_2.12.jar (2044ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/2.4.0/delta-storage-2.4.0.jar ...
	[SUCCESSFUL ] io.delta#delta-storage;2.4.0!delta-storage.jar (481ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.9.3/antlr4-runtime-4.9.3.jar ...
	[SUCCESSFUL ] org.antlr#antlr4-runtime;4.9.3!antlr4-runtime.jar (469ms)
:: resolution report :: resolve 4987ms :: arti

In [4]:
data = [
    ("ID001", "CLIENTE_X","SP","ATIVO",   250000.00),
    ("ID002", "CLIENTE_Y","SC","INATIVO", 400000.00),
    ("ID003", "CLIENTE_Z","DF","ATIVO",   1000000.00)
]

schema = (
    StructType([
        StructField("ID_CLIENTE",     StringType(),True),
        StructField("NOME_CLIENTE",   StringType(),True),
        StructField("UF",             StringType(),True),
        StructField("STATUS",         StringType(),True),
        StructField("LIMITE_CREDITO", FloatType(), True)
    ])
)

df = spark.createDataFrame(data=data,schema=schema)

df.show(truncate=False)

                                                                                

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE|UF |STATUS |LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|ID001     |CLIENTE_X   |SP |ATIVO  |250000.0      |
|ID002     |CLIENTE_Y   |SC |INATIVO|400000.0      |
|ID003     |CLIENTE_Z   |DF |ATIVO  |1000000.0     |
+----------+------------+---+-------+--------------+



Gravar Delta Lake

In [5]:
( 
    df
    .write
    .format("delta")
    .mode('overwrite')
    .save("./dados/raw/clientes")
)

24/01/11 18:29:08 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Novos dados

In [6]:
new_data = [
    ("ID001","CLIENTE_X","SP","INATIVO", 0.00),
    ("ID002","CLIENTE_Y","SC","ATIVO",   400000.00),
    ("ID004","CLIENTE_Z","DF","ATIVO",   5000000.00)
]

df_new = spark.createDataFrame(data=new_data, schema=schema)

df_new.show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
+----------+------------+---+-------+--------------+



UPSERT/MERGE

In [7]:
deltaTable = DeltaTable.forPath(spark, "./dados/raw/clientes")
deltaTable.toDF().show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID002|   CLIENTE_Y| SC|INATIVO|      400000.0|
|     ID001|   CLIENTE_X| SP|  ATIVO|      250000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+



In [8]:
(
    deltaTable.alias("dados_atuais")
    .merge(
        df_new.alias("novos_dados"),
        "dados_atuais.ID_CLIENTE = novos_dados.ID_CLIENTE"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


                                                                                

In [9]:
deltaTable = DeltaTable.forPath(spark, "./dados/raw/clientes")
deltaTable.toDF().show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+



DELETE

In [10]:
deltaTable.delete("LIMITE_CREDITO < 400000.0")

                                                                                

In [11]:
deltaTable = DeltaTable.forPath(spark, "./dados/raw/clientes")
deltaTable.toDF().show()

+----------+------------+---+------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF|STATUS|LIMITE_CREDITO|
+----------+------------+---+------+--------------+
|     ID002|   CLIENTE_Y| SC| ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF| ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF| ATIVO|     1000000.0|
+----------+------------+---+------+--------------+



History

In [12]:
(
    deltaTable
    .history()
    .select("version", "timestamp", "operation", "operationMetrics")
    .show()
)

+-------+--------------------+---------+--------------------+
|version|           timestamp|operation|    operationMetrics|
+-------+--------------------+---------+--------------------+
|      2|2024-01-11 18:40:...|   DELETE|{numRemovedFiles ...|
|      1|2024-01-11 18:36:...|    MERGE|{numTargetRowsCop...|
|      0|2024-01-11 18:29:...|    WRITE|{numFiles -> 4, n...|
+-------+--------------------+---------+--------------------+



Time Travel

In [14]:
(
    spark
    .read
    .format('delta')
    .option("versionAsOf", 1)
    .load('./dados/raw/clientes')
    .show()
)

                                                                                

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+



Restore Version

In [15]:
deltaTable.restoreToVersion(1)

                                                                                

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]