# Operações com Apache Iceberg

Este notebook demonstra as operações básicas de INSERT, UPDATE e DELETE usando Apache Iceberg com Apache Spark.

## Configuração do Ambiente

Primeiro, vamos configurar o ambiente e criar uma sessão Spark com suporte ao Apache Iceberg.

In [None]:
# Importar as bibliotecas necessárias
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# Adicionar o diretório src ao PYTHONPATH
project_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(os.path.join(project_dir, 'src'))

# Importar os módulos do projeto
from spark_delta_iceberg.spark_session import create_spark_session
from spark_delta_iceberg.iceberg_operations import IcebergOperations
from spark_delta_iceberg.sample_data import create_sample_dataframe, create_sample_update_dataframe

# Criar a sessão Spark
spark = create_spark_session("IcebergDemo")

# Criar uma instância da classe IcebergOperations
iceberg_ops = IcebergOperations(spark)

# Definir o nome da tabela Iceberg
database = "default"
table_name = "vendas_iceberg"

print("Ambiente configurado com sucesso!")
print(f"Versão do Spark: {spark.version}")
print(f"Tabela Iceberg: {iceberg_ops.catalog}.{database}.{table_name}")

## Criação de Dados de Exemplo

Vamos criar um DataFrame de exemplo com dados de vendas para usar em nossas operações.

In [None]:
# Criar um DataFrame de exemplo
df_vendas = create_sample_dataframe(spark)

# Exibir o esquema
print("Esquema do DataFrame:")
df_vendas.printSchema()

# Exibir os dados
print("\nDados do DataFrame:")
df_vendas.show()

## Operação INSERT (Criação da Tabela Iceberg)

Vamos criar uma tabela Iceberg com os dados de vendas.

In [None]:
# Registrar o DataFrame como uma view temporária para criar a tabela
df_vendas.createOrReplaceTempView("temp_view")

# Criar a tabela Iceberg com os dados de vendas
iceberg_ops.create_table(
    df=df_vendas,
    table_name=table_name,
    database=database,
    partition_by=["estado"]
)

# Ler a tabela Iceberg para verificar
df_iceberg = iceberg_ops.read_table(table_name, database)
print("\nDados da tabela Iceberg:")
df_iceberg.show()

## Operação INSERT (Adicionando Novos Dados)

Vamos adicionar novos dados à tabela Iceberg existente.

In [None]:
# Criar novos dados para inserção
novos_dados = [
    (11, "Produto K", "Eletrônicos", 1800.00, "2023-01-24", "RS"),
    (12, "Produto L", "Alimentos", 45.60, "2023-01-25", "PR")
]

# Criar DataFrame com os novos dados
schema = "id INT, nome STRING, categoria STRING, preco DOUBLE, data_venda STRING, estado STRING"
df_novos = spark.createDataFrame(novos_dados, schema)

# Inserir os novos dados na tabela Iceberg
iceberg_ops.insert_data(
    df=df_novos,
    table_name=table_name,
    database=database
)

# Ler a tabela Iceberg para verificar
df_iceberg_atualizado = iceberg_ops.read_table(table_name, database)
print("\nDados da tabela Iceberg após inserção:")
df_iceberg_atualizado.show()

## Operação UPDATE

Vamos atualizar alguns dados na tabela Iceberg.

In [None]:
# Atualizar o preço do Produto B
iceberg_ops.update_data(
    table_name=table_name,
    database=database,
    condition="nome = 'Produto B'",
    update_expr={"preco": "99.90"}
)

# Atualizar o nome e o preço do Produto E
iceberg_ops.update_data(
    table_name=table_name,
    database=database,
    condition="nome = 'Produto E'",
    update_expr={
        "nome": "'Produto E Premium'",
        "preco": "149.90"
    }
)

# Ler a tabela Iceberg para verificar
df_iceberg_atualizado = iceberg_ops.read_table(table_name, database)
print("\nDados da tabela Iceberg após atualização:")
df_iceberg_atualizado.show()

## Operação DELETE

