In [0]:
import pyspark.sql.functions as F

catalogo = "medalhao"
bronze_db_name = "bronze"
silver_db_name = "silver"

# Consumidores


Data Profiling

In [0]:
df_consumidores = spark.table(f"{catalogo}.{bronze_db_name}.ft_consumidores")
display(df_consumidores.limit(10))
df_consumidores.printSchema()
print(f"Total: {df_consumidores.count()}")

In [0]:
# Contagem de valores nulos
nulos_consumidores = df_consumidores.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_consumidores.columns
])
print(" Valores nulos por coluna:")
display(nulos_consumidores)

In [0]:
# Duplicidades
duplicados_consumidores = df_consumidores.groupBy("customer_id").count().filter("count > 1").count()
print(f"Registros duplicados (customer_id): {duplicados_consumidores}")


In [0]:
# Distribui√ß√£o por estado
print("üåé Distribui√ß√£o por estado:")
display(df_consumidores.groupBy("customer_state").count().orderBy("count", ascending=False))

# Quantidade de cidades
print("üåé Quantidade de cidades:")
display(df_consumidores.groupBy("customer_city").count().filter("count > 1").count())

# Distribui√ß√£o por cidade (top 10)
print("üèôÔ∏è Top 10 cidades:")
display(df_consumidores.groupBy("customer_city").count().orderBy("count", ascending=False).limit(10))

**Camada Silver**

In [0]:
df_consumidores_silver = (
    df_consumidores
    .select(
        F.col("customer_id").alias("id_consumidor"),
        # F.col("customer_unique_id").alias("id_consumidor_unico")   
        F.lpad(F.col("customer_zip_code_prefix").cast("string"), 5, "0").alias("prefixo_cep"), # Cast para garantia dos 5 digitos
        F.upper(F.col("customer_city")).alias("cidade"),
        F.upper(F.col("customer_state")).alias("estado")
    )
    .filter(F.col("id_consumidor").isNotNull()) # N√£o possui nulos mas por seguran√ßa
    .dropDuplicates(["id_consumidor"]) # N√£o existem mas por seguran√ßa
    .withColumn("data_ingestao", F.current_timestamp())
)


df_consumidores_silver.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.ft_consumidores")

print("‚úÖ Tabela silver.ft_consumidores criada com sucesso!")
display(df_consumidores_silver.limit(5))
df_consumidores_silver.printSchema()

# Pedidos

In [0]:
df_pedidos = spark.table(f"{catalogo}.{bronze_db_name}.ft_pedidos")
display(df_pedidos.limit(10))
df_pedidos.printSchema()
print(f"Total: {df_pedidos.count()}")

In [0]:
# Contagem de valores nulos
nulos_pedidos = df_pedidos.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_pedidos.columns
])
print(" Valores nulos por coluna:")
display(nulos_pedidos)

In [0]:
# Agrupamento por order_status
print("üöö Distribui√ß√£o por status:")
display(df_pedidos.groupBy("order_status").count().orderBy("count", ascending=False))

In [0]:
# Select de todos com order_approved_at nulo
display(df_pedidos.filter(F.col("order_approved_at").isNull()).select(*df_pedidos.columns))

In [0]:
df_pedidos.filter(F.col("order_delivered_carrier_date").isNull()) \
    .groupBy("order_status") \
    .count() \
    .orderBy(F.desc("count")) \
    .show()




In [0]:
display(
    df_pedidos
        .filter(
            F.col("order_approved_at").isNull() &
            F.col("order_delivered_carrier_date").isNull() &
            F.col("order_delivered_customer_date").isNull()
        )
        .select(*df_pedidos.columns).limit(10)
)

In [0]:


df_pedidos.filter(
    F.col("order_approved_at").isNull() &
    F.col("order_delivered_carrier_date").isNull() &
    F.col("order_delivered_customer_date").isNull()
).groupBy("order_status") \
 .count() \
 .orderBy(F.desc("count")) \
 .show()


In [0]:
# Select de todos com order_delivered_carrier_date nulo
display(df_pedidos.filter(F.col("order_delivered_carrier_date").isNull()).select(*df_pedidos.columns))

