***
## Projeto de Disciplina: IBGE - Lavouras Temporárias e Permanentes  

## Resumo:  
Esse projeto tem como objetivo a criação de um Data Warehouse utilizando dados do IBGE no período de 2021 a 2024 sobre a área plantada, área colhida e valor de produção de culturas como arroz, feijão, café e laranja a fim de facilitar a análise dessas métricas.


# Camada Bronze: Ingestão de Dados (Produção Agrícola Municipal)

Esta etapa é responsável pela extração dos dados brutos da API do IBGE, tratando as limitações de requisição e consolidando a base histórica para o Data Warehouse.

## 1. Fonte de Dados
Os dados são extraídos da **Pesquisa Agrícola Municipal (PAM)**, tabela 5457, através da API do SIDRA (Sistema IBGE de Recuperação Automática)

## 2. Escopo da Extração
A extração foi parametrizada para atender aos requisitos de análise do projeto:

* **Período:** 2021 a 2024 (Extraídos em lotes).
* **Produtos (Culturas):**
    * Arroz (em casca)
    * Feijão (em grão)
    * Café (em grão) Total
    * Laranja.
* **Variáveis (Métricas):**
    * Área plantada ou destinada à colheita (Hectares).
    * Área colhida (Hectares)
    * Valor da produção (Mil Reais)

## 3. Extração
A API do SIDRA impõe um limite máximo de registros por consulta, uma extração direta é inviável. Logo, para extrair os dados necessários foi preciso dividir as informações necessárias em várias requisições.

1.  **Período:** Os dados são solicitados em blocos de anos (`2021,2022` e `2023,2024`) para reduzir o volume de dados por lote.
2.  **Variável:** Dentro de cada bloco de tempo, o script itera sobre as variáveis (Área Plantada, Área Colhida, Valor) individualmente.
3.  **Fusão (Merge) e Concatenação:**
    * **Merge:** As variáveis de um mesmo período são unificadas horizontalmente (`pd.merge`), transformando o formato de linhas para colunas.
    * **Concat:** Os diferentes blocos temporais são empilhados verticalmente (`pd.concat`), criando um histórico contínuo.

In [0]:
import requests
import pandas as pd
import os

# Extração dos dados da API do IBGE

def load_ibge_data():
    
    # Extração dividida em várias requisições por periodo, produto e variáveis
    periodos = ['2021,2022', '2023,2024']
    cod_variaveis = ['8331', '216', '215']
    mapa_variaveis = {
        '8331': 'Área Plantada',
        '216': 'Área Colhida',
        '215': 'Valor-Produção'
    }
    dfs_periodo = []
    for periodo in periodos:
        dfs = []
        for cod in cod_variaveis:
            url = f"https://apisidra.ibge.gov.br/values/t/5457/n6/all/v/{cod}/p/{periodo}/c782/40102,40112,40139,40151/f/n"
            response = requests.get(url)
            data = response.json()
            df = pd.DataFrame(data)
            df = df.drop(['NN', 'MN'], axis=1)
            new_header = df.iloc[0].tolist()
            df.columns = new_header
            df = df.rename(columns={'Valor': mapa_variaveis[cod], 'Produto das lavouras temporárias e permanentes': 'Produto'})
            df = df[1:]
            df = df.drop("Variável", axis=1)
            dfs.append(df)
        df_merged = dfs[0]
        for df in dfs[1:]:
            df_merged = pd.merge(df_merged, df, on=['Município', 'Ano', 'Produto'])
        dfs_periodo.append(df_merged)
    df_final = pd.concat(dfs_periodo, ignore_index=True)

    col = df_final.columns
    new_order = [col[1], col[2], col[3], col[0], col[4], col[5]]
    df_final = df_final[new_order]
    df_final = df_final.sort_values(by=['Município', 'Ano', 'Produto'])

    return df_final

In [0]:
# Carregando a tabela
df = load_ibge_data()

display(df)

# Camada Prata: Tratamento e Padronização de Dados

