In [2]:
%load_ext autoreload
%autoreload 2
import sys
from pathlib import Path
path = str(Path.cwd().parent)
print(path)
sys.path.insert(1, path)

import numpy as np
import pandas as pd
import skforecast

print(skforecast.__version__)

/home/joaquin/Documents/GitHub/skforecast
0.19.0


In [47]:
from aeon.forecasting.stats import ARIMA, AutoARIMA
import numpy as np
from statsmodels.tsa.stattools import adfuller, kpss

from skforecast.datasets import fetch_dataset

In [137]:
data = fetch_dataset(name='fuel_consumption', raw=True)
data = data[['Fecha', 'Gasolinas']]
data = data.rename(columns={'Fecha':'date', 'Gasolinas':'litters'})
data['date'] = pd.to_datetime(data['date'], format='%Y-%m-%d')
data = data.set_index('date')
data = data.loc[:'1990-01-01 00:00:00']
data = data.asfreq('MS')
data = data['litters']

In [198]:
import warnings
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.tsa.arima.model import ARIMA as SMARIMA


def ndiffs_adf(y, alpha=0.05, max_d=2):
    """Estimate number of differences using ADF test (H0: unit root = non-stationary)."""

    for i in range(max_d + 1):
        try:
            pvalue = adfuller(np.diff(y, i), autolag="AIC")[1]
        except Exception:
            # Fall back if test fails (short/constant series)
            return i if i > 0 else 0
        if pvalue < alpha:
            return i
    return max_d

def ndiffs_kpss(y, alpha=0.05, max_d=2):
    """Estimate number of differences using KPSS test (H0: stationary)."""

    for i in range(max_d + 1):
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                _, pvalue, *_ = kpss(np.diff(y, i), nlags="auto")
        except Exception:
            # Fall back if test fails (short/constant series)
            return i if i > 0 else 0
        if pvalue >= alpha:
            return i
    return max_d

def estimate_d(y, max_d=2):
    """Combine ADF and KPSS heuristically (Hyndman 2008)."""
    try:
        adf_d = ndiffs_adf(y, max_d=max_d)
        kpss_d = ndiffs_kpss(y, max_d=max_d)
        return max(adf_d, kpss_d)
    except Exception:
        return 1  # safe fallback


def auto_arima_hyndman(
    y,
    p_max=5,
    q_max=5,
    max_d=2,
    seasonal=False,
    verbose=True,
    maxiter=100,
):
    """
    Fast stepwise Auto-ARIMA (non-seasonal).
    - Stepwise neighbors + occasional drop-to-zero moves.
    - Uses AICc for selection.
    """
    if seasonal:
        raise NotImplementedError("Seasonal search not implemented in this fast version.")

    y_arr = y.to_numpy() if isinstance(y, pd.Series) else np.asarray(y, dtype=float)
    n = len(y_arr)
    if n < 10:
        warnings.warn("Series is very short; results may be unstable.")
    d = estimate_d(y_arr, max_d=max_d)

    # Trend rule: include constant for d in {0,1}, else no constant
    if d == 0:
        trend = "c"
    elif d == 1:
        trend = "t"
    else:
        trend = "n"

    # Search state
    tried = {}
    def fit_try(p, q):

        if not (0 <= p <= p_max and 0 <= q <= q_max):
            return None, np.inf
        key = (p, d, q, trend)
        if key in tried:
            return tried[key]
        try:
            model = SMARIMA(
                y_arr,
                order=(p, d, q),
                trend=trend,
                enforce_stationarity=False,
                enforce_invertibility=False,
            )
            res = model.fit(method_kwargs={"warn_convergence": False, "maxiter": maxiter})
            aicc = res.aic
            tried[key] = (res, aicc)
            return res, aicc
        except Exception as e:
            print(f"Failed to fit ARIMA({p},{d},{q}) with trend='{trend}': {e}")
            tried[key] = (None, np.inf)
            return None, np.inf

    # Start from (1, d, 1) but clamp to ranges
    p = min(max(1, 0), p_max)
    q = min(max(1, 0), q_max)

    best_res, best_aicc = fit_try(p, q)
    best_order = (p, d, q)
    if verbose and np.isfinite(best_aicc):
        print(f"Start: AICc={best_aicc:.2f}, order={best_order}")

    improved = True
    while improved:
        improved = False
        candidates = []

        # Neighbor moves
        for dp, dq in [(1,0), (-1,0), (0,1), (0,-1)]:
            np_, nq_ = p + dp, q + dq
            res, aicc = fit_try(np_, nq_)
            candidates.append((aicc, (np_, d, nq_), res))

        # Occasional drop-to-zero moves (fast exploration)
        if p > 0:
            res, aicc = fit_try(0, q)
            candidates.append((aicc, (0, d, q), res))
        if q > 0:
            res, aicc = fit_try(p, 0)
            candidates.append((aicc, (p, d, 0), res))

        # Pick best among candidates
        if candidates:
            aicc, order, res = min(candidates, key=lambda x: x[0])
            if aicc + 1e-6 < best_aicc:
                best_aicc = aicc
                best_order = order
                best_res = res
                p, _, q = order
                improved = True
                if verbose:
                    print(f"Improved: AICc={aicc:.2f}, order={order}")

    if verbose:
        print(f"Best model: AICc={best_aicc:.2f}, order={best_order}")
    return best_res, best_order, best_aicc

