In [0]:
from pyspark.sql import functions as F, Window
from pyspark.sql.functions import to_date

# garante que os bancos existem
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
spark.sql("CREATE DATABASE IF NOT EXISTS silver")
spark.catalog.setCurrentDatabase("silver")

# ft_consumidores
df = spark.table("bronze.ft_consumidores").dropDuplicates(["customer_id"])
df = (df
    .filter(F.col("customer_id").isNotNull())
    .withColumnRenamed("customer_id", "id_consumidor")
    .withColumnRenamed("customer_zip_code_prefix", "prefixo_cep")
    .withColumnRenamed("customer_city", "cidade")
    .withColumnRenamed("customer_state", "estado")
    .withColumn("cidade", F.upper("cidade"))
    .withColumn("estado", F.upper("estado")))
df.write.mode("overwrite").saveAsTable("silver.ft_consumidores")

# ft_pedidos
status_map = {
    "delivered": "entregue", "invoiced": "faturado", "shipped": "enviado",
    "processing": "em processamento", "unavailable": "indisponível",
    "canceled": "cancelado", "created": "criado", "approved": "aprovado"
}
map_expr = F.create_map(*[F.lit(i) for kv in status_map.items() for i in kv])

df = spark.table("bronze.ft_pedidos")
df = (df
    .filter(F.col("order_purchase_timestamp").isNotNull())
    .filter(F.col("order_id").isNotNull())
    .withColumnRenamed("order_id","id_pedido")
    .withColumnRenamed("customer_id","id_consumidor")
    .withColumnRenamed("order_purchase_timestamp","pedido_compra_timestamp")
    .withColumnRenamed("order_approved_at","pedido_aprovado_timestamp")
    .withColumnRenamed("order_delivered_carrier_date","pedido_carregado_timestamp")
    .withColumnRenamed("order_delivered_customer_date","pedido_entregue_timestamp")
    .withColumnRenamed("order_estimated_delivery_date","pedido_estimativa_entrega_timestamp")
    .withColumn("status", F.coalesce(map_expr[F.col("order_status")], F.lit("desconhecido")))
    .withColumn("pedido_compra_timestamp", F.try_to_timestamp("pedido_compra_timestamp"))
    .withColumn("pedido_aprovado_timestamp", F.try_to_timestamp("pedido_aprovado_timestamp"))
    .withColumn("pedido_carregado_timestamp", F.try_to_timestamp("pedido_carregado_timestamp"))
    .withColumn("pedido_entregue_timestamp", F.try_to_timestamp("pedido_entregue_timestamp"))
    .withColumn("pedido_estimativa_entrega_timestamp", F.try_to_timestamp("pedido_estimativa_entrega_timestamp"))
    .withColumn("tempo_entrega_dias", F.datediff("pedido_entregue_timestamp","pedido_compra_timestamp"))
    .withColumn("tempo_entrega_estimado_dias", F.datediff("pedido_estimativa_entrega_timestamp","pedido_compra_timestamp"))
    .withColumn("diferenca_entrega_dias", F.col("tempo_entrega_estimado_dias") - F.col("tempo_entrega_dias"))
    .withColumn("entrega_no_prazo",
        F.when(F.col("pedido_entregue_timestamp").isNull(),"Não Entregue")
         .when(F.col("diferenca_entrega_dias") >= 0,"Sim")
         .otherwise("Não"))
)
df = df.filter(F.col("id_consumidor").isNotNull())
df.write.mode("overwrite").saveAsTable("silver.ft_pedidos")

# ft_itens_pedidos
df = spark.table("bronze.ft_itens_pedidos")
df = (df
    .filter(F.col("order_id").isNotNull())
    .withColumnRenamed("order_id","id_pedido")
    .withColumnRenamed("order_item_id","id_item")
    .withColumnRenamed("product_id","id_produto")
    .withColumnRenamed("seller_id","id_vendedor")
    .withColumnRenamed("price","preco_BRL")
    .withColumnRenamed("freight_value","preco_frete")
    .withColumn("preco_BRL", F.col("preco_BRL").cast("decimal(12,2)"))
    .withColumn("preco_frete", F.col("preco_frete").cast("decimal(12,2)")))
df = df.dropna(subset=["id_pedido","id_item","id_produto"])
df.write.mode("overwrite").saveAsTable("silver.ft_itens_pedidos")

# ft_pagamentos
map_pag = {
    "credit_card": "Cartão de Crédito",
    "boleto": "Boleto",
    "voucher": "Voucher",
    "debit_card": "Cartão de Débito"
}
map_expr = F.create_map(*[F.lit(i) for kv in map_pag.items() for i in kv])

df = spark.table("bronze.ft_pagamentos_pedidos")
df = (df
    .filter(F.col("order_id").isNotNull())
    .withColumnRenamed("order_id","id_pedido")
    .withColumnRenamed("payment_sequential","codigo_pagamento")
    .withColumnRenamed("payment_type","forma_pagamento")
    .withColumnRenamed("payment_installments","parcelas")
    .withColumnRenamed("payment_value","valor_pagamento")
    .withColumn("forma_pagamento", F.coalesce(map_expr[F.col("forma_pagamento")], F.lit("Outro")))
    .withColumn("valor_pagamento", F.col("valor_pagamento").cast("decimal(12,2)")))
df.write.mode("overwrite").saveAsTable("silver.ft_pagamentos")

