In [1]:
# Importar bibliotecas necessárias para Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, trim, coalesce, create_map, array, approx_percentile
from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType, DecimalType

# --- 1. Configuração de Lakehouses e Schemas ---
# Lakehouse da Camada Bronze (onde seus dados limpos estão)
bronze_lakehouse_name = "Projeto_II_Bronze_"
bronze_schema_name = "Projeto_II_Bronze_" # O schema onde suas tabelas Bronze foram salvas

# Lakehouse da Camada Silver (destino dos dados transformados)
silver_lakehouse_name = "Projeto_II_Silver_"
silver_schema_name = "Projeto_II_Silver_" # O schema de destino dentro do Lakehouse Silver

print(f"Processamento da Camada Bronze -> Silver iniciado.")
print(f"Lendo do Lakehouse Bronze: '{bronze_lakehouse_name}', Schema: '{bronze_schema_name}'")
print(f"Escrevendo no Lakehouse Silver: '{silver_lakehouse_name}', Schema: '{silver_schema_name}'")
print("-" * 80)

# --- 2. Processamento da Tabela 'cities_bronze' -> 'cities_silver' ---
print("\n--- Processando cities_bronze -> cities_silver ---")
try:
    df_cities_bronze = spark.read.format("delta").table(f"{bronze_lakehouse_name}.{bronze_schema_name}.cities_bronze")

    # Dicionário de correção para city_code
    city_code_mapping_dict = {
        "?zmir": "Izmir",
        "Sanl?urfa": "Sanliurfa",
        "Diyarbak?r": "Diyarbakir",
        "Eski?ehir": "Eskisehir",
        "Adapazar?": "Adapazari"
    }

    # Construindo a lista de pares chave-valor para create_map
    city_code_map_args = []
    for k, v in city_code_mapping_dict.items():
        city_code_map_args.append(lit(k))
        city_code_map_args.append(lit(v))
    
    city_code_map_spark = create_map(*city_code_map_args)

    df_cities_silver = df_cities_bronze.select(
        col("store_id"),
        col("storetype_id"), # Mantido em minúsculas
        col("store_size"),
        # Retirada da coluna "city_old_id" por, aparentemente, se tratar de um id antigo
        # Aplicando o mapeamento para city_code e renomeando para city_name
        # coalesce garante que se o valor não estiver no mapa, o original (trimed) seja usado
        coalesce(city_code_map_spark.getItem(col("city_code")), trim(col("city_code"))).alias("city_name"),
        # Foi retirada a coluna de country_id pois todos os dados são referentes ao mesmo país (Turquia)
    ).filter(
        col("store_id").isNotNull() # Remover linhas onde store_id é nulo (chave primária)
    )

    # Escrever a tabela Silver
    df_cities_silver.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(f"{silver_lakehouse_name}.{silver_schema_name}.cities_silver")

    print(f"Tabela 'cities_silver' criada com sucesso no Lakehouse '{silver_lakehouse_name}'.")
    df_cities_silver.printSchema()
    df_cities_silver.show(5)

except Exception as e:
    print(f"ERRO: Falha ao processar 'cities_silver'. Detalhes do erro: {e}")

# --- 3. Processamento da Tabela 'product_bronze' -> 'product_silver' ---
print("\n--- Processando product_bronze -> product_silver ---")
try:
    df_product_bronze = spark.read.format("delta").table(f"{bronze_lakehouse_name}.{bronze_schema_name}.product_bronze")

    df_product_silver = df_product_bronze.select(
        col("product_id"),
        # Tratamento de nulos em product_length, product_depth, product_width
        # Preenchendo nulos com 0.0 (double)
        coalesce(col("product_length"), lit(0.0)).alias("product_length"),
        coalesce(col("product_depth"), lit(0.0)).alias("product_depth"),
        coalesce(col("product_width"), lit(0.0)).alias("product_width"),
        # Tratamento de NULLs em cluster_id
        when(col("cluster_id").isNull(), "unknown_cluster") # Mantido em minúsculas
        .otherwise(trim(col("cluster_id")))
        .alias("cluster_id"),
        trim(col("hierarchy1_id")).alias("hierarchy1_id"), # Mantido em minúsculas
        trim(col("hierarchy2_id")).alias("hierarchy2_id"), # Mantido em minúsculas
        trim(col("hierarchy3_id")).alias("hierarchy3_id"), # Mantido em minúsculas
        trim(col("hierarchy4_id")).alias("hierarchy4_id"), # Mantido em minúsculas
        trim(col("hierarchy5_id")).alias("hierarchy5_id")  # Mantido em minúsculas
    ).filter(
        col("product_id").isNotNull() # Remover linhas onde product_id é nulo
    ).dropDuplicates(["product_id"]) # Remover duplicatas baseadas em product_id

    # Escrever a tabela Silver
    df_product_silver.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(f"{silver_lakehouse_name}.{silver_schema_name}.product_silver")

    print(f"Tabela 'product_silver' criada com sucesso no Lakehouse '{silver_lakehouse_name}'.")
    df_product_silver.printSchema()
    df_product_silver.show(5)

