In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
display(spark.sql("SHOW TABLES IN bronze"))

In [0]:
df = spark.table("bronze.ft_consumidores")

df = df.dropDuplicates(["customer_id"])

df_silver = df.select(
    col("customer_id").alias("id_consumidor"),
    col("customer_zip_code_prefix").alias("prefixo_cep"),
    upper(col("customer_city")).alias("cidade"),
    upper(col("customer_state")).alias("estado")
)

df_silver.write.mode("overwrite").saveAsTable("silver.ft_consumidores")

display(df_silver.limit(5))

In [0]:
df = spark.table("bronze.ft_pedidos")

status_map = {
    "delivered": "entregue",
    "invoiced": "faturado",
    "shipped": "enviado",
    "processing": "em processamento",
    "unavailable": "indisponível",
    "canceled": "cancelado",
    "created": "criado",
    "approved": "aprovado"
}

status_expr = when(col("order_status") == "delivered", "entregue")
for eng, pt in list(status_map.items())[1:]:
    status_expr = status_expr.when(col("order_status") == eng, pt)
status_expr = status_expr.otherwise(col("order_status"))

df_silver = df.select(
    col("order_id").alias("id_pedido"),
    col("customer_id").alias("id_consumidor"),
    status_expr.alias("status"),
    col("order_purchase_timestamp").cast("timestamp").alias("pedido_compra_timestamp"),
    col("order_approved_at").cast("timestamp").alias("pedido_aprovado_timestamp"),
    col("order_delivered_carrier_date").cast("timestamp").alias("pedido_carregado_timestamp"),
    col("order_delivered_customer_date").cast("timestamp").alias("pedido_entregue_timestamp"),
    col("order_estimated_delivery_date").cast("timestamp").alias("pedido_estimativa_entrega_timestamp")
)

df_silver = df_silver.withColumn(
    "tempo_entrega_dias",
    datediff(col("pedido_entregue_timestamp"), col("pedido_compra_timestamp"))
)

df_silver = df_silver.withColumn(
    "tempo_entrega_estimado_dias",
    datediff(col("pedido_estimativa_entrega_timestamp"), col("pedido_compra_timestamp"))
)

df_silver = df_silver.withColumn(
    "diferenca_entrega_dias",
    col("tempo_entrega_dias") - col("tempo_entrega_estimado_dias")
)

df_silver = df_silver.withColumn(
    "entrega_no_prazo",
    when(col("pedido_entregue_timestamp").isNull(), "Não Entregue")
    .when(col("diferenca_entrega_dias") <= 0, "Sim")
    .otherwise("Não")
)

df_silver.write.mode("overwrite").saveAsTable("silver.ft_pedidos")

display(df_silver.limit(5))

In [0]:
df = spark.table("bronze.ft_itens_pedidos")

df_silver = df.select(
    col("order_id").alias("id_pedido"),
    col("order_item_id").alias("id_item"),
    col("product_id").alias("id_produto"),
    col("seller_id").alias("id_vendedor"),
    col("price").cast("decimal(12,2)").alias("preco_BRL"),
    col("freight_value").cast("decimal(12,2)").alias("preco_frete")
)

df_silver.write.mode("overwrite").saveAsTable("silver.ft_itens_pedidos")

display(df_silver.limit(5))

In [0]:
df = spark.table("bronze.ft_pagamentos_pedidos")

df_silver = df.select(
    col("order_id").alias("id_pedido"),
    col("payment_sequential").alias("codigo_pagamento"),
    when(col("payment_type") == "credit_card", "Cartão de Crédito")
    .when(col("payment_type") == "boleto", "Boleto")
    .when(col("payment_type") == "voucher", "Voucher")
    .when(col("payment_type") == "debit_card", "Cartão de Débito")
    .otherwise("Outro").alias("forma_pagamento"),
    col("payment_installments").cast("int").alias("parcelas"),
    col("payment_value").cast("decimal(12,2)").alias("valor_pagamento")
)

df_silver.write.mode("overwrite").saveAsTable("silver.ft_pagamentos_pedidos")

display(df_silver.limit(5))

In [0]:
from pyspark.sql.functions import col, expr, current_date

df = spark.table("bronze.ft_avaliacoes_pedidos")

df_silver = df.select(
    col("review_id").alias("id_avaliacao"),
    col("order_id").alias("id_pedido"),
    expr("try_cast(review_score as int)").alias("avaliacao"),
    col("review_comment_title").alias("titulo_comentario"),
    col("review_comment_message").alias("comentario"),
    expr("try_cast(review_creation_date as date)").alias("data_comentario"),
    expr("try_cast(review_answer_timestamp as timestamp)").alias("data_resposta")
).filter(
    col("id_pedido").isNotNull() &
    col("data_comentario").isNotNull() &
    (col("data_comentario") <= current_date())
)

df_silver.write.mode("overwrite").saveAsTable("silver.ft_avaliacoes_pedidos")

display(df_silver.limit(5))

In [0]:
df = spark.table("bronze.ft_produtos")

