In [0]:
# Setup de Importação 
import sys
import os

# Pega o caminho do notebook atual
caminho_notebook = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
caminho_workspace = f"/Workspace{caminho_notebook}"

# Sobe dois níveis para achar a Raiz
pasta_notebooks = os.path.dirname(caminho_workspace)
raiz_projeto = os.path.dirname(pasta_notebooks)

#. Adiciona ao Path
sys.path.append(raiz_projeto)
print(f"Raiz do projeto adicionada: {raiz_projeto}")

In [0]:
import sys, os
from pyspark.sql.functions import col, split, trim, substring, length, expr, when, lit, current_timestamp, max
from pyspark.sql.types import IntegerType
from src.utils.dataframe_utils import *
from src.utils.delta_utils import salvar_delta_merge
from pyspark.sql.functions import size, element_at

In [0]:
# Definição dos Widgets
dbutils.widgets.text("camada_origem", "silver", "1. Schema Origem")
dbutils.widgets.text("camada_destino", "gold", "2. Schema Destino")
dbutils.widgets.text("tabela", "api_ibge_6579", "3. Nome Tabela")

# Captura dos valores
schema_origem = dbutils.widgets.get("camada_origem")
schema_destino = dbutils.widgets.get("camada_destino")
nome_tabela = dbutils.widgets.get("tabela")

# Define caminhos completos
tabela_silver = f"{schema_origem}.{nome_tabela}"
tabela_gold = f"{schema_destino}.{nome_tabela}"

print(f"Schema Origem: {schema_origem}")
print(f"Schema Destino: {schema_destino}")
print(f"Tabela: {nome_tabela}")
print(f"Tabela silver: {tabela_silver}")
print(f"Tabela gold: {tabela_gold}")

In [0]:
# Leitura da Silver
df_silver = spark.read.table("silver.api_ibge_6579") # Ajuste o nome se salvou diferente

In [0]:
# Seleção das colunas descritivas
df_dim_base = df_silver.select(
    "id_ibge", 
    "nome_local", 
    "cod_nivel_territorial", 
    "desc_nivel_territorial"
).distinct()

# Extrair UF
df_dim_localidade = df_dim_base.withColumn(
    "split_nome", split(col("nome_local"), " - ") 
).withColumn(
    "nome_limpo", trim(col("split_nome")[0]) 
).withColumn(
    "sigla_uf", 
    when(
        (col("desc_nivel_territorial") == "Município") & (size(col("split_nome")) > 1), 
        trim(col("split_nome")[1]) 
    ) 
    .otherwise(None) 
)

# Seleção Final da Dimensão
df_dim_final = df_dim_localidade.select(
    "id_ibge",
    col("nome_limpo").alias("nome"),
    "sigla_uf",
    "cod_nivel_territorial",
    "desc_nivel_territorial"
)

In [0]:
# Seleção das colunas 
df_fct = df_silver.select(
    col("id_ibge"),      
    col("ano"),          
    col("populacao"),    
    col("data_processamento_silver"), #coluna levada para facilitar manutenções futuras para engenharia
    col("data_ingestao_bronze"), #coluna levada para facilitar manutenções futuras para engenharia
    col("origem") #coluna levada para facilitar manutenções futuras para engenharia
)

# Regra de Qualidade 
df_fct_final = df_fct.filter(
    col("id_ibge").isNotNull() & 
    col("ano").isNotNull() &
    col("populacao").isNotNull()
)

# 3. Enriquecimento Gold
# Adicionamos a data que essa tabela GOLD foi gerada
df_fct_final = df_fct_final.withColumn("data_processamento_gold", current_timestamp())

In [0]:
# Garante schema Gold
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

In [0]:
# Gravação
salvar_delta_merge(
    df=df_dim_final,
    tabela_destino="gold.dim_localidade",
    chaves_match=["id_ibge"], 
    fazer_update=True
)

In [0]:
# 4. Gravação
salvar_delta_merge(
    df=df_fct_final,
    tabela_destino="gold.fct_populacao",
    chaves_match=["id_ibge", "ano"],
    modo_carga_inicial="overwrite",
    fazer_update=True
)

In [0]:
# Simulação do que um Analista de Dados faria para carregar esses dados no Power BI para construir um relatório

df_dim = spark.read.table("gold.dim_localidade")
df_fct = spark.read.table("gold.fct_populacao")

df_validacao = df_fct.join(df_dim, on="id_ibge", how="inner")

ultimo_ano = df_fct.select(max("ano")).collect()[0][0]

print(f"Gerando ranking de MUNICÍPIOS para o ano de: {ultimo_ano}")

df_relatorio = df_validacao \
    .filter(col("ano") == ultimo_ano) \
    .filter(col("desc_nivel_territorial") == "Município") \
    .select(
        col("nome").alias("Cidade"),
        col("sigla_uf").alias("UF"),
        col("populacao").alias("População"),
        col("desc_nivel_territorial").alias("Tipo")
    ) \
    .orderBy(col("População").desc()) \
    .limit(5)

display(df_relatorio)