except Exception as e:
    print(f"ERRO: Falha ao processar 'product_silver'. Detalhes do erro: {e}")


# --- 4. Processamento da Tabela 'sales_bronze' -> 'sales_silver' ---
print("\n--- Processando sales_bronze -> sales_silver ---")
try:
    df_sales_bronze = spark.read.format("delta").table(f"{bronze_lakehouse_name}.{bronze_schema_name}.sales_bronze")

    # PRÉ-PASSO CRÍTICO: Remover a coluna '_c0' ANTES do select principal
    if "_c0" in df_sales_bronze.columns:
        df_sales_bronze_cleaned = df_sales_bronze.drop("_c0")
    else:
        df_sales_bronze_cleaned = df_sales_bronze # Se _c0 já não existe, não faz nada
    
    # --- CALCULAR A MEDIANA DE 'price' ANTES DAS TRANSFORMAÇÕES ---
    # É importante calcular a mediana a partir dos valores válidos (não nulos)
    # Cast para DoubleType antes de calcular a média para garantir que a agregação funcione corretamente
    median_price_value = df_sales_bronze_cleaned.select(approx_percentile(col("price").cast(DoubleType()),0.5)).collect()[0][0]

    # Tratar o caso em que a mediana pode ser None (se a coluna for toda nula)
    # Se a média for None, podemos definir um valor padrão, por exemplo, 0.0 ou levantar um erro
    if median_price_value is None:
        print("Aviso: A coluna 'price' contém apenas valores nulos. Usando 0.0 como média de fallback.")
        median_price_value = 0.0 # Valor padrão se não houver valores para calcular a média
    
    print(f"Mediana calculada para a coluna 'price': {median_price_value}")

    # Construindo as expressões de coluna de forma mais verbosa e explícita
    # Isso isola cada etapa e evita potenciais ambiguidades para o otimizador do Spark

    # Numeric Columns
    sales_col_raw = col("sales")
    sales_cast_double = sales_col_raw.cast(DoubleType())
    sales_final = when(sales_cast_double.isNotNull(), sales_cast_double).otherwise(lit(0.0)).alias("sales")

    revenue_col_raw = col("revenue")
    revenue_cast_double = revenue_col_raw.cast(DoubleType())
    revenue_final = when(revenue_cast_double.isNotNull(), revenue_cast_double).otherwise(lit(0.0)).alias("revenue")

    stock_col_raw = col("stock")
    stock_cast_double = stock_col_raw.cast(DoubleType())
    stock_final = when(stock_cast_double.isNotNull(), stock_cast_double).otherwise(lit(0.0)).alias("stock")

 # Numeric Column (Imputação com a MEDIANA para price)
    price_col_raw = col("price")
    price_cast_double = price_col_raw.cast(DoubleType())
    # Usar o valor da média calculada
    price_final = when(price_cast_double.isNotNull(), price_cast_double).otherwise(lit(median_price_value)).alias("price")

    promo_discount_2_col_raw = col("promo_discount_2")
    promo_discount_2_cast_double = promo_discount_2_col_raw.cast(DoubleType())
    promo_discount_2_final = when(promo_discount_2_cast_double.isNotNull(), promo_discount_2_cast_double).otherwise(lit(0.0)).alias("promo_discount_2")

    # String/Categorical Columns
    promo_type_1_raw = col("promo_type_1")
    promo_type_1_trimmed_lower = trim(promo_type_1_raw)
    promo_type_1_final = when(promo_type_1_trimmed_lower == "na", "no_promo").otherwise(promo_type_1_trimmed_lower).alias("promo_type_1")

    promo_bin_1_raw = col("promo_bin_1")
    promo_bin_1_trimmed_lower = trim(promo_bin_1_raw)
    promo_bin_1_final = when(promo_bin_1_trimmed_lower == "na", "no_bin").otherwise(promo_bin_1_trimmed_lower).alias("promo_bin_1")

    promo_type_2_raw = col("promo_type_2")
    promo_type_2_trimmed_lower = trim(promo_type_2_raw)
    promo_type_2_final = when(promo_type_2_trimmed_lower == "na", "no_promo").otherwise(promo_type_2_trimmed_lower).alias("promo_type_2")

    promo_bin_2_raw = col("promo_bin_2")
    promo_bin_2_trimmed_lower = trim(promo_bin_2_raw)
    promo_bin_2_final = when(promo_bin_2_trimmed_lower == "na", "no_bin").otherwise(promo_bin_2_trimmed_lower).alias("promo_bin_2")

    promo_discount_type_2_raw = col("promo_discount_type_2")
    promo_discount_type_2_trimmed_lower = trim(promo_discount_type_2_raw)
    promo_discount_type_2_final = when(promo_discount_type_2_trimmed_lower == "na", "no_discount_type").otherwise(promo_discount_type_2_trimmed_lower).alias("promo_discount_type_2")


    # Lista final de colunas para o select, usando as variáveis explicitamente definidas
    sales_silver_columns_definitions = [
        col("store_id"),
        col("product_id"),
        col("date"), # 'date' já está como DateType

        sales_final,
        revenue_final,
        stock_final,
        price_final,
        
        promo_type_1_final,
        promo_bin_1_final,
        promo_type_2_final,
        promo_bin_2_final,
        
        promo_discount_2_final,
        promo_discount_type_2_final
    ]

    # Aplicar as transformações usando a lista de definições de colunas NO DATAFRAME LIMPO
    df_sales_silver = df_sales_bronze_cleaned.select(*sales_silver_columns_definitions).filter(
        # Remover linhas onde store_id, product_id ou date são nulos, pois são chaves essenciais
        col("store_id").isNotNull() &
        col("product_id").isNotNull() &
        col("date").isNotNull()
    ) # Opcional: .dropDuplicates(["store_id", "product_id", "date"]) se a combinação for única por dia

    # Escrever a tabela Silver
    df_sales_silver.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(f"{silver_lakehouse_name}.{silver_schema_name}.sales_silver")

    print(f"Tabela 'sales_silver' criada com sucesso no Lakehouse '{silver_lakehouse_name}'.")
    df_sales_silver.printSchema()
    df_sales_silver.show(5)

