#Imports

In [0]:
import numpy as np
import pandas as pd
from unidecode import unidecode 
from sklearn.metrics import make_scorer, mean_absolute_error, mean_absolute_percentage_error
from sklearn.model_selection import cross_val_score, KFold
from sklearn.tree import DecisionTreeRegressor
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from catboost import CatBoostRegressor
import optuna
import joblib
import json

#Funçoes

In [0]:
def prepare_data(X):
    X = X.copy()
    for col in ['fumante', 'regiao', 'facebook', 'classe']:
        if col in X.columns:
            # Converter para float antes de categoria
            X[col] = pd.to_numeric(X[col], errors='coerce').astype('category')
    return X

class SafeColumnTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, transformer, columns):
        self.transformer = transformer
        self.columns = columns
        self.columns_present_fit_ = []

    def fit(self, X, y=None):
        self.columns_present_fit_ = [col for col in self.columns if col in X.columns]
        if self.columns_present_fit_:
            self.transformer.fit(X[self.columns_present_fit_], y)
        return self

    def transform(self, X):
        # Garante que as colunas que existiam no fit estejam presentes
        cols = self.columns_present_fit_
        missing_cols = [col for col in cols if col not in X.columns]
        
        # Cria DataFrame com as colunas faltantes preenchidas com NaN
        if missing_cols:
            for col in missing_cols:
                X[col] = np.nan
        
        # Ordena colunas conforme o fit
        X_ordered = X[cols]

        return self.transformer.transform(X_ordered)

    def get_feature_names_out(self, input_features=None):
        try:
            return self.transformer.get_feature_names_out()
        except AttributeError:
            return np.array(self.columns_present_fit_)


In [0]:
def quantile_loss_metric(y_true, y_pred, q=0.8):
    y_true = np.array(y_true).flatten()
    y_pred = np.array(y_pred).flatten()
    residual = y_true - y_pred
    return np.mean(np.maximum(q * residual, (q - 1) * residual))

def bias_metric(y_true, y_pred):
    return np.mean(y_pred - y_true)

def percentual_subestimativas(y_true, y_pred):
    return np.mean(y_pred < y_true)

def avaliar_modelo(modelo, X_train, y_train, X_test, y_test, nome_modelo):
    # Fit
    modelo.fit(X_train, y_train)

    # Previsões
    y_pred_train = modelo.predict(X_train)
    y_pred_test = modelo.predict(X_test)

    # Métricas treino
    met_train = {
        "modelo": nome_modelo,
        "dataset": "train",
        "quantile_loss": quantile_loss_metric(y_train, y_pred_train),
        "mae": mean_absolute_error(y_train, y_pred_train),
        "mape": mean_absolute_percentage_error(y_train, y_pred_train),
        "bias": bias_metric(y_train, y_pred_train),
        "pct_subestimado": percentual_subestimativas(y_train, y_pred_train)
    }

    # Métricas teste
    met_test = {
        "modelo": nome_modelo,
        "dataset": "test",
        "quantile_loss": quantile_loss_metric(y_test, y_pred_test),
        "mae": mean_absolute_error(y_test, y_pred_test),
        "mape": mean_absolute_percentage_error(y_test, y_pred_test),
        "bias": bias_metric(y_test, y_pred_test),
        "pct_subestimado": percentual_subestimativas(y_test, y_pred_test)
    }

    return pd.DataFrame([met_train, met_test])


In [0]:
class EarlyStoppingCallback:
    def __init__(self, patience: int):
        self.patience = patience
        self.best_score = None
        self.num_no_improvement = 0

    def __call__(self, study: optuna.study.Study, trial: optuna.trial.FrozenTrial):
        current_score = study.best_value
        if self.best_score is None or current_score > self.best_score:
            self.best_score = current_score
            self.num_no_improvement = 0
        else:
            self.num_no_improvement += 1

        if self.num_no_improvement >= self.patience:
            study.stop()


#Lendo e preparando os dados

In [0]:
x_train = pd.read_parquet('data/x_train_prepared.parquet')
y_train = pd.read_excel('data/Seguro Saúde - Modelagem.xlsx', sheet_name='MODELAGEM')
df_test = pd.read_excel('data/Seguro Saúde - Teste Final.xlsx')

# Função que deixa colunas minúsculas
def normalize_columns(df):
    df.columns = [unidecode(col).lower() for col in df.columns]
    return df

# Função para normalizar os valores das colunas de texto
def normalize_text_values(df):
    for col in df.select_dtypes(include=['object']).columns:
        df[col] = df[col].apply(lambda x: unidecode(x).lower() if isinstance(x, str) else x)
    return df

df_test = normalize_columns(df_test) 
df_test = normalize_text_values(df_test)
x_test = df_test.drop(['nascimento', 'valor'], axis=1)
y_test = df_test[['matricula', 'valor']]

y_train = normalize_columns(y_train) 
y_train = normalize_text_values(y_train)
y_train = y_train[['matricula', 'valor']]

display(x_train.head())
display(y_train.head())
display(x_test.head())
display(y_test.head())

In [0]:
ranked_features = pd.read_csv('artifacts/boruta_feature_ranking.csv')

sel_features = ranked_features[ranked_features.selected == True]['feature']
x_train_sel = x_train[sel_features]

y_train = y_train[['valor']]

display(x_train_sel.head())
display(y_train.head())

In [0]:
full_pipeline = joblib.load("artifacts/pipeline_completo.pkl")

x_test_prep = full_pipeline.transform(x_test)

feature_names = full_pipeline.named_steps['preprocess'].get_feature_names_out()
x_test_prep = pd.DataFrame(x_test_prep, columns=feature_names, index=x_test.index)

display(x_test_prep.head())

In [0]:
x_test_prep_sel = x_test_prep[sel_features]

y_test = y_test[['valor']]

display(x_test_prep_sel.head())
display(y_test.head())

#Escolha do modelo campeao

In [0]:
modelos = {
    "LinearRegression": LinearRegression(),
    "DecisionTree": DecisionTreeRegressor(random_state=42),
    "RandomForest": RandomForestRegressor(n_jobs=-1, random_state=42),
    "XGBoost": XGBRegressor(n_jobs=-1, random_state=42),
    "LightGBM": LGBMRegressor(n_jobs=-1, random_state=42),
    "CatBoost": CatBoostRegressor(verbose=0, random_state=42)
}

# Avaliação de todos os modelos
relatorios = []
y_train = y_train.squeeze()
y_test = y_test.squeeze()

for nome, modelo in modelos.items():
    df_metrics = avaliar_modelo(modelo, x_train_sel, y_train, x_test_prep_sel, y_test, nome)
    relatorios.append(df_metrics)

# Concatenar tudo
df_resultados = pd.concat(relatorios, ignore_index=True)

In [0]:
display(df_resultados.sort_values(by=["dataset", "quantile_loss"]))


Tendo em conta que para um planejamento anual é preferível que o modelo errw superestimando o custo anual, uma vez que se errar para menos é mais arriscado pra companhia de saúde, preferimos o erro superestimado. Por isso uma Q-loss com penalizaçao para erros negativos (y_pred < y_real). Com base nisso e na análise dos resultados no conjunto de teste, definimos o catboost como o modelo vencedor uma vez que:
- Tem o melhor Q-loss, o que evita subestimar custo anual
- Tem o segundo menor MAPE 
- Tem o segundo menor MAE
- Tem o menor bias positivo que indica que ele erra pra mais mas nao tanto quanto os outros

#Hypertunning do CatBoost

In [0]:
q_loss_scorer = make_scorer(quantile_loss_metric, greater_is_better=False)

def objective_catboost(trial):
    params = {
        "iterations": 10000,
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
        "depth": trial.suggest_int("depth", 4, 10),
        "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1e-2, 100.0, log=True),
        "bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 1.0),
        "border_count": trial.suggest_int("border_count", 32, 255),
        "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 1, 20),
        "random_strength": trial.suggest_float("random_strength", 1e-3, 10.0, log=True),
        "loss_function": "Quantile:alpha=0.8",
        "early_stopping_rounds": 50,
        "random_seed": 42,
        "verbose": 0
    }

    model = CatBoostRegressor(**params)

    kf = KFold(n_splits=10, shuffle=True, random_state=42)
    scores = cross_val_score(model, x_train_sel, y_train.squeeze(),
                             scoring=q_loss_scorer, cv=kf, n_jobs=-1)
    
    return scores.mean()


In [0]:
early_stopping = EarlyStoppingCallback(patience=20)

study = optuna.create_study(direction="maximize", study_name="catboost_q_loss_opt")
study.optimize(objective_catboost, n_trials=200, callbacks=[early_stopping], show_progress_bar=True)

with open("artifacts/best_catboost_params.json", "w") as f:
    json.dump(study.best_trial.params, f)


