# Notebook 01 | Transformações Silver

Objetivos:

- Carregar as tabelas Bronze registradas em `/FileStore/bronze`  
- Realizar joins entre Sales, Products, Stores e Exchange Rates  
- Converter todos os valores para USD  
- Tratar inconsistências, nulos e tipos de dados  
- Gravar a camada Silver como Delta Table pronta para modelagem  

## 1. Configuração do Ambiente

Inicializar Spark, importar funções e definir banco e caminho da camada Silver.


In [0]:
# Iniciar Spark e importar funções necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, regexp_replace, col
from pyspark.sql.types import DoubleType, IntegerType

spark = SparkSession.builder.getOrCreate()

# Definir nomes de banco e caminho no DBFS
silver_db   = "silver"
silver_path = "/FileStore/silver/sales_silver"
bronze_path = "/FileStore/bronze"

## 2. Carregamento das Tabelas Bronze

Ler as Delta Tables registradas no catálogo `bronze`.



In [0]:
# Caminhos diretos para os arquivos Delta na Bronze
sales     = spark.read.format("delta").load(f"{bronze_path}/sales")
products  = spark.read.format("delta").load(f"{bronze_path}/products")
stores    = spark.read.format("delta").load(f"{bronze_path}/stores")
exchange  = spark.read.format("delta").load(f"{bronze_path}/exchange_rates")
customers = spark.read.format("delta").load(f"{bronze_path}/customers")

## 3. Inspeção Inicial

Verificar esquema e primeiros registros para garantir consistência.


In [0]:
# Conferir esquemas e amostras
sales.printSchema()
products.printSchema()
stores.printSchema()
exchange.printSchema()
customers.printSchema()

root
 |-- order_number: integer (nullable = true)
 |-- line_item: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- storekey: integer (nullable = true)
 |-- productkey: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- currency_code: string (nullable = true)

root
 |-- productkey: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- color: string (nullable = true)
 |-- unit_cost_usd: string (nullable = true)
 |-- unit_price_usd: string (nullable = true)
 |-- subcategorykey: integer (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- categorykey: integer (nullable = true)
 |-- category: string (nullable = true)

root
 |-- storekey: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- square_meters: integer (nullable = true)
 |-- open_

## 4. Junções (Joins)

- `sales_bronze` ↔ `products_bronze` por `productkey`  
- resultado ↔ `stores_bronze` por `storekey`  
- resultado ↔ `customers_bronze` por `customerkey` (com renomeação de colunas para evitar ambiguidade)  
- para normalização cambial, join com `exchange_bronze` usando `order_date` e `currency_code`  




In [0]:
# Join com produtos
df = (
    sales.alias("s")
         .join(products.alias("p"), col("s.productkey") == col("p.productkey"), "left")
         .select(
             "s.*",
             "p.category",
             "p.subcategory",
             "p.unit_price_usd",
             "p.unit_cost_usd"
         )
)

# Join com lojas
df = (
    df.join(stores.alias("st"), df["storekey"] == col("st.storekey"), "left")
      .select(
          df["*"],
          "st.country",
          "st.state",
          "st.square_meters"
      )
)

# Renomear colunas da dimensão clientes para evitar ambiguidade
customers = customers.select(
    col("customerkey"),
    col("gender").alias("cust_gender"),
    col("city").alias("cust_city"),
    col("state").alias("cust_state"),
    col("country").alias("cust_country"),
    col("birthday").alias("cust_birthday")
)

# Join com clientes
df = (
    df.join(customers.alias("c"), df["customerkey"] == col("c.customerkey"), "left")
      .select(
          df["*"],
          "c.cust_gender",
          "c.cust_city",
          "c.cust_state",
          "c.cust_country",
          "c.cust_birthday"
      )
)

# Padronizar formatos de data
df = df.withColumn("order_date", to_date("order_date", "yyyy-MM-dd"))
exchange = exchange.withColumn("date", to_date("date", "yyyy-MM-dd"))


## 5. Conversão de Moeda para USD

- Como vou usar `unit_price_usd` e `unit_cost_usd` em dolar, não preciso de conversão com `exchange` para obter taxa de câmbio  
- Remover símbolo de cifrão e converter `unit_price_usd` para double  
- Calcular `total_sales_usd = quantity * unit_price_usd`  
- Calcular `total_cost_usd = quantity * unit_cost_usd`  


In [0]:
# Join com exchange usando order_date + currency_code
df = (
    df.join(exchange.alias("e"),
            (df.order_date == col("e.date")) &
            (df.currency_code == col("e.currency")),
            "left")
)

# Limpar unit_price_usd: remover cifrão e vírgula
df = df.withColumn(
    "unit_price_usd",
    regexp_replace("unit_price_usd", "[$,]", "").cast(DoubleType())
)

# Limpar unit_cost_usd: remover cifrão e vírgula
df = df.withColumn(
    "unit_cost_usd",
    regexp_replace("unit_cost_usd", "[$,]", "").cast(DoubleType())
)

# Calcular total_sales_usd
df = df.withColumn(
    "total_sales_usd",
    (col("quantity") * col("unit_price_usd")).cast(DoubleType()))

# Calcular total_cost_usd
df = df.withColumn(
    "total_cost_usd",
    (col("quantity") * col("unit_cost_usd")).cast(DoubleType()))

## 6. Tratamento de Dados

- Preencher valores nulos críticos  
- Ajustar tipos de colunas onde necessário  
- Remover colunas intermediárias  




In [0]:
# Preenchimento de nulos após joins
df = df.fillna({
    "category": "unknown",
    "subcategory": "unknown",
    "exchange": 1.0,
    "cust_gender": "unknown",
    "cust_country": "unknown",
    "cust_state": "unknown",
    "cust_city": "unknown"
})

df = df.withColumn("square_meters", col("square_meters").cast(IntegerType()))

# Remover colunas temporárias
df = df.drop("currency_code", "e.date", "e.currency", "e.exchange")



## 7. Verificação Final

Visualizar esquema e sample de colunas-chave antes da escrita.



In [0]:
df.printSchema()
df.select("order_date", "unit_price_usd", "quantity", "total_sales_usd").show(10, truncate=False)

root
 |-- order_number: integer (nullable = true)
 |-- line_item: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- storekey: integer (nullable = true)
 |-- productkey: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- category: string (nullable = false)
 |-- subcategory: string (nullable = false)
 |-- unit_price_usd: double (nullable = true)
 |-- unit_cost_usd: double (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- square_meters: integer (nullable = true)
 |-- cust_gender: string (nullable = false)
 |-- cust_city: string (nullable = false)
 |-- cust_state: string (nullable = false)
 |-- cust_country: string (nullable = false)
 |-- cust_birthday: date (nullable = true)
 |-- date: date (nullable = true)
 |-- currency: string (nullable = true)
 |-- exchange: double (nullable = false)
 |-- total_sales_usd: doub

## 8. Escrita da Camada Silver

Gravar como Delta Table e registrar no catálogo `silver`.



In [0]:
# Gravar no DBFS em formato Delta
df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("mergeSchema", "true") \
  .save(silver_path)

# Criar database e tabela no catálogo SQL
spark.sql(f"CREATE DATABASE IF NOT EXISTS {silver_db}")
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS {silver_db}.sales_silver
  USING DELTA
  LOCATION '{silver_path}'
""")


Out[66]: DataFrame[]

## 9. Próximos Passos

- Realizar agregações temporais na camada Gold
- Calcular métricas como `total_sales_usd` por mês e por loja/subcategoria  
- Preparar séries temporais contínuas para modelagem preditiva (SARIMA, Prophet)
