In [0]:
from settings import create_spark_session
from pyspark.sql.functions import col, to_date, lit, trim
import logging
from datetime import datetime

In [0]:
def save_table(df, table_name, silver_schema, partition_col):
    df.write.format("delta") \
        .mode("append") \
        .partitionBy(partition_col) \
        .saveAsTable(f"{silver_schema}.{table_name}")

def process_customers(logger, spark, bronze_schema, silver_schema, ingest_date):
    try:
        logger.info("Lendo dados da camada Bronze - customers...")

        customers = spark.table(f"{bronze_schema}.customers") \
            .filter(col("ingest_date") == ingest_date)

        if customers.count() == 0:
            logger.warning(f"Nenhum dado encontrado para ingest_date = {ingest_date}.")
            return
        else:
            logger.info("Transformando e limpando dados dos clientes...")
            customers_cleaned = (
                customers
                .dropDuplicates(["customer_id"])
                .filter(col("email").isNotNull())
                .filter(col("created_at") <= to_date(lit(ingest_date), "yyyy-MM-dd"))
                .withColumn("address", trim(col("address")))
            )

            logger.info("Salvando clientes limpos na camada Silver como tabela...")
            save_table(customers_cleaned, "customers_cleaned", silver_schema, "ingest_date")

    except Exception as e:
        logger.error(f"Erro ao processar customers: {e}")

def process_products(logger, spark, bronze_schema, silver_schema, ingest_date):
    try:
        logger.info("Lendo dados da camada Bronze - products...")

        products = spark.table(f"{bronze_schema}.products") \
            .filter(col("ingest_date") == ingest_date)
        
        if products.count() == 0:
            logger.warning(f"Nenhum dado encontrado para ingest_date = {ingest_date}.")
        else:
            logger.info("Transformando e limpando dados dos produtos...")
            products_cleaned = (
                products
                .dropDuplicates(["product_id"])
                .filter(col("price") > 0)
                .filter(col("category") != "")
            )

            logger.info("Salvando produtos limpos na camada Silver como tabela...")
            save_table(products_cleaned, "products_cleaned", silver_schema, "ingest_date")

    except Exception as e:
        logger.error(f"Erro ao processar products: {e}")

def process_inventory_updates(logger, spark, bronze_schema, silver_schema, ingest_date):
    try:
        logger.info("Lendo dados da camada Bronze - inventory_updates...")

        inventory = spark.table(f"{bronze_schema}.inventory_updates") \
            .filter(col("ingest_date") == ingest_date)
        
        if inventory.count() == 0:
            logger.warning(f"Nenhum dado encontrado para ingest_date = {ingest_date}.")
        else:
            logger.info("Transformando e limpando dados de inventário...")
            inventory_cleaned = (
                inventory
                .filter(col("change") != 0)
                .filter(col("timestamp") <= lit(datetime.today().strftime('%Y-%m-%d %H:%M:%S')))
            )

            logger.info("Salvando inventário limpo na camada Silver como tabela...")
            save_table(inventory_cleaned, "inventory_cleaned", silver_schema, "ingest_date")

    except Exception as e:
        logger.error(f"Erro ao processar inventory_updates: {e}")

def process_orders(logger, spark, bronze_schema, silver_schema, ingest_date):
    try:
        logger.info("Lendo dados da camada Bronze - orders...")

        orders = spark.table(f"{bronze_schema}.orders") \
            .filter(col("ingest_date") == ingest_date)
        
        if orders.count() == 0:
            logger.warning(f"Nenhum dado encontrado para ingest_date = {ingest_date}.")
        else:
            logger.info("Transformando e limpando dados dos pedidos...")

            orders_cleaned = (
                orders
                .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd HH:mm:ss"))
                .filter(col("status").isin(["completed", "processing"]))
                .dropDuplicates(["order_id"])
            )

            logger.info("Salvando pedidos limpos na camada Silver como tabela...")
            save_table(orders_cleaned, "orders_cleaned", silver_schema, "ingest_date")

    except Exception as e:
        logger.error(f"Erro ao processar orders: {e}")

def process_order_items(logger, spark, bronze_schema, silver_schema, ingest_date):
    try:
        logger.info("Lendo dados da camada Bronze - order_items...")

        items = spark.table(f"{bronze_schema}.order_items") \
            .filter(col("ingest_date") == ingest_date)
        
        if items.count() == 0:
            logger.warning(f"Nenhum dado encontrado para ingest_date = {ingest_date}.")
        else:
            logger.info("Transformando e limpando dados dos itens dos pedidos...")
            items_cleaned = (
                items
                .filter(col("quantity") > 0)
                .filter(col("unit_price") > 0)
                .withColumn("total_price", col("quantity") * col("unit_price"))
            )

            logger.info("Salvando itens de pedido limpos na camada Silver como tabela...")
            save_table(items_cleaned, "order_items_cleaned", silver_schema, "ingest_date")

    except Exception as e:
        logger.error(f"Erro ao processar order_items: {e}")