In [199]:
model, order, aicc = auto_arima_hyndman(data, p_max=5, q_max=5, max_d=2, seasonal=False, verbose=True)

Start: AICc=6177.17, order=(1, 2, 1)
Improved: AICc=6157.34, order=(1, 2, 2)
Improved: AICc=6132.00, order=(1, 2, 3)
Improved: AICc=6113.75, order=(2, 2, 3)
Improved: AICc=6035.78, order=(2, 2, 4)
Improved: AICc=5973.42, order=(2, 2, 5)
Best model: AICc=5973.42, order=(2, 2, 5)


In [266]:
import numpy as np
import pandas as pd
import optuna
from statsmodels.tsa.arima.model import ARIMA
import warnings
from functools import partial

def estimate_d(y, max_d=2, alpha=0.05):
    """
    Estimate minimal non-seasonal differencing order d using ADF+KPSS.
    """
    y = np.asarray(y, dtype=float)
    for d in range(max_d + 1):
        diff_y = np.diff(y, n=d)
        if len(diff_y) < 10:
            break
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                adf_p = adfuller(diff_y, autolag="AIC")[1]
                kpss_p = kpss(diff_y, nlags="auto")[1]
            if adf_p < alpha and kpss_p > alpha:
                return d
        except Exception:
            continue
    warnings.warn(f"No stationary differencing found up to d={max_d}. Using d={max_d}.")
    
    return max_d


def estimate_D(y, m, max_D=1, alpha=0.05):
    """
    Estimate seasonal differencing order D (0 or 1) using KPSS.
    """
    y = np.asarray(y, dtype=float)
    for D in range(max_D + 1):
        if D == 0:
            diff_y = y
        else:
            diff_y = y[m:] - y[:-m]
        if len(diff_y) < 10:
            break
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                kpss_p = kpss(diff_y, nlags="auto")[1]
            if kpss_p > alpha:
                return D
        except Exception:
            continue
    return max_D