Vamos excluir alguns dados da tabela Iceberg.

In [None]:
# Excluir produtos da categoria Alimentos com preço menor que 30
iceberg_ops.delete_data(
    table_name=table_name,
    database=database,
    condition="categoria = 'Alimentos' AND preco < 30"
)

# Ler a tabela Iceberg para verificar
df_iceberg_atualizado = iceberg_ops.read_table(table_name, database)
print("\nDados da tabela Iceberg após exclusão:")
df_iceberg_atualizado.show()

## Operação MERGE

Vamos realizar uma operação MERGE para atualizar e inserir dados em uma única operação.

In [None]:
# Criar um DataFrame com dados para atualização e inserção
df_update = create_sample_update_dataframe(spark)
print("\nDados para MERGE:")
df_update.show()

# Realizar a operação MERGE
iceberg_ops.merge_data(
    source_df=df_update,
    table_name=table_name,
    database=database,
    merge_condition="target.id = source.id",
    matched_update={
        "nome": "source.nome",
        "preco": "source.preco"
    },
    not_matched_insert=True
)

# Ler a tabela Iceberg para verificar
df_iceberg_atualizado = iceberg_ops.read_table(table_name, database)
print("\nDados da tabela Iceberg após MERGE:")
df_iceberg_atualizado.show()

## Time Travel

Vamos explorar o recurso de Time Travel do Apache Iceberg para acessar versões anteriores dos dados.

In [None]:
# Obter os snapshots da tabela
snapshots_df = iceberg_ops.get_snapshots(table_name, database)
print("\nSnapshots da tabela Iceberg:")
snapshots_df.select("snapshot_id", "timestamp", "operation").show(truncate=False)

# Obter o ID do primeiro snapshot
first_snapshot_id = snapshots_df.select("snapshot_id").first()[0]

# Acessar o primeiro snapshot da tabela
df_primeiro_snapshot = iceberg_ops.time_travel(table_name, database, snapshot_id=first_snapshot_id)
print(f"\nDados do primeiro snapshot (ID: {first_snapshot_id}) da tabela Iceberg:")
df_primeiro_snapshot.show()

## Evolução de Esquema

Vamos demonstrar a evolução de esquema no Apache Iceberg, adicionando uma nova coluna à tabela.

In [None]:
# Adicionar uma nova coluna à tabela
full_table_name = f"{iceberg_ops.catalog}.{database}.{table_name}"
spark.sql(f"ALTER TABLE {full_table_name} ADD COLUMN desconto DOUBLE")

# Atualizar alguns registros com valores para a nova coluna
iceberg_ops.update_data(
    table_name=table_name,
    database=database,
    condition="categoria = 'Eletrônicos'",
    update_expr={"desconto": "preco * 0.1"}
)

# Ler a tabela Iceberg para verificar
df_iceberg_atualizado = iceberg_ops.read_table(table_name, database)
print("\nDados da tabela Iceberg após evolução de esquema:")
df_iceberg_atualizado.show()

## Conclusão

Neste notebook, demonstramos as operações básicas de INSERT, UPDATE e DELETE usando Apache Iceberg com Apache Spark. Também exploramos recursos avançados como MERGE, Time Travel e Evolução de Esquema.

O Apache Iceberg oferece várias vantagens para o gerenciamento de dados em data lakes:

1. **Formato de Tabela Aberta**: Iceberg é um formato de tabela aberto que permite que diferentes motores de processamento acessem os mesmos dados.
2. **Transações ACID**: Garantem a consistência dos dados mesmo em caso de falhas.
3. **Time Travel**: Permite acessar versões anteriores dos dados para auditoria ou rollback.
4. **Evolução de Esquema**: Permite alterar o esquema dos dados sem afetar os consumidores.
5. **Operações SQL Expressivas**: Suporte a operações como UPDATE, DELETE e MERGE, que não são nativas em formatos tradicionais de data lake.
6. **Particionamento Oculto**: O particionamento é gerenciado pelo Iceberg, não pelo usuário, o que simplifica o uso.