Esta etapa é responsável por limpar os dados brutos vindos da Camada Bronze, realizando tratamento de nulos e convertendo os tipos de dados para formatos adequados a cálculos numéricos.

## 1. Tratamento de Inconsistências (Padrão SIDRA/IBGE)
A API do IBGE retorna caracteres especiais para indicar situações específicas de coleta que impedem a análise direta. Foram aplicadas as seguintes regras de limpeza:

### 1.1. Zero Absoluto (`-`)
O símbolo de hífen (`-`) é utilizado pelo IBGE para indicar a inexistência de produção ou área.
* **Ação:** Substituição pelo caractere `'0'` para permitir a conversão numérica posterior.

### 1.2. Remoção de Registros Inválidos (`..` e `...`)
Existem registros que não possuem valor quantitativo válido:
* `..` ("Não se aplica"): Ocorre quando a unidade de medida não permite agregação (ex: somar toneladas com litros).
* `...` ("Informação não disponível"): Dados perdidos ou não coletados.
* **Ação:** As linhas que contiverem qualquer um destes símbolos nas colunas de métricas (`Área_plantada`, `Área_colhida` ou `Valor`) são removidas do dataset para não comprometerem a tabela fato.

## 2. Casting
Originalmente, os dados são extraídos como texto (*String*). Nesta etapa, é realizado o *casting* explícito para garantir a precisão aritmética na Camada Ouro:

* **(`Integer`):** Aplicado às colunas `Área Plantada` e `Área Colhida`.
* **(`Double`):** Aplicado à coluna `Valor-Produção` (Mil Reais)

## Projeto do Data Warehouse
As transformações serão realizadas com base no seguinte formato de projeto:
![](diagrama_dw.png)  

Algo que pode ser observado no diagrama é a formação de duas hierarquias:  
* Em dim_local, nos valores município -> estado/sigla -> região
* Em dim_produto, nos valores produto -> tipo_produto -> tipo_lavoura

In [0]:
from pyspark.sql.functions import col, when, trim

# Convertendo o df para o spark
spark_df = spark.createDataFrame(df)

# Limpeza dos valores nulos '-'
df_tratado = spark_df.withColumn(
    'Área Plantada', when(col('Área Plantada') == '-', '0').otherwise(col('Área Plantada'))) \
    .withColumn('Área Colhida', when(col('Área Colhida') == '-', '0').otherwise(col('Área Colhida'))) \
    .withColumn('Valor-Produção', when(col('Valor-Produção') == '-', '0').otherwise(col('Valor-Produção')))

valores_invalidos = ["..", "...", "-"]

# Filtrando dados inválidos
filtered_df = df_tratado.filter(
    (~col('Área Plantada').isin(valores_invalidos)) & 
    (~col('Área Colhida').isin(valores_invalidos)) & 
    (~col('Valor-Produção').isin(valores_invalidos))
)

# Convertendo tipos
df_limpo = filtered_df.withColumn('Área Plantada', col('Área Plantada').cast('int')) \
                          .withColumn('Área Colhida', col('Área Colhida').cast('int')) \
                          .withColumn('Valor-Produção', col('Valor-Produção').cast('double'))

display(df_limpo)

## Dimensão Local
Responsável por listar todos os municipios presentes no DW. Ademais, é preciso buscar seu UF na mesma consulta e mapear seu distrito pelo estado

### Lógica de Construção:
1.  **Extração de Valores Únicos:** Identificação de todos os municípios distintos presentes no conjunto de dados tratado (`df_limpo`). Como é possível haver dois municípios pelo mesmo nome, é preciso fazer o Join na tabela fato tanto pelo município quanto pelo UF
2.  **Recuperação UF:** Como cada município vem com a identificação do seu UF, é possível recuperar os dois juntos. Exemplo: Ariquemes - RO
2.  **Região:** Mapeamento dos estados com sua respectiva região através da API do Sidra
3.  **Geração de Chave Substituta (SK):** Criação da coluna `chave_local` (Primary Key), um identificador numérico artificial gerado pelo sistema para otimizar os relacionamentos com a tabela Fato.



