# ü•â Camada Bronze ‚Äî Openbrewery

Este notebook implementa a camada **Bronze** da arquitetura **Medallion** no Databricks utilizando:

- Auto Loader (cloudFiles)
- Unity Catalog Volumes
- Delta Lake
- Streaming incremental com checkpoint

O objetivo da Bronze √© armazenar os dados **exatamente como chegam da origem**, sem qualquer transforma√ß√£o.


## üì• Origem dos dados

O Databricks Community Edition n√£o permite chamadas externas para APIs.

Por isso, os dados da Open Brewery API s√£o coletados externamente e adicionados ao Volume do Unity Catalog: `/Volumes/bees-teste-jp/default/landing/openbrewery/`

No meu cen√°rio extrai os dados utilizando o Google Colab atrav√©s do crawler_coleta_openbrewery em anexo no reposit√≥rio. Devido a limita√ß√µes do Databricks Free Edition, n√£o foi possivel consumir diretamente na bronze e nem coletar externamente e disponibilizar no volume desejado.

C√≥digo usado para coletar do dados:

```python
import requests
import json
from datetime import datetime
import time

data_hoje = datetime.now().strftime("%Y-%m-%d")
all_data = []
page = 1
max_retries = 3

print(f"üöÄ Iniciando extra√ß√£o: {data_hoje}")

while True:
    url = f"https://api.openbrewerydb.org/v1/breweries?page={page}&per_page=200"

    success = False
    for attempt in range(max_retries):
        try:
            resp = requests.get(url, timeout=10)
            resp.raise_for_status()
            data = resp.json()
            success = True
            break

        except requests.exceptions.RequestException as e:
            print(f"‚ö†Ô∏è Erro na p√°gina {page} (Tentativa {attempt + 1}): {e}")
            time.sleep(2)

    if not success:
        print(f"‚ùå Falha cr√≠tica ao acessar a p√°gina {page}. Abortando.")
        break

    if not data:
        break

    all_data.extend(data)
    print(f"‚úÖ P√°gina {page} processada. Total acumulado: {len(all_data)}")
    page += 1

if all_data:
    nome_arquivo = f"breweries_raw_{data_hoje}.json"
    try:
        with open(nome_arquivo, "w") as f:
            json.dump(all_data, f)

        from google.colab import files
        files.download(nome_arquivo)
        print(f"üèÅ Extra√ß√£o conclu√≠da! {len(all_data)} registros salvos.")
    except Exception as e:
        print(f"‚ùå Erro ao salvar o arquivo: {e}")
else:
    print("‚ö†Ô∏è Nenhum dado foi extra√≠do.")
```



Cada execu√ß√£o da API gera um novo arquivo JSON nessa pasta.

Isso simula um cen√°rio real de Data Lake, onde arquivos chegam em uma √°rea de *landing*.



## üöÄ Por que usar Auto Loader?

Em vez de usar `spark.read.json`, utilizamos o **Auto Loader**:

`spark.readStream.format("cloudFiles")`

Vantagens:

- Processa apenas arquivos novos (incremental)
- Mant√©m estado via checkpoint
- Suporta evolu√ß√£o de schema
- Escal√°vel para grandes volumes de arquivos

## üìñ Leitura de JSON com `multiLine=true`

O JSON da Open Brewery vem no formato de **array √∫nico**:

[
  {...},
  {...}
]

O Spark por padr√£o espera **um JSON por linha**.

Por isso utilizamos:

`.option("multiLine", "true")`




## üèóÔ∏è C√≥digo de leitura da Bronze

> ‚ö†Ô∏è **Observa√ß√£o importante sobre a estrat√©gia de carga (FULL SNAPSHOT)**

A API OpenBrewery **n√£o disponibiliza nenhum campo de controle incremental** (como `updated_at`, `last_modified`, CDC, etc.) que permita identificar apenas os registros novos ou alterados a cada execu√ß√£o.

Na pr√°tica, a cada dia a API retorna **o dataset completo novamente** (snapshot do estado atual).

Por esse motivo, **n√£o utilizamos a estrat√©gia tradicional de append na Bronze**, pois isso faria a tabela crescer indefinidamente com dados duplicados, gerando:

- Aumento desnecess√°rio de storage
- Necessidade de deduplica√ß√£o complexa na Silver
- Maior custo computacional
- Dificuldade para identificar o ‚Äúestado atual‚Äù da fonte

---

### ‚úÖ Estrat√©gia adotada

Optamos por tratar essa ingest√£o como **FULL SNAPSHOT di√°rio**, aplicando o seguinte padr√£o:

1. Antes da carga, a tabela Bronze √© **truncada**
2. O Auto Loader grava o snapshot completo do dia na Bronze
3. Cada carga gera uma **nova vers√£o da tabela Delta**

Dessa forma:

- A Bronze sempre representa **o estado mais atual da fonte**
- Evitamos duplica√ß√µes desnecess√°rias
- Mantemos o hist√≥rico de vers√µes atrav√©s do **Delta Time Travel**
- O storage permanece est√°vel, crescendo apenas quando a fonte realmente cresce

---

### üîé Consulta a vers√µes anteriores (Delta Time Travel)

Caso seja necess√°rio consultar o estado da Bronze em dias anteriores:

```sql
SELECT * 
FROM bronze_openbrewery
VERSION AS OF <vers√£o>;


In [0]:
%sql
USE CATALOG `bees-teste-jp`;
USE SCHEMA default;

In [0]:
%sql
TRUNCATE TABLE `bees-teste-jp`.default.bronze_openbrewery;

In [0]:
df_bronze = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("multiLine", "true")
    .option("cloudFiles.schemaLocation", "/Volumes/bees-teste-jp/default/checkpoints/openbrewery_schema")
    .load("/Volumes/bees-teste-jp/default/landing/openbrewery/")
)

## üíæ Escrita na tabela Delta Bronze

Os dados s√£o escritos em uma tabela Delta utilizando streaming.

O `checkpointLocation` √© o que garante que:

- O Auto Loader saiba quais arquivos j√° foram ingeridos
- O streaming possa continuar de onde parou

## ‚è±Ô∏è Uso do `trigger(availableNow=True)`

Esse modo executa o streaming como um **batch inteligente**:

- Processa todos os arquivos novos dispon√≠veis
- Finaliza a execu√ß√£o
- Mant√©m o estado salvo no checkpoint

Na pr√≥xima execu√ß√£o, apenas novos arquivos ser√£o lidos.


In [0]:
(df_bronze.writeStream
    .format("delta")
    .option("checkpointLocation", "/Volumes/bees-teste-jp/default/checkpoints/openbrewery_checkpoint")
    .trigger(availableNow=True)
    .toTable("`bees-teste-jp`.default.bronze_openbrewery")
)
