### Avaliação Técnica – Engenheiro de Dados
### THALES DIAS BRAGA - thales89dias@gmail.com

Início da criação do pipeline.

Configuração dos paths de cada camada e leitura do arquivo XML.

In [0]:
# Configuração de caminhos 
import shutil

# Paths para cada camadas
path_bronze = "/Volumes/workspace/tmp/tdb_vol/bronze"
path_silver = "/Volumes/workspace/tmp/tdb_vol/silver"
path_gold   = "/Volumes/workspace/tmp/tdb_vol/gold"

# Limpeza dos paths para o caso de uma re-execução 
dbutils.fs.rm(path_bronze, True)
dbutils.fs.rm(path_silver, True)
dbutils.fs.rm(path_gold, True)

# Caminho do arquivo enviado 
input_path = "/Volumes/workspace/tmp/tdb_vol/all.xml" 


Camada Bronze : Foi necessário nesta fase de ingestão, transformar os dados de XML para o formato Delta. Este procedimento ajuda na performance.

In [0]:
from pyspark.sql.functions import col, lit, current_timestamp

# Definição do elemento raiz
row_tag = "ValorDescritoPorSuasDimensoes"

# Leitura do XML
df_raw = spark.read \
    .format("xml") \
    .option("rowTag", row_tag) \
    .load(input_path)

# Adicionar metadados de ingestão
df_bronze = df_raw.withColumn("ingestion_date", current_timestamp()) \
                  .withColumn("source_file", lit(input_path))

# Gravação na Camada Bronze em formato Delta
df_bronze.write.format("delta").mode("overwrite").save(path_bronze)

print("Camada Bronze processada com sucesso.")
display(df_bronze.limit(5))


Camada Bronze processada com sucesso.


D1C,D1N,D2C,D2N,D3C,D3N,D4C,D4N,D5C,D5N,D6C,D6N,D7C,D7N,D8C,D8N,D9C,D9N,MC,MN,NC,NN,V,ingestion_date,source_file
Unidade da Federação (Código),Unidade da Federação,Ano (Código),Ano,Variável (Código),Variável,List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),Unidade de Medida (Código),Unidade de Medida,Nível Territorial (Código),Nível Territorial,Valor,2026-01-31T16:21:51.346Z,/Volumes/workspace/tmp/tdb_vol/all.xml
11,Rondônia,2001,2001,9324,População residente estimada,List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),45,Pessoas,3,Unidade da Federação,1407886,2026-01-31T16:21:51.346Z,/Volumes/workspace/tmp/tdb_vol/all.xml
11,Rondônia,2002,2002,9324,População residente estimada,List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),45,Pessoas,3,Unidade da Federação,1431777,2026-01-31T16:21:51.346Z,/Volumes/workspace/tmp/tdb_vol/all.xml
11,Rondônia,2003,2003,9324,População residente estimada,List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),45,Pessoas,3,Unidade da Federação,1455907,2026-01-31T16:21:51.346Z,/Volumes/workspace/tmp/tdb_vol/all.xml
11,Rondônia,2004,2004,9324,População residente estimada,List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),List(true),45,Pessoas,3,Unidade da Federação,1562085,2026-01-31T16:21:51.346Z,/Volumes/workspace/tmp/tdb_vol/all.xml


Camada Silver : Renomear colunas, remover colunas desnecessárias, remover valores nulos e redefinir tipos de dados.

In [0]:
from pyspark.sql.functions import col, when

# Leitura da Bronze
df_b = spark.read.format("delta").load(path_bronze)

# Renomeando colunas : D1 -> Estado/UF, D2 -> Ano, D3 -> Variável, V -> Valor
df_silver = df_b.select(
    col("D1C").alias("cod_uf"),
    col("D1N").alias("nome_uf"),
    col("D2N").alias("ano"),
    col("D3N").alias("variavel"),
    col("V").alias("valor"),
    col("MC").alias("cod_medida"),
    col("MN").alias("unidade_medida")
)

# 1. Remover linha que é apenas cabeçalho
# 2. Converter tipos de dados
# 3. Filtrar e remover valores NULL especificamente para o ano de 2001, com o operador '~' 
df_silver_clean = df_silver \
    .filter(col("ano") != "Ano") \
    .withColumn("ano", col("ano").cast("integer")) \
    .withColumn("valor", col("valor").cast("long")) \
    .withColumn("cod_uf", col("cod_uf").cast("integer")) \
    .filter(~((col("ano") == 2001) & (col("valor").isNull()))) 



# Gravação na Camada Silver
df_silver_clean.write.format("delta").mode("overwrite").save(path_silver)