class BayesianAutoArimaStatsModels:
    """
    Auto-ARIMA using Bayesian Optimization (via Optuna).

    Parameters
    ----------
    p_max : int, default=5
        Maximum autoregressive (AR) order to search.
    d_max : int or None, default=None
        Maximum differencing order to search. If None, estimate using ADF+KPSS.
    q_max : int, default=5
        Maximum moving-average (MA) order to search.
    P_max : int, default=0
        Maximum seasonal autoregressive (SAR) order to search.
    D_max : int or None, default=None
        Maximum seasonal differencing order to search. If None, estimate using KPSS.
    Q_max : int, default=0
        Maximum seasonal moving-average (SMA) order to search.
    m : int, default=0
        Seasonal period.
    criterion : {"aic", "aicc", "bic"}, default="aic"
        Information criterion to minimize.
    n_trials : int, default=100
        Number of trials for Optuna optimization.
    early_stopping_rounds : int, default=10
        Stop optimization if no improvement in this many trials.
    random_state : int or None
        Random seed for reproducibility.
    verbose : bool, default=True
        If True, show progress messages.
    show_progress_bar : bool, default=True
        If True, display Optuna's progress bar.

    Attributes
    ----------
    best_model_ : ARIMAResults
        Fitted ARIMA model with best found parameters.
    best_params_ : dict
        Best (p,d,q) parameters found.
    study_ : optuna.study.Study
        Optuna study object after optimization.
    """

    def __init__(
        self,
        p_max=5,
        q_max=5,
        d_max=2,
        P_max=2,
        Q_max=2,
        D_max=1,
        m=0,
        use_stat_tests=False,
        criterion="aic",
        early_stopping_rounds=10,
        random_state=None,
        verbose=True,
        show_progress_bar=False,
    ):
        self.p_max = p_max
        self.d_max = d_max
        self.q_max = q_max
        self.P_max = P_max
        self.D_max = D_max
        self.Q_max = Q_max
        self.m = m
        self.use_stat_tests = use_stat_tests
        self.criterion = criterion
        self.early_stopping_rounds = early_stopping_rounds
        self.random_state = random_state
        self.verbose = verbose
        self.show_progress_bar = show_progress_bar

        self.d_ = None
        self.D_ = None
        self.best_model_ = None
        self.best_params_ = None
        self.study_ = None

        total_combinations = (
            max(1, self.p_max) * max(1, self.q_max) * max(1, self.P_max) * max(1, self.Q_max)
        )
        if self.use_stat_tests:
            total_combinations += 2
        else:
            total_combinations = total_combinations * max(1, self.d_max) * max(1, self.D_max)
        self.n_trials = total_combinations

    def _criterion_value(self, res, n):
        """
        Compute information criterion.

        Parameters
        ----------
        res : ARIMAResults
            Fitted ARIMA model result.
        n : int
            Number of observations.

        Returns
        -------
        float
            Value of the selected information criterion.
        """
        if self.criterion == "aic":
            return res.aic
        elif self.criterion == "bic":
            return res.bic
        elif self.criterion == "aicc":
            k = res.params.size
            return res.aic + (2 * k * (k + 1)) / (n - k - 1)
        else:
            raise ValueError("criterion must be 'aic', 'aicc', or 'bic'")

    def fit(self, y):
        """
        Run Bayesian optimization to select best (p,d,q).

        Parameters
        ----------
        y : array-like of shape (n_samples,)
            Univariate time series data.

        Returns
        -------
        None
        """
        y_arr = y.to_numpy() if isinstance(y, pd.Series) else y
        n = len(y_arr)

        if self.use_stat_tests:
            D = 0
            if self.m > 1:
                D = estimate_D(y_arr, m=self.m, max_D=self.D_max)
            self.D_ = D

            if D > 0:
                y_diff = y_arr[self.m:] - y_arr[:-self.m]
            else:
                y_diff = y_arr
            d = estimate_d(y_diff, max_d=self.d_max)
            self.d_ = d

        def objective_broad(trial):
            p = trial.suggest_int("p", 0, self.p_max)
            q = trial.suggest_int("q", 0, self.q_max)

            if self.m > 1:
                P = trial.suggest_int("P", 0, self.P_max)
                Q = trial.suggest_int("Q", 0, self.Q_max)
            else:
                P = trial.suggest_int("P", 0, 0)
                Q = trial.suggest_int("Q", 0, 0)

            if self.use_stat_tests:
                d = trial.suggest_categorical("d", [self.d_])
                D = trial.suggest_categorical("D", [self.D_])
            else:
                d = trial.suggest_int("d", 0, self.d_max)
                D = trial.suggest_int("D", 0, self.D_max)

            m = trial.suggest_int("m", self.m, self.m)
            
            try:
                order = (p, d, q)
                seasonal_order = (P, D, Q, m) if m > 1 else (0, 0, 0, 0)
                model = ARIMA(y_arr, order=order, seasonal_order=seasonal_order)
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore")
                    res = model.fit()
                objective_value = self._criterion_value(res, n)
                if self.verbose:
                    print(f"ARIMA({p},{d},{q}) ({P},{D},{Q},{self.m}) : {self.criterion.upper()}={objective_value:.4f}")
                return objective_value
            except Exception as e:
                if self.verbose:
                    print(f"Trial ({p},{d},{q}) ({P},{D},{Q},{self.m}): {e}")
                return np.inf
        
        def objective_fine(trial):
            p = trial.suggest_int("p", max(0, best_trial.params["p"] - 1), min(self.p_max, best_trial.params["p"] + 1))
            q = trial.suggest_int("q", max(0, best_trial.params["q"] - 1), min(self.q_max, best_trial.params["q"] + 1))
            
            if self.m > 1:
                P = trial.suggest_int("P", max(0, best_trial.params["P"] - 1), min(self.P_max, best_trial.params["P"] + 1))
                Q = trial.suggest_int("Q", max(0, best_trial.params["Q"] - 1), min(self.Q_max, best_trial.params["Q"] + 1))
            else:
                P = 0
                Q = 0

            if self.use_stat_tests:
                d = trial.suggest_categorical("d", [self.d_])
                D = trial.suggest_categorical("D", [self.D_])
            else:
                d = trial.suggest_int("d", max(0, best_trial.params["d"] - 1), min(self.d_max, best_trial.params["d"] + 1))
                D = trial.suggest_int("D", max(0, best_trial.params["D"] - 1), min(self.D_max, best_trial.params["D"] + 1))

            m = trial.suggest_int("m", self.m, self.m)      
            
            try:
                order = (p, d, q)
                seasonal_order = (P, D, Q, m) if m > 1 else (0, 0, 0, 0)
                model = ARIMA(y_arr, order=order, seasonal_order=seasonal_order)
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore")
                    res = model.fit()
                objective_value = self._criterion_value(res, n)
                if self.verbose:
                    print(f"ARIMA({p},{d},{q}) ({P},{D},{Q},{self.m}) : {self.criterion.upper()}={objective_value:.4f}")
                return objective_value
            except Exception as e:
                if self.verbose:
                    print(f"Trial ({p},{d},{q}) ({P},{D},{Q},{self.m}): {e}")
                return np.inf
            
        def _early_stop_callback(study, trial, early_stopping_rounds, label=""):
            if study.best_trial.number == trial.number:
                study.set_user_attr("last_improvement", trial.number)
                return
            last = study.user_attrs.get("last_improvement", trial.number)
            if trial.number - last >= early_stopping_rounds:
                if self.verbose:
                    print(f"‚èπÔ∏è  Early stopping in {label} after {early_stopping_rounds} trials without improvement.")
                study.stop()


        # First search phase: broad exploration
        optuna.logging.set_verbosity(optuna.logging.WARNING)
        study = optuna.create_study(
            direction="minimize",
            sampler=optuna.samplers.TPESampler(seed=self.random_state),
        )
        # study.enqueue_trial({"p": 1, "d": self.d_, "q": 1, "P": 0, "D": self.D_, "Q": 0, 'm':self.m})
        # study.enqueue_trial({"p": 0, "d": self.d_, "q": 1, "P": 0, "D": self.D_, "Q": 0, 'm':self.m})
        # study.enqueue_trial({"p": 2, "d": self.d_, "q": 2, "P": 0, "D": self.D_, "Q": 0, 'm':self.m})

        if self.verbose:
            print(f"üîç Starting Bayesian Auto-ARIMA search to minimize {self.criterion}")
        
        study.optimize(
            objective_broad,
            n_trials=self.n_trials,
            show_progress_bar=self.show_progress_bar,
            callbacks=[partial(_early_stop_callback, early_stopping_rounds=self.early_stopping_rounds, label="broad-search")]
        )
        
        # Second search phase: fine-tune around best found
        best_trial = study.best_trial
        study = optuna.create_study(
            direction="minimize",
            sampler=optuna.samplers.TPESampler(seed=self.random_state),
        )
        study.optimize(
            objective_fine,
            n_trials=self.n_trials,
            show_progress_bar=self.show_progress_bar,
            callbacks=[partial(_early_stop_callback, early_stopping_rounds=self.early_stopping_rounds, label="fine-tune")]
        )

        self.study_ = study
        self.best_params_ = study.best_params
        best_order = (self.best_params_["p"], self.best_params_["d"], self.best_params_["q"])

        # Refit best model
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            order = (self.best_params_["p"], self.best_params_["d"], self.best_params_["q"])
            seasonal_order = (self.best_params_["P"], self.best_params_["D"], self.best_params_["Q"], self.m) if self.m > 1 else (0, 0, 0, 0)
            self.best_model_ = ARIMA(y_arr, order=order, seasonal_order=seasonal_order).fit()
        if self.verbose:
            print(f"‚úÖ Best model: ARIMA({order})({seasonal_order}) with {self.criterion}={self._criterion_value(self.best_model_, n):.4f}")

        return

    def predict(self, n_periods=1):
        """Forecast n_periods ahead."""
        if self.best_model_ is None:
            raise RuntimeError("Model not fitted yet. Call .fit(y) first.")
        return self.best_model_.forecast(steps=n_periods)

    def summary(self):
        """Return the summary of the best ARIMA model."""
        if self.best_model_ is None:
            raise RuntimeError("Model not fitted yet.")
        return self.best_model_.summary()


