In [None]:
# üîπ Cria e configura a sess√£o do Spark ‚Äî ponto de entrada para usar PySpark
from pyspark.sql import SparkSession

# üîπ Configura o Spark para suportar Delta Lake usando o pacote Delta via pip (n√£o Spark JARs)
from delta import configure_spark_with_delta_pip

# üîπ Permite acessar e manipular tabelas Delta com opera√ß√µes como merge, update, delete, etc.
from delta.tables import DeltaTable

# üîπ Fun√ß√µes do PySpark para transforma√ß√£o de dados:
#    - col: acessa colunas dinamicamente
#    - to_json: transforma struct em string JSON
#    - lit: cria colunas com valores fixos
#    - collect_list: agrega valores em listas
#    - size: retorna o tamanho de arrays/listas
from pyspark.sql.functions import col, to_json, lit, collect_list, size

# üîπ Define esquemas expl√≠citos para DataFrames:
#    - StructType e StructField criam a estrutura
#    - IntegerType e StringType definem os tipos das colunas
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [None]:
# Configura√ß√£o do Spark com Delta Lake

# üîπappName -  Define o nome da aplica√ß√£o Spark (√∫til para logs e UI)
# üîπspark.sql.extensions -  Ativa a extens√£o do Delta Lake no Spark SQL ‚Äî necess√°ria para habilitar comandos Delta (ex: MERGE, VACUUM, etc.)
# üîπspark.sql.catalog.spark_catalog -  Substitui o cat√°logo padr√£o do Spark pelo DeltaCatalog ‚Äî faz com que tabelas gerenciadas sejam Delta por padr√£o

builder = SparkSession.builder \
    .appName("Lab6Exemplo2") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")


In [None]:
# Configura√ß√µes para a Session
#Cria uma nova SparkSession ou retorna a existente, se j√° estiver ativa.
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [8]:
# Define o n√≠vel de log
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Ela cria um DataFrame Spark em mem√≥ria.

df_initial = spark.createDataFrame([
    (1, "Bruno", "SP"),
    (2, "Maria", "RJ"),
    (3, "Victor", "MG"),
    (4, "Tiago", "RJ")
], ["id", "nome", "estado"])

In [None]:
#tipo do objeto
type(df_initial)

In [None]:
# Salva o arquivo no formato Delta
df_initial.write.format("delta").mode("overwrite").save("file:///util/delta_clientes")

In [None]:
# READ - Ler os dados
spark.read.format("delta").load("file:///util/delta_clientes").show()

In [None]:
# UPDATE - Atualizar o nome do Aluno
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
delta_table.update(
    condition = "id = 1",
    set = {"nome": "'Bruno da Silva'"}
)
print("Ap√≥s atualiza√ß√£o:")
delta_table.toDF().show(truncate=False)

In [None]:
# DELETE - Remover registros
delta_table.delete(condition = "estado == 'RJ'")
print("Ap√≥s remo√ß√£o")
delta_table.toDF().show(truncate=False)

In [None]:
# Criando um DataFrame para altera√ß√£o
df_update = spark.createDataFrame([
    (2, "Maria Silva", "RJ"),
    (3, "Pedro", "MG"),
    (4, "Paulo", "MG"),
    (5, "Jos√©", "MG")
], ["id", "nome", "estado"])

