In [0]:
# Importação de Bibliotecas
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DecimalType, IntegerType, TimestampType, DateType

print("Bibliotecas importadas com sucesso.")

# Configuração dos Schemas
catalogo = "medalhao"
bronze_db_name = "bronze"
silver_db_name = "silver"

In [0]:
# Garantir que o catálogo e o schema silver existam
spark.sql(f"USE CATALOG {catalogo}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_db_name}")

print(f"Contexto configurado para Catálogo: {catalogo}, Schema Silver: {silver_db_name}")

In [0]:
# Lendo todas as tabelas que serão transformadas
try:
    df_consumidores_bronze = spark.table(f"{bronze_db_name}.ft_consumidores")
    df_pedidos_bronze = spark.table(f"{bronze_db_name}.ft_pedidos")
    df_itens_pedidos_bronze = spark.table(f"{bronze_db_name}.ft_itens_pedidos")
    df_pagamentos_bronze = spark.table(f"{bronze_db_name}.ft_pagamentos_pedidos")
    df_avaliacoes_bronze = spark.table(f"{bronze_db_name}.ft_avaliacoes_pedidos")
    df_produtos_bronze = spark.table(f"{bronze_db_name}.ft_produtos")
    df_vendedores_bronze = spark.table(f"{bronze_db_name}.ft_vendedores")
    df_categoria_traducao_bronze = spark.table(f"{bronze_db_name}.dim_categoria_produtos_traducao")
    df_cotacao_bronze = spark.table(f"{bronze_db_name}.dm_cotacao_dolar")
    
    print("Todas as tabelas Bronze foram lidas com sucesso.")
except Exception as e:
    print(f"Erro ao ler tabelas Bronze: {e}")

In [0]:
# Olhando a leitura
df_consumidores_bronze.display()
# Verificando os detalhes do dataframe
df_consumidores_bronze.printSchema()

In [0]:
# Tratamento para consumidores
print("Transformando ft_consumidores...")
df_consumidores_silver = (
    df_consumidores_bronze
    .select(
        F.col("customer_id").alias("id_consumidor"),
        F.col("customer_zip_code_prefix").alias("prefixo_cep"),
        F.upper(F.col("customer_city")).alias("cidade"),
        F.upper(F.col("customer_state")).alias("estado")
    )
    .dropDuplicates(["id_consumidor"])
)

In [0]:
# Tratamento de pedidos
print("Transformando ft_pedidos...")
# Mapeamento do status do pedido
map_status_pedidos = {
    "delivered": "entregue",
    "invoiced": "faturado",
    "shipped": "enviado",
    "processing": "em processamento",
    "unavailable": "indisponivel",
    "canceled": "cancelado",
    "created": "criado",
    "approved": "aprovado"
}
map_status_expr = F.create_map([F.lit(k) for k in map_status_pedidos for v in (k, map_status_pedidos[k])])
df_pedidos_silver_raw = (
    df_pedidos_bronze
    .withColumn("status", map_status_expr[F.col("order_status")])
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("customer_id").alias("id_consumidor"),
        F.col("status"),
        F.col("order_purchase_timestamp").cast(TimestampType()).alias("pedido_compra_timestamp"),
        F.col("order_approved_at").cast(TimestampType()).alias("pedido_aprovado_timestamp"),
        F.col("order_delivered_carrier_date").cast(TimestampType()).alias("pedido_carregado_timestamp"),
        F.col("order_delivered_customer_date").cast(TimestampType()).alias("pedido_entregue_timestamp"),
        F.col("order_estimated_delivery_date").cast(TimestampType()).alias("pedido_estimativa_entrega_timestamp")
    )
)
# Criação das colunas derivadas (tempo e prazo)
df_pedidos_silver = df_pedidos_silver_raw.withColumn(
    "tempo_entrega_dias",
    F.datediff(F.col("pedido_entregue_timestamp"), F.col("pedido_compra_timestamp"))
).withColumn(
    "tempo_entrega_estimado_dias",
    F.datediff(F.col("pedido_estimativa_entrega_timestamp"), F.col("pedido_compra_timestamp"))
).withColumn(
    "diferenca_entrega_dias",
    F.col("tempo_entrega_estimado_dias") - F.col("tempo_entrega_dias")
).withColumn(
    "entrega_no_prazo",
    F.when(F.col("pedido_entregue_timestamp").isNull(), "Não Entregue")
     .when(F.col("diferenca_entrega_dias") >= 0, "Sim")
     .otherwise("Não")
)