In [268]:
autoarima = BayesianAutoArimaStatsModels(p_max=3, d_max=2, q_max=3, D_max=0, P_max=0, Q_max=0, m=0, use_stat_tests=False,
                              early_stopping_rounds=5, verbose=True, show_progress_bar=False)
autoarima.fit(data)

üîç Starting Bayesian Auto-ARIMA search to minimize aic
ARIMA(3,0,3) (0,0,0,0) : AIC=6221.0948
ARIMA(2,0,2) (0,0,0,0) : AIC=6210.6880
ARIMA(1,0,0) (0,0,0,0) : AIC=6235.4588
ARIMA(3,1,3) (0,0,0,0) : AIC=6127.7331
ARIMA(2,2,2) (0,0,0,0) : AIC=6211.1224
ARIMA(3,0,0) (0,0,0,0) : AIC=6230.9133
ARIMA(0,2,0) (0,0,0,0) : AIC=6398.2669
ARIMA(0,2,1) (0,0,0,0) : AIC=6204.5860
ARIMA(2,2,0) (0,0,0,0) : AIC=6293.8540
‚èπÔ∏è  Early stopping in broad-search after 5 trials without improvement.
ARIMA(2,2,2) (0,0,0,0) : AIC=6211.1224
ARIMA(2,0,3) (0,0,0,0) : AIC=6219.5370
ARIMA(3,1,3) (0,0,0,0) : AIC=6127.7331
ARIMA(3,0,2) (0,0,0,0) : AIC=6216.8974
ARIMA(3,0,2) (0,0,0,0) : AIC=6216.8974
ARIMA(2,0,2) (0,0,0,0) : AIC=6210.6880
ARIMA(2,1,2) (0,0,0,0) : AIC=6141.9084
ARIMA(2,0,2) (0,0,0,0) : AIC=6210.6880
‚èπÔ∏è  Early stopping in fine-tune after 5 trials without improvement.
‚úÖ Best model: ARIMA((3, 1, 3))((0, 0, 0, 0)) with aic=6127.7331


