## CRUD em Tabelas de Data Lake com Spark

Este notebook demonstra operações CRUD com uma tabela transacional em data lake.

- O foco aqui é no fluxo de CRUD e conceitos (ACID, snapshots, time travel) de forma neutra em relação ao formato.


### Inicialização da sessão Spark (com extensões de Catálogo)

Iniciamos a `SparkSession` com as extensões necessárias para usar um catálogo Iceberg local.

- Configuramos `spark.sql.extensions` e `spark.sql.catalog.local` para habilitar comandos SQL como `CREATE TABLE ... USING iceberg`.
- O warehouse local aponta para `docs/notebooks/spark-warehouse/iceberg` para persistência no repositório de demonstração.


In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session with Iceberg configurations
spark = SparkSession.builder \
  .appName("IcebergLocalDevelopment") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1') \
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
  .config("spark.sql.catalog.local.type", "hadoop") \
  .config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
  .getOrCreate()

25/10/01 13:53:38 WARN Utils: Your hostname, joaovfe resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/01 13:53:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/root/projects/engenharia_de_dados/sparke_iceberg/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c3ed55e9-664b-4b3a-b2e0-ffa06e3f3a75;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.1 in central
:: resolution report :: resolve 204ms :: artifacts dl 5ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-subm

In [2]:
spark

### Leitura e preparação dos dados (CSV → DataFrame)

Lemos o arquivo CSV `data/listings.csv`, selecionamos e renomeamos colunas relevantes e convertimos `price` para `double`.

- Essa etapa prepara o dataset para escrita em uma tabela transacional.
- Mantemos nomes consistentes para facilitar consultas.


In [46]:
df = spark.read.option("header", True).csv("../data/listings.csv")

df = df.selectExpr(
    "id as listing_id",
    "host_id",
    "host_name",
    "neighbourhood as city",
    "room_type as property_type",
    "cast(price as double) as price"
)

df.show()


+----------+-------+--------------------+---------------+---------------+------+
|listing_id|host_id|           host_name|           city|  property_type| price|
+----------+-------+--------------------+---------------+---------------+------+
|     17878|  68997|            Matthias|     Copacabana|Entire home/apt| 254.0|
|     25026| 102840|             Viviane|     Copacabana|Entire home/apt| 252.0|
|     35764| 153691|Patricia Miranda ...|     Copacabana|Entire home/apt| 190.0|
|     48305|  70933|             Goitaca|        Ipanema|Entire home/apt|2239.0|
|     48901| 222884|              Marcio|     Copacabana|Entire home/apt| 743.0|
|     49179| 224192|               David|     Copacabana|Entire home/apt| 189.0|
|     50759| 233554|              Felipe|Barra da Tijuca|Entire home/apt|2800.0|
|     51703| 238091|               Dalia|     Copacabana|Entire home/apt| 201.0|
|     53533| 249439|      Sherri & Andre|            Joá|Entire home/apt|1467.0|
|     64795|  93005|        

### CREATE TABLE (criação da tabela transacional)

Criamos a tabela `local.db_listings` com colunas equivalentes ao dataset.

- A cláusula `USING iceberg` define o formato transacional.


In [49]:
spark.sql("""
CREATE TABLE local.db_listings (
    listing_id STRING,
    host_id STRING,
    host_name STRING,
    city STRING,
    property_type STRING,
    price DOUBLE
) USING iceberg
""")


DataFrame[]

In [50]:
df.createOrReplaceTempView("temp_listings")

spark.sql("""
INSERT INTO local.db_listings
SELECT * FROM temp_listings
""")

DataFrame[]

### INSERT (inserção de dados)

Inserimos os dados lidos do CSV na tabela transacional `local.db_listings`.

- Inserções são ACID e versionadas.
- O mecanismo de escrita (e.g., Iceberg/Delta) gerencia metadados e particionamento de forma transparente.


In [51]:
spark.sql("SELECT * FROM local.db_listings").show()