# ft_avaliacoes_pedidos
df = spark.table("bronze.ft_avaliacoes_pedidos")
ped = spark.table("bronze.ft_pedidos").select("order_id")
df = (df
    .filter(F.col("review_id").isNotNull())
    .withColumn("review_creation_date", F.try_to_timestamp("review_creation_date"))
    .withColumn("review_answer_timestamp", F.try_to_timestamp("review_answer_timestamp"))
    .filter(F.col("review_creation_date").isNotNull())
    .join(ped, "order_id", "inner")
    .withColumnRenamed("review_id","id_avaliacao")
    .withColumnRenamed("order_id","id_pedido")
    .withColumnRenamed("review_score","avaliacao")
    .withColumnRenamed("review_comment_title","titulo_comentario")
    .withColumnRenamed("review_comment_message","comentario")
    .withColumnRenamed("review_creation_date","data_comentario")
    .withColumnRenamed("review_answer_timestamp","data_resposta"))
df.write.mode("overwrite").saveAsTable("silver.ft_avaliacoes_pedidos")

# ft_produtos
df = spark.table("bronze.ft_produtos")
df = (df
    .filter(F.col("product_id").isNotNull())
    .withColumnRenamed("product_id","id_produto")
    .withColumnRenamed("product_category_name","categoria_produto")
    .withColumnRenamed("product_weight_g","peso_produto_gramas")
    .withColumnRenamed("product_length_cm","comprimento_centimetros")
    .withColumnRenamed("product_height_cm","altura_centimetros")
    .withColumnRenamed("product_width_cm","largura_centimetros"))
df.write.mode("overwrite").saveAsTable("silver.ft_produtos")

# ft_vendedores
df = spark.table("bronze.ft_vendedores")
df = (df
    .filter(F.col("seller_id").isNotNull())
    .withColumnRenamed("seller_id","id_vendedor")
    .withColumnRenamed("seller_zip_code_prefix","prefixo_cep")
    .withColumnRenamed("seller_city","cidade")
    .withColumnRenamed("seller_state","estado")
    .withColumn("cidade", F.upper("cidade"))
    .withColumn("estado", F.upper("estado")))
df.write.mode("overwrite").saveAsTable("silver.ft_vendedores")

# dm_categoria_produtos_traducao
df = spark.table("bronze.dm_categoria_produtos_traducao")
df = (df
    .filter(F.col("product_category_name").isNotNull())
    .withColumnRenamed("product_category_name","nome_produto_pt")
    .withColumnRenamed("product_category_name_english","nome_produto_en"))
df.write.mode("overwrite").saveAsTable("silver.dm_categoria_produtos_traducao")

# dm_cotacao_dolar
df = spark.table("bronze.dm_cotacao_dolar").select("dataHoraCotacao","cotacaoCompra")
df = (df
    .withColumn("data", F.to_date("dataHoraCotacao"))
    .withColumn("cotacao_dolar", F.col("cotacaoCompra").cast("decimal(12,6)"))
    .select("data","cotacao_dolar").distinct().orderBy("data"))
min_date, max_date = df.agg(F.min("data"), F.max("data")).first()
cal = (spark.createDataFrame([(min_date, max_date)], ["start","end"])
    .select(F.explode(F.expr("sequence(start, end, interval 1 day)")).alias("data")))
df = (cal.join(df, "data", "left")
    .withColumn("cotacao_dolar", F.last("cotacao_dolar", True).over(
        Window.orderBy("data").rowsBetween(Window.unboundedPreceding, 0))))
df.write.mode("overwrite").saveAsTable("silver.dm_cotacao_dolar")

# integridade referencial
ped = spark.table("silver.ft_pedidos").select("id_pedido","id_consumidor")
cons = spark.table("silver.ft_consumidores").select("id_consumidor")
itens = spark.table("silver.ft_itens_pedidos").select("id_pedido")

ped = ped.join(cons, "id_consumidor", "inner")
itens = itens.join(ped.select("id_pedido"), "id_pedido", "inner")

ped.write.mode("overwrite").saveAsTable("silver.ft_pedidos")
itens.write.mode("overwrite").saveAsTable("silver.ft_itens_pedidos")

# ft_pedido_total
ped = spark.table("silver.ft_pedidos").alias("ped").select(
    "id_pedido", "id_consumidor", "status", "pedido_compra_timestamp"
)
pag = (spark.table("silver.ft_pagamentos")
    .groupBy("id_pedido")
    .agg(F.sum("valor_pagamento").alias("valor_total_pago_brl"))
)
cot = spark.table("silver.dm_cotacao_dolar").alias("cot")

final = (ped
    .withColumn("data_pedido", F.to_date(F.col("pedido_compra_timestamp")))
    .join(pag, "id_pedido", "left")
    .join(cot, F.col("ped.pedido_compra_timestamp") >= F.col("cot.data"), "left")
    .withColumn("cotacao_dia",
        F.last("cot.cotacao_dolar", True).over(
            Window.partitionBy("ped.id_pedido").orderBy("cot.data").rowsBetween(Window.unboundedPreceding, 0)
        )
    )
    .withColumn(
        "valor_total_pago_usd",
        F.when(
            F.col("valor_total_pago_brl").isNotNull() & F.col("cotacao_dia").isNotNull(),
            F.round(F.col("valor_total_pago_brl") / F.col("cotacao_dia"), 2)
        ).otherwise(None)
    )
    .select(
        F.col("data_pedido"),
        F.col("ped.id_pedido").alias("id_pedido"),
        F.col("ped.id_consumidor").alias("id_consumidor"),
        F.col("ped.status").alias("status"),
        F.col("valor_total_pago_brl"),
        F.col("valor_total_pago_usd")
    )
)
final.write.mode("overwrite").saveAsTable("silver.ft_pedido_total")