In [269]:
import numpy as np
import pandas as pd
import optuna
from aeon.forecasting.stats import ARIMA as AeonARIMA
import warnings
from functools import partial

def estimate_d(y, max_d=2, alpha=0.05):
    """
    Estimate minimal non-seasonal differencing order d using ADF+KPSS.
    """
    y = np.asarray(y, dtype=float)
    for d in range(max_d + 1):
        diff_y = np.diff(y, n=d)
        if len(diff_y) < 10:
            break
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                adf_p = adfuller(diff_y, autolag="AIC")[1]
                kpss_p = kpss(diff_y, nlags="auto")[1]
            if adf_p < alpha and kpss_p > alpha:
                return d
        except Exception:
            continue
    warnings.warn(f"No stationary differencing found up to d={max_d}. Using d={max_d}.")
    
    return max_d


def estimate_D(y, m, max_D=1, alpha=0.05):
    """
    Estimate seasonal differencing order D (0 or 1) using KPSS.
    """
    y = np.asarray(y, dtype=float)
    for D in range(max_D + 1):
        if D == 0:
            diff_y = y
        else:
            diff_y = y[m:] - y[:-m]
        if len(diff_y) < 10:
            break
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                kpss_p = kpss(diff_y, nlags="auto")[1]
            if kpss_p > alpha:
                return D
        except Exception:
            continue
    return max_D

