In [0]:
from pyspark.sql.functions import col

# Caminho para a camada Silver
silver_path = "/dbfs/FileStore/tables/data_lake/silver/"

# Lista de tabelas da camada Bronze
tabelas = ["produtos", "pedidos", "pagamentos", "clientes", "categorias", "entregas"]

# Loop para transformar e salvar cada tabela na camada Silver
for tabela in tabelas:
    # Caminho de leitura na camada Bronze
    bronze_path = f"/dbfs/FileStore/tables/data_lake/bronze/{tabela}/"
    
    # Caminho de gravação na camada Silver
    silver_table_path = f"{silver_path}{tabela}/"
    
    # Ler os dados da camada Bronze
    df_bronze = spark.read.format("delta").load(bronze_path)
    
    # Realizar transformações nos dados
    if tabela == "produtos":
        # Remover duplicatas, filtrar produtos com preço positivo e estoque não negativo
        df_silver = df_bronze.dropDuplicates() \
            .filter(df_bronze['preco'].isNotNull()) \
            .filter(df_bronze['preco'] > 0) \
            .filter(df_bronze['estoque'] >= 0)  # Filtra produtos com estoque não negativo
    
    elif tabela == "pedidos":
        # Remover duplicatas e garantir que a coluna 'valor_total' seja válida e positiva
        df_silver = df_bronze.dropDuplicates() \
            .filter(df_bronze['valor_total'].isNotNull()) \
            .filter(df_bronze['valor_total'] > 0) \
            .filter(df_bronze['data_pedido'] >= '2023-01-01')  # Filtra pedidos realizados a partir de 2023
    
    elif tabela == "pagamentos":
        # Remover duplicatas, garantir que o 'valor_pago' seja positivo e 'metodo_pagamento' não nulo
        df_silver = df_bronze.dropDuplicates() \
            .filter(df_bronze['valor_pago'].isNotNull()) \
            .filter(df_bronze['valor_pago'] > 0) \
            .filter(df_bronze['metodo_pagamento'] != '')  # Filtra pagamentos com método de pagamento válido
    
    elif tabela == "clientes":
        # Remover duplicatas, garantir que o 'email' e 'data_criacao' sejam válidos
        df_silver = df_bronze.dropDuplicates() \
            .filter(df_bronze['email'].isNotNull()) \
            .filter(df_bronze['email'] != '') \
            .filter(df_bronze['data_criacao'].isNotNull()) \
            .filter(df_bronze['status'] == 'ativo')  # Filtra clientes ativos
    
    elif tabela == "categorias":
        # Remover duplicatas e garantir que a coluna 'nome' seja não nula
        df_silver = df_bronze.dropDuplicates() \
            .filter(df_bronze['nome'].isNotNull()) \
            .filter(df_bronze['nome'] != '')  # Filtra categorias com nome válido
    
    elif tabela == "entregas":
        # Remover duplicatas, garantir que 'status_entrega' seja válido
        df_silver = df_bronze.dropDuplicates() \
            .filter(df_bronze['status_entrega'].isNotNull()) \
            .filter(df_bronze['status_entrega'] != '')  # Filtra entregas com status válido
    
    # Exemplo de transformação de tipo de dado
    # Garantir que a coluna 'data_pedido' seja do tipo 'timestamp' em 'pedidos'
    if tabela == "pedidos":
        df_silver = df_silver.withColumn("data_pedido", df_silver["data_pedido"].cast("timestamp"))
    
    # Gravar os dados transformados na camada Silver no formato Delta
    df_silver.write.format("delta").mode("overwrite").save(silver_table_path)
    
    print(f"Dados da tabela '{tabela}' transformados e salvos na camada Silver em {silver_table_path}")