In [0]:
print("üìä Pedidos que nunca foram aprovados, despachados ou entregues:")
df_pedidos.filter(
    F.col("order_approved_at").isNull() &
    F.col("order_delivered_carrier_date").isNull() &
    F.col("order_delivered_customer_date").isNull()
).groupBy("order_status") \
 .count() \
 .orderBy(F.desc("count")) \
 .show()

print("\nüìä Pedidos sem data de aprova√ß√£o (order_approved_at nulo):")
df_pedidos.filter(
    F.col("order_approved_at").isNull()
).groupBy("order_status") \
 .count() \
 .orderBy(F.desc("count")) \
 .show()

print("\nüìä Pedidos sem aprova√ß√£o e sem despacho (approved_at e carrier_date nulos):")
df_pedidos.filter(
    F.col("order_approved_at").isNull() &
    F.col("order_delivered_carrier_date").isNull()
).groupBy("order_status") \
 .count() \
 .orderBy(F.desc("count")) \
 .show()


print("\nüìä Pedidos sem aprova√ß√£o e sem entrega (approved_at e customer_date nulos):")
df_pedidos.filter(
    F.col("order_approved_at").isNull() &
    F.col("order_delivered_customer_date").isNull()
).groupBy("order_status") \
 .count() \
 .orderBy(F.desc("count")) \
 .show()

print("\nüìä Pedidos nunca despachados nem entregues (carrier_date e customer_date nulos):")
df_pedidos.filter(
    F.col("order_delivered_carrier_date").isNull() &
    F.col("order_delivered_customer_date").isNull()
).groupBy("order_status") \
 .count() \
 .orderBy(F.desc("count")) \
 .show()


print("\nüìä Pedidos sem data de envio (order_delivered_carrier_date nulo):")
df_pedidos.filter(
    F.col("order_delivered_carrier_date").isNull()
).groupBy("order_status") \
 .count() \
 .orderBy(F.desc("count")) \
 .show()

print("\nüìä Pedidos sem data de entrega ao cliente (order_delivered_customer_date nulo):")
df_pedidos.filter(
    F.col("order_delivered_customer_date").isNull()
).groupBy("order_status") \
 .count() \
 .orderBy(F.desc("count")) \
 .show()


In [0]:
# Duplicados
Duplicados_pedidos = (
    df_pedidos
    .groupBy("order_id")
    .count()
    .filter(F.col("count") > 1)
    .count()
)
print(f"Registros duplicados (order_id): {Duplicados_pedidos}")

Camada Silver

In [0]:
df_pedidos_silver = (
    df_pedidos
    .select(
        F.col("order_id").alias("id_pedido"),
        F.col("customer_id").alias("id_consumidor"),
        F.col("order_status").alias("status_original"),
        F.col("order_purchase_timestamp").alias("pedido_compra_timestamp"),
        F.col("order_approved_at").alias("pedido_aprovado_timestamp"),
        F.col("order_delivered_carrier_date").alias("pedido_carregado_timestamp"),
        F.col("order_delivered_customer_date").alias("pedido_entregue_timestamp"),
        F.to_date("order_estimated_delivery_date").alias("pedido_estimativa_entrega_data"),
        F.col("ingestion_timestamp").alias("data_ingestao")
    )
    .withColumn(
        "status_original",
        F.when(F.col("status_original") == "delivered", "entregue")
         .when(F.col("status_original") == "invoiced", "faturado")
         .when(F.col("status_original") == "shipped", "enviado")
         .when(F.col("status_original") == "processing", "em processamento")
         .when(F.col("status_original") == "unavailable", "indispon√≠vel")
         .when(F.col("status_original") == "canceled", "cancelado")
         .when(F.col("status_original") == "created", "criado")
         .when(F.col("status_original") == "approved", "aprovado")
         .otherwise(F.col("status_original"))
    )
    .withColumn("tempo_entrega_dias",
                F.datediff("pedido_entregue_timestamp", "pedido_compra_timestamp"))
    .withColumn(
    "tempo_entrega_estimado_dias",
    F.datediff("pedido_estimativa_entrega_data", "pedido_compra_timestamp")
    )
    .withColumn("diferenca_entrega_dias",
                F.col("tempo_entrega_dias") - F.col("tempo_entrega_estimado_dias"))
    .withColumn(
        "entrega_no_prazo",
        F.when(F.col("pedido_entregue_timestamp").isNull(), "N√£o Entregue")
         .when(F.col("diferenca_entrega_dias") <= 0, "Sim")
         .otherwise("N√£o")
    )
    .withColumn(
        "status_integridade",
        F.when(
            (F.col("status_original").isin("canceled", "created")) &
            F.col("pedido_entregue_timestamp").isNotNull(),
            "ERRO_CANCELADO_COM_DATA_ENTREGA"
        )
        .when(
            (F.col("status_original") == "delivered") &
            (
                F.col("pedido_aprovado_timestamp").isNull() |
                F.col("pedido_carregado_timestamp").isNull()
            ),
            "ERRO_ENTREGA_SEM_APROVACAO"
        )
        .when(
            (F.col("status_original").isin("approved", "invoiced")) &
            F.col("pedido_carregado_timestamp").isNull(),
            "PENDENTE_ENVIO"
        )
        .when(
            (F.col("status_original") == "shipped") &
            F.col("pedido_entregue_timestamp").isNull(),
            "EM_TRANSITO"
        )
        .when(
            (F.col("status_original") == "delivered") &
            (F.col("diferenca_entrega_dias") > 0),
            "ATRASADO"
        )
        .when(
            (F.col("status_original") == "unavailable"),
            "INDISPONIVEL"
        )
        .otherwise("OK")
    )
)