In [0]:
with open("artifacts/best_catboost_params.json", "r") as f:
    best_params = json.load(f)

best_params.update({
    "iterations": 10000,
    "loss_function": "Quantile:alpha=0.8",
    "early_stopping_rounds": 50,
    "random_seed": 42,
    "verbose": 0
})

modelo_final = CatBoostRegressor(**best_params)
modelo_final.fit(x_train_sel, y_train.squeeze())

In [0]:
joblib.dump(modelo_final, "artifacts/catboost_model_final.pkl")

In [0]:
feature_names = x_train_sel.columns if isinstance(x_train_sel, pd.DataFrame) else [f"feat_{i}" for i in range(x_train_sel.shape[1])]

importances = modelo_final.get_feature_importance()
plt.figure(figsize=(10, 6))
plt.barh(feature_names, importances)
plt.xlabel("Importance")
plt.title("CatBoost - Feature Importance")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

plt.savefig("plots/catboost_final_feature_importance.png")

# Performance Assessment

In [0]:
y_pred_test = modelo_final.predict(x_test_prep_sel)

relatorio_final = {
    "quantile_loss": quantile_loss_metric(y_test, y_pred_test),
    "mae": mean_absolute_error(y_test, y_pred_test),
    "mape": mean_absolute_percentage_error(y_test, y_pred_test),
    "bias": bias_metric(y_test.squeeze(), y_pred_test),
    "pct_subestimado": percentual_subestimativas(y_test.squeeze(), y_pred_test)
}

df_relatorio_final = pd.DataFrame([relatorio_final])
display(df_relatorio_final)

df_relatorio_final.to_csv("artifacts/catboost_relatorio_final_test.csv", index=False)

In [0]:
erro_abs = np.abs(y_test.squeeze() - y_pred_test)

plt.figure(figsize=(8, 4))
plt.hist(erro_abs, bins=30, edgecolor="k")
plt.title("Distribuição do Erro Absoluto (|y - y_pred|)")
plt.xlabel("Erro Absoluto")
plt.ylabel("Frequência")
plt.grid(True)
plt.tight_layout()
plt.show()

plt.savefig("plots/catboost_final_erro_abs.png")

In [0]:
q_loss_individual = np.maximum(0.8 * (y_test.squeeze() - y_pred_test), (0.8 - 1) * (y_test.squeeze() - y_pred_test))
mape_individual = np.abs((y_test.squeeze() - y_pred_test) / y_test.squeeze())
mae_individual = np.abs(y_test.squeeze() - y_pred_test)

df_erros = pd.DataFrame({
    "Q-Loss": q_loss_individual,
    "MAE": mae_individual,
    "MAPE": mape_individual
})


plt.figure(figsize=(10, 5))
df_erros.boxplot()
plt.title("Distribuição das Métricas Individuais no Conjunto de Teste")
plt.ylabel("Erro")
plt.grid(True)
plt.tight_layout()
plt.show()

plt.savefig("plots/catboost_final_erros_individuais.png")


In [0]:
plt.figure(figsize=(6, 6))
plt.scatter(y_test.squeeze(), y_pred_test, alpha=0.5)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', label="Ideal")
plt.xlabel("Valor Real")
plt.ylabel("Valor Predito")
plt.title("Real vs Predito - Teste")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

plt.savefig("plots/catboost_final_real_vs_predito.png")

Pelos gráficos, concluimos que:
- Real vs Predito: 
  - A maioria dos pontos segue bem a linha vermelha (ideal).
  - Alguns pontos destoam bastante, especialmente:
    - Para valores reais entre R$ 3000 e R$ 8000: o modelo subestimou ou superestimou significativamente.
  - Alguns outliers extremos (acima de R$ 10.000) foram previsões bem próximas do ideal, o que é ótimo.
- Boxplots:
  - Distribuição concentrada nas métricas (sobretudo Q-Loss e MAPE)
  - Outliers influenciam bastante o MAE — o que é esperado em regressões com caudas longas (gastos de saúde)
  - MAPE muito pequeno para maioria dos casos → a mediana está visivelmente baixa

- Histogram do erro absoluto:
  - A grande maioria dos erros absolutos está abaixo de R$ 1000
  - Poucos casos têm erro muito alto (R$ 3000+), mas eles existem — e explicam o MAE de ~R$ 890

Conclusão: desempenho muito bom na maior parte dos dados, com poucos erros extremos.