In [0]:
# Tratamento de itens pedidos
print("Transformando ft_itens_pedidos...")
df_itens_pedidos_silver = (
    df_itens_pedidos_bronze
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("order_item_id").cast(IntegerType()).alias("id_item"),
        F.col("product_id").alias("id_produto"),
        F.col("seller_id").alias("id_vendedor"),
        F.col("price").cast(DecimalType(10, 2)).alias("preco_brl"),
        F.col("freight_value").cast(DecimalType(10, 2)).alias("preco_frete")
    )
)

In [0]:
# Tratamento de pagamentos
print("Transformando ft_pagamentos...")
# Mapeamento das formas de pagamento
map_payment_type = {
    "credit_card": "Cartão de Crédito",
    "boleto": "Boleto",
    "voucher": "Voucher",
    "debit_card": "Cartão de Débito"
}
map_payment_expr = F.create_map([F.lit(k) for k in map_payment_type for v in (k, map_payment_type[k])])

df_pagamentos_silver = (
    df_pagamentos_bronze
    .withColumn("forma_pagamento", 
                F.when(F.map_contains_key(map_payment_expr, F.col("payment_type")), map_payment_expr[F.col("payment_type")])
                 .otherwise("Outro")
    )
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("payment_sequential").cast(IntegerType()).alias("codigo_pagamento"),
        F.col("forma_pagamento"),
        F.col("payment_installments").cast(IntegerType()).alias("parcelas"),
        F.col("payment_value").cast(DecimalType(10, 2)).alias("valor_pagamento")
    )
)

In [0]:
# Tratamento de avaliações
print("Transformando ft_avaliacoes_pedidos...")

# Adiciona a data de hoje para verificação de data futura
data_processamento = F.current_timestamp()

# Cast inicial e adição da coluna de verificação (motivo_rejeicao)
df_avaliacoes_verificadas = (
    df_avaliacoes_bronze
    .withColumn("data_comentario_cast", F.expr("TRY_CAST(review_creation_date AS TIMESTAMP)"))
    .withColumn("motivo_rejeicao",
        F.when(F.col("review_id").isNull(), "ID da avaliação está nulo")
         .when(F.length(F.col("review_id")) <= 30, "ID da avaliação inválido (formato incorreto)")
         .when(F.col("review_creation_date").isNull(), "Data de criação está nula (origem)") # Data nula na bronze
         .when(F.col("data_comentario_cast").isNull(), "Data de criação inválida (formato inconsistente)") # Falha no cast
         .when(F.col("data_comentario_cast") > data_processamento, "Data de criação está no futuro")
         .otherwise(None) # Linha Válida
    )
)

# Relatório de registros inválidos
print("Verificando avaliações inválidas...")
df_invalidos = (
    df_avaliacoes_verificadas
    .filter(F.col("motivo_rejeicao").isNotNull())
    .groupBy("motivo_rejeicao")
    .count()
)

# Coleta e exibe o sumário de rejeições, se houver
invalidos_report = df_invalidos.collect()
if invalidos_report:
    print("ATENÇÃO: Registros de avaliação removidos pelos seguintes motivos:")
    for row in invalidos_report:
        print(f"- {row['motivo_rejeicao']}: {row['count']} registros")
else:
    print("Nenhum registro de avaliação inválido encontrado.")

