In [0]:
# Alteração 

from pyspark.sql.functions import col, upper, trim, to_timestamp, current_timestamp
from pyspark.sql.types import IntegerType, DoubleType
#alteração apenas para subir no git
# Definições de Catálogo e Schema
CATALOG_NAME = "workspace"
SCHEMA_NAME = "projeto_medalhao_ecommerce"

print(f"Iniciando transformações da Camada Bronze para Silver no Schema: {SCHEMA_NAME}")

# ==============================================================================
# 1. Tabela Clientes: Padronização, Limpeza e Remoção de Duplicados
# ==============================================================================
df_customers_bronze = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.bronze_customers")

df_customers_silver = (df_customers_bronze
    # --- Passo 1: Limpeza/Tratamento usando colunas EXISTENTES

    .withColumn("name", trim(col("name"))) # Limpa espaços
    
    # Data Quality: Remoção de Duplicados (Usando 'customer_id' como ID único neste dataset)
    .dropDuplicates(["customer_id"])
    
    # --- Passo 2: Padronização de Nomenclatura ---
    .withColumnRenamed("customer_id", "id_cliente")
    .withColumnRenamed("name", "nome_cliente")
    .withColumnRenamed("email", "email_cliente")
    .withColumnRenamed("gender", "genero")
    .withColumnRenamed("signup_date", "data_cadastro")
    .withColumnRenamed("country", "pais")
    
    # Adicionar metadado de processamento Silver
    .withColumn("processamento_silver_ts", current_timestamp())
    
    # Remover metadados da Bronze
    .drop("source_file", "ingestion_timestamp")
)

# Escrita na Camada Silver (Corrigido para evitar erro de gravação)
df_customers_silver.write.format("delta").mode("overwrite").saveAsTable(f"{SCHEMA_NAME}.silver_clientes")
print("-> Tabela silver_clientes criada/atualizada com sucesso.")

# Continue com a Seção 2 (Tabela Produtos) e seguintes.


# ==============================================================================
# 2. Tabela Produtos: Conversão de Tipos e Tratamento de Nulos 
# ==============================================================================
df_products_bronze = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.bronze_products")

df_products_silver = (df_products_bronze
    # Padronização de Nomenclatura (USANDO SÓ COLUNAS EXISTENTES: product e category)
    .withColumnRenamed("product_id", "id_produto")
    .withColumnRenamed("product", "nome_produto") 
    .withColumnRenamed("category", "categoria_produto") 
    .withColumnRenamed("price", "preco") # Manter essa renomeação para consistência
    
    # Tratamento de Nulos: Preencher valores nulos (dados ausentes) na categoria
    .fillna({"categoria_produto": "SEM_CATEGORIA"})
    
    # Removendo colunas que não existem e as antigas de metadados
    .drop("tamanho_nome_produto", "tamanho_descricao_produto", "qtd_fotos_produto") 
    .drop("source_file", "ingestion_timestamp")
)

# Escrita na Camada Silver (Corrigido para evitar erro de gravação)
df_products_silver.write.format("delta").mode("overwrite").saveAsTable(f"{SCHEMA_NAME}.silver_produtos")
print("-> Tabela silver_produtos criada/atualizada com sucesso.")

# Continue com a Seção 3 (Tabela Orders) e seguintes.

# ==============================================================================
# 3. Tabela Orders (Pedidos): Tipagem de Datas
# ==============================================================================
df_orders_bronze = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.bronze_orders")

date_cols = [
    "order_date"  # USAR APENAS A COLUNA EXISTENTE
]

df_orders_silver = df_orders_bronze
for col_name in date_cols:
    # Conversão de tipo de dado
    df_orders_silver = df_orders_silver.withColumn(col_name, to_timestamp(col(col_name)))

# Padronização e Filtro
df_orders_silver = (df_orders_silver
    .withColumnRenamed("order_id", "id_pedido")
    .withColumnRenamed("customer_id", "id_cliente")
    .withColumnRenamed("order_date", "data_pedido")
    
    # Filtro: Remover pedidos sem ID (Data Quality)
    .filter(col("id_pedido").isNotNull())
    .drop("source_file", "ingestion_timestamp")
)

# Escrita na Camada Silver
df_orders_silver.write.format("delta").mode("overwrite").saveAsTable(f"{SCHEMA_NAME}.silver_pedidos")
print("-> Tabela silver_pedidos criada/atualizada com sucesso.")


# ==============================================================================
# 4. Tabela Order Items (Itens de Pedido) 
# ==============================================================================
df_items_bronze = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.bronze_order_items")

df_items_silver = (df_items_bronze
    .withColumnRenamed("order_id", "id_pedido")
    .withColumnRenamed("order_item_id", "id_item_pedido")
    .withColumnRenamed("product_id", "id_produto")
    .withColumnRenamed("seller_id", "id_vendedor")
    
    # Coluna REAL é 'unit_price', renomeada para 'preco'
    .withColumnRenamed("unit_price", "preco") 
    
    # Conversão de tipos de dados numéricos (preco para DoubleType)
    .withColumn("preco", col("preco").cast(DoubleType())) 
    
    
    # Filtro para garantir integridade básica
    .filter(col("preco").isNotNull())
    .drop("source_file", "ingestion_timestamp")
)

# Escrita na Camada Silver
df_items_silver.write.format("delta").mode("overwrite").saveAsTable(f"{SCHEMA_NAME}.silver_itens_pedido")
print("-> Tabela silver_itens_pedido criada/atualizada com sucesso.")

# ==============================================================================
# 5. Tabela Product Reviews (Avaliações)
# ==============================================================================
df_reviews_bronze = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.bronze_product_reviews")

df_reviews_silver = (df_reviews_bronze
    
    # TRATAMENTO 1: Tratar a coluna REAL 'rating' para garantir que seja Integer
    .withColumn("rating", col("rating").cast(IntegerType()))
    
    # Padronização de Nomenclatura
    .withColumnRenamed("review_id", "id_avaliacao")
    .withColumnRenamed("order_id", "id_pedido")
    .withColumnRenamed("rating", "score_avaliacao") # Renomear para o padrão português
    
    # Opcional: Remova Reviews sem score ou com dados incompletos
    .filter(col("score_avaliacao").isNotNull()) # Usar o nome JÁ RENOMEADO, pois a transformação acima já foi resolvida
    .drop("source_file", "ingestion_timestamp")
)

# Escrita na Camada Silver
df_reviews_silver.write.format("delta").mode("overwrite").saveAsTable(f"{SCHEMA_NAME}.silver_avaliacoes")
print("-> Tabela silver_avaliacoes criada/atualizada com sucesso.")