Notebook 01 | Transformações Silver

Objetivos:
- Carregar as tabelas Bronze registradas em `/FileStore/bronze`
- Realizar joins entre Sales, Products, Stores e Exchange Rates
- Converter todos os valores para USD
- Tratar inconsistências, nulos e tipos de dados
- Gravar a camada Silver como Delta Table pronta para modelagem

## 1. Configuração do Ambiente

Importar bibliotecas necessárias e definir caminhos das tabelas.

In [0]:
from pyspark.sql.functions import col, to_date, regexp_replace
from pyspark.sql.types import DecimalType

bronze_db = "bronze"
silver_db = "silver"
silver_path = "/FileStore/silver"

## 2. Carregamento das Tabelas Bronze

Ler as Delta Tables já registradas no catálogo `bronze`.


In [0]:
sales_bronze       = spark.table(f"{bronze_db}.sales_bronze")
products_bronze    = spark.table(f"{bronze_db}.products_bronze")
stores_bronze      = spark.table(f"{bronze_db}.stores_bronze")
exchange_bronze    = spark.table(f"{bronze_db}.exchange_rates_bronze")

## 3. Inspeção Inicial

Verificar schema e amostra das tabelas para garantir consistência.

In [0]:
sales_bronze.printSchema()
sales_bronze.show(5, truncate=False)

products_bronze.printSchema()
stores_bronze.printSchema()
exchange_bronze.printSchema()

root
 |-- order_number: integer (nullable = true)
 |-- line_item: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- storekey: integer (nullable = true)
 |-- productkey: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- currency_code: string (nullable = true)

+------------+---------+----------+-------------+-----------+--------+----------+--------+-------------+
|order_number|line_item|order_date|delivery_date|customerkey|storekey|productkey|quantity|currency_code|
+------------+---------+----------+-------------+-----------+--------+----------+--------+-------------+
|366000      |1        |2016-01-01|NULL         |265598     |10      |1304      |1       |CAD          |
|366001      |1        |2016-01-01|2016-01-13   |1269051    |0       |1048      |2       |USD          |
|366001      |2        |2016-01-01|2016-01-13   |1269051    |0       |2007      |1   

## 4. Junções (Joins)

- `sales_bronze` ↔ `products_bronze` por `productkey`  
- resultado ↔ `stores_bronze` por `storekey`  
- para normalização cambial, fazer join com `exchange_bronze` usando `order_date` e `currency_code`



In [0]:
# Sales + Products
df = sales_bronze.alias("s") \
    .join(products_bronze.alias("p"), col("s.productkey") == col("p.productkey"), "left") \
    .select("s.*", "p.category", "p.subcategory", "p.unit_price_usd")

# + Stores
df = df.join(stores_bronze.alias("st"), df["storekey"] == col("st.storekey"), "left") \
       .select(df["*"], "st.country", "st.state", "st.square_meters")

# Preparar datas para join cambial
df = df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))
exchange = exchange_bronze.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))



## 5. Conversão de Moeda para USD

- Unir com `exchange` para obter taxa de câmbio  
- Corrigir tipo do preço (`unit_price_usd`)  
- Calcular `total_sales_usd = quantity * unit_price_usd * exchange`


In [0]:
# Join com exchange
df = df.join(exchange.alias("e"),
             (df.order_date == col("e.date")) & (df.currency_code == col("e.currency")),
             "left")

# Remover cifrão e converter unit_price_usd para double
df = df.withColumn("unit_price_usd", regexp_replace("unit_price_usd", "[$]", "").cast("double"))

# Calcular total_sales_usd
df = df.withColumn(
    "total_sales_usd",
    (col("quantity") * col("unit_price_usd") * col("e.exchange")).cast(DecimalType(18, 2))
)



## 6. Tratamento de Dados

- Preencher ou remover nulos críticos  
- Ajustar tipos de colunas se necessário  
- Remover colunas intermediárias usadas apenas no processo



In [0]:
# Preenche valores nulos após os joins
df = df.fillna({
    "category": "unknown",
    "subcategory": "unknown",
    "e.exchange": 1.0
})

df = df.withColumn("square_meters", col("square_meters").cast("integer"))

# Remover colunas auxiliares
df = df.drop("currency_code", "e.date", "e.currency", "e.exchange")

## 7. Verificação Final

Visualizar dados tratados antes da escrita.


In [0]:
df.printSchema()
df.select("order_date", "unit_price_usd", "quantity", "total_sales_usd").show(10, truncate=False)

root
 |-- order_number: integer (nullable = true)
 |-- line_item: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- storekey: integer (nullable = true)
 |-- productkey: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- category: string (nullable = false)
 |-- subcategory: string (nullable = false)
 |-- unit_price_usd: double (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- square_meters: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- currency: string (nullable = true)
 |-- exchange: double (nullable = false)
 |-- total_sales_usd: decimal(18,2) (nullable = true)

+----------+--------------+--------+---------------+
|order_date|unit_price_usd|quantity|total_sales_usd|
+----------+--------------+--------+---------------+
|2016-01-01|68.0          |1       |94.41          |
|2016-01-01|427.0    

## 8. Escrita da Camada Silver

Gravar como Delta Table e registrar no catálogo SQL `silver`.


In [0]:
# Gravar no DBFS
df.write \
  .format("delta") \
  .mode("overwrite") \
  .save(f"{silver_path}/sales_silver")

# Registrar no catálogo
spark.sql(f"CREATE DATABASE IF NOT EXISTS {silver_db}")
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS {silver_db}.sales_silver
  USING DELTA
  LOCATION '{silver_path}/sales_silver'
""")


DataFrame[]

## 9. Próximos Passos

- Iniciar preparação de features e agregações na camada Gold