# Seleção final apenas dos registros válidos
df_avaliacoes_silver = (
    df_avaliacoes_verificadas
    .filter(F.col("motivo_rejeicao").isNull()) # Mantém apenas as linhas válidas
    .select(
        F.col("review_id").alias("id_avaliacao"),
        F.col("order_id").alias("id_pedido"),
        F.col("review_score").cast(IntegerType()).alias("avaliacao"),
        F.col("review_comment_title").alias("titulo_comentario"),
        F.col("review_comment_message").alias("comentario"),
        F.col("data_comentario_cast").alias("data_comentario"), # Usa a coluna já castada
        F.col("review_answer_timestamp").cast(TimestampType()).alias("data_resposta")
    )
)

In [0]:
# Tratamento dos Produtos
print("Transformando ft_produtos...")
df_produtos_silver = (
    df_produtos_bronze
    .select(
        F.col("product_id").alias("id_produto"),
        F.col("product_category_name").alias("categoria_produto"),
        F.col("product_weight_g").cast(IntegerType()).alias("peso_produto_gramas"),
        F.col("product_length_cm").cast(IntegerType()).alias("comprimento_centimetros"),
        F.col("product_height_cm").cast(IntegerType()).alias("altura_centimetros"),
        F.col("product_width_cm").cast(IntegerType()).alias("largura_centimetros")
    )
)

In [0]:
# Tratamento dos Vendedores
print("Transformando ft_vendedores...")
df_vendedores_silver = (
    df_vendedores_bronze
    .select(
        F.col("seller_id").alias("id_vendedor"),
        F.col("seller_zip_code_prefix").alias("prefixo_cep"),
        F.upper(F.col("seller_city")).alias("cidade"),
        F.upper(F.col("seller_state")).alias("estado") 
    )
)

In [0]:
# Tratamento Categoria_Produtos_Tradução
print("Transformando dim_categoria_produtos_traducao...")
df_categoria_traducao_silver = (
    df_categoria_traducao_bronze
    .select(
        F.col("product_category_name").alias("nome_produto_pt"),
        F.col("product_category_name_english").alias("nome_produto_en")
    )
)

In [0]:
# Tratamento Cotação do Dólar
print("Transformando dm_cotacao_dolar...")
df_cotacao_silver = (
    df_cotacao_bronze
    .select(
        F.to_date(F.col("dataHoraCotacao")).alias("data"),
        F.col("cotacaoCompra").cast("float").alias("cotacao_dolar")
    )
    .dropDuplicates(["data"])
)

In [0]:
# Verificação de Integridade Referencial
print("Iniciando verificação de integridade referencial...")

# Todos os pedidos devem possuir um consumidor válido
df_pedidos_orfos = df_pedidos_silver.join(df_consumidores_silver, "id_consumidor", "left_anti")
count_pedidos_orfos = df_pedidos_orfos.count()

if count_pedidos_orfos > 0:
    print(f"ATENÇÃO: Encontrados {count_pedidos_orfos} pedidos órfãos (sem consumidor). Removendo-os.")
    # Remove os órfãos usando left_semi join (mantém apenas os que têm correspondência)
    df_pedidos_silver = df_pedidos_silver.join(df_consumidores_silver, "id_consumidor", "left_semi")
else:
    print("Verificação 1/2 (Pedidos vs Consumidores): OK. Nenhum pedido órfão encontrado.")

# Todos os itens de pedidos devem estar associados a um pedido existente
df_itens_orfos = df_itens_pedidos_silver.join(df_pedidos_silver, "id_pedido", "left_anti")
count_itens_orfos = df_itens_orfos.count()

if count_itens_orfos > 0:
    print(f"ATENÇÃO: Encontrados {count_itens_orfos} itens órfãos (sem pedido). Removendo-os.")
    # Remove os órfãos
    df_itens_pedidos_silver = df_itens_pedidos_silver.join(df_pedidos_silver, "id_pedido", "left_semi")