class BayesianAutoArimaAeon:
    """
    Auto-ARIMA using Bayesian Optimization (via Optuna).

    Parameters
    ----------
    p_max : int, default=5
        Maximum autoregressive (AR) order to search.
    d_max : int or None, default=None
        Maximum differencing order to search. If None, estimate using ADF+KPSS.
    q_max : int, default=5
        Maximum moving-average (MA) order to search.
    P_max : int, default=0
        Maximum seasonal autoregressive (SAR) order to search.
    D_max : int or None, default=None
        Maximum seasonal differencing order to search. If None, estimate using KPSS.
    Q_max : int, default=0
        Maximum seasonal moving-average (SMA) order to search.
    m : int, default=0
        Seasonal period.
    criterion : {"aic", "aicc", "bic"}, default="aic"
        Information criterion to minimize.
    n_trials : int, default=100
        Number of trials for Optuna optimization.
    early_stopping_rounds : int, default=10
        Stop optimization if no improvement in this many trials.
    random_state : int or None
        Random seed for reproducibility.
    verbose : bool, default=True
        If True, show progress messages.
    show_progress_bar : bool, default=True
        If True, display Optuna's progress bar.

    Attributes
    ----------
    best_model_ : ARIMAResults
        Fitted ARIMA model with best found parameters.
    best_params_ : dict
        Best (p,d,q) parameters found.
    study_ : optuna.study.Study
        Optuna study object after optimization.
    """

    def __init__(
        self,
        p_max=5,
        q_max=5,
        d_max=2,
        P_max=2,
        Q_max=2,
        D_max=1,
        m=0,
        use_stat_tests=False,
        criterion="aic",
        early_stopping_rounds=10,
        random_state=None,
        verbose=True,
        show_progress_bar=False,
    ):
        self.p_max = p_max
        self.d_max = d_max
        self.q_max = q_max
        self.P_max = P_max
        self.D_max = D_max
        self.Q_max = Q_max
        self.m = m
        self.use_stat_tests = use_stat_tests
        self.criterion = criterion
        self.early_stopping_rounds = early_stopping_rounds
        self.random_state = random_state
        self.verbose = verbose
        self.show_progress_bar = show_progress_bar

        self.d_ = None
        self.D_ = None
        self.best_model_ = None
        self.best_params_ = None
        self.study_ = None

        total_combinations = (
            max(1, self.p_max) * max(1, self.q_max) * max(1, self.P_max) * max(1, self.Q_max)
        )
        if self.use_stat_tests:
            total_combinations += 2
        else:
            total_combinations = total_combinations * max(1, self.d_max) * max(1, self.D_max)
        self.n_trials_ = total_combinations

    def fit(self, y):
        """
        Run Bayesian optimization to select best (p,d,q).

        Parameters
        ----------
        y : array-like of shape (n_samples,)
            Univariate time series data.

        Returns
        -------
        None
        """
        y_arr = y.to_numpy() if isinstance(y, pd.Series) else y
        n = len(y_arr)

        if self.use_stat_tests:
            D = 0
            if self.m > 1:
                D = estimate_D(y_arr, m=self.m, max_D=self.D_max)
            self.D_ = D

            if D > 0:
                y_diff = y_arr[self.m:] - y_arr[:-self.m]
            else:
                y_diff = y_arr
            d = estimate_d(y_diff, max_d=self.d_max)
            self.d_ = d

        def objective_broad(trial):
            p = trial.suggest_int("p", 0, self.p_max)
            q = trial.suggest_int("q", 0, self.q_max)

            # if self.m > 1:
            #     P = trial.suggest_int("P", 0, self.P_max)
            #     Q = trial.suggest_int("Q", 0, self.Q_max)
            # else:
            #     P = trial.suggest_int("P", 0, 0)
            #     Q = trial.suggest_int("Q", 0, 0)

            if self.use_stat_tests:
                d = trial.suggest_categorical("d", [self.d_])
                # D = trial.suggest_categorical("D", [self.D_])
            else:
                d = trial.suggest_int("d", 0, self.d_max)
                # D = trial.suggest_int("D", 0, self.D_max)

            # m = trial.suggest_int("m", self.m, self.m)
            
            try:
                # order = (p, d, q)
                # seasonal_order = (P, D, Q, m) if m > 1 else (0, 0, 0, 0)
                model = AeonARIMA(p=p, d=d, q=q)
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore")
                    model.fit(y=y)
                objective_value = model.aic_
                if self.verbose:
                    print(f"ARIMA({p},{d},{q}) ({0},{0},{0},{self.m}) : {self.criterion.upper()}={objective_value:.4f}")
            except Exception as e:
                if self.verbose:
                    print(f"Trial ({p},{d},{q}) ({0},{0},{0},{self.m}): {e}")
                objective_value = np.inf
            
            return objective_value
        
        def objective_fine(trial):
            p = trial.suggest_int("p", max(0, best_trial.params["p"] - 1), min(self.p_max, best_trial.params["p"] + 1))
            q = trial.suggest_int("q", max(0, best_trial.params["q"] - 1), min(self.q_max, best_trial.params["q"] + 1))
            
            # if self.m > 1:
            #     P = trial.suggest_int("P", max(0, best_trial.params["P"] - 1), min(self.P_max, best_trial.params["P"] + 1))
            #     Q = trial.suggest_int("Q", max(0, best_trial.params["Q"] - 1), min(self.Q_max, best_trial.params["Q"] + 1))
            # else:
            #     P = 0
            #     Q = 0

            if self.use_stat_tests:
                d = trial.suggest_categorical("d", [self.d_])
                # D = trial.suggest_categorical("D", [self.D_])
            else:
                d = trial.suggest_int("d", max(0, best_trial.params["d"] - 1), min(self.d_max, best_trial.params["d"] + 1))
                # D = trial.suggest_int("D", max(0, best_trial.params["D"] - 1), min(self.D_max, best_trial.params["D"] + 1))

            # m = trial.suggest_int("m", self.m, self.m)      
            
            try:
                # order = (p, d, q)
                # seasonal_order = (P, D, Q, m) if m > 1 else (0, 0, 0, 0)
                model = AeonARIMA(p=p, d=d, q=q)
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore")
                    model.fit(y)
                objective_value = model.aic_
                if self.verbose:
                    print(f"ARIMA({p},{d},{q}) ({0},{0},{0},{self.m}) : {self.criterion.upper()}={objective_value:.4f}")
            except Exception as e:
                if self.verbose:
                    print(f"Trial ({p},{d},{q}) ({0},{0},{0},{self.m}): {e}")
                objective_value = np.inf
                
            return objective_value
            
        def _early_stop_callback(study, trial, early_stopping_rounds, label=""):
            if study.best_trial.number == trial.number:
                study.set_user_attr("last_improvement", trial.number)
                return
            last = study.user_attrs.get("last_improvement", trial.number)
            if trial.number - last >= early_stopping_rounds:
                if self.verbose:
                    print(f"‚èπÔ∏è  Early stopping in {label} after {early_stopping_rounds} trials without improvement.")
                study.stop()


        # First search phase: broad exploration
        optuna.logging.set_verbosity(optuna.logging.WARNING)
        study = optuna.create_study(
            direction="minimize",
            sampler=optuna.samplers.TPESampler(seed=self.random_state),
        )
        # study.enqueue_trial({"p": 1, "d": self.d_, "q": 1, "P": 0, "D": self.D_, "Q": 0, 'm':self.m})
        # study.enqueue_trial({"p": 0, "d": self.d_, "q": 1, "P": 0, "D": self.D_, "Q": 0, 'm':self.m})
        # study.enqueue_trial({"p": 2, "d": self.d_, "q": 2, "P": 0, "D": self.D_, "Q": 0, 'm':self.m})

        if self.verbose:
            print(f"üîç Starting Bayesian Auto-ARIMA search to minimize {self.criterion}")
        
        study.optimize(
            objective_broad,
            n_trials=self.n_trials_,
            show_progress_bar=self.show_progress_bar,
            callbacks=[partial(_early_stop_callback, early_stopping_rounds=self.early_stopping_rounds, label="broad-search")]
        )
        
        # Second search phase: fine-tune around best found
        best_trial = study.best_trial
        study = optuna.create_study(
            direction="minimize",
            sampler=optuna.samplers.TPESampler(seed=self.random_state),
        )
        study.optimize(
            objective_fine,
            n_trials=self.n_trials_,
            show_progress_bar=self.show_progress_bar,
            callbacks=[partial(_early_stop_callback, early_stopping_rounds=self.early_stopping_rounds, label="fine-search")]
        )

        self.study_ = study
        self.best_params_ = study.best_params
        self.best_order_ = (self.best_params_["p"], self.best_params_["d"], self.best_params_["q"])
        # self.best_seasonal_order_ = (self.best_params_["P"], self.best_params_["D"], self.best_params_["Q"], self.m) if self.m > 1 else (0, 0, 0, 0)

        # Refit best model
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            self.best_model_ = AeonARIMA(p=self.best_order_[0], d=self.best_order_[1], q=self.best_order_[2]).fit(y)
        if self.verbose:
            print(f"‚úÖ Best model: ARIMA({self.best_order_})() with {self.criterion}={self.best_model_.aic_:.4f}")

        return

    def predict(self, n_periods=1):
        """Forecast n_periods ahead."""
        if self.best_model_ is None:
            raise RuntimeError("Model not fitted yet. Call .fit(y) first.")
        return self.best_model_.forecast(steps=n_periods)

    def summary(self):
        """Return the summary of the best ARIMA model."""
        if self.best_model_ is None:
            raise RuntimeError("Model not fitted yet.")
        return self.best_model_.summary()


