# # 🧪 Notebook: Data Quality Checks
# 
# Este notebook executa verificações de qualidade de dados nas tabelas processadas.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [0]:
spark = SparkSession.builder.getOrCreate()

# ## 📏 Definição das regras de qualidade
# 
### Cada tabela tem um conjunto de colunas esperadas que serão validadas.
### Caso alguma coluna não exista, será mostrado um aviso em vez de erro.

In [0]:
rules = {
    "default.posts_creator": ["creator_id", "title", "likes", "views", "post_id"],
    "default.users_yt": ["user_id", "wiki_page"],
    "default.creators_scrape_wiki": ["page_name", "url"]
}

# ## 🔍 Execução das validações
# 
### Para cada tabela:
### - Verifica se existe no catálogo.  
### - Checa valores nulos por coluna.  
### - Se `post_id` não existir, cria um ID sintético.

###Deixei uma com colunas erradas para exemplificar

In [0]:
for table, checks in rules.items():
    print(f"\n🔎 Validando tabela: {table}")
    try:
        # Carregar a tabela
        df = spark.table(table)
        
        # Se precisar materializar em serverless, evitar .cache()/.persist()
        df = df.localCheckpoint()  
        
        # Criar post_id sintético se necessário
        if "post_id" in checks and "post_id" not in df.columns:
            print("⚠️ Coluna post_id não encontrada, criando identificador sintético...")
            df = df.withColumn("post_id", F.monotonically_increasing_id())
        
        # Executa verificações de colunas
        for col in checks:
            if col in df.columns:
                nulls = df.filter(F.col(col).isNull()).count()
                print(f"   - Coluna {col}: {nulls} valores nulos")
            else:
                print(f"   ⚠️ Coluna {col} não encontrada nesta tabela. Colunas disponíveis: {df.columns}")
                
        # Exemplo extra: contagem de registros duplicados por post_id (se existir)
        if "post_id" in df.columns:
            dups = df.groupBy("post_id").count().filter("count > 1").count()
            print(f"   - Registros duplicados (post_id): {dups}")
            
    except Exception as e:
        print(f"❌ Erro ao validar {table}: {e}")


🔎 Validando tabela: default.posts_creator
⚠️ Coluna post_id não encontrada, criando identificador sintético...
   - Coluna creator_id: 0 valores nulos
   - Coluna title: 0 valores nulos
   - Coluna likes: 0 valores nulos
   - Coluna views: 0 valores nulos
   - Coluna post_id: 0 valores nulos
   - Registros duplicados (post_id): 0

🔎 Validando tabela: default.users_yt
   - Coluna user_id: 0 valores nulos
   - Coluna wiki_page: 0 valores nulos

🔎 Validando tabela: default.creators_scrape_wiki
   ⚠️ Coluna page_name não encontrada nesta tabela. Colunas disponíveis: ['wiki_page']
   ⚠️ Coluna url não encontrada nesta tabela. Colunas disponíveis: ['wiki_page']