def main(spark, bronze_schema, silver_schema, ingest_date):
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s"
    )
    logger = logging.getLogger("SilverLayer")

    # Cria o schema Silver
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_schema}")

    logger.info("Iniciando o processamento da camada Silver.")

    process_customers(logger, spark, bronze_schema, silver_schema, ingest_date)
    process_inventory_updates(logger, spark, bronze_schema, silver_schema, ingest_date)
    process_order_items(logger, spark, bronze_schema, silver_schema, ingest_date)
    process_orders(logger, spark, bronze_schema, silver_schema, ingest_date)
    process_products(logger, spark, bronze_schema, silver_schema, ingest_date)

    logger.info("Finalizando sessão spark.")
    spark.stop()
    logger.info("Processamento da camada Silver finalizado com sucesso.")


In [0]:
silver_config = {
    "spark.sql.shuffle.partitions": "200",
    "spark.databricks.delta.optimizeWrite.enabled": "true",
    "spark.databricks.delta.autoCompact.enabled": "true",
    "spark.sql.autoBroadcastJoinThreshold": "104857600",
    "spark.default.parallelism": "200",
    "spark.sql.parquet.filterPushdown": "true",
    "spark.databricks.delta.merge.enabled": "true",
}

app_name = "SilverLayer"
spark = create_spark_session(app_name, silver_config)

Explicação das configs para a camada silver
  - **spark.sql.shuffle.partitions = 200**
    - Aumenta a paralelização de operações como groupBy, join, repartition.
    - Ideal para datasets maiores, onde ocorrem limpezas e agregações.

  - **spark.databricks.delta.optimizeWrite.enabled = true**
    - Durante as transformações e escrita intermediária, garante menos arquivos pequenos.
    - Evita overhead futuro na leitura.

  - **spark.databricks.delta.autoCompact.enabled = true**
    - Compacta arquivos pequenos automaticamente após escrita.
    - Mantém a Silver eficiente para leitura na Gold.
  
  - **spark.sql.autoBroadcastJoinThreshold = 104857600**
    - Permite que joins sejam feitos por broadcast quando a tabela for menor que 100 MB.
    - Evita shuffles desnecessários e acelera joins.
  
  - **spark.default.parallelism = 200**
    - Ajusta o nível de paralelismo padrão para tarefas como leitura/escrita paralela, sem shuffle.
    - Coerente com o número de partitions.

  - **spark.sql.parquet.filterPushdown = true**
    - Usa filtros na leitura de arquivos Parquet, economizando leitura de dados não relevantes.
    - Já útil na Silver ao reprocessar ou validar dados.

  - **spark.databricks.delta.merge.enabled = true**
    - Habilita suporte interno para comandos MERGE INTO, comuns na Silver.
    - Útil caso aplique upserts (ex: merge de batches ou dados CDC).

Essa configuração é ideal para a camada Silver, pois:
  - Aumenta a eficiência das transformações.
  - Mantém a escrita estruturada e otimizada.
  - Suporta operações como deduplicação, joins e merge com performance.

In [0]:
#VARIAVEIS
bronze_schema = "lakehouse.a_bronze"
silver_schema = "lakehouse.a_silver"
ingest_date = "2025-06-15" # Define a data de ingestão a ser processada (ex: '2025-06-12')

# Chama a função principal da camada Silver
main(spark, bronze_schema, silver_schema, ingest_date)

2025-06-15 13:47:17,313 - INFO - Iniciando o processamento da camada Silver.
2025-06-15 13:47:17,315 - INFO - Lendo dados da camada Bronze - customers...
2025-06-15 13:47:17,750 - INFO - Transformando e limpando dados dos clientes...
2025-06-15 13:47:17,751 - INFO - Salvando clientes limpos na camada Silver como tabela...
2025-06-15 13:47:21,523 - INFO - Lendo dados da camada Bronze - inventory_updates...
2025-06-15 13:47:21,838 - INFO - Transformando e limpando dados de inventário...
2025-06-15 13:47:21,841 - INFO - Salvando inventário limpo na camada Silver como tabela...
2025-06-15 13:47:24,240 - INFO - Lendo dados da camada Bronze - order_items...
2025-06-15 13:47:24,544 - INFO - Transformando e limpando dados dos itens dos pedidos...
2025-06-15 13:47:24,545 - INFO - Salvando itens de pedido limpos na camada Silver como tabela...
2025-06-15 13:47:29,198 - INFO - Lendo dados da camada Bronze - orders...
2025-06-15 13:47:29,533 - INFO - Transformando e limpando dados dos pedidos...
2