### <font color='blue'>Automatizando o Pipeline de Consolidação, Limpeza e Enriquecimento de Dados no Databricks</font>

## Tabela Silver de Produtos

In [None]:
# Define o banco de dados que será usado
spark.sql('use database dsa_db_02_staging')

In [None]:
# Cria a tabela Silver
spark.sql(""" create table if not exists dsa_silver_produtos (
    brand string,
    category string,
    is_active boolean,
    name string,
    price double,
    product_id long,
    rating double,
    stock_quantity string,
    last_updated_at timestamp) """
)

In [None]:
# Extrai a última data de atualização (a maior data)
last_updated_df = spark.sql('select max(last_updated_at) as last_completed from dsa_silver_produtos')

In [None]:
# Extrai a data de atualização
last_updated_time = last_updated_df.collect()[0]['last_completed']

In [None]:
# Se não houver data de atualização, considera a data de 1900-01-01
if last_updated_time is None:
    last_updated_time = '1900-01-01T00:00:00.000+00:00'

In [None]:
# Vamos criar uma view temporária a fim de verificar que o registro recebido na tabela bronze foi ou não processado para a tabela silver
spark.sql(f"""
          create or replace temporary view produto_incremental as
          select * from dsa_db_01_inicial.dsa_bronze_produtos as c where c.recebido_em  > '{last_updated_time}' """)

In [None]:
# Select
spark.sql("select *  from produto_incremental limit 10").show()

In [None]:
# Vamos criar uma view temporária somente para produtos com id e categoria
spark.sql(""" 
          create or replace temporary view vw_silver_produtos_incremental as
          select
          case
            when brand is not null then lower(trim(brand))
            else 'Unknown'
          end as brand,
          case 
            when category is not null then initcap(trim(category))
            else 'Unknown'
          end as category,
          is_active,
          case 
            when name is not null then initcap(trim(name))
            else 'Unknown'
          end as name,
          case
            when price < 0 then 0
            else price
          end as price,
          product_id,
          case 
            when rating < 0 then 0
            when rating > 5 then 5
            else rating
          end as rating,
          case
            when stock_quantity > 0 or stock_quantity is null  then coalesce(stock_quantity, 0)
            else 0
          end as stock_quantity,
          current_timestamp() as last_updated_at 
          from produto_incremental
          where product_id is not null and category is not null
          """)

In [None]:
# Se o produto já havia sido inserido, atualizamos o registro.
# Caso contrário, inserimos.
spark.sql("""
          merge into dsa_silver_produtos target
          using vw_silver_produtos_incremental source
          on target.product_id = source.product_id
          when matched then 
            update set *
          when not matched then
            insert *
            """)

In [None]:
# Contagem de registros
spark.sql(" select count(*) from dsa_silver_produtos limit 10").show()