
### Modelo Hedônico IPTU — Treinamento Multifaixas com CatBoost

Este notebook realiza o treinamento de um **modelo hedônico** para estimar o valor de transação (ITBI/IPTU),
utilizando **CatBoostRegressor** com variáveis cadastrais do imóvel e **agregados mensais defasados (lag-1)**
por **setor fiscal** e **grupo de uso** (TERRENO vs EDIFICADO).

A abordagem é **multifaixas**: treina-se um modelo por combinação de:
- `grupo_uso` ∈ {`TERRENO`, `EDIFICADO`}
- `faixa` ∈ faixas de valor (<=500k, 500k–2mi, 2mi–10mi, >10mi)

A variável-alvo é modelada em **log** (`log1p(valor / escala)`) para estabilizar variância e melhorar ajuste.

%% [markdown]
### 1) Imports

- **glob / Path**: varredura recursiva dos Parquets na camada staging.
- **pandas / numpy**: manipulação de dados e engenharia de atributos.
- **catboost**: regressão com suporte nativo a variáveis categóricas.
- **sklearn**: validação cruzada (KFold) e métricas (MAE e R²).

In [2]:
!pip install -U scikit-learn catboost -q

In [1]:
import os
import glob
import json
import numpy as np
import pandas as pd
from pathlib import Path
from catboost import CatBoostRegressor, Pool
from sklearn.model_selection import KFold
from sklearn.metrics import mean_absolute_error, r2_score


### 2) Configurações

- `STAGING_PATH`: caminho para os Parquets já padronizados (camada staging).
- `ARTEFATOS_PATH`: diretório para salvar modelos e metadados do treinamento.
- `TARGET_SCALE`: escala aplicada antes do log para melhorar estabilidade numérica.

#### Multifaixas
As faixas são definidas por `FAIXAS_BINS` e `FAIXAS_LABELS`, utilizadas para:
- segmentar o treinamento por nível de preço
- permitir modelos especializados em regimes distintos (ex.: imóveis baratos vs caros)

#### Variáveis categóricas
`CAT_COLS` são passadas explicitamente ao CatBoost via `cat_features` (sem necessidade de one-hot).

In [2]:
BRONZE_PATH = "../data/bronze/itbi"
ARTEFATOS_PATH = "../data/artefatos_modelo_multifaixas"
Path(ARTEFATOS_PATH).mkdir(exist_ok=True)

TARGET_SCALE = 1000.0

FAIXAS_BINS = [0, 500_000, 2_000_000, 10_000_000, 30_000_000, 100_000_000, np.inf]
FAIXAS_LABELS = [
  "faixa1_<=500k",
  "faixa2_500k_2mi",
  "faixa3_2mi_10mi",
  "faixa4_10mi_30mi",
  "faixa5_30mi_100mi",
  "faixa6_>100mi"
]


CAT_COLS = [
    "setor_fiscal",
    "descricao_uso_iptu",
    "descricao_padrao_iptu"
]

### 3) Carregar Parquets (camada staging)

- Varre recursivamente todos os arquivos `.parquet` em `STAGING_PATH`.
- Concatena em um único DataFrame para treinamento.

In [10]:
print(">> Carregando parquets...")
files = glob.glob(f"{BRONZE_PATH}/**/*.parquet", recursive=True)
df = pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)
print("Base carregada:", len(df), "linhas")

>> Carregando parquets...
Base carregada: 2600551 linhas


### 4) Filtros de qualidade e recorte do domínio

#### Objetivo
Aplicar regras de elegibilidade e consistência para reduzir ruído e manter apenas registros comparáveis ao fenômeno modelado (transações válidas e atributos coerentes).

#### Estratégia
Os filtros são aplicados em **etapas**, e a cada etapa registramos:
- linhas **antes / depois**
- linhas **perdidas**
- **critério** aplicado

Esse log é retornado como um dicionário `audit` (auditoria) e pode ser salvo em CSV/JSON.

#### Regras aplicadas (por etapa)

- **TN (natureza da transação)**  
  Mantém **compra e venda**; opcionalmente inclui **permuta** (`include_permuta=True`).

- **STATUS (situação do SQL)**  
  Mantém `situacao_sql ∈ {Ativo Territorial}`.

- **VALOR (faixa de valor declarado)**  
  Mantém `min_valor ≤ valor_transacao_declarado ≤ max_valor`.