df_pedidos_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{catalogo}.{silver_db_name}.ft_pedidos")

print("‚úÖ Tabela silver.ft_pedidos criada com sucesso!")


In [0]:
df_pedidos_silver_display = spark.table(f"{catalogo}.{silver_db_name}.ft_pedidos")

display(df_pedidos_silver_display)
df_pedidos_silver_display.printSchema()

# Itens Pedidos

Data profiling

In [0]:
df_itens_pedidos = spark.table(f"{catalogo}.{bronze_db_name}.ft_itens_pedidos")

display(df_itens_pedidos.limit(10))
df_itens_pedidos.printSchema()

In [0]:
nulos_itens_pedidos = df_itens_pedidos.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_itens_pedidos.columns
])
print("Valores nulos por coluna:")
display(nulos_itens_pedidos)

In [0]:
# Verifica duplicados por order_id + order_item_id (chave natural)
duplicados_chave = (
    df_itens_pedidos.groupBy("order_id", "order_item_id")
            .count()
            .filter("count > 1")
            .count()
)
print(f"Duplicados (order_id + order_item_id): {duplicados_chave}")


Camada silver

In [0]:
from pyspark.sql import functions as F

df_itens_pedidos = spark.table(f"{catalogo}.{bronze_db_name}.ft_itens_pedidos")

df_itens_pedidos_silver = (
    df_itens_pedidos.select(
        F.col("order_id").alias("id_pedido"),
        F.col("order_item_id").alias("id_item"),
        F.col("product_id").alias("id_produto"),
        F.col("seller_id").alias("id_vendedor"),
        F.col("price").cast("decimal(12,2)").alias("preco_brl"),    
        F.col("freight_value").cast("decimal(12,2)").alias("preco_frete"), 
        F.current_timestamp().alias("ingestion_timestamp")
    )
)


df_itens_pedidos_silver.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalogo}.silver.ft_itens_pedidos"
)

print("‚úÖ Tabela 'silver.ft_itens_pedidos' criada com sucesso")
display(df_itens_pedidos_silver.limit(5))
df_itens_pedidos_silver.printSchema()



# Pagamentos

In [0]:
df_pagamentos = spark.table(f"{catalogo}.{bronze_db_name}.ft_pagamentos_pedidos")
display(df_pagamentos.limit(10))
df_pagamentos.printSchema()

In [0]:
# Linhas onde h√° parcelas > 1 e o pagamento n√£o √© credit_card
parcelas_incorretas = (
    df_pagamentos
    .filter(
        (F.col("payment_installments") > 1) &
        (F.lower(F.col("payment_type")) != "credit_card")
    )
)

# Quantidade total de casos incorretos
qtd_incorretos = parcelas_incorretas.count()
print(f"üö® Registros com parcelas > 1 fora do cart√£o de cr√©dito: {qtd_incorretos}")

# Exibir amostra se existir algum
if qtd_incorretos > 0:
    display(parcelas_incorretas.limit(10))
else:
    print("‚úÖ Nenhum caso incorreto encontrado! Todas as parcelas > 1 s√£o no cart√£o de cr√©dito.")