In [270]:
autoarima = BayesianAutoArimaAeon(p_max=3, d_max=2, q_max=3, D_max=0, P_max=0, Q_max=0, m=0, use_stat_tests=False,
                              early_stopping_rounds=5, verbose=True, show_progress_bar=False)
autoarima.fit(data)

üîç Starting Bayesian Auto-ARIMA search to minimize aic
ARIMA(3,0,0) (0,0,0,0) : AIC=6227.9880
ARIMA(1,1,3) (0,0,0,0) : AIC=6171.1550
ARIMA(2,1,2) (0,0,0,0) : AIC=6153.0577
ARIMA(3,1,1) (0,0,0,0) : AIC=6175.1738
ARIMA(2,2,1) (0,0,0,0) : AIC=6187.1939
ARIMA(1,1,3) (0,0,0,0) : AIC=6171.1550
ARIMA(2,0,1) (0,0,0,0) : AIC=6195.6066
ARIMA(3,1,0) (0,0,0,0) : AIC=6199.3345
‚èπÔ∏è  Early stopping in broad-search after 5 trials without improvement.
ARIMA(1,0,2) (0,0,0,0) : AIC=6204.4802
ARIMA(2,2,1) (0,0,0,0) : AIC=6187.1939
ARIMA(3,1,2) (0,0,0,0) : AIC=6124.4584
ARIMA(2,2,3) (0,0,0,0) : AIC=6166.9821
ARIMA(3,1,2) (0,0,0,0) : AIC=6124.4584
ARIMA(1,2,2) (0,0,0,0) : AIC=6139.1149
ARIMA(3,1,2) (0,0,0,0) : AIC=6124.4584
ARIMA(3,1,3) (0,0,0,0) : AIC=6115.2469
ARIMA(1,1,2) (0,0,0,0) : AIC=6173.9552
ARIMA(2,2,2) (0,0,0,0) : AIC=6190.0845
ARIMA(3,0,3) (0,0,0,0) : AIC=6183.0485
ARIMA(3,1,3) (0,0,0,0) : AIC=6115.2469
ARIMA(3,0,3) (0,0,0,0) : AIC=6183.0485
‚èπÔ∏è  Early stopping in fine-search after 5 tri

In [271]:
autoarima = AutoARIMA(max_p=3, max_d=2, max_q=3)
autoarima.fit(data)
print(autoarima.final_model_.aic_)
autoarima.final_model_

6145.565923976397


0,1,2
,p,np.int64(3)
,d,1
,q,np.int64(3)
,use_constant,np.True_
,iterations,200
