## Apache Iceberg com pySpark

### Iniciando container:
Abra o docker desktop para iniciar a docker engine e use esse comando no terminal (neste diretório) pra iniciar um container do docker com as configurações do arquivo docker-compose.yml:

```docker-compose up```

### Instalando delta-spark
Ative novamente o ambiente virtual e instale o pacote delta-spark no python:<br>
```venv\Scripts\activate``` <br>
```pip install delta-spark```

### Iniciando sessão do Spark
Após a inicialização do container, rode esse bloco de código para iniciar a sessão do pyspark:

In [None]:
from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import *
from delta.pip_utils import configure_spark_with_delta_pip
from pyspark.dbutils import DBUtils


spark = (
    SparkSession
    .builder.master("spark://spark:7077")
    .appName("DeltaLakeFundamentals")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(spark).getOrCreate()

### Criando uma tabela
Rode o bloco de código a seguir para criar uma tabela com o comando salvo no arquivo comando-criar-tabela.txt. <br>
O arquivo contem um script importando os tipos o do pyspark e criando um esboço para o DataFrame, usada pelo método spark.createDataFrame para criar um DataFrame vazio que em seguida é salvo com o formato delta no diretório /data/delta/SRAG2024/.

In [None]:
with open('comando-criar-tabela.txt', 'r') as arquivo:
    comando_criar_tabela = arquivo.read()
exec(comando_criar_tabela)

### Inserindo dados na tabela
Rode o bloco de código a seguir para inserir dados na tabela com o comando salvo no arquivo comando-inserir.txt. <br>
O arquivo contem um script criando um spark DataFrame usando o schema da tabela com os dados a serem inseridos e usa o mesmo método que foi usado para criar a tabela, porém com .mode("append") em vez de .mode("overwrite") para somente adicionar os dados (útil caso a tabela já apresente dados inseridos)

In [None]:
with open('comando-insert.txt', 'r') as arquivo:
    comando_insert = arquivo.read()
exec(comando_insert)

Caso deseje inserir mais dados na tabela, você pode modificar o seguinte bloco de código (o número de dados nas linhas deve ser equivalente ao número de colunas):

In [None]:
data = [
    ("dado", "dado", "dado"), # linha 1
    ("dado", "dado", "dado") # linha n
  ]
df = spark.createDataFrame(data, schema)
df\
    .write.format("delta")\
    .mode("append")\
    .save("/data/delta/SRAG2024/")

### Visualizando tabela:

In [None]:
df.show(5)

### Atualizando dados da tabela:

O script a seguir atualiza o valor na coluna ```ID_MUNICIP``` para ```'Criciuma'``` onde o ```ID_MUNICIP``` é igual a ```'CRICIUMA'```.

In [None]:
deltaTable = DeltaTable.forPath(spark, '/data/delta/SRAG2024')

deltaTable.update(
  condition = "ID_MUNICIP = 'CRICIUMA'",
  set = { "ID_MUNICIP": "'Criciuma'" }
)

### Apagando dados da tabela

O script a seguir deleta todas as linhas onde  o ```ID_MUNICIP``` é igual a ```'CRICIUMA'```.

In [None]:
deltaTable.delete("ID_MUNICIP = 'Criciuma'")

### Alterando uma tabela:

O script a seguir adiciona a coluna ```nova_coluna```, com todos os dados iguais a ```''```, à tabela SRAG2024, sobrescrevendo ela.

In [None]:
spark.read.format("delta").load('/data/delta/SRAG2024')\
    .withColumn("nova_coluna", lit(''))\
    .write\
    .format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "true")\
    .save('/data/delta/SRAG2024')


### Apagando uma tabela:

O script a seguir usa o comando ```VACUUM``` para remover fisicamente os arquivos da tabela inseridos a mais de que 0 horas atrás (todos) e em seguida usa um objeto de ```DBUtils``` para remover o diretório da tabela.

In [None]:
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

spark.sql("VACUUM 'data/delta/SRAG2024' RETAIN 0 HOURS")

dbutils.fs.rm('data/delta/SRAG2024', recurse=True)