## MVP Engenharia de Dados - Notebook 2: Camada SILVER

**Objetivo:** Ler os dados brutos da camada Bronze, aplicar regras de qualidade e transformação (T) e preparar os dados para a modelagem.

**Processo de Transformação (T):**

Este notebook resolve os seguintes problemas de qualidade de dados:

1. **Datas e Horas:**
    * **Problema:** A coluna `DATA_HORA_GERACAO` foi lida como `string`.
    * **Solução:** Convertemos para o tipo `timestamp` usando a função `to_timestamp()`.

2. **Valores Monetários:**
    * **Problema:** Colunas como `PRECO_VENDA_UNIDADE` e `VR_DESCONTO` foram lidas como `string`, pois continham vírgulas (ex: "10,90").
    * **Solução:** Usamos `regexp_replace()` para trocar "," por "." e depois `cast(DecimalType)` para converter em um tipo numérico preciso.

3. **Dados de PDV e Sócio (Parsing):**
    * **Problema:** A coluna `INFORMACAO_TRIBUTO` continha informações relevantes e outras nem tanto em um único texto (ex: "POS : 234 ; PDV: Bar Misto;; ALERJ...").
    * **Solução:** Usamos `regexp_extract()` para "quebrar" esta coluna em duas novas colunas: `ID_SOCIO` e `NOME_PDV` contendo somente as informações relevantes para a análise.

4. **Categorização de Produtos (Nova Dimensão):**
    * **Problema:** A tabela de vendas não possui a informação de "Grupo" (Categoria) do produto, o que impede a distinção entre comidas, bebidas... na análise final.
    * **Solução:** Tratamos a tabela auxiliar `bronze_materiais_raw`, selecionando apenas as colunas de vínculo (`Código` e `Grupo`) e renomeando para `COD_PRODUTO` e `NOME_GRUPO`, criando a tabela `silver_dim_grupos` pronta para o cruzamento.

**Carga (Modelagem):**

Ao final, separei os dados limpos em três tabelas para preparar a junção na camada Gold:

* `default.silver_dim_produto` (Dados do Produto vindos da venda)
* `default.silver_fato_vendas` (Dados da Transação)
* `default.silver_dim_grupos` (Dados de Categoria vindos do cadastro)

In [0]:
# 1. Ler a tabela BRONZE
bronze_df = spark.read.table("default.bronze_vendas_raw")

# 2. Mostrar o Schema
bronze_df.printSchema()

In [0]:
from pyspark.sql.functions import col, to_timestamp, regexp_replace, regexp_extract, trim
from pyspark.sql.types import DecimalType

# Definições para Vendas
COLUNA_BAGUNCADA = "INFORMACAO_TRIBUTO"
timestamp_cols = ["DATA_HORA_GERACAO"]
money_cols = ["PRECO_VENDA_UNIDADE", "VR_DESCONTO", "BC"]

# Carregar as tabelas BRONZE
bronze_vendas_df = spark.read.table("default.bronze_vendas_raw")
bronze_grupos_df = spark.read.table("default.bronze_materiais_raw")

print("Processando Vendas...")

# Filtrar somente as notas fiscais emitidas
cleaned_df = bronze_vendas_df.filter(col("ESTADO") == "E")

# Limpar e converter colunas de Data/Hora
for col_name in timestamp_cols:
    cleaned_df = cleaned_df.withColumn(
        col_name, to_timestamp(col(col_name), "yyyy-MM-dd HH:mm:ss.SSS")
    )

# Limpar e converter colunas de Dinheiro
for col_name in money_cols:
    cleaned_df = (
        cleaned_df.withColumn(
            col_name + "_limpo", regexp_replace(col(col_name), ",", ".")
        )
        .withColumn(col_name, col(col_name + "_limpo").cast(DecimalType(10, 2)))
        .drop(col_name + "_limpo")
    )

# Parsing e Extração (Sócio e PDV)
cleaned_df = cleaned_df.withColumn(
    "ID_SOCIO", regexp_extract(col(COLUNA_BAGUNCADA), r"POS : (\d+)", 1)
).withColumn("NOME_PDV", regexp_extract(col(COLUNA_BAGUNCADA), r"PDV: (.*?);", 1))

# Renomear colunas-chave
cleaned_df = (
    cleaned_df.withColumnRenamed("DESCR", "NOME_PRODUTO")
    .withColumnRenamed("QTD", "QUANTIDADE")
    .withColumnRenamed("PRECO_VENDA_UNIDADE", "PRECO_UNITARIO")
    .withColumnRenamed("VR_DESCONTO", "VALOR_DESCONTO")
    .withColumnRenamed("DATA_HORA_GERACAO", "DATA_HORA")
    .withColumnRenamed("BC", "VALOR_TOTAL_ITEM")
)

# Separar: Criar a Dimensão Produto e a Fato Vendas
dim_produto_df = cleaned_df.select("COD_PRODUTO", "NOME_PRODUTO").distinct()

colunas_fato = [
    "NUM_NFCE", "NUM_ITEM", "DATA_HORA", "COD_PRODUTO",
    "QUANTIDADE", "PRECO_UNITARIO", "VALOR_TOTAL_ITEM",
    "VALOR_DESCONTO", "ID_SOCIO", "NOME_PDV"
]
fato_vendas_df = cleaned_df.select(*colunas_fato)


# Definições para Grupos
print("Processando Grupos...")