In [0]:
# Contagem de valores nulos
nulos_pagamentos = df_pagamentos.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_pagamentos.columns
])
print(" Valores nulos por coluna:")
display(nulos_pagamentos)


In [0]:
#Duplicidades
duplicados_pagamentos = df_pagamentos.groupBy("order_id").count().filter("count > 1").count()
print(f"Registros duplicados (order_id):{duplicados_pagamentos}")

In [0]:
df_dup = (
    df_pagamentos.groupBy("order_id")
        .agg(
            F.count("*").alias("qtd_registros"),
            F.countDistinct("payment_sequential").alias("qtd_seq_unicos")
        )
        .filter("qtd_registros > 1")
)

# Se as quantidades forem iguais ‚Üí duplicatas leg√≠timas (parcelas diferentes)
display(df_dup.limit(100))


In [0]:
# Coleta os primeiros 10 order_id duplicados
duplicados_ids = [
    row["order_id"]
    for row in (
        df_pagamentos.groupBy("order_id")
        .count()
        .filter("count > 1")
        .limit(10)
        .select("order_id")
        .collect()
    )
]

# Exibe as linhas completas desses pedidos duplicados
df_pagamentos.filter(F.col("order_id").isin(duplicados_ids)) \
             .orderBy("order_id", "payment_sequential") \
             .display()


In [0]:
df_check = (
    df_pagamentos
    .groupBy("order_id")
    .agg(
        F.count("*").alias("qtd_registros"),
        F.countDistinct("payment_sequential").alias("qtd_seq_unicos"),
        F.collect_set("payment_type").alias("tipos_pagamento")
    )
    .filter("qtd_registros > 1")  # apenas pedidos com mais de 1 pagamento
    .orderBy("qtd_registros", ascending=False)
)

display(df_check.limit(10))

Camada Silver

In [0]:
map_forma_pagamento = F.create_map(
    F.lit("credit_card"), F.lit("Cart√£o de Cr√©dito"),
    F.lit("boleto"),      F.lit("Boleto"),
    F.lit("voucher"),     F.lit("Voucher"),
    F.lit("debit_card"),  F.lit("Cart√£o de D√©bito")
)

In [0]:
df_pagamentos_silver = (
    df_pagamentos.select(
        F.col("order_id").alias("id_pedido"),
        F.col("payment_sequential").cast("int").alias("codigo_pagamento"),
        F.when(
            map_forma_pagamento[F.col("payment_type")].isNotNull(),
            map_forma_pagamento[F.col("payment_type")]
        ).otherwise("Outro").alias("forma_pagamento"),
        F.col("payment_installments").cast("int").alias("parcelas"),
        F.col("payment_value").cast("decimal(12,2)").alias("valor_pagamento"),  
        F.current_timestamp().alias("ingestion_timestamp")  # novo timestamp da Silver
    )
)

df_pagamentos_silver.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalogo}.silver.ft_pagamentos_pedidos"
)

print("‚úÖ Tabela 'silver.ft_pagamentos_pedidos' criada com sucesso!")
display(df_pagamentos_silver.limit(10))

# Avalia√ß√µes de pedidos

Data Profiling

In [0]:
df_avaliacoes = spark.table(f"{catalogo}.{bronze_db_name}.ft_avaliacoes_pedidos")

display(df_avaliacoes.limit(100))
df_avaliacoes.printSchema()

In [0]:
nulos_avaliacoes = df_avaliacoes.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_avaliacoes.columns
])
print("Valores nulos por coluna:")
display(nulos_avaliacoes)

In [0]:


# Filtra registros com review_id nulo
df_reviewid_nulo = df_avaliacoes.filter(F.col("review_id").isNull())

# Contagem e amostra
print(f"Total de registros com review_id nulo: {df_reviewid_nulo.count()}")
display(df_reviewid_nulo.limit(10))


In [0]:
duplicados_review = (
    df_avaliacoes.groupBy("review_id")
    .count()
    .filter("count > 1")
    .count()
)
print(f"Duplicados (review_id): {duplicados_review}")

In [0]:
# Encontra review_ids duplicados
dup_ids = (
    df_avaliacoes.groupBy("review_id")
    .count()
    .filter("count > 1")
    .select("review_id")
)