In [None]:
# Buscando os dados e fazendo o Merge
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
delta_table.alias("existingData").merge(
    df_update.alias("newData"),
    "existingData.id = newData.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [None]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
print("Ap√≥s inser√ß√£o de novos registros:")
delta_table.toDF().show(truncate=False)

In [None]:
# Novos DataFrame
df_update = spark.createDataFrame([
    (2, "Maria Silva - Alterada", "RJ"),
    (6, "Paulo", "MG")
], ["id", "nome", "estado"])

In [None]:
# Merge com campos especificos

delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
delta_table.alias("oldData").merge(
    df_update.alias("upsertData"),
    "oldData.id = upsertData.id").whenMatchedUpdate(set={
    "oldData.nome": "upsertData.nome",
    "oldData.estado": "upsertData.estado"
}).whenNotMatchedInsert(values={
    "id": "upsertData.id",
    "nome": "upsertData.nome",
    "estado": "upsertData.estado"
}).execute()

In [None]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
print("Ap√≥s inser√ß√£o de novos registros:")
delta_table.toDF().show(truncate=False)

## Filtros

In [None]:
# Where simples
delta_table.toDF().filter("id = 1").show(truncate=False)

In [None]:
# Agrupar por estado
delta_table.toDF().groupBy("estado").count().show(truncate=False)

In [None]:
# Agrupar por estado e filtrando seu total
delta_table.toDF() \
    .groupBy("estado") \
    .count() \
    .filter(col("count") == 4) \
    .show(truncate=False)

## üïí Time Travel - Visualizar vers√µes antigas

Permite acessar vers√µes anteriores de uma tabela Delta via `versionAsOf` ou `timestampAsOf`. √ötil para auditoria, rollback e compara√ß√µes.


In [60]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")

In [61]:
# Obter o hist√≥rico completo
history_df = delta_table.history()

In [None]:
# Contar o n√∫mero de vers√µes
num_versions = history_df.count()
print(f"A tabela tem {num_versions} vers√µes.")

In [None]:
# Vers√µes dispon√≠veis
delta_table.history().show()

In [None]:
# Acessar a vers√£o mais antiga da tabela (vers√£o 0)
print("Vers√£o inicial da tabela:")
version_0 = spark.read.format("delta").option("versionAsOf", 0).load("file:///util/delta_clientes")
version_0.show(truncate=False)

In [None]:
print("Vers√£o 1 da tabela:")
version_1 = spark.read.format("delta").option("versionAsOf", 1).load("file:///util/delta_clientes")
version_1.show(truncate=False)

In [69]:
# Adicionar uma coluna que identifica a vers√£o
version_0 = version_0.withColumn("versao", lit(0))
version_1 = version_1.withColumn("versao", lit(1))

# Unir as duas vers√µes
changes = version_0.union(version_1)

In [None]:
changes.show(truncate=False)

In [None]:
# Pega a diferen√ßa
changes.groupBy("id", "nome") \
       .agg(collect_list("versao").alias("versoes")) \
       .filter(size("versoes") == 1) \
       .show(truncate=False)

In [None]:
# Carrega a tabela delta
version_1 = spark.read.format("delta").option("versionAsOf", 1).load("file:///util/delta_clientes")

# Carregar o hist√≥rico de altera√ß√µes da tabela Delta
history = delta_table.history()

# Selecionar apenas as colunas relevantes
formatted_history = history.select(
    col("version").alias("Vers√£o"),
    col("operation").alias("Opera√ß√£o"),
    col("operationMetrics").alias("M√©tricas"),
    col("userMetadata").alias("Metadados do Usu√°rio")
)

# Mostrar as altera√ß√µes 
formatted_history.show(truncate=False)

In [None]:
# Consultar uma vers√£o antiga (vers√£o 2)
spark.read.format("delta").option("versionAsOf", 2).load("file:///util/delta_clientes").show(truncate=False)

In [None]:
# Atualizar com uma vers√£o antiga
old_version = spark.read.format("delta").option("versionAsOf", 2).load("file:///util/delta_clientes")

In [75]:
# Sobrescrever a tabela principal com a vers√£o 2
# Isso vai gerar uma c√≥pia da vers√£o 2 que ser√° agora a vers√£o principal. 
old_version.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("file:///util/delta_clientes")

In [None]:
spark.read.format("delta").load("file:///util/delta_clientes").show(truncate=False)

In [None]:
# Caminho para a tabela Delta
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")

# Obter o hist√≥rico completo
history_df = delta_table.history()

# Contar o n√∫mero de vers√µes
num_versions = history_df.count()
print(f"A tabela tem {num_versions} vers√µes.")

## üßπ Vacuum - Remo√ß√£o de arquivos obsoletos

Por padr√£o, o Delta Lake define um per√≠odo m√≠nimo de reten√ß√£o de 7 dias. Essa regra existe para garantir a integridade de opera√ß√µes como time travel e evitar que dados importantes para transa√ß√µes sejam removidos acidentalmente. Se voc√™ quiser diminuir esse tempo, ser√° necess√°rio ajustar a configura√ß√£o de reten√ß√£o.

Ap√≥s esse per√≠odo, vers√µes anteriores dos dados n√£o poder√£o mais ser acessadas.

In [None]:
spark.sql("""
ALTER TABLE delta.`file:///util/delta_clientes`
SET TBLPROPERTIES ('delta.deletedFileRetentionDuration' = '1 day')
""")

In [None]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
delta_table.detail().select("location", "properties").show(truncate=False)

## üß¨ Evolu√ß√£o de Schema

In [105]:
df_novo = spark.createDataFrame([
    (4, "Ana", "BA", 29)
], ["id", "nome", "estado", "idade"])

df_novo.write.format("delta").mode("append") \
    .option("mergeSchema", "true") \
    .save("file:///util/delta_clientes")

In [None]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
print("Ap√≥s inser√ß√£o de novos registros:")
delta_table.toDF().show(truncate=False)

## üóÇÔ∏è Particionamento

In [None]:
## Criando um particionamento por estado
df_partition = spark.createDataFrame([
    (10, "Lucas", "SP"),
    (11, "Carla", "SP"),
    (12, "Rafa", "BA")
], ["id", "nome", "estado"])

df_partition.write.format("delta").mode("overwrite").partitionBy("estado").save("file:///util/particionados")

In [None]:
df_partition = DeltaTable.forPath(spark, "file:///util/particionados")
print("Ap√≥s inser√ß√£o de novos registros:")
df_partition.toDF().show(truncate=False)

In [None]:
# Definir o esquema 
esquema = StructType([
    StructField("id", IntegerType(), True),
    StructField("nome", StringType(), True),
    StructField("estado", StringType(), True),
    StructField("idade", IntegerType(), True)
])

# Ler os dados e ajustar o esquema, se necess√°rio
delta_table = spark.read.format("delta").load("file:///util/delta_clientes")
delta_table = spark.createDataFrame(delta_table.rdd, esquema)
delta_table.show()

### Trabalhando com Stream

In [None]:
## Criando o schema 

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("nome", StringType(), True),
    StructField("estado", StringType(), True)
])

In [None]:
## Preparando a leitura em json como stream
df_stream = spark.readStream.json("file:///home/user/readstream/", schema=schema)

In [None]:
## Iniciando a stream
query = (df_stream.writeStream
         .format("console")
         .outputMode("append")
         .option("truncate", "false")
         .option("checkpointLocation", "file:///home/user/readstream/_chk")
         .start())
#query.awaitTermination()

In [None]:
print(query.isActive)
df_stream.isStreaming

In [None]:
# Finaliza a sess√£o Spark
spark.stop()
query.stop()