# Arquitetura Medalhão — Camada Bronze
## Notebook: landing_to_bronze

Este notebook realiza a **ingestão dos dados brutos (Landing Layer)** para a **Camada Bronze** do catálogo `medalhao`.

A camada Bronze é responsável por armazenar os dados **exatamente como vieram da fonte**, apenas adicionando metadados técnicos, como o `ingestion_timestamp`.  
Todas as tabelas são armazenadas em formato **Delta Lake**, dentro do schema `bronze`.

---


## Configuração inicial

Nesta etapa, são configurados o catálogo e o schema (`medalhao.bronze`) onde as tabelas serão criadas.  
Se o database ainda não existir, ele será criado automaticamente.


In [0]:
%sql
--DROP CATALOG IF EXISTS medalhao CASCADE;

In [0]:
%sql
--CREATE CATALOG IF NOT EXISTS medalhao

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

catalogo = "medalhao"
bronze_db_name = "bronze"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalogo}.{bronze_db_name}")
print(f"Database {catalogo}.{bronze_db_name} criado/verificado com sucesso.\n")

Database medalhao.bronze criado/verificado com sucesso.



## Função de ingestão de CSVs

A função `ingest_csv()` é responsável por:
- Ler os arquivos CSV da camada Landing;
- Validar se o arquivo não está vazio;
- Adicionar a coluna `ingestion_timestamp` com o horário da carga;
- Gravar os dados no formato Delta na tabela correspondente da camada Bronze.

Essa função será utilizada para os 9 arquivos do dataset **Olist**.


In [0]:
def ingest_csv(nome_arquivo, nome_tabela):
    try:
        landing_path = f"/Volumes/{catalogo}/default/landing/{nome_arquivo}"
        table_name = nome_tabela
        
        df = spark.read.csv(landing_path, header=True, inferSchema=True)

        #se arquivo estiver vazio, levanta uma exceção
        if df.count() == 0:
            raise ValueError(f"O arquivo {nome_arquivo} está vazio ou não pôde ser lido.")

        # adiciona timestamp
        df_with_metadata = df.withColumn("ingestion_timestamp", F.current_timestamp())

        #formato delta
        df_with_metadata.write.format("delta") \
            .mode("overwrite") \
            .saveAsTable(f"{catalogo}.{bronze_db_name}.{table_name}")

        print(f"Tabela {bronze_db_name}.{nome_tabela} criada com sucesso\n")

    except Exception as e:
        print(f"Erro ao processar {nome_tabela}: {str(e)}\n")

## Ingestão dos arquivos Landing → Bronze

Aqui são definidos os nomes dos arquivos CSV e suas tabelas correspondentes na camada Bronze, conforme o enunciado da atividade.

Cada arquivo será processado pela função `ingest_csv()`, resultando em uma tabela Delta dentro de `medalhao.bronze`.

Após a execução, podemos verificar usando o script de SQL, no fim do notebook.


In [0]:
arquivos = [
    ("olist_customers_dataset.csv", "ft_consumidores"),
    ("olist_geolocation_dataset.csv", "ft_geolocalizacao"),
    ("olist_order_items_dataset.csv", "ft_itens_pedidos"),
    ("olist_order_payments_dataset.csv", "ft_pagamentos_pedidos"),
    ("olist_order_reviews_dataset.csv", "ft_avaliacoes_pedidos"),
    ("olist_orders_dataset.csv", "ft_pedidos"),
    ("olist_products_dataset.csv", "ft_produtos"),
    ("olist_sellers_dataset.csv", "ft_vendedores"),
    ("product_category_name_translation.csv", "dm_categoria_produtos_traducao")
]

#loop para ingestão dos 9 arquivos do dataset
for nome_arquivo, nome_tabela in arquivos:
    ingest_csv(nome_arquivo, nome_tabela)

print("Ingestão da camada Bronze concluída com sucesso")


Tabela bronze.ft_consumidores criada com sucesso

Tabela bronze.ft_geolocalizacao criada com sucesso

Tabela bronze.ft_itens_pedidos criada com sucesso

Tabela bronze.ft_pagamentos_pedidos criada com sucesso

Tabela bronze.ft_avaliacoes_pedidos criada com sucesso

Tabela bronze.ft_pedidos criada com sucesso

Tabela bronze.ft_produtos criada com sucesso

Tabela bronze.ft_vendedores criada com sucesso

Tabela bronze.dm_categoria_produtos_traducao criada com sucesso

Ingestão da camada Bronze concluída com sucesso


##Ingestão da cotação do dólar via API do Banco Central

Nesta etapa:
- São definidos os parâmetros de data inicio/fim;
- O endpoint é montado dinamicamente;
- Os dados são requisitados em formato JSON e convertidos em df Spark;
- Adiciona-se a coluna `ingestion_timestamp`;
- O resultado em `medalhao.bronze.dm_cotacao_dolar`.


In [0]:
import requests
import json
from datetime import datetime

data_inicio_formatada = "01-01-2017"
data_fim_formatada = "12-31-2018"

url = f"https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/" \
      f"CotacaoDolarPeriodo(dataInicial=@dataInicial,dataFinalCotacao=@dataFinalCotacao)" \
      f"?@dataInicial='{data_inicio_formatada}'&@dataFinalCotacao='{data_fim_formatada}'" \
      f"&$select=dataHoraCotacao,cotacaoCompra&$format=json"

response = requests.get(url) #resposta da API

if response.status_code != 200:
    raise Exception(f"Erro ao acessar a API: {response.status_code} - {response.text}") # verifica se a requisição foi bem-sucedida

data_json = response.json() # converte a resposta em um objeto JSON

dados = data_json.get("value", []) # extrai os dados da chave "value"
if not dados:
    raise ValueError("Nenhum dado retornado pela API do Banco Central.") # verifica se há dados

df_cotacao = spark.createDataFrame(dados) # cria um DataFrame Spark a partir dos dados

df_cotacao = ( # renomeia as colunas e adiciona o timestamp
    df_cotacao
    .withColumnRenamed("dataHoraCotacao", "dataHoraCotacao")
    .withColumnRenamed("cotacaoCompra", "cotacaoCompra")
    .withColumn("ingestion_timestamp", F.current_timestamp())
)

df_cotacao.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(f"{catalogo}.{bronze_db_name}.dm_cotacao_dolar") #formato delta

print("Tabela bronze.dm_cotacao_dolar criada com sucesso!\n")
df_cotacao.show(5)


Tabela bronze.dm_cotacao_dolar criada com sucesso!

+-------------+--------------------+--------------------+
|cotacaoCompra|     dataHoraCotacao| ingestion_timestamp|
+-------------+--------------------+--------------------+
|       3.2723|2017-01-02 13:07:...|2025-11-11 23:34:...|
|       3.2626|2017-01-03 13:07:...|2025-11-11 23:34:...|
|       3.2327|2017-01-04 13:11:...|2025-11-11 23:34:...|
|       3.2123|2017-01-05 13:04:...|2025-11-11 23:34:...|
|       3.2051|2017-01-06 13:13:...|2025-11-11 23:34:...|
+-------------+--------------------+--------------------+
only showing top 5 rows


In [0]:
df_cotacao.select( # mostrando a quantidade total de registros 
    F.min("dataHoraCotacao").alias("primeira_data"),
    F.max("dataHoraCotacao").alias("ultima_data"),
    F.count("*").alias("total_registros")
).show()


+--------------------+--------------------+---------------+
|       primeira_data|         ultima_data|total_registros|
+--------------------+--------------------+---------------+
|2017-01-02 13:07:...|2018-12-31 11:04:...|            499|
+--------------------+--------------------+---------------+



In [0]:
%sql
SHOW TABLES IN medalhao.bronze;


database,tableName,isTemporary
bronze,dm_categoria_produtos_traducao,False
bronze,dm_cotacao_dolar,False
bronze,ft_avaliacoes_pedidos,False
bronze,ft_consumidores,False
bronze,ft_geolocalizacao,False
bronze,ft_itens_pedidos,False
bronze,ft_pagamentos_pedidos,False
bronze,ft_pedidos,False
bronze,ft_produtos,False
bronze,ft_vendedores,False


## Conclusão

Todas as tabelas foram criadas com sucesso na camada **Bronze**:

- ft_consumidores  
- ft_geolocalizacao  
- ft_itens_pedidos  
- ft_pagamentos_pedidos  
- ft_avaliacoes_pedidos  
- ft_pedidos  
- ft_produtos  
- ft_vendedores  
- dm_categoria_produtos_traducao  
- dm_cotacao_dolar  

A próxima etapa consiste em transformar, padronizar e enriquecer esses dados na camada **Silver**, que será implementada no notebook `bronze_to_silver`.