else:
    print("Verificação 2/2 (Itens vs Pedidos): OK. Nenhum item órfão encontrado.")

print("Verificação de integridade concluída.")

In [0]:
# Tabela Final
print("Iniciando criação da tabela final ft_pedidos_total...")

# Agregar pagamentos por pedido
df_pagamentos_agg = df_pagamentos_silver.groupBy("id_pedido").agg(
    F.sum("valor_pagamento").alias("valor_total_pago_brl")
)

# Selecionar campos dos pedidos e extrair a data
df_pedidos_com_data = df_pedidos_silver.select(
    "id_pedido", 
    "id_consumidor", 
    "status",
    F.to_date(F.col("pedido_compra_timestamp")).alias("data_pedido")
)

# Juntar pedidos e pagamentos
df_final_join = df_pedidos_com_data.join(
    df_pagamentos_agg, "id_pedido", "left"
)

# Juntar com a cotação (pode ter nulos nos fins de semana)
df_final_join = df_final_join.join(
    df_cotacao_silver, df_final_join.data_pedido == df_cotacao_silver.data, "left"
).drop(df_cotacao_silver.data)

# Preencher cotação de FDS (Sugestão Técnica)
# Cria a window spec para buscar o último valor não nulo
window_spec_cotacao = (
    Window.orderBy("data_pedido")
    .rowsBetween(Window.unboundedPreceding, 0) # 0 = Window.currentRow
)

# Preenche nulos (fins de semana) com o último valor válido (sexta-feira)
df_final_com_cotacao = df_final_join.withColumn(
    "cotacao_dolar_dia",
    F.last("cotacao_dolar", ignorenulls=True).over(window_spec_cotacao)
)

# Calcular valor em USD e selecionar colunas finais
df_pedidos_total = (
    df_final_com_cotacao
    .withColumn(
        "valor_total_pago_usd",
        (F.col("valor_total_pago_brl") / F.col("cotacao_dolar_dia")).cast("decimal(10,2)")
    )
    .fillna(0, subset=["valor_total_pago_brl", "valor_total_pago_usd"]) # Preenche nulos onde não houve pagamento
    .select(
        "id_pedido",
        "id_consumidor",
        "status",
        F.col("valor_total_pago_brl"),
        F.col("valor_total_pago_usd"),
        F.col("data_pedido")
    )
)

print("Tabela ft_pedidos_total pronta para ser salva.")

In [0]:
# Salvando na Silver
print("Iniciando salvamento das tabelas na camada Silver...")

# Dicionário de DataFrames e nomes de tabelas Silver
tabelas_silver = {
    df_consumidores_silver: "ft_consumidores",
    df_pedidos_silver: "ft_pedidos",
    df_itens_pedidos_silver: "ft_itens_pedidos",
    df_pagamentos_silver: "ft_pagamentos_pedidos",
    df_avaliacoes_silver: "ft_avaliacoes_pedidos",
    df_produtos_silver: "ft_produtos",
    df_vendedores_silver: "ft_vendedores",
    df_categoria_traducao_silver: "dim_categoria_produtos_traducao",
    df_cotacao_silver: "dm_cotacao_dolar",
    df_pedidos_total: "ft_pedidos_total"
}

# Loop para salvar todas as tabelas
for df, nome_tabela in tabelas_silver.items():
    try:
        nome_completo_tabela = f"{catalogo}.{silver_db_name}.{nome_tabela}"
        
        df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(nome_completo_tabela)
        
        print(f"Tabela Silver salva com sucesso: {nome_completo_tabela}")
    except Exception as e:
        print(f"Erro ao salvar tabela {nome_completo_tabela}: {e}")

print("---")
print(f"Processo de transformação para a camada Silver concluído. Todas as tabelas foram salvas no schema {silver_db_name}.")

In [0]:
# Olhando a leitura
df_consumidores_silver.display()
# Verificando os detalhes do dataframe
df_consumidores_silver.printSchema()