# Junta e exibe exemplos completos
df_dups = df_avaliacoes.join(dup_ids, "review_id", "inner") \
                       .orderBy("review_id", "review_answer_timestamp")

display(df_dups.limit(100))

In [0]:
# üßπ Limpeza de review_id: remove nulos, vazios e registros fora do padr√£o alfanum√©rico; identifica duplicidades ap√≥s a filtragem

df_base = df_avaliacoes.filter(
    (F.col("review_id").isNotNull()) &
    (F.col("review_id").rlike("^[0-9A-Za-z]+$"))
)

removidos = df_avaliacoes.count() - df_base.count()
print(f"üöÆ Linhas removidas (review_id nulo, vazio, '-' ou fora do padr√£o alfanum√©rico): {removidos}")
print(f"‚úÖ Total restante ap√≥s limpeza: {df_base.count()}")

dup_ids = (
    df_base.groupBy("review_id")
    .count()
    .filter("count > 1")
    .select("review_id")
)

df_dups = (
    df_base.join(dup_ids, "review_id", "inner")
    .orderBy("review_id", "review_answer_timestamp")
)

print(f"üîÅ Total de review_id duplicados ap√≥s limpeza: {dup_ids.count()}")
display(df_dups.limit(100))


In [0]:
# Base inicial (j√° com review_id v√°lido)
df_base = df_avaliacoes.filter(
    (F.col("review_id").isNotNull()) &
    (F.col("review_id").rlike("^[0-9A-Za-z]+$"))
)

# Limpeza das colunas de data: mant√©m apenas registros no formato "YYYY-MM-DD HH:MM:SS" ou nulos; remove textos e valores inv√°lidos
regex_data = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$"

# Filtrar registros que tenham datas no formato v√°lido ou nulas
df_final = df_base.filter(
    (
        F.col("review_creation_date").rlike(regex_data) | F.col("review_creation_date").isNull()
    ) &
    (
        F.col("review_answer_timestamp").rlike(regex_data) | F.col("review_answer_timestamp").isNull()
    )
)

# Converter colunas para tipo timestamp (mantendo as nulas)
df_final = (
    df_final
    .withColumn("review_creation_date", F.to_timestamp("review_creation_date"))
    .withColumn("review_answer_timestamp", F.to_timestamp("review_answer_timestamp"))
)

# Exibir resultados
removidos = df_avaliacoes.count() - df_final.count()
print(f"üöÆ Linhas removidas (review_id inv√°lido ou datas fora do padr√£o): {removidos}")
print(f"‚úÖ Total final ap√≥s limpeza: {df_final.count()}")
display(df_final.select("review_id", "review_creation_date", "review_answer_timestamp").limit(20))

In [0]:
# üßπ Limpeza final da coluna review_score: mant√©m apenas valores num√©ricos v√°lidos (1‚Äì5) ou nulos; remove textos, datas e ru√≠dos

from pyspark.sql import functions as F

df_final_score_valid = df_final.filter(
    F.col("review_score").isNull() |               # mant√©m nulos
    F.col("review_score").rlike("^[1-5]$")        # mant√©m valores 1‚Äì5 v√°lidos
)

removidos_score = df_final.count() - df_final_score_valid.count()
print(f"üöÆ Linhas removidas (review_score inv√°lido): {removidos_score}")
print(f"‚úÖ Total restante ap√≥s limpeza: {df_final_score_valid.count()}")

display(
    df_final_score_valid.select("review_id", "review_score", "review_creation_date", "review_answer_timestamp").limit(10)
)


In [0]:

df_avaliacoes_silver = (
    df_final_score_valid
    .select(
        F.col("review_id").alias("id_avaliacao"),
        F.col("order_id").alias("id_pedido"),
        F.col("review_score").cast("int").alias("avaliacao"),
        F.col("review_comment_title").alias("titulo_comentario"),
        F.col("review_comment_message").alias("comentario"),
        F.to_date("review_creation_date", "yyyy-MM-dd").alias("data_comentario"),
        F.to_timestamp("review_answer_timestamp", "yyyy-MM-dd HH:mm:ss").alias("data_resposta"),
        F.current_timestamp().alias("ingestion_timestamp")  # novo timestamp da camada Silver
    )
)

# Escreve no cat√°logo Silver
df_avaliacoes_silver.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalogo}.silver.ft_avaliacoes_pedidos"
)