- **PROP (proporção transmitida)**  
  Mantém `0 < proporcao_transmitida_percent ≤ 100`.

- **AREAS (consistência área vs tipo do imóvel)** *(heurística via `descricao_uso_iptu`)*  
  - Terreno → `area_terreno_m2 > 0`  
  - Edificado → `area_construida_m2 > 0`  
  - Opcional: se `allow_land_on_edificado=False`, remove edificados com `area_terreno_m2 > 0` (não recomendado na maioria dos casos).

- **ANO_CONS (ano de conclusão da construção)**  
  Mantém `ano_conclusao_construcao_iptu ∈ [ano_min, ano_max]` ou NA.

- **DATA (data da transação plausível)**  
  Mantém `data_transacao ∈ [2000..hoje]` ou NA.

- **DEDUP (deduplicação)**  
  Remove duplicatas por `['cadastro_sql', 'data_transacao', 'valor_transacao_declarado']` (quando presentes).


In [11]:
import numpy as np
import pandas as pd
from datetime import datetime

def filtrar_base(
    df: pd.DataFrame,
    *,
    include_permuta: bool = False,
    min_valor: float = 5_000.0,
    max_valor: float = 50_000_000.0,
    ano_min: int = 1900,
    ano_max: int | None = None,
    situacao = ["Ativo Predial", "Ativo Territorial"],
    allow_land_on_edificado: bool = True,
    verbose: bool = True,
) -> tuple[pd.DataFrame, dict]:


    if ano_max is None:
        ano_max = datetime.today().year

    def _count(stage, df_before, df_after, why, stats):
        b, a = len(df_before), len(df_after)
        drop = b - a
        stats.append({"etapa": stage, "antes": b, "depois": a, "perdidos": drop, "criterio": why})
        if verbose:
            print(f"[{stage}] {why} | antes={b:,} → depois={a:,} | perdidos={drop:,}")

    stats: list[dict] = []
    df0 = df.copy()
    _count("START", df0, df0, "inicial", stats)

    # -------- 1) natureza_transacao
    if "natureza_transacao" in df0.columns:
        df_before = df0
        tn = df0["natureza_transacao"].astype(str).str.strip()

        if include_permuta:
            mask = tn.eq("1.Compra e venda") | tn.str.contains("permuta", case=False, na=False)
            why = 'natureza_transacao ∈ {"1.Compra e venda", "*permuta*"}'
        else:
            mask = tn.eq("1.Compra e venda")
            why = 'natureza_transacao == "1.Compra e venda"'

        df0 = df0[mask]
        _count("TN", df_before, df0, why, stats)

    # -------- 2) situacao_sql (DOIS casos)
    if "situacao_sql" in df0.columns:
        df_before = df0
        st = df0["situacao_sql"].astype(str).str.strip()
        mask = st.isin(situacao)
        df0 = df0[mask]
        _count("STATUS", df_before, df0, 'situacao_sql ∈ {"Ativo Predial","Ativo Territorial"}', stats)

    # -------- 3) valor_transacao_declarado
    if "valor_transacao_declarado" in df0.columns:
        df_before = df0
        val = pd.to_numeric(df0["valor_transacao_declarado"], errors="coerce")
        mask = (val >= min_valor) & (val <= max_valor)
        df0 = df0[mask]
        _count("VALOR", df_before, df0, f"{min_valor:,.0f} ≤ valor_transacao_declarado ≤ {max_valor:,.0f}", stats)

    # -------- 4) proporcao_transmitida_percent (qualidade, não trava em 100)
    # Mantém apenas proporções plausíveis quando a coluna existir
    if "proporcao_transmitida_percent" in df0.columns:
        df_before = df0
        prop = pd.to_numeric(df0["proporcao_transmitida_percent"], errors="coerce")
        mask = (prop > 0) & (prop <= 100) | prop.isna()
        df0 = df0[mask]
        _count("PROP", df_before, df0, "0 < proporcao_transmitida_percent ≤ 100 (ou NA)", stats)

    # -------- 5) áreas coerentes por uso (descricao_uso_iptu)
    # terreno -> area_terreno_m2>0 ; edificado -> area_construida_m2>0
    # (opcional) permitir land_area em edificados (default True)
    df_before = df0
    built = pd.to_numeric(df0.get("area_construida_m2", pd.Series(index=df0.index)), errors="coerce").fillna(0)
    land  = pd.to_numeric(df0.get("area_terreno_m2",  pd.Series(index=df0.index)), errors="coerce").fillna(0)
    usage = df0.get("descricao_uso_iptu", pd.Series("", index=df0.index)).astype(str)

    is_terreno = usage.str.contains("terren", case=False, na=False)
    is_edificado = ~is_terreno

    mask_ok_terreno = (~is_terreno) | (land > 0)
    mask_ok_edif    = (~is_edificado) | (built > 0)
    mask = mask_ok_terreno & mask_ok_edif

    if not allow_land_on_edificado:
        mask = mask & (~(is_edificado & (land > 0)))
        why_land = " e (edificado → area_terreno_m2 = 0)"
    else:
        why_land = ""

    df0 = df0[mask]
    _count("AREAS", df_before, df0, f"terreno → land>0; edificado → built>0{why_land}", stats)

    # -------- 6) ano_conclusao_construcao_iptu plausível (ou NA)
    if "ano_conclusao_construcao_iptu" in df0.columns:
        df_before = df0
        ano = pd.to_numeric(df0["ano_conclusao_construcao_iptu"], errors="coerce")
        ok = (ano >= ano_min) & (ano <= ano_max)
        df0 = df0[ok | ano.isna()]
        _count("ANO_CONS", df_before, df0, f"ano_conclusao_construcao_iptu ∈ [{ano_min}, {ano_max}] ou NA", stats)

    # -------- 7) data_transacao plausível (ou NA)
    if "data_transacao" in df0.columns:
        df_before = df0
        td = pd.to_datetime(df0["data_transacao"], errors="coerce")
        mask = td.between("2000-01-01", pd.Timestamp.today()) | td.isna()
        df0 = df0[mask]
        _count("DATA", df_before, df0, "data_transacao ∈ [2000..hoje] ou NA", stats)

    # -------- 8) dedup por (cadastro_sql, data_transacao, valor_transacao_declarado)
    df_before = df0
    keys = ["cadastro_sql", "data_transacao", "valor_transacao_declarado"]
    keys = [k for k in keys if k in df0.columns and df0[k].notna().any()]

    if keys:
        df0 = df0.sort_values(keys).drop_duplicates(subset=keys, keep="last")
        _count("DEDUP", df_before, df0, f"drop_duplicates por {keys}", stats)
    else:
        _count("DEDUP", df_before, df0, "chaves ausentes; sem dedup", stats)

    audit = {
        "etapas": stats,
        "linhas_iniciais": len(df),
        "linhas_finais": len(df0),
        "percentual_perdido": 0.0 if len(df) == 0 else (len(df) - len(df0)) / len(df) * 100.0,
    }

    if verbose:
        print(f"[FIM] linhas iniciais={len(df):,} | finais={len(df0):,} | perda={audit['percentual_perdido']:.2f}%")

    return df0, audit


