In [4]:
import numpy as np
from catboost import CatBoostRegressor
from pykalman import KalmanFilter
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

class HybridCatBoostKalman:
    def _init_(self, catboost_params=None, kf_params=None, sigma_multiplier=2):
        """
        catboost_params: dict con parámetros para CatBoostRegressor
        kf_params: dict con parámetros para KalmanFilter
        sigma_multiplier: para intervalos de predicción (2 ≈ 95%)
        """
        self.catboost_params = catboost_params or {"iterations":300, "learning_rate":0.05, "depth":6, "verbose":0}
        self.kf_params = kf_params or {"transition_matrices":[0.8],
                                       "observation_matrices":[1.0],
                                       "initial_state_mean":0,
                                       "initial_state_covariance":1,
                                       "transition_covariance":0.5,
                                       "observation_covariance":0.3}
        self.sigma_multiplier = sigma_multiplier
        self.model = CatBoostRegressor(**self.catboost_params)
        self.kf = None
        self.state_means_train = None
        self.state_covs_train = None

    def fit(self, X_train, y_train):
        # Entrenar CatBoost
        self.model.fit(X_train, y_train)
        residuals_train = y_train - self.model.predict(X_train)
        # Entrenar Kalman Filter sobre residuos
        self.kf = KalmanFilter(**self.kf_params)
        self.state_means_train, self.state_covs_train = self.kf.filter(residuals_train.values)
        return self

    def predict(self, X, y_train_residuals=None):
        """
        X: features
        y_train_residuals: residuos de train si se quiere usar el filtro sobre test
        """
        y_pred_boost = self.model.predict(X)
        if self.kf is not None and y_train_residuals is not None:
            state_means_test, state_covs_test = self.kf.filter(y_train_residuals.values)
            y_pred_hybrid = y_pred_boost + state_means_test.flatten()
            intervals = self.compute_intervals(state_means_test, state_covs_test)
        else:
            y_pred_hybrid = y_pred_boost
            intervals = None
        return y_pred_hybrid, intervals

    def compute_intervals(self, state_means, state_covs):
        sigma = np.sqrt(state_covs.flatten())
        lower = state_means.flatten() - self.sigma_multiplier * sigma
        upper = state_means.flatten() + self.sigma_multiplier * sigma
        return lower, upper

    def evaluate(self, y_true, y_pred, label=""):
        rmse = mean_squared_error(y_true, y_pred, squared=False)
        mae = mean_absolute_error(y_true, y_pred)
        r2 = r2_score(y_true, y_pred)
        print(f"{label} -> RMSE: {rmse:.3f}, MAE: {mae:.3f}, R²: {r2:.3f}")
        return {"rmse": rmse, "mae": mae, "r2": r2}

    def coverage_rate(self, y_true, lower, upper):
        return np.mean((y_true >= lower) & (y_true <= upper))
    
    def plot_prediction(self, y_true, y_pred, lower=None, upper=None, title="Híbrido CatBoost + Kalman"):
        plt.figure(figsize=(12,6))
        plt.plot(y_true.index, y_true, label="Observado", color="black")
        plt.plot(y_true.index, y_pred, label="Predicción", color="blue")
        if lower is not None and upper is not None:
            plt.fill_between(y_true.index, lower, upper, color="blue", alpha=0.2)
        plt.legend()
        plt.title(title)

plt.show()