print("‚úÖ Tabela 'silver.ft_avaliacoes_pedidos' criada com sucesso!")
display(df_avaliacoes_silver.limit(10))

# Produtos

Data Profiling

In [0]:
# Carrega a tabela da camada Bronze
df_produtos = spark.table(f"{catalogo}.{bronze_db_name}.ft_produtos")

# Mostra amostra e schema
display(df_produtos.limit(10))
df_produtos.printSchema()

In [0]:
nulos_produtos = df_produtos.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_produtos.columns
])
print("üìâ Valores nulos por coluna:")
display(nulos_produtos)

In [0]:
duplicados_produtos = (
    df_produtos.groupBy("product_id")
    .count()
    .filter("count > 1")
)
print(f"üîÅ Total de product_id duplicados: {duplicados_produtos.count()}")
display(duplicados_produtos.limit(10))


Camada Silver

In [0]:
# ü™Ñ Cria√ß√£o da camada Silver: renomeia colunas e adiciona ingestion_timestamp

from pyspark.sql import functions as F

df_produtos_silver = (
    df_produtos
    .select(
        F.col("product_id").alias("id_produto"),
        F.col("product_category_name").alias("categoria_produto"),
        F.col("product_weight_g").cast("float").alias("peso_produto_gramas"),
        F.col("product_length_cm").cast("float").alias("comprimento_centimetros"),
        F.col("product_height_cm").cast("float").alias("altura_centimetros"),
        F.col("product_width_cm").cast("float").alias("largura_centimetros"),
        F.current_timestamp().alias("ingestion_timestamp")
    )
)

# Escreve no cat√°logo Silver
df_produtos_silver.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalogo}.silver.ft_produtos"
)

print("‚úÖ Tabela 'silver.ft_produtos' criada com sucesso!")
display(df_produtos_silver.limit(10))


# Vendedores

In [0]:
# Carrega a tabela da camada Bronze
df_vendedores = spark.table(f"{catalogo}.{bronze_db_name}.ft_vendedores")

# Mostra amostra e schema
display(df_vendedores.limit(10))
df_vendedores.printSchema()

In [0]:

nulos_vendedores = df_vendedores.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_vendedores.columns
])
print("üìâ Valores nulos por coluna:")
display(nulos_vendedores)

duplicados_vendedores = (
    df_vendedores.groupBy("seller_id")
    .count()
    .filter("count > 1")
)
print(f"üîÅ Total de seller_id duplicados: {duplicados_vendedores.count()}")
display(duplicados_vendedores.limit(10))

In [0]:
# üß© Padroniza cidade e estado para letras mai√∫sculas antes de criar a Silver

from pyspark.sql import functions as F

df_vendedores_upper = (
    df_vendedores
    .withColumn("seller_city", F.upper(F.col("seller_city")))
    .withColumn("seller_state", F.upper(F.col("seller_state")))
)

# ‚úÖ Valida√ß√£o: checa se ainda existe alguma entrada fora do padr√£o
df_valida_upper = df_vendedores_upper.filter(
    (F.col("seller_city") != F.upper(F.col("seller_city"))) |
    (F.col("seller_state") != F.upper(F.col("seller_state")))
)

print(f"üöÄ Linhas fora do padr√£o UPPER CASE (depois da transforma√ß√£o): {df_valida_upper.count()}")
display(df_vendedores_upper.limit(10))


In [0]:
# ü™Ñ Cria√ß√£o da camada Silver ft_vendedores com colunas renomeadas e dados padronizados em UPPER CASE

df_vendedores_silver = (
    df_vendedores_upper
    .select(
        F.col("seller_id").alias("id_vendedor"),
        F.col("seller_zip_code_prefix").alias("prefixo_cep"),
        F.col("seller_city").alias("cidade"),
        F.col("seller_state").alias("estado"),
        F.current_timestamp().alias("ingestion_timestamp")
    )
)

df_vendedores_silver.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalogo}.silver.ft_vendedores"
)

print("‚úÖ Tabela 'silver.ft_vendedores' criada com sucesso!")
display(df_vendedores_silver.limit(10))


# Categoria Produtos Tradu√ß√£o

In [0]:
# Carrega a tabela Bronze
df_categoria = spark.table(f"{catalogo}.{bronze_db_name}.dm_categoria_produtos_traducao")