df, audit = filtrar_base(
    df,
    include_permuta=False,
    min_valor=100_000,
    max_valor=100_000_000,
    #situacao = ["Ativo Territorial"],
    verbose=True
)

[START] inicial | antes=2,600,551 → depois=2,600,551 | perdidos=0
[TN] natureza_transacao == "1.Compra e venda" | antes=2,600,551 → depois=2,294,635 | perdidos=305,916
[STATUS] situacao_sql ∈ {"Ativo Predial","Ativo Territorial"} | antes=2,294,635 → depois=2,248,884 | perdidos=45,751
[VALOR] 100,000 ≤ valor_transacao_declarado ≤ 100,000,000 | antes=2,248,884 → depois=1,745,860 | perdidos=503,024
[PROP] 0 < proporcao_transmitida_percent ≤ 100 (ou NA) | antes=1,745,860 → depois=1,745,490 | perdidos=370
[AREAS] terreno → land>0; edificado → built>0 | antes=1,745,490 → depois=1,745,490 | perdidos=0
[ANO_CONS] ano_conclusao_construcao_iptu ∈ [1900, 2026] ou NA | antes=1,745,490 → depois=1,745,484 | perdidos=6
[DATA] data_transacao ∈ [2000..hoje] ou NA | antes=1,745,484 → depois=1,745,446 | perdidos=38
[DEDUP] drop_duplicates por ['cadastro_sql', 'data_transacao', 'valor_transacao_declarado'] | antes=1,745,446 → depois=1,684,944 | perdidos=60,502
[FIM] linhas iniciais=2,600,551 | finais=1,68

