In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
spark = SparkSession.builder.appName("AtividadePraticaSpark2").getOrCreate()

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS medalhao;

USE CATALOG medalhao;

CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;

[0;31m---------------------------------------------------------------------------[0m
[0;31mParseException[0m                            Traceback (most recent call last)
File [0;32m<command-6354239496042959>, line 1[0m
[0;32m----> 1[0m get_ipython()[38;5;241m.[39mrun_cell_magic([38;5;124m'[39m[38;5;124msql[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124mUSE CATALOG medalhao;[39m[38;5;130;01m\n[39;00m[38;5;124mCREATE OR REPLACE SCHEMA medalhao.bronze;[39m[38;5;130;01m\n[39;00m[38;5;124mCREATE OR REPLACE SCHEMA medalhao.silver;[39m[38;5;130;01m\n[39;00m[38;5;124mCREATE OR REPLACE SCHEMA medalhao.gold;[39m[38;5;130;01m\n[39;00m[38;5;124m'[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/IPython/core/interactiveshell.py:2541[0m, in [0;36mInteractiveShell.run_cell_magic[0;34m(self, magic_name, line, cell)[0m
[1;32m   2539[0m [38;5;28;01mwith[39;00m [38;5;28mself[39m[38;5;241m.[39mbuiltin_trap:


In [0]:
landing_path = f"/Volumes/medalhao/bronze/arquivos_raw/"
file_to_table_map = {
    "olist_customers_dataset.csv": "bronze.ft_consumidores",
    "olist_geolocation_dataset.csv": "bronze.ft_geolocalizacao",
    "olist_order_items_dataset.csv": "bronze.ft_itens_pedidos",
    "olist_order_payments_dataset.csv": "bronze.ft_pagamentos_pedidos",
    "olist_order_reviews_dataset.csv": "bronze.ft_avaliacoes_pedidos",
    "olist_orders_dataset.csv": "bronze.ft_pedidos",
    "olist_products_dataset.csv": "bronze.ft_produtos",
    "olist_sellers_dataset.csv": "bronze.ft_vendedores",
    "product_category_name_translation.csv": "bronze.dm_categoria_produtos_traducao"
}

In [0]:
for csv_file, table_name in file_to_table_map.items():
    try:
        file_path = f"{landing_path}/{csv_file}"
        df_raw = spark.read.csv(file_path, header=True, inferSchema=True)
        if df_raw.count() == 0:
            raise ValueError(f"O arquivo {csv_file} está vazio ou não pôde ser lido.")
        df_bronze = df_raw.withColumn("ingestion_timestamp", F.current_timestamp())
        df_bronze.write.format("delta").mode("overwrite").saveAsTable(table_name)
    except Exception as e:
        print(f"Erro ao processar {table_name}: {e}")



In [0]:
%sql
SELECT 
  min(to_date(order_purchase_timestamp)) AS primeira_data_pedido,
  max(to_date(order_purchase_timestamp)) AS ultima_data_pedido
FROM bronze.ft_pedidos

primeira_data_pedido,ultima_data_pedido
2016-09-04,2018-10-17


In [0]:
import requests
import json

dbutils.widgets.text("data_inicio", "09-03-2016", "Data Início (MM-DD-AAAA)")
dbutils.widgets.text("data_fim", "10-18-2018", "Data Fim (MM-DD-AAAA)")

data_inicio_formatada = dbutils.widgets.get("data_inicio")
data_fim_formatada = dbutils.widgets.get("data_fim")

print(f"Buscando cotação para o período: {data_inicio_formatada} até {data_fim_formatada}")

url = f"https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/CotacaoDolarPeriodo(dataInicial=@dataInicial,dataFinalCotacao=@dataFinalCotacao)?@dataInicial='{data_inicio_formatada}'&@dataFinalCotacao='{data_fim_formatada}'&$select=dataHoraCotacao,cotacaoCompra&$format=json"

try:
    response = requests.get(url)
    response.raise_for_status() 
    
    data = response.json()
    cotacoes_list = data.get("value", []) 
    
    if not cotacoes_list:
        print("API não retornou dados.")
    
    else:
        print(f"API retornou {len(cotacoes_list)} registros.")
        df_cotacao_raw = spark.createDataFrame(cotacoes_list)
        df_cotacao_bronze = df_cotacao_raw.withColumn("ingestion_timestamp", F.current_timestamp())
        df_cotacao_bronze.write.format("delta").mode("overwrite").saveAsTable("bronze.dm_cotacao_dolar")
        display(df_cotacao_bronze.limit(10))


except requests.exceptions.HTTPError as http_err:
    print(f"❌ ERRO de HTTP: {http_err}")
except Exception as e:
    print(f"❌ ERRO ao processar API: {e}")

Buscando cotação para o período: 09-01-2016 até 10-23-2018
API retornou 536 registros.


cotacaoCompra,dataHoraCotacao,ingestion_timestamp
3.2466,2016-09-01 13:10:10.989,2025-11-13T02:19:27.461Z
3.2425,2016-09-02 13:05:51.688,2025-11-13T02:19:27.461Z
3.2715,2016-09-05 13:09:55.659,2025-11-13T02:19:27.461Z
3.2446,2016-09-06 13:02:39.984,2025-11-13T02:19:27.461Z
3.1928,2016-09-08 13:03:53.968,2025-11-13T02:19:27.461Z
3.2632,2016-09-09 13:14:00.885,2025-11-13T02:19:27.461Z
3.2848,2016-09-12 13:08:01.541,2025-11-13T02:19:27.461Z
3.2966,2016-09-13 13:03:56.534,2025-11-13T02:19:27.461Z
3.3256,2016-09-14 13:05:51.819,2025-11-13T02:19:27.461Z
3.332,2016-09-15 13:08:34.825,2025-11-13T02:19:27.461Z