+----------+-------+--------------------+---------------+---------------+------+
|listing_id|host_id|           host_name|           city|  property_type| price|
+----------+-------+--------------------+---------------+---------------+------+
|     17878|  68997|            Matthias|     Copacabana|Entire home/apt| 254.0|
|     25026| 102840|             Viviane|     Copacabana|Entire home/apt| 252.0|
|     35764| 153691|Patricia Miranda ...|     Copacabana|Entire home/apt| 190.0|
|     48305|  70933|             Goitaca|        Ipanema|Entire home/apt|2239.0|
|     48901| 222884|              Marcio|     Copacabana|Entire home/apt| 743.0|
|     49179| 224192|               David|     Copacabana|Entire home/apt| 189.0|
|     50759| 233554|              Felipe|Barra da Tijuca|Entire home/apt|2800.0|
|     51703| 238091|               Dalia|     Copacabana|Entire home/apt| 201.0|
|     53533| 249439|      Sherri & Andre|            Joá|Entire home/apt|1467.0|
|     64795|  93005|        

### SELECT (consulta e validação)

Executamos consultas `SELECT` para inspecionar dados específicos antes e depois de `DELETE` e `UPDATE`.

- Essas leituras permitem validar o efeito das operações.
- Em formatos como Iceberg e Delta Lake, também é possível ler snapshots anteriores para auditoria.


In [52]:
spark.sql("SELECT * FROM local.db_listings WHERE listing_id = '35764'").show()

+----------+-------+--------------------+----------+---------------+-----+
|listing_id|host_id|           host_name|      city|  property_type|price|
+----------+-------+--------------------+----------+---------------+-----+
|     35764| 153691|Patricia Miranda ...|Copacabana|Entire home/apt|190.0|
+----------+-------+--------------------+----------+---------------+-----+



In [53]:
spark.sql("""
DELETE FROM local.db_listings
WHERE listing_id = '35764'
""")

DataFrame[]

In [54]:
spark.sql("SELECT * FROM local.db_listings WHERE listing_id = '35764'").show()

+----------+-------+---------+----+-------------+-----+
|listing_id|host_id|host_name|city|property_type|price|
+----------+-------+---------+----+-------------+-----+
+----------+-------+---------+----+-------------+-----+



### DELETE (exclusão de registros)

Excluímos o anúncio com `listing_id = '35764'` e validamos com uma consulta após a exclusão.

- Deleções são ACID e criam um novo snapshot.
- Em tabelas de data lakes, a exclusão pode ser física (data files removidos) ou lógica (position deletes), dependendo do engine e do formato.


In [55]:
spark.sql("SELECT * FROM local.db_listings WHERE listing_id = '17878'").show()

+----------+-------+---------+----------+---------------+-----+
|listing_id|host_id|host_name|      city|  property_type|price|
+----------+-------+---------+----------+---------------+-----+
|     17878|  68997| Matthias|Copacabana|Entire home/apt|254.0|
+----------+-------+---------+----------+---------------+-----+



In [56]:
spark.sql("""
UPDATE local.db_listings
SET price = 500
WHERE listing_id = '17878'
""")

DataFrame[]

### UPDATE (atualização de registros)

Atualizamos o preço do anúncio `listing_id = '17878'` para `500`.

- Operações `UPDATE` são ACID e versionadas.
- Cada atualização cria um novo snapshot que pode ser consultado via time-travel (em formatos com suporte, como Iceberg/Delta).


In [57]:
spark.sql("SELECT * FROM local.db_listings WHERE listing_id = '17878'").show()

+----------+-------+---------+----------+---------------+-----+
|listing_id|host_id|host_name|      city|  property_type|price|
+----------+-------+---------+----------+---------------+-----+
|     17878|  68997| Matthias|Copacabana|Entire home/apt|500.0|
+----------+-------+---------+----------+---------------+-----+



### Limpeza de recursos (DROP TABLE)

Removemos a tabela `local.db_listings` para deixar o ambiente limpo após a demonstração.

- Em formatos de tabela transacionais (como Apache Iceberg e Delta Lake), `DROP TABLE` remove metadados do catálogo e pode remover dados conforme políticas de retenção/GC.
- Em ambientes de produção, avalie retenções e catálogos antes de executar limpezas.


In [None]:
spark.sql("DROP TABLE local.db_listings")