### 5) Feature engineering (atributos temporais e do imóvel)

Atributos criados:
- `year`, `month`, `yyyymm`: granularidade temporal e chave mensal
- `imovel_idade`: idade aproximada do imóvel (ano transação - ano conclusão)
- `grupo_uso`: binarização simples de uso: TERRENO vs EDIFICADO

**Nota:** `imovel_idade` é truncada em 0 para evitar negativos (dados inconsistentes).
Valores ausentes são tratados como 0 (estratégia conservadora).

In [12]:
df["data_transacao"] = pd.to_datetime(df["data_transacao"], errors="coerce")
df["year"] = df["data_transacao"].dt.year
df["month"] = df["data_transacao"].dt.month
df["yyyymm"] = df["year"] * 100 + df["month"]

# Idade do imóvel (ano de transação - ano de conclusão)
df["imovel_idade"] = df["year"] - df["ano_conclusao_construcao_iptu"]
df["imovel_idade"] = df["imovel_idade"].clip(lower=0).fillna(0)

# Grupo de uso (heurística por descrição)
df["grupo_uso"] = np.where(
    df["descricao_uso_iptu"].str.contains("terren", case=False, na=False),
    "TERRENO",
    "EDIFICADO"
)

### 6) Agregados mensais por setor fiscal (com defasagem)

Nesta etapa criamos variáveis de contexto de mercado, por **mês**, **setor fiscal** e **grupo de uso**.

Métricas mensais:
- `median_total`: mediana do valor declarado no grupo
- `median_sqm`: (atualmente igual à mediana do total; ver observação abaixo)
- `count`: quantidade de transações no mês (proxy de liquidez)

Em seguida, calculamos `lag-1` (mês anterior) para evitar vazamento temporal:
- `median_total_lag1`, `median_sqm_lag1`, `count_lag1`

**Observação importante:** `median_sqm` está calculado com a mesma expressão de `median_total`.
Se a intenção for mediana do preço por m², o correto seria:
`declared_transaction_value / built_area_sqm` (ou outra área relevante), com tratamento de divisão por zero.

In [13]:

monthly = (
    df.groupby(["grupo_uso", "setor_fiscal", "yyyymm"])
      .agg(
          median_total=("valor_transacao_declarado", "median"),
          median_sqm=("valor_transacao_declarado", lambda x: np.median(x)),
          count=("valor_transacao_declarado", "count")
      )
      .reset_index()
)

monthly["median_total_lag1"] = (
    monthly.sort_values("yyyymm")
           .groupby(["grupo_uso", "setor_fiscal"])["median_total"]
           .shift(1)
)

monthly["median_sqm_lag1"] = (
    monthly.sort_values("yyyymm")
           .groupby(["grupo_uso", "setor_fiscal"])["median_sqm"]
           .shift(1)
)

monthly["count_lag1"] = (
    monthly.sort_values("yyyymm")
           .groupby(["grupo_uso", "setor_fiscal"])["count"]
           .shift(1)
)

df = df.merge(
    monthly[[
        "grupo_uso",
        "setor_fiscal",
        "yyyymm",
        "median_total_lag1",
        "median_sqm_lag1",
        "count_lag1"
    ]],
    on=["grupo_uso", "setor_fiscal", "yyyymm"],
    how="left"
)

### 7) Definição do target (log-transform)

Para reduzir assimetria (cauda longa) típica de valores imobiliários:
- Definimos `target = log1p(valor / TARGET_SCALE)`

No momento da avaliação (predição), desfazemos a transformação:
- `valor_pred = expm1(pred) * TARGET_SCALE`

In [14]:
df["target"] = np.log1p(df["valor_transacao_declarado"] / TARGET_SCALE)

### 8) Criação das faixas de valor

Segmenta cada registro em uma faixa baseada no `valor_transacao_declarado`.
Isso permite treinar modelos especializados por regime de preços.

In [15]:
df["faixa"] = pd.cut(
    df["valor_transacao_declarado"],
    bins=FAIXAS_BINS,
    labels=FAIXAS_LABELS
)

### 9) Treinamento dos modelos (KFold 5x)

Para cada `grupo_uso` e `faixa`:
- Filtra o subconjunto (`sub`)
- Se houver menos de 1000 registros, pula (amostra insuficiente)
- Cria matriz `X` com:
  - Categóricas: `CAT_COLS`
  - Numéricas: área construída, área do terreno, testada, idade, yyyymm, lags do setor