except Exception as e:
    print(f"ERRO: Falha ao processar 'sales_silver'. Detalhes do erro: {e}")

print("\n" + "="*80)
print("--- Processamento da Camada Bronze para Silver Concluído! ---")
print("As tabelas Delta foram criadas no Lakehouse 'Projeto_II_Silver_', Schema 'Projeto_II_Silver_'.")
print("="*80)

StatementMeta(, d09098d8-0fb4-4392-b623-961b6bdaa95e, 3, Finished, Available, Finished)

Processamento da Camada Bronze -> Silver iniciado.
Lendo do Lakehouse Bronze: 'Projeto_II_Bronze_', Schema: 'Projeto_II_Bronze_'
Escrevendo no Lakehouse Silver: 'Projeto_II_Silver_', Schema: 'Projeto_II_Silver_'
--------------------------------------------------------------------------------

--- Processando cities_bronze -> cities_silver ---
Tabela 'cities_silver' criada com sucesso no Lakehouse 'Projeto_II_Silver_'.
root
 |-- store_id: string (nullable = true)
 |-- storetype_id: string (nullable = true)
 |-- store_size: integer (nullable = true)
 |-- city_name: string (nullable = true)

+--------+------------+----------+---------+
|store_id|storetype_id|store_size|city_name|
+--------+------------+----------+---------+
|   S0036|        ST04|        21|  Denizli|
|   S0005|        ST04|        19|  Denizli|
|   S0104|        ST04|        47|   Ankara|
|   S0068|        ST03|        14|    Izmir|
|   S0086|        ST03|        12|    Izmir|
+--------+------------+----------+---------+