In [0]:
# Utilizando a API do IBGE para obter a sigla, nome e região de cada estado

url = "https://servicodados.ibge.gov.br/api/v1/localidades/regioes/1|2|3|4|5/estados"

response = requests.get(url)
data = response.json()

df_regioes = pd.DataFrame([{
    "sigla": item["sigla"],
    "estado": item["nome"],
    "regiao": item["regiao"]["nome"]
} for item in data])

# Separando o nome de cada município e sigla de seu estado,
# Utilizando a sigla para juntar com o df obtido na célula anterior
# Para montar um df com nome do município, sigla e nome de seu estado e qual região pertence

municipios = pd.DataFrame(df['Município'].unique())

municipios['municipio'] = municipios[0].str.split(' - ').str[0]
municipios['sigla'] = municipios[0].str.split(' - ').str[-1]

municipios = municipios.drop(columns=[0])

municipios = municipios.merge(df_regioes, on='sigla')

display(municipios)

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, col
import pandas as pd

# Convertendo para Spark DataFrame
df_dim_local_spark = spark.createDataFrame(municipios)

# Criando a Chave Primária artificial
df_dim_local_spark = df_dim_local_spark.withColumn("chave_local", monotonically_increasing_id())

# Organizando as colunas para o padrão do Data Warehouse (PK primeiro)
df_dim_local = df_dim_local_spark.select(
    "chave_local",
    col("municipio"),
    "estado",
    "regiao",
    "sigla",
)

# Salvando a tabela
spark.sql("DROP TABLE IF EXISTS workspace.default.dim_local")
df_dim_local.write.format("delta").mode("overwrite").saveAsTable("workspace.default.dim_local")

display(df_dim_local)

%md
## Dimensão Tempo

A tabela **Dimensão Tempo** é responsável por catalogar os períodos temporais disponíveis para análise. No contexto deste projeto, a granularidade temporal é **anual**.

### Lógica de Construção:
1.  **Extração de Valores Únicos:** Identificação de todos os anos distintos presentes no conjunto de dados tratado (`df_limpo`).
2.  **Ordenação:** Organização dos anos em ordem cronológica crescente.
3.  **Geração de Chave Substituta (SK):** Criação da coluna `chave_tempo` (Primary Key), um identificador numérico artificial gerado pelo sistema para otimizar os relacionamentos com a tabela Fato.

In [0]:
# Criando a dimensão Tempo
from pyspark.sql.functions import monotonically_increasing_id

# Filtrando os anos e anexando um id para cada ano
df_dim_tempo = df_limpo.select('Ano').distinct().orderBy('Ano').withColumn('chave_tempo', monotonically_increasing_id())

spark.sql("DROP TABLE IF EXISTS workspace.default.dim_tempo")

# Salvando como tabela delta
df_dim_tempo.write.format("delta").mode("overwrite").saveAsTable("workspace.default.dim_tempo")

# Verificar
display(spark.table("workspace.default.dim_tempo"))

%md
## Dimensão Produto
Responsável por listar os produtos análisados no DW, além de classificá-los pelo tipo da lavoura (Temporária e Permanente) e pelo tipo do produto (Cereal, Leguminosa, Fruta e Grão)

### Lógica de Construção:
1.  **Extração de Valores Únicos:** Identificação de todos os produtos distintos presentes no conjunto de dados tratado (`df_limpo`). Nesse caso pelo escopo do projeto temos Laranja, Café (em grão) Total, Arroz (em casca) e Feijão (em grão)
2.  **Classificação de Lavoura:** Mapeamento das lavouras permanentes e temporárias.
2.  **Classificação do Produto:** Mapeamento dos produtos classificados como (Cereal, Leguminosa, Fruta e Grão).
3.  **Geração de Chave Substituta (SK):** Criação da coluna `chave_produto` (Primary Key), um identificador numérico artificial gerado pelo sistema para otimizar os relacionamentos com a tabela Fato.