# Selecionar e renomear colunas para o padrão Silver
# Código -> COD_PRODUTO (Para fazer o join depois)
# Grupo  -> NOME_GRUPO
dim_grupos_df = bronze_grupos_df.select(
    col("Código").alias("COD_PRODUTO"),
    trim(col("Grupo")).alias("NOME_GRUPO")
).distinct()


# SALVAMENTO
print("Salvando tabelas na Camada SILVER")

# Salvar Fato Vendas
fato_vendas_df.write.format("delta").mode("overwrite").saveAsTable("default.silver_fato_vendas")
print("- silver_fato_vendas: OK")

# Salvar Dim Produto
dim_produto_df.write.format("delta").mode("overwrite").saveAsTable("default.silver_dim_produto")
print("- silver_dim_produto: OK")

# Salvar Dim Grupos (A Nova!)
dim_grupos_df.write.format("delta").mode("overwrite").saveAsTable("default.silver_dim_grupos")
print("- silver_dim_grupos: OK")

print("\nSUCESSO! Todas as 3 tabelas Silver foram atualizadas.")

## Verificação de Qualidade (QA) da Camada Silver

Nesta seção evidenciei que todas as regras de negócio e transformações foram aplicadas com sucesso no pipeline.

Irei inspecionar o Schema (a estrutura) e os Dados (a amostra) das três tabelas que acabei de criar: `silver_fato_vendas`, `silver_dim_produto` e `silver_dim_grupos`.

**Pontos de verificação a observar nas tabelas abaixo:**

1. **Tipos de Dados Corrigidos:**
    * `DATA_HORA` agora é do tipo `timestamp` (não mais `string`).
    * `PRECO_UNITARIO` e `VALOR_TOTAL_ITEM` agora são `decimal` (não mais `string` com vírgula).

2. **Regras de Negócio Aplicadas:**
    * **Parsing (Extração):** As colunas `ID_SOCIO` e `NOME_PDV` foram extraídas com sucesso da fonte original.
    * **Filtro:** Apenas notas com `ESTADO` = 'E' foram processadas (embora isso não seja visível na amostra, o count de linhas seria menor que o da Bronze).
    * **Enriquecimento (Categorização):** A nova tabela `silver_dim_grupos` foi gerada e normalizada para permitir a classificação correta dos produtos (comida, bebida...) na etapa Gold.

3. **Modelo Otimizado (Colunas Removidas):**
    * A `silver_fato_vendas` contém apenas as colunas relevantes para a análise (removendo `ICMS`, `IDORIGEM`, etc.).
    * A `silver_dim_produto` contém apenas `COD_PRODUTO` e `NOME_PRODUTO`.
    * A `silver_dim_grupos` contém apenas `COD_PRODUTO` e `NOME_GRUPO` (removendo colunas desnecessárias do cadastro original).

4. **Verificação de Nulos (Integridade):**
    * Foi validado (através de `display()` ou `count()`) que as colunas métricas essenciais para a análise (como `QUANTIDADE`, `PRECO_UNITARIO`, `DATA_HORA`) não contêm valores nulos. Isso garante a integridade dos cálculos de faturamento  e agrupamentos na camada de Análise.


In [0]:
# Verificação pós-silver
from pyspark.sql.functions import col, count, when

print("Carregando tabelas Silver para verificação")
fato_df = spark.read.table("default.silver_fato_vendas")
dim_produto_df = spark.read.table("default.silver_dim_produto")
dim_grupos_df = spark.read.table("default.silver_dim_grupos")

# Verificação de Nulos
print("="*50)
print("1. VERIFICANDO INTEGRIDADE DA FATO (VENDAS)")
print("="*50)

colunas_criticas = ["NUM_NFCE", "DATA_HORA", "COD_PRODUTO", "VALOR_TOTAL_ITEM"]
null_counts = fato_df.select([count(when(col(c).isNull(), c)).alias(c) for c in colunas_criticas]).first().asDict()

print("Nulos encontrados:")
for c, v in null_counts.items():
    print(f"- {c}: {v}")

# Análise de Parsing
print("\n" + "="*50)
print("2. VERIFICANDO PARSING (SÓCIO/PDV)")
print("="*50)
parsing_check = fato_df.select(
    count("*").alias("Total_Linhas"),
    count(when(col("ID_SOCIO").isNull(), 1)).alias("Sem_Socio"),
    count(when(col("NOME_PDV").isNull(), 1)).alias("Sem_PDV")
)
display(parsing_check)

# Verificação de Schemas
print("\n" + "="*50)
print("3. VERIFICANDO SCHEMAS (ESTRUTURA)")
print("="*50)
print("\nFATO VENDAS")
fato_df.printSchema()
print("\nDIM PRODUTO")
dim_produto_df.printSchema()

# Verificação da Tabela de Grupos
print("\n" + "="*50)
print("4. VERIFICANDO NOVA DIMENSÃO: GRUPOS")
print("="*50)
print("\nSchema (Deve ter COD_PRODUTO e NOME_GRUPO)")
dim_grupos_df.printSchema()

print(f"\nTotal de produtos categorizados: {dim_grupos_df.count()}")

print("\nAmostra dos Grupos")
display(dim_grupos_df.limit(5))

In [0]:
# Lista de todas as tabelas
tabelas = [
    "default.bronze_vendas_raw",
    "default.bronze_materiais_raw",
    "default.silver_fato_vendas",
    "default.silver_dim_produto",
    "default.silver_dim_grupos"
]

# Mostra todas as tabelas
for t in tabelas:
    print(f"\n{'='*50}")
    print(f"AMOSTRA DA TABELA: {t}")
    print(f"{'='*50}")
    display(spark.read.table(t).limit(5))