df_silver = df.select(
    col("product_id").alias("id_produto"),
    col("product_category_name").alias("categoria_produto"),
    col("product_weight_g").cast("int").alias("peso_produto_gramas"),
    col("product_length_cm").cast("int").alias("comprimento_centimetros"),
    col("product_height_cm").cast("int").alias("altura_centimetros"),
    col("product_width_cm").cast("int").alias("largura_centimetros")
)

df_silver.write.mode("overwrite").saveAsTable("silver.ft_produtos")

display(df_silver.limit(5))

In [0]:
df = spark.table("bronze.ft_vendedores")

df_silver = df.select(
    col("seller_id").alias("id_vendedor"),
    col("seller_zip_code_prefix").alias("prefixo_cep"),
    upper(col("seller_city")).alias("cidade"), 
    upper(col("seller_state")).alias("estado")
)

df_silver.write.mode("overwrite").saveAsTable("silver.ft_vendedores")

display(df_silver.limit(5))

In [0]:
df = spark.table("bronze.dm_categoria_produtos_traducao")

df_silver = df.select(
    col("product_category_name").alias("nome_produto_pt"),
    col("product_category_name_english").alias("nome_produto_en")
)

df_silver.write.mode("overwrite").saveAsTable("silver.dm_categoria_produtos_traducao")

display(df_silver.limit(5))

In [0]:
from pyspark.sql.window import Window

df = spark.table("bronze.dm_cotacao_dolar")

df = df.withColumn("data", to_date(col("dataHoraCotacao")))

windowSpec = Window.orderBy("data").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_silver = df.select(
    col("cotacaoCompra").cast("decimal(10,4)").alias("cotacao_dolar"),
    col("data")
).withColumn(
    "cotacao_dolar",
    last(col("cotacao_dolar"), ignorenulls=True).over(windowSpec)
)

df_silver = df_silver.dropDuplicates(["data"])

df_silver.write.mode("overwrite").saveAsTable("silver.dm_cotacao_dolar")

display(df_silver.limit(5))

In [0]:
df_pedidos = spark.table("silver.ft_pedidos")
df_consumidores = spark.table("silver.ft_consumidores")

pedidos_orfaos = df_pedidos.join(
    df_consumidores,
    df_pedidos.id_consumidor == df_consumidores.id_consumidor,
    "left_anti"
)

qtd_orfaos_pedidos = pedidos_orfaos.count()
print(f"Quantidade de Pedidos orfaos: {qtd_orfaos_pedidos}")

if qtd_orfaos_pedidos > 0:
    ids_orfaos = [row.id_pedido for row in pedidos_orfaos.select("id_pedido").collect()]
    df_pedidos_validos = df_pedidos.filter(~col("id_pedido").isin(ids_orfaos))
    df_pedidos_validos.write.mode("overwrite").saveAsTable("silver.ft_pedidos")

In [0]:
df_itens = spark.table("silver.ft_itens_pedidos")
df_pedidos = spark.table("silver.ft_pedidos")

itens_orfaos = df_itens.join(
    df_pedidos,
    df_itens.id_pedido == df_pedidos.id_pedido,
    "left_anti"
)

qtd_orfaos_itens = itens_orfaos.count()
print(f"Quantidade de Itens orfaos: {qtd_orfaos_itens}")

if qtd_orfaos_itens > 0:
    df_itens_validos = df_itens.join(
        df_pedidos.select("id_pedido"),
        "id_pedido",
        "inner"
    )
    df_itens_validos.write.mode("overwrite").saveAsTable("silver.ft_itens_pedidos")

In [0]:
df_pedidos = spark.table("silver.ft_pedidos")
df_consumidores = spark.table("silver.ft_consumidores")
df_pagamentos = spark.table("silver.ft_pagamentos_pedidos")
df_cotacao = spark.table("silver.dm_cotacao_dolar")

df_pagamentos_total = df_pagamentos.groupBy("id_pedido").agg(
    sum("valor_pagamento").alias("valor_total_pago_brl")
)

df_final = df_pedidos.join(
    df_consumidores,
    "id_consumidor",
    "inner"
).join(
    df_pagamentos_total,
    "id_pedido",
    "inner"
)

df_final = df_final.withColumn(
    "data_pedido", 
    to_date(col("pedido_compra_timestamp"))
).join(
    df_cotacao,
    col('data_pedido') == df_cotacao.data,
    "left"
)

df_final = df_final.withColumn(
    "valor_total_pago_usd",
    when(col("cotacao_dolar").isNotNull(), 
         col("valor_total_pago_brl") / col("cotacao_dolar")
    ).otherwise(None)
)

df_pedido_total = df_final.select(
    col("data_pedido"),
    col("id_pedido"),
    col("id_consumidor"),
    col("status"),
    col("valor_total_pago_brl").cast("decimal(12,2)"),
    col("valor_total_pago_usd").cast("decimal(12,2)")
)

df_pedido_total.write.mode("overwrite").saveAsTable("silver.ft_pedido_total")

display(df_pedido_total.limit(10))

In [0]:
display(spark.sql("SHOW TABLES IN silver"))