# Mostra amostra e schema
display(df_categoria.limit(10))
df_categoria.printSchema()


In [0]:
nulos_categoria = df_categoria.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_categoria.columns
])
print("üìâ Valores nulos por coluna:")
display(nulos_categoria)

duplicados_categoria = (
    df_categoria.groupBy("product_category_name")
    .count()
    .filter("count > 1")
)
print(f"üîÅ Total de product_category_name duplicados: {duplicados_categoria.count()}")
display(duplicados_categoria.limit(10))


In [0]:
# ü™Ñ Cria√ß√£o da camada Silver dm_categoria_produtos_traducao
# Regras aplicadas:
# - Renomeia colunas
# - Remove duplicados
# - Adiciona ingestion_timestamp

df_categoria_silver = (
    df_categoria
    .dropDuplicates(["product_category_name"])
    .select(
        F.col("product_category_name").alias("nome_produto_pt"),
        F.col("product_category_name_english").alias("nome_produto_en"),
        F.current_timestamp().alias("ingestion_timestamp")
    )
)

# Escrita no cat√°logo Silver
df_categoria_silver.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalogo}.silver.dm_categoria_produtos_traducao"
)

print("‚úÖ Tabela 'silver.dm_categoria_produtos_traducao' criada com sucesso!")
display(df_categoria_silver.limit(10))


# cota√ß√£o dolar

In [0]:
# Carrega a tabela Bronze
df_cotacao = spark.table(f"{catalogo}.{bronze_db_name}.dm_cotacao_dolar")

# Mostra amostra e schema
display(df_cotacao.limit(10))
df_cotacao.printSchema()

In [0]:
stats_cotacao = df_cotacao.select(
    F.min("cotacaoCompra").alias("min_cotacao"),
    F.max("cotacaoCompra").alias("max_cotacao"),
    F.avg("cotacaoCompra").alias("media_cotacao")
)
print("üìä Estat√≠sticas da cota√ß√£o:")
display(stats_cotacao)

In [0]:
nulos_cotacao = df_cotacao.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_cotacao.columns
])
print("üìâ Valores nulos por coluna:")
display(nulos_cotacao)

duplicados_cotacao = (
    df_cotacao.groupBy("dataHoraCotacao")
    .count()
    .filter("count > 1")
)
print(f"üîÅ Total de datas duplicadas: {duplicados_cotacao.count()}")
display(duplicados_cotacao.limit(10))

In [0]:
from pyspark.sql.window import Window

# Converte dataHoraCotacao para date (sem hora)
df_cotacao = (
    df_cotacao
        .withColumn("data", F.to_date("dataHoraCotacao"))
        .withColumn("cotacaoCompra", F.col("cotacaoCompra").cast("decimal(12,2)"))
)

# Cria uma janela temporal ordenada por data
janela = Window.orderBy("data").rowsBetween(Window.unboundedPreceding, 0)

# Criar calend√°rio completo (inclui finais de semana)
min_data, max_data = df_cotacao.select(
    F.min("data"), 
    F.max("data")
).first()

df_calendario = spark.createDataFrame([(min_data, max_data)], ["start", "end"]) \
    .select(F.explode(F.sequence("start", "end")).alias("data"))

# Join do calend√°rio com as cota√ß√µes
df_join = df_calendario.join(
    df_cotacao.select("data", "cotacaoCompra"),
    "data",
    "left"
)


# Preencher finais de semana com √∫ltima cota√ß√£o v√°lida
df_cotacao_silver = (
    df_join
        .withColumn("cotacao_dolar",
            F.last("cotacaoCompra", ignorenulls=True).over(janela)
        )
        .select(
            "data",
            F.col("cotacao_dolar").cast("decimal(12,2)"),
            F.current_timestamp().alias("ingestion_timestamp")
        )
)

# üîπ Salva na camada Silver
df_cotacao_silver.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalogo}.silver.dm_cotacao_dolar"
)

print("‚úÖ Tabela 'silver.dm_cotacao_dolar' criada com sucesso!")
display(df_cotacao_silver.orderBy("data").limit(10))

# VALIDA√á√ïES

In [0]:
# Verifica se h√° PEDIDOS sem CONSUMIDOR

# Carrega as tabelas Silver
df_pedidos = spark.table(f"{catalogo}.silver.ft_pedidos")
df_consumidores = spark.table(f"{catalogo}.silver.ft_consumidores")

# Faz o join anti (retorna pedidos sem consumidor correspondente)
pedidos_orfaos = df_pedidos.join(
    df_consumidores,
    df_pedidos["id_consumidor"] == df_consumidores["id_consumidor"],
    "left_anti"
)

# Conta quantos pedidos est√£o √≥rf√£os
qtd_pedidos_orfaos = pedidos_orfaos.count()
print(f"üö® Pedidos √≥rf√£os (sem consumidor): {qtd_pedidos_orfaos}")

# Se existirem, remove-os do dataset principal
if qtd_pedidos_orfaos > 0:
    df_pedidos = df_pedidos.join(
        df_consumidores,
        "id_consumidor",
        "inner"
    )
    print("üßπ Pedidos √≥rf√£os removidos com sucesso!")
else:
    print("‚úÖ Nenhum pedido √≥rf√£o encontrado.")


# 2Ô∏è‚É£ Verifica se h√° ITENS sem PEDIDO

df_itens = spark.table(f"{catalogo}.silver.ft_itens_pedidos")

# Join anti: itens sem correspond√™ncia de pedido
itens_orfaos = df_itens.join(
    df_pedidos,
    df_itens["id_pedido"] == df_pedidos["id_pedido"],
    "left_anti"
)

# Conta quantos itens est√£o √≥rf√£os
qtd_itens_orfaos = itens_orfaos.count()
print(f"üö® Itens √≥rf√£os (sem pedido): {qtd_itens_orfaos}")

# Se existirem, remove-os
if qtd_itens_orfaos > 0:
    df_itens = df_itens.join(df_pedidos, "id_pedido", "inner")
    print("üßπ Itens √≥rf√£os removidos com sucesso!")
else:
    print("‚úÖ Nenhum item √≥rf√£o encontrado.")


# ============================================
# 3Ô∏è‚É£ Mostra amostras finais e confirma limpeza
# ============================================

print("\nüì¶ Tabelas ap√≥s valida√ß√£o:")
print(f"- ft_pedidos: {df_pedidos.count()} registros v√°lidos")
print(f"- ft_itens_pedidos: {df_itens.count()} registros v√°lidos")

display(df_pedidos.limit(5))
display(df_itens.limit(5))


# TABELA PEDIDO TOTAL

In [0]:
df_pedidos = spark.table(f"{catalogo}.silver.ft_pedidos")
df_consumidores = spark.table(f"{catalogo}.silver.ft_consumidores")
df_pagamentos = spark.table(f"{catalogo}.silver.ft_pagamentos_pedidos")
df_cotacao = spark.table(f"{catalogo}.silver.dm_cotacao_dolar")

# Agrupar pagamentos por pedido
df_pagamentos_agg = (
    df_pagamentos
      .groupBy("id_pedido")
      .agg(F.sum("valor_pagamento").alias("valor_total_pago_brl"))
)

# Join principal
df_pedido_total = (
    df_pedidos
      .join(df_pagamentos_agg, "id_pedido", "left")
      .join(df_consumidores, "id_consumidor", "left")
)

# Data do pedido
df_pedido_total = df_pedido_total.withColumn(
    "data_pedido", F.to_date("pedido_compra_timestamp")
)

# Join com cota√ß√£o
df_pedido_total = df_pedido_total.join(
    df_cotacao,
    df_pedido_total["data_pedido"] == df_cotacao["data"],
    "left"
)

# Convers√£o para USD
df_pedido_total = df_pedido_total.withColumn(
    "valor_total_pago_usd",
    (F.col("valor_total_pago_brl") / F.col("cotacao_dolar")).cast("decimal(12,2)")
)

# Criar campo status
df_pedido_total = df_pedido_total.withColumnRenamed("status_original", "status")

# Sele√ß√£o final
df_pedido_total = df_pedido_total.select(
    "id_pedido",
    "id_consumidor",
    "status",
    F.col("valor_total_pago_brl").cast("decimal(12,2)"),
    "valor_total_pago_usd",
    "data_pedido",
    F.current_timestamp().alias("ingestion_timestamp")
)

# Salvar
df_pedido_total.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalogo}.silver.ft_pedido_total"
)

display(df_pedido_total)
print(f"Total: {df_pedido_total.count()}")