print("Camada Silver processada e limpa (NULLs de 2001 removidos).")
display(df_silver_clean)



Camada Silver processada e limpa (NULLs de 2001 removidos).


cod_uf,nome_uf,ano,variavel,valor,cod_medida,unidade_medida
11,Rondônia,2001,População residente estimada,1407886,45,Pessoas
11,Rondônia,2002,População residente estimada,1431777,45,Pessoas
11,Rondônia,2003,População residente estimada,1455907,45,Pessoas
11,Rondônia,2004,População residente estimada,1562085,45,Pessoas
11,Rondônia,2005,População residente estimada,1534594,45,Pessoas
11,Rondônia,2006,População residente estimada,1562417,45,Pessoas
11,Rondônia,2008,População residente estimada,1493565,45,Pessoas
11,Rondônia,2009,População residente estimada,1503928,45,Pessoas
11,Rondônia,2011,População residente estimada,1576455,45,Pessoas
11,Rondônia,2012,População residente estimada,1590011,45,Pessoas


Camada Gold : Focada em criar análises.

Análise 1: Calcular o crescimento ano a ano por estado. Crescimento absoluto e percentual. 

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, round, col  # Adicionado 'col' para o filtro

# Leitura da Silver
df_s = spark.read.format("delta").load(path_silver)

# Análise 1: Calcular o crescimento ano a ano por estado. Crescimento absoluto e percentual. 
window_spec = Window.partitionBy("nome_uf").orderBy("ano")

# Limpando valores NULL
df_gold_growth = df_s.withColumn("valor_ano_anterior", lag("valor").over(window_spec)) \
                     .withColumn("crescimento_absoluto", col("valor") - col("valor_ano_anterior")) \
                     .withColumn("crescimento_pct", round(((col("valor") - col("valor_ano_anterior")) / col("valor_ano_anterior")) * 100, 2)) \
                     .select("nome_uf", "ano", "valor", "crescimento_absoluto", "crescimento_pct") \
                     .filter(col("crescimento_pct").isNotNull()) 


# Gravação na Camada Gold
df_gold_growth.write.format("delta").mode("overwrite").save(path_gold)

print("Camada Gold gerada com indicadores de crescimento (linhas com NULL removidas).")
display(df_gold_growth)



Camada Gold gerada com indicadores de crescimento (linhas com NULL removidas).


nome_uf,ano,valor,crescimento_absoluto,crescimento_pct
Acre,2002,586942,12587,2.19
Acre,2003,600595,13653,2.33
Acre,2004,630328,29733,4.95
Acre,2005,669736,39408,6.25
Acre,2006,686652,16916,2.53
Acre,2008,680075,-6577,-0.96
Acre,2009,691132,11057,1.63
Acre,2011,746386,55254,7.99
Acre,2012,758786,12400,1.66
Acre,2013,776463,17677,2.33


### Pipeline criado e pronto para consumir os dados diretamente no Databricks, via spark sql.

Análise 2: Nível populacional por estado.

In [0]:
# Análise 2: Nível populacional por estado.

from pyspark.sql.functions import col, max as spark_max

df_silver = spark.read.format("delta").load(path_silver)
max_ano = df_silver.agg(spark_max(col("ano")).alias("max_ano")).collect()[0]["max_ano"]

result = df_silver.filter(col("ano") == max_ano) \
    .select(col("nome_uf"), col("valor").alias("populacao_atual")) \
    .orderBy(col("valor").desc())

display(result)


nome_uf,populacao_atual
São Paulo,46081801
Minas Gerais,21393441
Rio de Janeiro,17223547
Bahia,14870907
Paraná,11890517
Rio Grande do Sul,11233263
Pernambuco,9562007
Ceará,9268836
Pará,8711196
Santa Catarina,8187029


Análise 3: Qual percentual de evolução populacional de um estado específico?

In [0]:
# Análise 3: Qual a evolução temporal de um estado específico (ex:DF)?

from pyspark.sql.functions import col

df_gold = spark.read.format("delta").load(path_gold)
result = df_gold.filter(col("nome_uf") == "Distrito Federal") \
    .select(col("ano"), col("valor").alias("populacao"), col("crescimento_pct").alias("variacao_percentual")) \
    .orderBy(col("ano"))

display(result)


ano,populacao,variacao_percentual
2002,2145839,2.31
2003,2189789,2.05
2004,2282049,4.21
2005,2333108,2.24
2006,2383784,2.17
2008,2557159,7.27
2009,2606885,1.94
2011,2609998,0.12
2012,2648532,1.48
2013,2789761,5.33