In [0]:
# Criando a tabela de produtos
from pyspark.sql.functions import when

# Selecionar os produtos 
df_dim_produtos = df_limpo.select('Produto').distinct()

# Mapeamento para tipo de lavoura 
df_dim_produtos = df_dim_produtos.withColumn("tipo_lavoura",
    when(col("Produto") == "Arroz (em casca)", "Temporária")
    .when(col("Produto") == "Café (em grão) Total", "Permanente")
    .when(col("Produto") == "Feijão (em grão)", "Temporária")
    .when(col("Produto") == "Laranja", "Permanente")
    .otherwise("Não mapeado")
)

# Mapeando para o tipo do produto
df_dim_produtos = df_dim_produtos.withColumn("tipo_produto",
    when(col("Produto") == "Arroz (em casca)", "Cereal")
    .when(col("Produto") == "Feijão (em grão)", "Leguminosa")
    .when(col("Produto") == "Café (em grão) Total", "Grão")
    .when(col("Produto") == "Laranja", "Fruta")
    .otherwise("Não Mapeado")
)

# Adicionando o id incremental para cada produto
df_dim_produtos = df_dim_produtos.withColumn('chave_produto', monotonically_increasing_id())

spark.sql("DROP TABLE IF EXISTS workspace.default.dim_produtos")

# Salvando a tabela no formato delta
df_dim_produtos.write.format("delta").mode("overwrite").saveAsTable("workspace.default.dim_produtos")

display(df_dim_produtos)

# Camada Ouro: Tabela Fato

## 1. Estratégia de Junção (Joins)

Para construir a fato, realizamos o cruzamento (`JOIN`) entre os dados tratados da Camada Prata (`df_limpo`) e as três dimensões carregadas.

### 2.1 Preparação das Chaves de Ligação
Como na fonte de dados original, a informação geográfica vem concatenada no formato `"Município - UF"`, enquanto na `dim_local` estes campos estão separados. Aplicamos uma transformação prévia (`df_fato_prep`) para separar o Município da UF e remover espaços em branco (`trim`). Isso garante que a chave de junção seja compatível.

### 2.2 Lógica dos Relacionamentos
Os cruzamentos foram feitos utilizando `INNER JOIN` para garantir que apenas registros com dimensões válidas sejam carregados:

* **Dimensão Tempo:** Junção pelo `Ano`.
* **Dimensão Produto:** Junção pelo `Produto`.
* **Dimensão Local:** Junção **Composta**.
    * `Município (Fato) == Município (Dim)` **E** `UF (Fato) == UF (Dim)`.
  

## 2. Estrutura Final

O resultado final é um DataFrame otimizado para análise OLAP, contendo apenas IDs e Números:

| Tipo | Coluna | Descrição |
| :--- | :--- | :--- |
| **FK** | `chave_local` | Referência para `dim_local` |
| **FK** | `chave_produto` | Referência para `dim_produto` |
| **FK** | `chave_tempo` | Referência para `dim_tempo` |
| **Métrica** | `area_plantada` | Área destinada ao plantio (Hectares) |
| **Métrica** | `area_colhida` | Área efetivamente colhida (Hectares) |
| **Métrica** | `valor` | Valor total da produção (Mil Reais) |

In [0]:
# Criando a tabela Fato 
from pyspark.sql.functions import col, split, element_at, trim, regexp_replace

# Carregando as dimensoes
df_tempo = spark.table("workspace.default.dim_tempo")
df_local = spark.table("workspace.default.dim_local")
df_produtos = spark.table("workspace.default.dim_produtos")

# Ajustando a df_limpo para ter colunas de junção compatíveis
df_fato_prep = df_limpo.withColumn("UF_join", trim(element_at(split(col("Município"), " - "), 2))) \
                          .withColumn("Município_join", trim(regexp_replace(col("Município"), " - .*", "")))

# Fazendo o join das FKs
# Nota: junção em local é com base no município e UF
df_fato = df_fato_prep.join(df_tempo, df_fato_prep.Ano == df_tempo.Ano, "inner") \
                      .join(df_produtos, df_fato_prep.Produto == df_produtos.Produto, "inner") \
                      .join(df_local, 
                           (trim(df_fato_prep.Município_join) == trim(df_local.municipio)) & 
                           (trim(df_fato_prep.UF_join) == trim(df_local.sigla)), 
                           "inner")

# Selecionando as colunas da tabela fato e renomando elas
dim_df_fato = df_fato.select(
    "chave_local",
    "chave_produto",
    "chave_tempo",
    col("Área Plantada").alias("area_plantada"),
    col("Área Colhida").alias("area_colhida"),
    col("Valor-Produção").alias("valor")
)

spark.sql("DROP TABLE IF EXISTS workspace.default.fato")

# Salvando a tabela no formato delta
dim_df_fato.write.format("delta").mode("overwrite").saveAsTable("workspace.default.fato")

display(dim_df_fato)


---
### Dessa forma, todo o fluxo de ETL pode ser observado abaixo:
![](fluxo_etl.png)

# Consultas SQL

### 1) Área de produção perdida (diferença entre área plantada e colhida) em cada um dos estados nos anos de 2023 e 2024:

In [0]:
%sql
SELECT
  l.sigla,
  t.ano,
  SUM(f.area_plantada) - SUM(f.area_colhida) AS area_perdida
FROM
  workspace.default.fato f
  JOIN workspace.default.dim_local l ON f.chave_local = l.chave_local
  JOIN workspace.default.dim_tempo t ON f.chave_tempo = t.chave_tempo
WHERE
  t.ano IN (2023, 2024)
GROUP BY
  l.sigla, t.ano
ORDER BY
  l.sigla, t.ano;



Databricks visualization. Run in Databricks to view.

In [0]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd

df = _sqldf.toPandas()

fig, ax = plt.subplots(figsize=(14,7))

anos = sorted(df['ano'].unique())
largura = 0.35
x = pd.Index(df['sigla'].unique())
x_pos = range(len(x))

for i, ano in enumerate(anos):
    dados_ano = df[df['ano'] == ano].set_index('sigla').reindex(x)['area_perdida']
    ax.bar([p + i*largura for p in x_pos], dados_ano, width=largura, label=str(ano))

ax.set_xticks([p + largura/2 for p in x_pos])
ax.set_xticklabels(x, rotation=0)

ax.set_xlabel("Estado")
ax.set_ylabel("Área Perdida (hectares)")
ax.set_title("Área de Produção Perdida por Estado - 2023 e 2024")
ax.legend(title="Ano")

ax.yaxis.grid(True)
ax.yaxis.set_major_formatter(ticker.StrMethodFormatter('{x:,.0f}'))

plt.tight_layout()
plt.show()

Nessa consulta, é fácil ver que dois estados se destacam muito, além de possuirem uma grande diferença de perda de produção de um ano para o próximo, sendo eles:
* Pernambuco (PE) com 60000 mil hectares perdidos de área de produção no ano de 2023 e aproximadamente 26000 mil hectares em 2024;
* Rio Grande do Sul (RS) com 16000 mil hectares perdidos no ano de 2023 e 48000 no ano de 2024.

### 2) Lucro total de cada cultura em cada um dos anos:

In [0]:
%sql
SELECT
  p.produto,
  t.ano,
  SUM(f.valor) AS valor_total_mil_reais
FROM
  workspace.default.fato f
  JOIN workspace.default.dim_produtos p ON f.chave_produto = p.chave_produto
  JOIN workspace.default.dim_tempo t ON f.chave_tempo = t.chave_tempo
GROUP BY
  p.produto, t.ano
ORDER BY
  t.ano DESC, valor_total_mil_reais DESC;



Databricks visualization. Run in Databricks to view.

In [0]:
df = _sqldf.toPandas()

fig, ax = plt.subplots(figsize=(16,8))

anos = sorted(df['ano'].unique())
largura = 0.15
produtos = df['produto'].unique()
x_pos = range(len(produtos))

for i, ano in enumerate(anos):
    dados_ano = df[df['ano'] == ano].set_index('produto').reindex(produtos)['valor_total_mil_reais'].fillna(0)
    ax.bar([p + i*largura for p in x_pos], dados_ano, width=largura, label=str(ano))

ax.set_xticks([p + largura*(len(anos)-1)/2 for p in x_pos])
ax.set_xticklabels(produtos, ha='center', fontsize=16)

ax.set_xlabel("Produto")
ax.set_ylabel("Valor Total (Mil Reais)")
ax.set_title("Culturas Mais Lucrativas por Ano - Brasil")
ax.legend(title="Ano")

ax.yaxis.grid(True)
ax.yaxis.set_major_formatter(ticker.StrMethodFormatter('{x:,.0f}'))

plt.tight_layout()
plt.show()

A partir do gráfico, podemos observar que a cultura de café sempre foi a mais lucrativa, até mesmo quando consideramos que seu menor lucro foi aproximadamente 35.000.000.000 reais no ano de 2021 e que o maior lucro entre as outras culturas foi obtido a partir da laranja, no ano de 2024, com um pouco menos de 30.000.000.000 reais. Além disso, dois outros pontos interessantes são a variação extremamente baixa no lucro do feijão nos quatro anos, e a forma como o lucro da laranja vem sempre aumentando desde o ano de 2021.

### 3) Área plantada, colhida e o valor total de produção das culturas temporárias e das permanentes no ano de 2021:

In [0]:
%sql
-- Área e produção por tipo de lavoura em 2021
SELECT
  p.tipo_lavoura,
  SUM(f.area_plantada) AS total_area_plantada,
  SUM(f.area_colhida) AS total_area_colhida,
  SUM(f.valor) AS total_valor_producao
FROM
  workspace.default.fato f
  JOIN workspace.default.dim_produtos p ON f.chave_produto = p.chave_produto
  JOIN workspace.default.dim_tempo t ON f.chave_tempo = t.chave_tempo
WHERE
  t.ano = 2021
GROUP BY
  p.tipo_lavoura;

Databricks visualization. Run in Databricks to view.

In [0]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

df = _sqldf.toPandas()

ax = df.rename(columns={
    "total_area_plantada": "Área Plantada (Hectares)",
    "total_area_colhida": "Área Colhida (Hectares)",
    "total_valor_producao": "Valor da Produção (Mil Reais)"
}).plot(kind="bar", x="tipo_lavoura", figsize=(12,6))

ax.yaxis.set_major_formatter(ticker.StrMethodFormatter('{x:,.0f}'))

plt.xticks(rotation=0)
plt.ylabel("Valores")
plt.title("Produção por tipo de lavoura (2021)")

for container in ax.containers:
    ax.bar_label(container, fmt='{:,.0f}')
    
plt.show()

O que pode ser visto nesse gráfico é que, no ano de 2021, mesmo que as culturas permanentes utilizaram uma menor área para plantação, 2.413.556 hectares em comparação com os 4.456.803 das temporárias, sua produção ainda foi bem maior sendo 47.367.747.000 reais para as permanentes e 31.190.499.000 para as temporárias.

---
## Considerações Finais
Sobre o desenvolvimento do projeto, algo que foi possível observar para o nosso tema foi a extrema facilidade de utilização dos dados, devido o formato limpo e consistente da API do IBGE, sendo apenas a transformação para obter o estado e região de cada município um pouco mais complicada, que necessitou a utilização de uma outra API do IBGE, mas que mesmo assim não apresentou nenhuma dificuldade.  
Uma pequena limitação do projeto foi a quantidade muito grande de dados e o limite de valores por requisição da API dos SIDRA, que levava bastante tempo. A ideia inicial era utilizar uma variedade maior de produtos de categorias diferentes, mas cada variável, produto e ano diferentes requisitados pela API aumentava muito rapidamente a quantidade de valores enviados pela formatação dos dados, o que fez com que o grupo escolhesse apenas 4 produtos e 3 variáveis.