<small style="font-size:10px;">

# 📌 Camada Bronze

### 📖 Importação de Bibliotecas
Carrega as bibliotecas necessárias para ingestão, processamento e armazenamento dos dados, incluindo **requests (requisição de API) e PySpark (tratamento e armazenamento).**

### 📖 Execução de Variáveis Globais
Executa o notebook `Variaveis`, que contém **caminhos do Data Lake** e outras configurações globais usadas no pipeline.

### 📖 Requisição de Dados da API
Extrai os dados da API Open Brewery DB com **tratamento de erros e tentativas automáticas** (`retries`), garantindo que falhas temporárias não interrompam o pipeline.

### 📖 Implementação de Paginação na API
A API retorna **dados paginados** com um limite padrão de **50 registros por requisição**. Para garantir que **todos os dados sejam extraídos**, foi implementada a paginação automática até que **não haja mais páginas disponíveis**.  
**📌 Resultado:** o volume de dados aumentou de **50 para 7407 registros na Bronze**.

### 📖 Definição do Schema
Define a **estrutura dos dados** no Spark usando `StructType`, garantindo **consistência e validação de tipos**. Isso melhora a performance e evita problemas com inferência de schema.

### 📖 Tratamento de Erros e Logs
Implementa um **sistema de logs no Delta Lake**, capturando erros ocorridos durante a ingestão. Caso a requisição falhe, os erros são **registrados para auditoria futura**.

### 📖 Adição de Timestamp de Ingestão
Adiciona a coluna `data_ingestao` ao DataFrame com **`current_timestamp()`**, permitindo **monitorar quando os dados foram carregados**.

### 📖 Salvamento na Camada Bronze
Os dados são salvos no **Delta Lake**, utilizando `overwrite` para manter apenas a versão mais recente e **`mergeSchema=True` para compatibilidade com novas colunas**.

</small>


In [0]:
import requests
import time
import traceback
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StructField, StringType
import uuid
from pyspark.sql.utils import AnalysisException




In [0]:
%run "./Variaveis"

In [0]:
def requisicao_data(url, per_page=100, maximo_tentativas=3, delay=5):
    dados_totais = []
    page = 1 
    while True: 
        url = f"{url}?per_page={per_page}&page={page}"

        for tentativa in range(maximo_tentativas):
            try:
                response = requests.get(url, timeout=10)
                
                if response.raise_for_status():
                    print(f"Erro na requisição: {response.status_code}")
                    return dados_totais  

                  

                dados_atuais = response.json()

                if not dados_atuais:  
                    return dados_totais

                dados_totais.extend(dados_atuais)  
                page += 1  
                break  

            except requests.exceptions.RequestException as e:
                print(f"Erro na requisição: {e}, nova tentativa {tentativa + 1}")
                time.sleep(delay)

        else:
            raise Exception("Erro ao criar DataFrame após múltiplas tentativas")

In [0]:
schema_Cervejaria = StructType([
    StructField("id",StringType(),False),
    StructField("name",StringType(),False),
    StructField("brewery_type",StringType(),False),
    StructField("address_1",StringType(),True),
    StructField("address_2",StringType(),True),
    StructField("address_3",StringType(),True),
    StructField("city",StringType(),False),
    StructField("state_province",StringType(),False),
    StructField("postal_code",StringType(),False),
    StructField("country",StringType(),False),
    StructField("longitude",StringType(),True),
    StructField("latitude",StringType(),True),
    StructField("phone",StringType(),True),
    StructField("website_url",StringType(),True),
    StructField("state",StringType(),True),
    StructField("street",StringType(),True),

])

In [0]:

caminho_erros_logs = f"{camada_bronze}erros_logs"
url = "https://api.openbrewerydb.org/breweries"

erros = []

try:
    dados = requisicao_data(url)
    df_spark = spark.createDataFrame(dados, schema=schema_Cervejaria)
    print("DataFrame criado com sucesso")

except Exception as e:
    erro_id = str(uuid.uuid4()) 
    erro = str(e)
    hora_erro = time.strftime("%Y-%m-%d %H:%M:%S")
    trace = traceback.format_exc()

    erros.append((erro_id, "Camada Bronze", erro, trace, hora_erro))

    df_novo_erro = spark.createDataFrame(erros, ["id", "camada", "erro", "trace", "data_hora"])

    try:
        df_logs_existentes = spark.read.format("delta").load(caminho_erros_logs)

        if df_logs_existentes.count() > 0 :
            df_logs_erros = df_logs_existentes.union(df_novo_erro)
        else:
            df_logs_erros = df_novo_erro


    except AnalysisException:
        df_logs_erros = df_novo_erro

    display(df_logs_erros)

    df_logs_erros.write.format("delta").mode("append").option("mergeSchema", "true").save(caminho_erros_logs)


In [0]:
df_snake = df_spark\
    .withColumnRenamed("id","id_cervejaria") \
    .withColumnRenamed("name","nm_cervejaria") \
    .withColumnRenamed("brewery_type","nm_tipo_cervejaria") \
    .withColumnRenamed("address_1","ds_endereco_1") \
    .withColumnRenamed("address_2","ds_endereco_2") \
    .withColumnRenamed("address_3","ds_endereco_3") \
    .withColumnRenamed("city","nm_cidade") \
    .withColumnRenamed("state_province","nm_estado") \
    .withColumnRenamed("postal_code","cd_cep") \
    .withColumnRenamed("country","nm_pais") \
    .withColumnRenamed("longitude","nr_longitude") \
    .withColumnRenamed("latitude","nr_latitude") \
    .withColumnRenamed("phone","nr_telefone") \
    .withColumnRenamed("website_url","ds_site_web") \
    .withColumnRenamed("state","nm_estado_repetido") \
    .withColumnRenamed("street","nm_rua")

 


In [0]:
df_dataIngestao = df_snake.withColumn("data_ingestao", current_timestamp())


In [0]:
df_dataIngestao.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(f"{camada_bronze}cervejarias")


In [0]:
display(df_dataIngestao)