- Realiza validação cruzada KFold (5 folds)
- Métricas por fold:
  - **MAE** em R$ (no espaço original, após inversão do log)
  - **R²** (coeficiente de determinação)

Ao final, salva o último modelo treinado da iteração em:
`artefatos_modelo_multifaixas/modelo_<grupo>_<faixa>.cbm`

In [18]:
CAT_NA_TOKEN = "__MISSING__"
for c in CAT_COLS:
    df[c] = df[c].astype("string").fillna(CAT_NA_TOKEN).astype(str)


modelos = {}

for grupo in df["grupo_uso"].unique():
    modelos[grupo] = {}

    for faixa in FAIXAS_LABELS:
        sub = df[(df["grupo_uso"] == grupo) & (df["faixa"] == faixa)]

        # Evita treinar com pouca amostra
        if len(sub) < 1000:
            continue

        print(f"\nTreinando {grupo} - {faixa} ({len(sub)} linhas)")

        X = sub[CAT_COLS + [
            "area_construida_m2",
            "area_terreno_m2",
            "testada_m",
            "imovel_idade",
            "yyyymm",
            "median_total_lag1",
            "median_sqm_lag1",
            "count_lag1"
        ]]

        y = sub["target"]

        kf = KFold(n_splits=5, shuffle=True, random_state=42)

        maes = []
        r2s = []

        for fold, (train_idx, val_idx) in enumerate(kf.split(X), 1):
            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

            model = CatBoostRegressor(
                iterations=400,
                depth=6,
                learning_rate=0.05,
                loss_function="RMSE",
                verbose=False
            )

            model.fit(
                Pool(X_train, y_train, cat_features=CAT_COLS),
                eval_set=Pool(X_val, y_val, cat_features=CAT_COLS),
                verbose=False
            )

            # volta do log para R$
            pred = np.expm1(model.predict(X_val)) * TARGET_SCALE
            true = np.expm1(y_val) * TARGET_SCALE

            mae = mean_absolute_error(true, pred)
            r2 = r2_score(true, pred)

            maes.append(mae)
            r2s.append(r2)

            print(f"[Fold {fold}] MAE: R$ {mae:,.0f} | R²: {r2:.3f}")

        print(f">> MÉDIA MAE: R$ {np.mean(maes):,.0f} | R²: {np.mean(r2s):.3f}")

        # guarda e salva o último modelo treinado
        modelos[grupo][faixa] = model

        model.save_model(
            f"{ARTEFATOS_PATH}/modelo_{grupo}_{faixa}.cbm"
        )



Treinando EDIFICADO - faixa1_<=500k (989225 linhas)
[Fold 1] MAE: R$ 61,422 | R²: 0.450
[Fold 2] MAE: R$ 61,619 | R²: 0.447
[Fold 3] MAE: R$ 61,602 | R²: 0.446
[Fold 4] MAE: R$ 61,507 | R²: 0.447
[Fold 5] MAE: R$ 61,444 | R²: 0.448
>> MÉDIA MAE: R$ 61,519 | R²: 0.448

Treinando EDIFICADO - faixa2_500k_2mi (338490 linhas)
[Fold 1] MAE: R$ 195,554 | R²: 0.450
[Fold 2] MAE: R$ 194,489 | R²: 0.453
[Fold 3] MAE: R$ 196,150 | R²: 0.444
[Fold 4] MAE: R$ 194,611 | R²: 0.447
[Fold 5] MAE: R$ 196,797 | R²: 0.441
>> MÉDIA MAE: R$ 195,520 | R²: 0.447

Treinando EDIFICADO - faixa3_2mi_10mi (104190 linhas)
[Fold 1] MAE: R$ 1,272,800 | R²: 0.371
[Fold 2] MAE: R$ 1,268,748 | R²: 0.369
[Fold 3] MAE: R$ 1,277,423 | R²: 0.369
[Fold 4] MAE: R$ 1,263,007 | R²: 0.370
[Fold 5] MAE: R$ 1,263,986 | R²: 0.381
>> MÉDIA MAE: R$ 1,269,193 | R²: 0.372

Treinando EDIFICADO - faixa4_10mi_30mi (78045 linhas)
[Fold 1] MAE: R$ 3,070,967 | R²: 0.502
[Fold 2] MAE: R$ 3,062,681 | R²: 0.510
[Fold 3] MAE: R$ 3,040,863 | R²: