In [0]:
# Se precisar
# python
# Copiar código
from pyspark.sql import SparkSession

# Inicializar Spark
spark = SparkSession.builder \
    .appName("DeltaLakeDetailedExample") \
    .config("spark.sql.extensions", "delta.sql.DeltaSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [0]:
########################################
### Importar bibliotecas necessárias ###
########################################

import os
import pyspark
from pyspark.sql import SparkSession

# Inicializar a SparkSession
spark = SparkSession.builder \
    .appName("DeltaLakeExample") \
    .getOrCreate()

# Criar um DataFrame de exemplo
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)

# Salvar como uma tabela Delta
#df.write.format("delta").mode("overwrite").save("/mnt/delta_table")


In [0]:
df.display()

name,age
Alice,34
Bob,45
Cathy,29


In [0]:
dbutils.fs.ls("dbfs:/Folder")

Out[5]: []

In [0]:
# Make a Directory:
dbutils.fs.mkdirs("dbfs:/Folder")

Out[4]: True

In [0]:
####################################
### Salvar como uma tabela Delta ###
####################################

df.write.format("delta").mode("overwrite").save("dbfs:/Folder")

In [0]:
#################################
### Ler dados da tabela Delta ###
#################################

df_delta = spark.read.format("delta").load("dbfs:/Folder")
df_delta.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 34|
|Cathy| 29|
|  Bob| 45|
+-----+---+



In [0]:
################################################
### Criar um novo DataFrame para atualização ###
################################################

new_data = [("Alice", 35), ("Bob", 45), ("David", 28)]
new_df = spark.createDataFrame(new_data, columns)

In [0]:
new_df.display()

name,age
Alice,35
Bob,45
David,28


In [0]:
################################################
### Criar um novo DataFrame para atualização ###
################################################

# new_data = [("Alice", 35), ("Bob", 45), ("David", 28)]
# new_df = spark.createDataFrame(new_data, columns)

# Atualizar os dados existentes
# new_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/delta_table")
new_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("dbfs:/Folder")

In [0]:
spark.read.format("delta").load("dbfs:/Folder").display()

name,age
Alice,35
David,28
Bob,45


In [0]:
spark.read.format("delta").load("dbfs:/Folder").printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [0]:
###############################################
### Consultar uma versão anterior da tabela ###
###############################################

# version_df = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/delta_table")
version_df = spark.read.format("delta").option("versionAsOf", 0).load("dbfs:/Folder")
version_df.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 34|
|Cathy| 29|
|  Bob| 45|
+-----+---+



In [0]:
version_df = spark.read.format("delta").option("versionAsOf", 1).load("dbfs:/Folder")
version_df.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 35|
|David| 28|
|  Bob| 45|
+-----+---+



In [0]:
version_df = spark.read.format("delta").option("versionAsOf", 2).load("dbfs:/Folder")
version_df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3307471690888716>:1[0m
[0;32m----> 1[0m version_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39moption([38;5;124m"[39m[38;5;124mversionAsOf[39m[38;5;124m"[39m, [38;5;241m2[39m)[38;5;241m.[39mload([38;5;124m"[39m[38;5;124mdbfs:/Folder[39m[38;5;124m"[39m)
[1;32m      2[0m version_df[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[4

In [0]:
#################################
### Atualizar Dados com Merge ###
#################################

# Suponha que queremos atualizar a idade de "Alice" e adicionar um novo registro:
  
# Novo DataFrame para atualização
updates = [("Alice", 40), ("David", 28)]
updates_df = spark.createDataFrame(updates, columns)

# Usar Merge para atualizar e inserir dados
from delta.tables import DeltaTable

# Criar uma tabela Delta a partir do caminho existente
# delta_table = DeltaTable.forPath(spark, "/mnt/delta_table")
delta_table = DeltaTable.forPath(spark, "dbfs:/Folder")


# Fazer o merge
delta_table.alias("old_data") \
    .merge(
        updates_df.alias("new_data"),
        "old_data.name = new_data.name"
    ) \
    .whenMatchedUpdate(set={"age": "new_data.age"}) \
    .whenNotMatchedInsert(values={"name": "new_data.name", "age": "new_data.age"}) \
    .execute()


In [0]:
delta_table.toDF().show()

+-----+---+
| name|age|
+-----+---+
|Alice| 40|
|David| 28|
|  Bob| 45|
+-----+---+



In [0]:
###########################
### Ler dados filtrados ###
###########################

filtered_df = spark.read.format("delta").load("dbfs:/Folder").filter("age > 30")
filtered_df.show()
# Exibir estatísticas da tabela Delta
delta_table.toDF().describe().show()

+-----+---+
| name|age|
+-----+---+
|Alice| 40|
|  Bob| 45|
+-----+---+

+-------+-----+------------------+
|summary| name|               age|
+-------+-----+------------------+
|  count|    3|                 3|
|   mean| null|37.666666666666664|
| stddev| null| 8.736894948054104|
|    min|Alice|                28|
|    max|David|                45|
+-------+-----+------------------+



In [0]:
#################################
### Otimização com Z-Ordering ###
################################

# Podemos melhorar o desempenho das consultas usando Z-Ordering:

# python
# Copiar código
# Otimizar a tabela com Z-Ordering
delta_table.optimize().where("age > 30").execute()

# Optimize the table with Z-Ordering
delta_table.optimize().zOrderBy("age").execute()

In [0]:
###################################
### Compactar arquivos pequenos ###
###################################

### spark.sql("OPTIMIZE delta.`/mnt/delta_table`")

# Remover arquivos antigos que não são mais necessários
### spark.sql("VACUUM delta.`/mnt/delta_table` RETAIN 168 HOURS")  # Retém arquivos por 7 dias
spark.sql("VACUUM delta.`dbfs:/Folder` RETAIN 168 HOURS")


Out[24]: DataFrame[path: string]

In [0]:
spark.sql("VACUUM delta.`dbfs:/Folder` RETAIN 168 HOURS").display()

path
dbfs:/Folder


In [0]:
########################
### Schema Evolution ###
########################

# O Delta Lake permite que você altere o esquema da tabela, adicionando novas colunas conforme necessário.
# python
# Copiar código
# Adicionar uma nova coluna

new_data = [("Alice", 35, "F"), ("Bob", 45, "M"), ("Cathy", 29, "F")]
new_columns = ["name", "age", "gender"]
new_df = spark.createDataFrame(new_data, new_columns)

# Salvar com evolução de esquema

# new_df.write.format("delta").mode("append").option("mergeSchema", "true").save("/mnt/delta_table")
new_df.write.format("delta").mode("append").option("mergeSchema", "true").save("dbfs:/Folder")

In [0]:
spark.read.format("delta").load("dbfs:/Folder").display()

name,age,gender
Alice,35,F
Cathy,29,F
Bob,45,M
Alice,40,
David,28,
Bob,45,


In [0]:
#######################################################
### Gerenciamento de Conflitos de Leitura e Escrita ###
#######################################################

# O Delta Lake permite que você trate conflitos de forma eficiente, usando a funcionalidade de transações ACID.
# python
# Copiar código

from delta.tables import DeltaTable

# Tentar atualizar a tabela enquanto lê
delta_table = DeltaTable.forPath(spark, "/mnt/delta_table")
try:
    # Exemplo de operação de leitura e escrita simultânea
    df_current = delta_table.toDF()
    df_current.show()

    # Atualizar a tabela
    delta_table.update("name = 'Alice'", {"age": "36"})
except Exception as e:
    print(f"Ocorreu um erro: {e}")

In [0]:
#######################
### Particionamento ###
#######################

# Particionar tabelas Delta pode melhorar o desempenho das consultas.
# python
# Copiar código
# Salvar como tabela Delta particionada

df.write.partitionBy("age").format("delta").mode("overwrite").save("/mnt/delta_table_partitioned")

In [0]:
#################################
### Monitoramento e Auditoria ###
#################################

# O Delta Lake permite que você monitore as operações e mantenha um histórico de alterações para auditoria.
# python
# Copiar código
# Consultar o histórico de operações

history_df = delta_table.history(10)  # últimos 10 comandos
history_df.show(truncate=False)

+-------+-------------------+----------------+--------------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+----

In [0]:
history_df.display()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
7,2024-10-25T23:06:52.000+0000,3466497405212432,zhang.yuan@senaicni.com.br,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3307471690888693),0429-204748-rasps399,6.0,WriteSerializable,True,"Map(numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 3064)",,Databricks-Runtime/12.2.x-scala2.12
6,2024-10-25T23:05:33.000+0000,3466497405212432,zhang.yuan@senaicni.com.br,VACUUM END,Map(status -> COMPLETED),,List(3307471690888693),0429-204748-rasps399,5.0,SnapshotIsolation,True,"Map(numDeletedFiles -> 0, numVacuumedDirectories -> 1)",,Databricks-Runtime/12.2.x-scala2.12
5,2024-10-25T23:05:31.000+0000,3466497405212432,zhang.yuan@senaicni.com.br,VACUUM START,"Map(retentionCheckEnabled -> true, defaultRetentionMillis -> 604800000, specifiedRetentionMillis -> 604800000)",,List(3307471690888693),0429-204748-rasps399,4.0,SnapshotIsolation,True,"Map(numFilesToDelete -> 0, sizeOfDataToDelete -> 0)",,Databricks-Runtime/12.2.x-scala2.12
4,2024-10-25T23:04:37.000+0000,3466497405212432,zhang.yuan@senaicni.com.br,VACUUM END,Map(status -> COMPLETED),,List(3307471690888693),0429-204748-rasps399,3.0,SnapshotIsolation,True,"Map(numDeletedFiles -> 0, numVacuumedDirectories -> 1)",,Databricks-Runtime/12.2.x-scala2.12
3,2024-10-25T23:04:35.000+0000,3466497405212432,zhang.yuan@senaicni.com.br,VACUUM START,"Map(retentionCheckEnabled -> true, defaultRetentionMillis -> 604800000, specifiedRetentionMillis -> 604800000)",,List(3307471690888693),0429-204748-rasps399,2.0,SnapshotIsolation,True,"Map(numFilesToDelete -> 0, sizeOfDataToDelete -> 0)",,Databricks-Runtime/12.2.x-scala2.12
2,2024-10-25T22:53:57.000+0000,3466497405212432,zhang.yuan@senaicni.com.br,MERGE,"Map(predicate -> [""(name#752627 = name#752605)""], matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(3307471690888693),0429-204748-rasps399,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 1616, numTargetBytesRemoved -> 1616, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 2, executionTimeMs -> 2804, materializeSourceTimeMs -> 182, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 1699, numTargetRowsUpdated -> 2, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 2, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 838)",,Databricks-Runtime/12.2.x-scala2.12
1,2024-10-25T22:47:11.000+0000,3466497405212432,zhang.yuan@senaicni.com.br,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(3307471690888693),0429-204748-rasps399,0.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 2410)",,Databricks-Runtime/12.2.x-scala2.12
0,2024-10-25T22:43:54.000+0000,3466497405212432,zhang.yuan@senaicni.com.br,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(3307471690888693),0429-204748-rasps399,,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 2410)",,Databricks-Runtime/12.2.x-scala2.12


# Streaming

In [0]:
# Se precisar
from pyspark.sql import SparkSession

# Inicializar a SparkSession com suporte a Delta
spark = SparkSession.builder \
    .appName("DeltaLakeStreamingExample") \
    .config("spark.sql.extensions", "delta.sql.DeltaSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [0]:
# Se precisar
from pyspark.sql import SparkSession

# Initialize a SparkSession with support for Delta Lake
spark = SparkSession.builder \
    .appName("DeltaLakeStreamingExample") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [0]:
#######################
### Streaming_input ###
#######################

# Create a list of tuples with the data
data = [("Alice", 30), ("Bob", 25), ("Cathy", 27)]

# Define the schema
columns = ["name", "age"]

# Create DataFrame
df_test = spark.createDataFrame(data, schema=columns)

In [0]:
df_test.display()

name,age
Alice,30
Bob,25
Cathy,27


In [0]:
# Make a Directory:
dbutils.fs.mkdirs("dbfs:/Streaming")

Out[11]: True

In [0]:
# Delete Files:
dbutils.fs.rm("dbfs:/Streaming", True)  # True to delete recursively

Out[9]: True

In [0]:
# Suponha que df_test seja seu DataFrame
df_test.coalesce(1).write.format("csv").mode("overwrite").option("header", "true").save("dbfs:/Streaming")

In [0]:
#############################################
### Criar uma fonte de dados de streaming ###
#############################################

# Para fins de exemplo, você pode usar um diretório onde os arquivos CSV serão colocados
# input_path = "/mnt/streaming_input"
input_path = "dbfs:/Streaming"

# Suponha que os arquivos CSV tenham as colunas "name" e "age"
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema of the CSV files
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Use the defined schema in the readStream operation
streaming_df = spark.readStream \
    .option("header", "true") \
    .schema(schema) \
    .csv(input_path)

In [0]:
streaming_df.display()

name,age
Alice,30
Bob,25
Cathy,27


In [0]:
########################################
### Processar e Gravar em Delta Lake ###
########################################

# Processar e Gravar em Delta Lake
# Agora, vamos processar os dados recebidos e gravá-los em uma tabela Delta.
# python
# Copiar código

# Especificar o caminho da tabela Delta
# delta_table_path = "/mnt/delta_table_streaming"
delta_table_path = "dbfs:/Delta_table_streaming"

# Escrever os dados em um formato Delta
query = streaming_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "dbfs:/Delta_table_streaming") \
    .start(delta_table_path)
# .option("checkpointLocation", "/mnt/checkpoints") \

query.awaitTermination()

In [0]:
##############################################
### Ler Dados da Tabela Delta em Streaming ###
##############################################

# Agora que temos os dados sendo gravados na tabela Delta, podemos criar uma consulta para ler esses dados em tempo real.
# python
# Copiar código
# Ler os dados da tabela Delta em streaming
delta_streaming_df = spark.readStream \
    .format("delta") \
    .load(delta_table_path)

# Escrever os dados lidos na saída do console
query_console = delta_streaming_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query_console.awaitTermination()

In [0]:
##################################
### Simulando Dados de Entrada ###
##################################

# Para testar este exemplo, você pode usar o netcat (ou nc) para enviar dados ao socket. Abra um terminal e execute:
# bash
# Copiar código
# nc -lk 9999