From 0961b30fde954f4988f48b5bc448fd856027f8eb Mon Sep 17 00:00:00 2001 From: Carlos Trujillo <59846724+cetagostini@users.noreply.github.com> Date: Wed, 26 Nov 2025 02:04:44 +0200 Subject: [PATCH 1/6] Refactor PyMC time series models to use xarray API Unified the API for BayesianBasisExpansionTimeSeries and StateSpaceTimeSeries to accept xarray.DataArray inputs for X and y, with coordinates for datetime and treated units. Removed legacy numpy/datetime handling and updated internal conversion logic. Adjusted InterruptedTimeSeries and tests to use the new API, ensuring consistent handling of exogenous regressors and time indices. Improved error handling and warnings for coordinate mismatches and prediction inputs. --- .gitignore | 2 + .../experiments/interrupted_time_series.py | 103 +--- causalpy/pymc_models.py | 489 ++++++++++++------ .../test_integration_its_new_timeseries.py | 4 +- .../tests/test_integration_pymc_examples.py | 323 +++++++----- docs/dev/its_pymc copy.ipynb | 54 +- docs/source/_static/interrogate_badge.svg | 6 +- 7 files changed, 565 insertions(+), 416 deletions(-) diff --git a/.gitignore b/.gitignore index 9a6b0579..a3ce429e 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ dist/ docs/build/ docs/jupyter_execute/ docs/source/api/generated/ + +.cursor/ diff --git a/causalpy/experiments/interrupted_time_series.py b/causalpy/experiments/interrupted_time_series.py index 8ce20f25..1365b42b 100644 --- a/causalpy/experiments/interrupted_time_series.py +++ b/causalpy/experiments/interrupted_time_series.py @@ -27,11 +27,7 @@ from causalpy.custom_exceptions import BadIndexException from causalpy.plot_utils import get_hdi_to_df, plot_xY -from causalpy.pymc_models import ( - BayesianBasisExpansionTimeSeries, - PyMCModel, - StateSpaceTimeSeries, -) +from causalpy.pymc_models import PyMCModel from causalpy.utils import round_num from .base import BaseExperiment @@ -153,27 +149,15 @@ def __init__( ) # fit the model to the observed (pre-intervention) data + # All PyMC models now accept xr.DataArray with consistent API if isinstance(self.model, PyMCModel): - is_bsts_like = isinstance( - self.model, (BayesianBasisExpansionTimeSeries, StateSpaceTimeSeries) - ) - - if is_bsts_like: - # BSTS/StateSpace models expect numpy arrays and datetime coords - X_fit = self.pre_X.values if self.pre_X.shape[1] > 0 else None # type: ignore[attr-defined] - y_fit = self.pre_y.isel(treated_units=0).values # type: ignore[attr-defined] - pre_coords: dict[str, Any] = {"datetime_index": self.datapre.index} - if X_fit is not None: - pre_coords["coeffs"] = list(self.labels) - self.model.fit(X=X_fit, y=y_fit, coords=pre_coords) - else: - # General PyMC models expect xarray with treated_units - COORDS = { - "coeffs": self.labels, - "obs_ind": np.arange(self.pre_X.shape[0]), - "treated_units": ["unit_0"], - } - self.model.fit(X=self.pre_X, y=self.pre_y, coords=COORDS) + COORDS: dict[str, Any] = { + "coeffs": self.labels, + "obs_ind": np.arange(self.pre_X.shape[0]), + "treated_units": ["unit_0"], + "datetime_index": self.datapre.index, # For time series models + } + self.model.fit(X=self.pre_X, y=self.pre_y, coords=COORDS) elif isinstance(self.model, RegressorMixin): # For OLS models, use 1D y data self.model.fit(X=self.pre_X, y=self.pre_y.isel(treated_units=0)) @@ -182,18 +166,7 @@ def __init__( # score the goodness of fit to the pre-intervention data if isinstance(self.model, PyMCModel): - is_bsts_like = isinstance( - self.model, (BayesianBasisExpansionTimeSeries, StateSpaceTimeSeries) - ) - if is_bsts_like: - X_score = self.pre_X.values if self.pre_X.shape[1] > 0 else None # type: ignore[attr-defined] - y_score = self.pre_y.isel(treated_units=0).values # type: ignore[attr-defined] - score_coords: dict[str, Any] = {"datetime_index": self.datapre.index} - if X_score is not None: - score_coords["coeffs"] = list(self.labels) - self.score = self.model.score(X=X_score, y=y_score, coords=score_coords) - else: - self.score = self.model.score(X=self.pre_X, y=self.pre_y) + self.score = self.model.score(X=self.pre_X, y=self.pre_y) elif isinstance(self.model, RegressorMixin): self.score = self.model.score( X=self.pre_X, y=self.pre_y.isel(treated_units=0) @@ -201,66 +174,20 @@ def __init__( # get the model predictions of the observed (pre-intervention) data if isinstance(self.model, PyMCModel): - is_bsts_like = isinstance( - self.model, (BayesianBasisExpansionTimeSeries, StateSpaceTimeSeries) - ) - if is_bsts_like: - X_pre_predict = self.pre_X.values if self.pre_X.shape[1] > 0 else None # type: ignore[attr-defined] - pre_pred_coords: dict[str, Any] = {"datetime_index": self.datapre.index} - self.pre_pred = self.model.predict( - X=X_pre_predict, coords=pre_pred_coords - ) - if not isinstance(self.pre_pred, az.InferenceData): - self.pre_pred = az.InferenceData(posterior_predictive=self.pre_pred) - else: - self.pre_pred = self.model.predict(X=self.pre_X) + self.pre_pred = self.model.predict(X=self.pre_X) elif isinstance(self.model, RegressorMixin): self.pre_pred = self.model.predict(X=self.pre_X) # calculate the counterfactual (post period) if isinstance(self.model, PyMCModel): - is_bsts_like = isinstance( - self.model, (BayesianBasisExpansionTimeSeries, StateSpaceTimeSeries) - ) - if is_bsts_like: - X_post_predict = ( - self.post_X.values if self.post_X.shape[1] > 0 else None # type: ignore[attr-defined] - ) - post_pred_coords: dict[str, Any] = { - "datetime_index": self.datapost.index - } - self.post_pred = self.model.predict( - X=X_post_predict, coords=post_pred_coords, out_of_sample=True - ) - if not isinstance(self.post_pred, az.InferenceData): - self.post_pred = az.InferenceData( - posterior_predictive=self.post_pred - ) - else: - self.post_pred = self.model.predict(X=self.post_X) + self.post_pred = self.model.predict(X=self.post_X, out_of_sample=True) elif isinstance(self.model, RegressorMixin): self.post_pred = self.model.predict(X=self.post_X) - # calculate impact - use appropriate y data format for each model type + # calculate impact - all PyMC models now use 2D data with treated_units if isinstance(self.model, PyMCModel): - is_bsts_like = isinstance( - self.model, (BayesianBasisExpansionTimeSeries, StateSpaceTimeSeries) - ) - if is_bsts_like: - pre_y_for_impact = self.pre_y.isel(treated_units=0) - post_y_for_impact = self.post_y.isel(treated_units=0) - self.pre_impact = self.model.calculate_impact( - pre_y_for_impact, self.pre_pred - ) - self.post_impact = self.model.calculate_impact( - post_y_for_impact, self.post_pred - ) - else: - # PyMC models with treated_units use 2D data - self.pre_impact = self.model.calculate_impact(self.pre_y, self.pre_pred) - self.post_impact = self.model.calculate_impact( - self.post_y, self.post_pred - ) + self.pre_impact = self.model.calculate_impact(self.pre_y, self.pre_pred) + self.post_impact = self.model.calculate_impact(self.post_y, self.post_pred) elif isinstance(self.model, RegressorMixin): # SKL models work with 1D data self.pre_impact = self.model.calculate_impact( diff --git a/causalpy/pymc_models.py b/causalpy/pymc_models.py index 9adc3bcf..5a94d640 100644 --- a/causalpy/pymc_models.py +++ b/causalpy/pymc_models.py @@ -1057,6 +1057,76 @@ class initialisation. return idata_outcome, model_outcome +def _convert_timeseries_data( + X: xr.DataArray, y: xr.DataArray, coords: Dict[str, Any] +) -> tuple[Optional[np.ndarray], np.ndarray, pd.DatetimeIndex, List[str]]: + """ + Convert xr.DataArray inputs to numpy arrays for time series models. + + This helper function maintains API consistency with other PyMCModel subclasses + by accepting xr.DataArray inputs and converting them internally to the numpy + format needed by time series models. + + Parameters + ---------- + X : xr.DataArray + Input features as an xarray DataArray with dims ["obs_ind", "coeffs"]. + y : xr.DataArray + Target variable as an xarray DataArray with dims ["obs_ind", "treated_units"]. + coords : dict + Coordinates dictionary. Must contain "datetime_index" (pd.DatetimeIndex). + + Returns + ------- + tuple + (X_numpy, y_numpy, datetime_index, coeff_names) + - X_numpy: numpy array or None if no exogenous variables + - y_numpy: 1D numpy array + - datetime_index: pd.DatetimeIndex + - coeff_names: list of coefficient names + """ + # Extract datetime_index from coords + datetime_index = coords.get("datetime_index") + if datetime_index is None: + # Try to extract from obs_ind coordinate if it's a datetime + if hasattr(X, "coords") and "obs_ind" in X.coords: + obs_ind = X.coords["obs_ind"].values + if len(obs_ind) > 0 and isinstance( + obs_ind[0], (np.datetime64, pd.Timestamp) + ): + datetime_index = pd.DatetimeIndex(obs_ind) + + if not isinstance(datetime_index, pd.DatetimeIndex): + raise ValueError( + "coords must contain 'datetime_index' of type pd.DatetimeIndex, " + "or X must have datetime obs_ind coordinates." + ) + + # Convert X from xr.DataArray to numpy + X_numpy: Optional[np.ndarray] = None + coeff_names: List[str] = [] + if isinstance(X, xr.DataArray): + if X.shape[1] > 0: + X_numpy = X.values + if "coeffs" in X.coords: + coeff_names = list(X.coords["coeffs"].values) + elif isinstance(X, np.ndarray): + if X.shape[1] > 0: + X_numpy = X + coeff_names = coords.get("coeffs", []) + + # Convert y from xr.DataArray to numpy (1D) + if isinstance(y, xr.DataArray): + if "treated_units" in y.dims: + y_numpy = y.isel(treated_units=0).values.flatten() + else: + y_numpy = y.values.flatten() + else: + y_numpy = np.asarray(y).flatten() + + return X_numpy, y_numpy, datetime_index, coeff_names + + class BayesianBasisExpansionTimeSeries(PyMCModel): r""" Bayesian Structural Time Series Model. @@ -1107,7 +1177,6 @@ def __init__( # Warn that this is experimental warnings.warn( "BayesianBasisExpansionTimeSeries is experimental and its API may change in future versions. " - "It uses a different data format (numpy arrays and datetime indices) compared to other PyMC models. " "Not recommended for production use.", FutureWarning, stacklevel=2, @@ -1271,31 +1340,37 @@ def _prepare_time_and_exog_features( return time_for_trend, time_for_seasonality, X_values_for_pymc, num_obs def build_model( - self, X: Optional[np.ndarray], y: np.ndarray, coords: Dict[str, Any] | None + self, X: xr.DataArray, y: xr.DataArray, coords: Dict[str, Any] | None ) -> None: """ Defines the PyMC model. Parameters ---------- - X : np.ndarray or None - NumPy array of exogenous regressors. Can be None if no exogenous variables. - y : np.ndarray - The target variable. + X : xr.DataArray + Input features with dims ["obs_ind", "coeffs"]. Can have 0 columns if + no exogenous variables. + y : xr.DataArray + Target variable with dims ["obs_ind", "treated_units"]. coords : dict Coordinates dictionary. Must contain "datetime_index" (pd.DatetimeIndex). - If X is provided and has columns, coords must also contain "coeffs" (List[str]). """ if coords is None: raise ValueError("coords must be provided with 'datetime_index'") - datetime_index = coords.pop("datetime_index", None) - if not isinstance(datetime_index, pd.DatetimeIndex): - raise ValueError( - "`coords` must contain 'datetime_index' of type pd.DatetimeIndex." - ) - # Get exog_names from coords["coeffs"] if X_exog_array is present - exog_names_from_coords = coords.get("coeffs") + # Make a copy to avoid modifying the original + coords = coords.copy() + + # Convert xr.DataArray to numpy internally + X_numpy, y_numpy, datetime_index, coeff_names = _convert_timeseries_data( + X, y, coords + ) + + # Pop datetime_index from coords copy (already extracted) + coords.pop("datetime_index", None) + + # Get exog_names from conversion or coords + exog_names_from_coords = coeff_names if coeff_names else coords.get("coeffs") ( time_for_trend, @@ -1303,11 +1378,12 @@ def build_model( X_values_for_pymc, # NumPy array for PyMC or None num_obs, ) = self._prepare_time_and_exog_features( - X, datetime_index, exog_names_from_coords + X_numpy, datetime_index, exog_names_from_coords ) model_coords = { "obs_ind": np.arange(num_obs), + "treated_units": ["unit_0"], # Add treated_units for consistency } # Start with a copy of the input coords (datetime_index was already popped) @@ -1322,37 +1398,30 @@ def build_model( elif isinstance(current_coeffs, tuple): model_coords["coeffs"] = list(current_coeffs) elif not isinstance(current_coeffs, list): - # If it's something else weird, raise error or clear it - # so self._exog_var_names can take precedence if needed. raise TypeError( f"Unexpected type for 'coeffs' in input coords: {type(current_coeffs)}" ) - # self._exog_var_names is the source of truth for coefficient names, ensure it's a list (done in _prepare) - # Override or set "coeffs" in model_coords based on self._exog_var_names + # self._exog_var_names is the source of truth for coefficient names if self._exog_var_names: if ( "coeffs" in model_coords and model_coords["coeffs"] != self._exog_var_names ): - # This implies a mismatch between what user provided in coords["coeffs"] - # and what _prepare_time_and_exog_features decided based on X and coords["coeffs"] - # This should ideally be caught earlier or be consistent. - # For now, let's assume _prepare_time_and_exog_features's derivation (self._exog_var_names) is correct. - print( - f"Warning: Discrepancy in 'coeffs'. Using derived: {self._exog_var_names} over input: {model_coords['coeffs']}" + warnings.warn( + f"Discrepancy in 'coeffs'. Using derived: {self._exog_var_names} " + f"over input: {model_coords['coeffs']}", + UserWarning, + stacklevel=2, ) model_coords["coeffs"] = self._exog_var_names # type: ignore[assignment] elif "coeffs" in model_coords and model_coords["coeffs"]: - # No exog vars determined by _prepare..., but coords has non-empty coeffs raise ValueError( f"Model determined no exogenous variables (self._exog_var_names is {self._exog_var_names}), " f"but input coords provided 'coeffs': {model_coords['coeffs']}. " f"If no exog vars, provide empty list or omit 'coeffs'." ) - elif ( - "coeffs" not in model_coords and self._exog_var_names - ): # Should not happen if logic is right + elif "coeffs" not in model_coords and self._exog_var_names: model_coords["coeffs"] = self._exog_var_names # type: ignore[assignment] with self: @@ -1370,7 +1439,7 @@ def build_model( dims="obs_ind", ) - # Get validated components (no more ugly imports in build_model!) + # Get validated components trend_component_instance = self._get_trend_component() seasonality_component_instance = self._get_seasonality_component() @@ -1393,11 +1462,7 @@ def build_model( mu_ = trend_component + season_component # Exogenous regressors (optional) - if ( - X_values_for_pymc is not None and self._exog_var_names - ): # self._exog_var_names is guaranteed list - # self.coords["coeffs"] should be an xarray.Coordinate object here. - # Its .values attribute is a numpy array. So list(self.coords["coeffs"].values) is a list. + if X_values_for_pymc is not None and self._exog_var_names: model_coord_coeffs_list = ( list(self.coords["coeffs"]) if "coeffs" in self.coords else [] ) @@ -1418,36 +1483,38 @@ def build_model( beta = pm.Normal("beta", mu=0, sigma=10, dims="coeffs") mu_ = mu_ + pm.math.dot(X_data, beta) - # Make mu_ an explicit deterministic variable named "mu" - mu = pm.Deterministic("mu", mu_, dims="obs_ind") + # Make mu_ an explicit deterministic variable with treated_units dimension + # Expand dims to include treated_units for consistency with other models + mu = pm.Deterministic("mu", mu_[:, None], dims=["obs_ind", "treated_units"]) - # Likelihood - sigma = pm.HalfNormal("sigma", sigma=self.prior_sigma) - y_data = pm.Data("y", y.flatten(), dims="obs_ind") - pm.Normal("y_hat", mu=mu, sigma=sigma, observed=y_data, dims="obs_ind") + # Likelihood - also with treated_units dimension + sigma = pm.HalfNormal("sigma", sigma=self.prior_sigma, dims="treated_units") + y_data = pm.Data("y", y_numpy[:, None], dims=["obs_ind", "treated_units"]) + pm.Normal( + "y_hat", + mu=mu, + sigma=sigma, + observed=y_data, + dims=["obs_ind", "treated_units"], + ) def fit( - self, - X: Optional[np.ndarray], - y: np.ndarray, - coords: Dict[str, Any] | None = None, + self, X: xr.DataArray, y: xr.DataArray, coords: Dict[str, Any] | None = None ) -> az.InferenceData: """Draw samples from posterior, prior predictive, and posterior predictive distributions, placing them in the model's idata attribute. + Parameters ---------- - X : np.ndarray or None - NumPy array of exogenous regressors. Can be None or an array with 0 columns - if no exogenous variables. - y : np.ndarray - The target variable. + X : xr.DataArray + Input features with dims ["obs_ind", "coeffs"]. Can have 0 columns if + no exogenous variables. + y : xr.DataArray + Target variable with dims ["obs_ind", "treated_units"]. coords : dict Coordinates dictionary. Must contain "datetime_index" (pd.DatetimeIndex). - If X is provided and has columns, coords must also contain "coeffs" (List[str]). """ - random_seed = self.sample_kwargs.get("random_seed", None) - # X can be None if no exog vars, _prepare_... handles it. self.build_model(X, y, coords=coords) with self: self.idata = pm.sample(**self.sample_kwargs) @@ -1456,61 +1523,61 @@ def fit( self.idata.extend( pm.sample_posterior_predictive( self.idata, - var_names=["y_hat", "mu"], # Ensure mu is sampled + var_names=["y_hat", "mu"], progressbar=self.sample_kwargs.get("progressbar", True), random_seed=random_seed, ) ) return self.idata # type: ignore[return-value] - def _data_setter( # type: ignore[override] - self, - X_pred: Optional[np.ndarray], - coords_pred: Dict[ - str, Any - ], # Must contain "datetime_index" for prediction period - ) -> None: + def _data_setter(self, X: xr.DataArray) -> None: """ Set data for the model for prediction. - X_pred contains exogenous variables for the prediction period. - coords_pred must contain "datetime_index" for the prediction period. + + Parameters + ---------- + X : xr.DataArray + Input features with dims ["obs_ind", "coeffs"]. Must have datetime + coordinates on obs_ind. """ - datetime_index_pred = coords_pred.get("datetime_index") - if not isinstance(datetime_index_pred, pd.DatetimeIndex): - raise ValueError( - "`coords_pred` must contain 'datetime_index' for prediction." - ) + # Extract datetime from X coordinates + if not hasattr(X, "coords") or "obs_ind" not in X.coords: + raise ValueError("X must have 'obs_ind' coordinate with datetime values") + + obs_ind_vals = X.coords["obs_ind"].values + if len(obs_ind_vals) == 0 or not isinstance( + obs_ind_vals[0], (np.datetime64, pd.Timestamp) + ): + raise ValueError("X 'obs_ind' coordinate must contain datetime values") + + datetime_index_pred = pd.DatetimeIndex(obs_ind_vals) + + # Convert X to numpy + X_numpy = X.values if X.shape[1] > 0 else None - # For _data_setter, exog_names are already known (self._exog_var_names from fit) - # We pass self._exog_var_names so _prepare_time_and_exog_features can validate - # the shape of X_pred_numpy if it's provided. ( time_for_trend_pred_vals, time_for_seasonality_pred_vals, - X_exog_pred_vals, # NumPy array for PyMC or None + X_exog_pred_vals, num_obs_pred, ) = self._prepare_time_and_exog_features( - X_pred, datetime_index_pred, self._exog_var_names + X_numpy, datetime_index_pred, self._exog_var_names ) new_obs_inds = np.arange(num_obs_pred) data_to_set = { - "y": np.zeros(num_obs_pred), + "y": np.zeros((num_obs_pred, 1)), # 2D for treated_units "t_trend_data": time_for_trend_pred_vals, "t_season_data": time_for_seasonality_pred_vals, } coords_to_set = {"obs_ind": new_obs_inds} - if ( - "X" in self.named_vars - ): # Model was built with exogenous variable X (i.e. self._exog_var_names is not empty) - if ( - X_exog_pred_vals is None and self._exog_var_names - ): # Check if exog_var_names expects something + if "X" in self.named_vars: + if X_exog_pred_vals is None and self._exog_var_names: raise ValueError( "Model was built with exogenous variables. " - "New X data (X_pred) must provide these (or index_for_time_pred if X_pred is array)." + "New X data must provide these." ) if ( self._exog_var_names @@ -1518,28 +1585,30 @@ def _data_setter( # type: ignore[override] and X_exog_pred_vals.shape[1] != len(self._exog_var_names) ): raise ValueError( - f"Shape mismatch for exogenous prediction variables. Expected {len(self._exog_var_names)} columns, " + f"Shape mismatch for exogenous prediction variables. " + f"Expected {len(self._exog_var_names)} columns, " f"got {X_exog_pred_vals.shape[1]}." ) - data_to_set["X"] = X_exog_pred_vals # Can be None if no exog vars + data_to_set["X"] = X_exog_pred_vals elif X_exog_pred_vals is not None: - print( - "Warning: X_pred provided exogenous variables, but the model was not " - "built with exogenous variables. These will be ignored." + warnings.warn( + "X provided exogenous variables, but the model was not " + "built with exogenous variables. These will be ignored.", + UserWarning, + stacklevel=2, ) - # Ensure "X" is set to None if no exog vars, even if "X" data var exists but model has no coeffs if not self._exog_var_names and "X" in self.named_vars: - # Pass an array with 0 columns for the X data variable if no exog vars expected if X_exog_pred_vals is not None and X_exog_pred_vals.shape[1] > 0: - # This should not happen if self._exog_var_names is empty - print( - "Warning: Model expects no exog vars, but X_exog_pred_vals has columns. Forcing to 0 columns." + warnings.warn( + "Model expects no exog vars, but X has columns. Forcing to 0 columns.", + UserWarning, + stacklevel=2, ) data_to_set["X"] = np.empty((num_obs_pred, 0)) elif X_exog_pred_vals is None: data_to_set["X"] = np.empty((num_obs_pred, 0)) - else: # X_exog_pred_vals has 0 columns already + else: data_to_set["X"] = X_exog_pred_vals with self: @@ -1547,50 +1616,72 @@ def _data_setter( # type: ignore[override] def predict( self, - X: Optional[np.ndarray], - coords: Dict[str, Any] - | None = None, # Must contain "datetime_index" for prediction period + X: xr.DataArray, + coords: Optional[Dict[str, Any]] = None, out_of_sample: Optional[bool] = False, **kwargs: Any, ) -> az.InferenceData: """ - Predict data given input X and coords for prediction period. - coords must contain "datetime_index". If X has columns, coords should also have "coeffs". - However, for prediction, exog var names are already known by the model. + Predict data given input X. + + Parameters + ---------- + X : xr.DataArray + Input features with dims ["obs_ind", "coeffs"]. Must have datetime + coordinates on obs_ind. + coords : dict, optional + Not used, kept for API compatibility. + out_of_sample : bool, optional + Not used, kept for API compatibility. + + Returns + ------- + az.InferenceData + Posterior predictive samples. """ - if coords is None: - raise ValueError("coords must be provided with 'datetime_index'") random_seed = self.sample_kwargs.get("random_seed", None) - self._data_setter(X, coords_pred=coords) + self._data_setter(X) with self: post_pred = pm.sample_posterior_predictive( self.idata, var_names=["y_hat", "mu"], - progressbar=self.sample_kwargs.get( - "progressbar", False - ), # Consistent with base + progressbar=self.sample_kwargs.get("progressbar", False), random_seed=random_seed, ) + + # Assign coordinates from input X for proper alignment + if isinstance(X, xr.DataArray) and "obs_ind" in X.coords: + post_pred["posterior_predictive"] = post_pred[ + "posterior_predictive" + ].assign_coords(obs_ind=X.obs_ind) + return post_pred def score( self, - X: Optional[np.ndarray], - y: np.ndarray, - coords: Dict[str, Any] - | None = None, # Must contain "datetime_index" for score period + X: xr.DataArray, + y: xr.DataArray, + coords: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> pd.Series: - """Score the Bayesian R2. - coords must contain "datetime_index". If X has columns, coords should also have "coeffs". - However, for scoring, exog var names are already known by the model. + """Score the Bayesian R^2. + + Parameters + ---------- + X : xr.DataArray + Input features with dims ["obs_ind", "coeffs"]. + y : xr.DataArray + Target variable with dims ["obs_ind", "treated_units"]. + coords : dict, optional + Not used, kept for API compatibility. + + Returns + ------- + pd.Series + R² score and standard deviation for each treated unit. """ - pred_output = self.predict(X, coords=coords) - mu_pred = az.extract( - pred_output, group="posterior_predictive", var_names="mu" - ).T.values - # Note: First argument must be a 1D array - return r2_score(y.flatten(), mu_pred) + # Use base class score method now that we have treated_units dimension + return super().score(X, y, coords=coords, **kwargs) class StateSpaceTimeSeries(PyMCModel): @@ -1610,7 +1701,7 @@ class StateSpaceTimeSeries(PyMCModel): sample_kwargs : dict, optional Kwargs passed to `pm.sample`. mode : str, optional - Mode passed to `build_statespace_graph` (e.g., "JAX"). + Pytensor compile mode passed to `build_statespace_graph`. Defaults to None. """ def __init__( @@ -1620,15 +1711,13 @@ def __init__( trend_component: Optional[Any] = None, seasonality_component: Optional[Any] = None, sample_kwargs: Optional[Dict[str, Any]] = None, - mode: str = "JAX", + mode: Optional[str] = None, ): super().__init__(sample_kwargs=sample_kwargs) # Warn that this is experimental warnings.warn( "StateSpaceTimeSeries is experimental and its API may change in future versions. " - "It uses a different data format (numpy arrays and datetime indices) compared to other PyMC models, " - "and returns xr.Dataset instead of az.InferenceData from predict(). " "Not recommended for production use.", FutureWarning, stacklevel=2, @@ -1704,20 +1793,30 @@ def _get_seasonality_component(self): return self._seasonality_component def build_model( - self, X: Optional[np.ndarray], y: np.ndarray, coords: Dict[str, Any] | None + self, X: xr.DataArray, y: xr.DataArray, coords: Dict[str, Any] | None ) -> None: """ - Build the PyMC state-space model. `coords` must include: - - 'datetime_index': a pandas.DatetimeIndex matching `y`. + Build the PyMC state-space model. + + Parameters + ---------- + X : xr.DataArray + Input features with dims ["obs_ind", "coeffs"]. Not used by state-space + models, but kept for API compatibility. + y : xr.DataArray + Target variable with dims ["obs_ind", "treated_units"]. + coords : dict + Coordinates dictionary. Must contain "datetime_index" (pd.DatetimeIndex). """ if coords is None: raise ValueError("coords must be provided with 'datetime_index'") coords = coords.copy() - datetime_index = coords.pop("datetime_index", None) - if not isinstance(datetime_index, pd.DatetimeIndex): - raise ValueError( - "coords must contain 'datetime_index' of type pandas.DatetimeIndex." - ) + + # Convert xr.DataArray to numpy internally + _, y_numpy, datetime_index, _ = _convert_timeseries_data(X, y, coords) + + # Pop datetime_index from coords copy (already extracted) + coords.pop("datetime_index", None) self._train_index = datetime_index # Instantiate components and build state-space object @@ -1752,19 +1851,30 @@ def build_model( _sigma_monthly_season = pm.Gamma("sigma_freq", alpha=2, beta=1) # Attach the state-space graph using the observed data - df = pd.DataFrame({"y": y.flatten()}, index=datetime_index) + df = pd.DataFrame({"y": y_numpy.flatten()}, index=datetime_index) if self.ss_mod is not None: self.ss_mod.build_statespace_graph(df[["y"]], mode=self.mode) def fit( - self, - X: Optional[np.ndarray], - y: np.ndarray, - coords: Dict[str, Any] | None = None, + self, X: xr.DataArray, y: xr.DataArray, coords: Dict[str, Any] | None = None ) -> az.InferenceData: """ Fit the model, drawing posterior samples. - Returns the InferenceData with parameter draws. + + Parameters + ---------- + X : xr.DataArray + Input features with dims ["obs_ind", "coeffs"]. Not used by state-space + models, but kept for API compatibility. + y : xr.DataArray + Target variable with dims ["obs_ind", "treated_units"]. + coords : dict + Coordinates dictionary. Must contain "datetime_index" (pd.DatetimeIndex). + + Returns + ------- + az.InferenceData + InferenceData with parameter draws. """ self.build_model(X, y, coords) if self.second_model is None: @@ -1780,7 +1890,8 @@ def fit( self.conditional_idata = self._smooth() return self._prepare_idata() - def _prepare_idata(self): + def _prepare_idata(self) -> az.InferenceData: + """Prepare InferenceData with proper dimensions including treated_units.""" if self.idata is None: raise RuntimeError("Model must be fit before smoothing.") @@ -1797,8 +1908,11 @@ def _prepare_idata(self): else: y_hat_final = y_hat_summed - new_idata["posterior_predictive"]["y_hat"] = y_hat_final - new_idata["posterior_predictive"]["mu"] = y_hat_final + # Add treated_units dimension for consistency with other models + y_hat_with_units = y_hat_final.expand_dims({"treated_units": ["unit_0"]}) + + new_idata["posterior_predictive"]["y_hat"] = y_hat_with_units + new_idata["posterior_predictive"]["mu"] = y_hat_with_units return new_idata @@ -1825,58 +1939,95 @@ def _forecast(self, start: pd.Timestamp, periods: int) -> xr.Dataset: def predict( self, - X: Optional[np.ndarray], - coords: Dict[str, Any] | None = None, + X: xr.DataArray, + coords: Optional[Dict[str, Any]] = None, out_of_sample: Optional[bool] = False, **kwargs: Any, - ) -> xr.Dataset: + ) -> az.InferenceData: """ - Wrapper around forecast: expects coords with 'datetime_index' of future points. + Predict data given input X. + + Parameters + ---------- + X : xr.DataArray + Input features with dims ["obs_ind", "coeffs"]. Must have datetime + coordinates on obs_ind for out-of-sample predictions. + coords : dict, optional + Not used directly, datetime extracted from X coordinates. + out_of_sample : bool, optional + If True, forecast future values. If False, return in-sample predictions. + + Returns + ------- + az.InferenceData + Posterior predictive samples with y_hat and mu. """ if not out_of_sample: return self._prepare_idata() else: - if coords is None: - raise ValueError("coords must be provided for out-of-sample prediction") - idx = coords.get("datetime_index") - if not isinstance(idx, pd.DatetimeIndex): + # Extract datetime from X coordinates + if not hasattr(X, "coords") or "obs_ind" not in X.coords: raise ValueError( - "coords must contain 'datetime_index' for prediction period." + "X must have 'obs_ind' coordinate with datetime values for prediction" ) + + obs_ind_vals = X.coords["obs_ind"].values + if len(obs_ind_vals) == 0 or not isinstance( + obs_ind_vals[0], (np.datetime64, pd.Timestamp) + ): + raise ValueError("X 'obs_ind' coordinate must contain datetime values") + + idx = pd.DatetimeIndex(obs_ind_vals) last = self._train_index[-1] # start forecasting after the last observed - temp_idata = self._forecast(start=last, periods=len(idx)) - new_idata = temp_idata.copy() + forecast_data = self._forecast(start=last, periods=len(idx)) + forecast_copy = forecast_data.copy() # Rename 'time' to 'obs_ind' to match CausalPy conventions - if "time" in new_idata.dims: - new_idata = new_idata.rename({"time": "obs_ind"}) + if "time" in forecast_copy.dims: + forecast_copy = forecast_copy.rename({"time": "obs_ind"}) + + # Extract the forecasted observed data and add treated_units dimension + y_hat = forecast_copy["forecast_observed"].isel(observed_state=0) + y_hat_with_units = y_hat.expand_dims({"treated_units": ["unit_0"]}) - # Extract the forecasted observed data and assign it to 'y_hat' - new_idata["y_hat"] = new_idata["forecast_observed"].isel(observed_state=0) + # Wrap in InferenceData for consistency + result = az.InferenceData( + posterior_predictive=xr.Dataset( + {"y_hat": y_hat_with_units, "mu": y_hat_with_units} + ) + ) - # Assign 'y_hat' to 'mu' for consistency - new_idata["mu"] = new_idata["y_hat"] + # Assign coordinates from input X for proper alignment + if isinstance(X, xr.DataArray) and "obs_ind" in X.coords: + result["posterior_predictive"] = result[ + "posterior_predictive" + ].assign_coords(obs_ind=X.obs_ind) - return new_idata + return result def score( self, - X: Optional[np.ndarray], - y: np.ndarray, - coords: Dict[str, Any] | None = None, + X: xr.DataArray, + y: xr.DataArray, + coords: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> pd.Series: """ Compute R^2 between observed and mean forecast. - """ - pred = self.predict(X, coords) - fc = pred["posterior_predictive"]["y_hat"] # .isel(observed_state=0) - # Use all posterior samples to compute Bayesian R² - # fc has shape (chain, draw, time), we want (n_samples, time) - fc_samples = fc.stack( - sample=["chain", "draw"] - ).T.values # Shape: (time, n_samples) + Parameters + ---------- + X : xr.DataArray + Input features with dims ["obs_ind", "coeffs"]. + y : xr.DataArray + Target variable with dims ["obs_ind", "treated_units"]. + coords : dict, optional + Not used, kept for API compatibility. - # Use arviz.r2_score to get both r2 and r2_std - return r2_score(y.flatten(), fc_samples) + Returns + ------- + pd.Series + R² score and standard deviation for each treated unit. + """ + # Use base class score method now that we have treated_units dimension + return super().score(X, y, coords=coords, **kwargs) diff --git a/causalpy/tests/test_integration_its_new_timeseries.py b/causalpy/tests/test_integration_its_new_timeseries.py index 80bd5d03..4918382a 100644 --- a/causalpy/tests/test_integration_its_new_timeseries.py +++ b/causalpy/tests/test_integration_its_new_timeseries.py @@ -84,7 +84,7 @@ def test_its_with_state_space_model(): """ # Skip if pymc-extras is not available try: - import pymc_extras.statespace.structural # noqa: F401 + from pymc_extras.statespace import structural # noqa: F401 except ImportError: pytest.skip("pymc-extras is required for StateSpaceTimeSeries tests") @@ -111,7 +111,7 @@ def test_its_with_state_space_model(): level_order=2, seasonal_length=7, sample_kwargs=sample_kwargs, - mode="PyMC", + mode="FAST_COMPILE", ) result = cp.InterruptedTimeSeries( diff --git a/causalpy/tests/test_integration_pymc_examples.py b/causalpy/tests/test_integration_pymc_examples.py index 00068507..c0626e73 100644 --- a/causalpy/tests/test_integration_pymc_examples.py +++ b/causalpy/tests/test_integration_pymc_examples.py @@ -808,18 +808,30 @@ def test_bayesian_structural_time_series(): # --- Test Case 1: Model with exogenous regressor --- # coords_with_x = { - "obs_ind": np.arange(n_obs), + "obs_ind": dates, # Use dates directly for xarray coords "coeffs": ["x1"], + "treated_units": ["unit_0"], "datetime_index": dates, - # "time_for_seasonality": day_of_year, # Not used by model directly from coords - # "time_for_trend": time_numeric, # Not used by model directly from coords } + + # Create DataArrays for input to match new API + X_da = xr.DataArray( + data_with_x[["x1"]].values, + dims=["obs_ind", "coeffs"], + coords={"obs_ind": dates, "coeffs": ["x1"]}, + ) + y_da = xr.DataArray( + data_with_x["y"].values[:, None], + dims=["obs_ind", "treated_units"], + coords={"obs_ind": dates, "treated_units": ["unit_0"]}, + ) + model_with_x = cp.pymc_models.BayesianBasisExpansionTimeSeries( n_order=2, n_changepoints_trend=5, sample_kwargs=bsts_sample_kwargs ) model_with_x.fit( - X=data_with_x[["x1"]].values, - y=data_with_x["y"].values.reshape(-1, 1), + X=X_da, + y=y_da, coords=coords_with_x.copy(), # Pass a copy ) assert isinstance(model_with_x.idata, az.InferenceData) @@ -838,32 +850,45 @@ def test_bayesian_structural_time_series(): assert "y_hat" in model_with_x.idata.posterior_predictive predictions_with_x = model_with_x.predict( - X=data_with_x[["x1"]].values, + X=X_da, coords=coords_with_x, # Original coords_with_x is fine here ) assert isinstance(predictions_with_x, az.InferenceData) score_with_x = model_with_x.score( - X=data_with_x[["x1"]].values, - y=data_with_x["y"].values.reshape(-1, 1), + X=X_da, + y=y_da, coords=coords_with_x, # Original coords_with_x is fine here ) assert isinstance(score_with_x, pd.Series) # --- Test Case 2: Model without exogenous regressor --- # - data_for_no_exog = None coords_no_x = { - "obs_ind": np.arange(n_obs), + "obs_ind": dates, + "treated_units": ["unit_0"], "datetime_index": dates, # "coeffs": [], # Explicitly empty or omitted if X is None - # "time_for_seasonality": day_of_year, # Not used - # "time_for_trend": time_numeric, # Not used } + + y_da_no_x = xr.DataArray( + data_no_x["y"].values[:, None], + dims=["obs_ind", "treated_units"], + coords={"obs_ind": dates, "treated_units": ["unit_0"]}, + ) + + # Create X_da_no_x (empty coeffs) to provide time index for predict + X_da_no_x = xr.DataArray( + np.zeros((len(dates), 0)), # 0 coeffs + dims=["obs_ind", "coeffs"], + coords={"obs_ind": dates, "coeffs": []}, + ) + model_no_x = cp.pymc_models.BayesianBasisExpansionTimeSeries( n_order=2, n_changepoints_trend=5, sample_kwargs=bsts_sample_kwargs ) + model_no_x.fit( - X=data_for_no_exog, - y=data_no_x["y"].values.reshape(-1, 1), + X=X_da_no_x, + y=y_da_no_x, coords=coords_no_x.copy(), # Pass a copy ) assert isinstance(model_no_x.idata, az.InferenceData) @@ -874,43 +899,47 @@ def test_bayesian_structural_time_series(): assert "sigma" in model_no_x.idata.posterior predictions_no_x = model_no_x.predict( - X=data_for_no_exog, + X=X_da_no_x, coords=coords_no_x, # Original coords_no_x is fine ) assert isinstance(predictions_no_x, az.InferenceData) score_no_x = model_no_x.score( - X=data_for_no_exog, - y=data_no_x["y"].values.reshape(-1, 1), + X=X_da_no_x, + y=y_da_no_x, coords=coords_no_x, # Original coords_no_x is fine ) assert isinstance(score_no_x, pd.Series) # --- Test Case 3: Model with empty exogenous regressor (X has 0 columns) --- # - # This is similar to Test Case 2. Model should handle X=np.empty((n_obs,0)) - data_empty_x_array = np.empty((n_obs, 0)) + # This is similar to Test Case 2. Model should handle X with 0 columns coords_empty_x = { # Coords for 0 exog vars - "obs_ind": np.arange(n_obs), + "obs_ind": dates, + "treated_units": ["unit_0"], "datetime_index": dates, "coeffs": [], # Must be empty list if X has 0 columns and 'coeffs' is provided } + + # Reuse X_da_no_x from Test Case 2 as it has 0 columns and correct coords + # Reuse y_da_no_x from Test Case 2 + model_empty_x = cp.pymc_models.BayesianBasisExpansionTimeSeries( n_order=2, n_changepoints_trend=5, sample_kwargs=bsts_sample_kwargs ) model_empty_x.fit( - X=data_empty_x_array, - y=data_no_x["y"].values.reshape(-1, 1), + X=X_da_no_x, + y=y_da_no_x, coords=coords_empty_x.copy(), # Pass a copy ) assert isinstance(model_empty_x.idata, az.InferenceData) predictions_empty_x = model_empty_x.predict( - X=data_empty_x_array, + X=X_da_no_x, coords=coords_empty_x, # Original coords_empty_x is fine ) assert isinstance(predictions_empty_x, az.InferenceData) score_empty_x = model_empty_x.score( - X=data_empty_x_array, - y=data_no_x["y"].values.reshape(-1, 1), + X=X_da_no_x, + y=y_da_no_x, coords=coords_empty_x, # Original coords_empty_x is fine ) assert isinstance(score_empty_x, pd.Series) @@ -918,30 +947,44 @@ def test_bayesian_structural_time_series(): # --- Test Case 4: Model with incorrect coord/data setup (ValueErrors) --- # with pytest.raises( ValueError, - match=r"`coords` must contain 'datetime_index' of type pd\.DatetimeIndex\.", + match=r"coords must contain 'datetime_index' of type pd\.DatetimeIndex", ): model_error_idx = cp.pymc_models.BayesianBasisExpansionTimeSeries( sample_kwargs=bsts_sample_kwargs ) bad_dt_idx_coords = coords_with_x.copy() bad_dt_idx_coords["datetime_index"] = np.arange(n_obs) # Not a DatetimeIndex + + # Using DataArrays here too for consistency, though check happens on coords dict model_error_idx.fit( - X=data_with_x[["x1"]].values, - y=data_with_x["y"].values.reshape(-1, 1), + X=X_da, + y=y_da, coords=bad_dt_idx_coords.copy(), # Pass a copy ) with pytest.raises(ValueError, match="Model was built with exogenous variables"): - model_with_x.predict(X=None, coords=coords_with_x) + # Pass X with no exogenous vars (X_da_no_x) to model expecting vars (model_with_x) + # This checks that we can't predict without supplying the expected exog vars + model_with_x.predict(X=X_da_no_x, coords=coords_with_x) with pytest.raises( ValueError, - match=r"Mismatch: X_exog_array has 2 columns, but 1 names provided\.", + match=r"Mismatch: X_exog_array has 2 columns, but 1 names provided", ): wrong_shape_x_pred_vals = np.hstack( [data_with_x[["x1"]].values, data_with_x[["x1"]].values] ) # 2 columns - model_with_x.predict(X=wrong_shape_x_pred_vals, coords=coords_with_x) + + X_wrong_shape = xr.DataArray( + wrong_shape_x_pred_vals, + dims=["obs_ind", "coeffs"], + coords={ + "obs_ind": dates, + "coeffs": ["x1", "x2"], # 2 coeffs + }, + ) + + model_with_x.predict(X=X_wrong_shape, coords=coords_with_x) @pytest.mark.integration @@ -965,7 +1008,7 @@ def test_state_space_time_series(): """ # Check if pymc-extras is available try: - import pymc_extras.statespace.structural # noqa: F401 + from pymc_extras.statespace import structural # noqa: F401 except ImportError: pytest.skip("pymc-extras is required for InterruptedTimeSeries tests") @@ -1001,113 +1044,131 @@ def test_state_space_time_series(): "datetime_index": dates, } + # Create DataArray for y to support score() which requires xarray + y_da = xr.DataArray( + data["y"].values.reshape(-1, 1), + dims=["obs_ind", "treated_units"], + coords={"obs_ind": coords["obs_ind"], "treated_units": ["unit_0"]}, + ) + # Initialize model with PyMC mode (more stable than JAX for testing) - model = cp.pymc_models.InterruptedTimeSeries( + model = cp.pymc_models.StateSpaceTimeSeries( level_order=2, # Local linear trend (level + slope) seasonal_length=7, # Weekly seasonality for shorter test period sample_kwargs=ss_sample_kwargs, - mode="PyMC", # Use PyMC mode instead of JAX for better compatibility + mode="FAST_COMPILE", # Use PyMC mode instead of JAX for better compatibility ) # Test the complete workflow - try: - # --- Test Case 1: Model fitting --- # - idata = model.fit( - X=None, # No exogenous variables for state-space model - y=data["y"].values.reshape(-1, 1), - coords=coords.copy(), - ) + # --- Test Case 1: Model fitting --- # + idata = model.fit( + X=None, # No exogenous variables for state-space model + y=y_da, + coords=coords.copy(), + ) - # Verify inference data structure - assert isinstance(idata, az.InferenceData) - assert "posterior" in idata - assert "posterior_predictive" in idata - - # Check for expected state-space parameters - expected_params = [ - "P0_diag", - "initial_trend", - "freq", - "sigma_trend", - "sigma_freq", - ] - for param in expected_params: - assert param in idata.posterior, f"Parameter {param} not found in posterior" - - # Check for expected posterior predictive variables - assert "y_hat" in idata.posterior_predictive - assert "mu" in idata.posterior_predictive - - # --- Test Case 2: In-sample prediction --- # - predictions_in_sample = model.predict( - X=None, - coords=coords, - out_of_sample=False, - ) - assert isinstance(predictions_in_sample, az.InferenceData) - assert "posterior_predictive" in predictions_in_sample - assert "y_hat" in predictions_in_sample.posterior_predictive - assert "mu" in predictions_in_sample.posterior_predictive - - # --- Test Case 3: Out-of-sample prediction (forecasting) --- # - future_dates = pd.date_range(start="2020-04-01", end="2020-04-07", freq="D") - future_coords = { - "datetime_index": future_dates, - } - - predictions_out_sample = model.predict( - X=None, - coords=future_coords, - out_of_sample=True, - ) - assert isinstance(predictions_out_sample, xr.Dataset) - assert "y_hat" in predictions_out_sample - assert "mu" in predictions_out_sample + # Verify inference data structure + assert isinstance(idata, az.InferenceData) + assert "posterior" in idata + assert "posterior_predictive" in idata + + # Check for expected state-space parameters + expected_params = [ + "P0_diag", + "initial_level_trend", + "params_freq", + "sigma_level_trend", + "sigma_freq", + ] + for param in expected_params: + assert param in idata.posterior, f"Parameter {param} not found in posterior" + + # Check for expected posterior predictive variables + assert "y_hat" in idata.posterior_predictive + assert "mu" in idata.posterior_predictive + + # --- Test Case 2: In-sample prediction --- # + predictions_in_sample = model.predict( + X=None, + coords=coords, + out_of_sample=False, + ) + assert isinstance(predictions_in_sample, az.InferenceData) + assert "posterior_predictive" in predictions_in_sample + assert "y_hat" in predictions_in_sample.posterior_predictive + assert "mu" in predictions_in_sample.posterior_predictive + + # --- Test Case 3: Out-of-sample prediction (forecasting) --- # + future_dates = pd.date_range(start="2020-04-01", end="2020-04-07", freq="D") + future_coords = { + "datetime_index": future_dates, + } + # Create dummy X for forecasting (needs time index) + future_X = xr.DataArray( + np.zeros((len(future_dates), 0)), + dims=["obs_ind", "coeffs"], + coords={"obs_ind": future_dates, "coeffs": []}, + ) - # Verify forecast has correct dimensions - assert predictions_out_sample["y_hat"].shape[-1] == len(future_dates) + predictions_out_sample = model.predict( + X=future_X, + coords=future_coords, + out_of_sample=True, + ) + # Note: predict now returns InferenceData, not Dataset! + # But let's check what the test expects. + # The previous code expected xr.Dataset: + # assert isinstance(predictions_out_sample, xr.Dataset) + # I updated predict() to return az.InferenceData. + # So I should update this assertion too. + + assert isinstance(predictions_out_sample, az.InferenceData) + assert "y_hat" in predictions_out_sample.posterior_predictive + assert "mu" in predictions_out_sample.posterior_predictive + + # Verify forecast has correct dimensions + # y_hat is in posterior_predictive group + assert predictions_out_sample.posterior_predictive["y_hat"].shape[-1] == len( + future_dates + ) - # --- Test Case 4: Model scoring --- # - score = model.score( - X=None, - y=data["y"].values.reshape(-1, 1), - coords=coords, - ) - assert isinstance(score, pd.Series) - assert "r2" in score.index - assert "r2_std" in score.index - # R² should be reasonable for synthetic data with clear structure - assert score["r2"] > 0.0, "R² should be positive for structured synthetic data" - - # --- Test Case 5: Model components verification --- # - # Test that the model has the expected state-space structure - assert hasattr(model, "ss_mod") - assert model.ss_mod is not None - assert hasattr(model, "_train_index") - assert isinstance(model._train_index, pd.DatetimeIndex) - - # Test conditional inference data - assert hasattr(model, "conditional_idata") - assert isinstance(model.conditional_idata, xr.Dataset) - - # Verify model parameters match initialization - assert model.level_order == 2 - assert model.seasonal_length == 7 - assert model.mode == "PyMC" - - except Exception as e: - # If there are still compatibility issues, skip the test with a warning - pytest.skip( - f"InterruptedTimeSeries test skipped due to compatibility issue: {e}" - ) + # --- Test Case 4: Model scoring --- # + score = model.score( + X=None, + y=y_da, + coords=coords, + ) + assert isinstance(score, pd.Series) + assert "unit_0_r2" in score.index + assert "unit_0_r2_std" in score.index + # R² should be reasonable for synthetic data with clear structure + assert score["unit_0_r2"] > 0.0, ( + "R² should be positive for structured synthetic data" + ) + + # --- Test Case 5: Model components verification --- # + # Test that the model has the expected state-space structure + assert hasattr(model, "ss_mod") + assert model.ss_mod is not None + assert hasattr(model, "_train_index") + assert isinstance(model._train_index, pd.DatetimeIndex) + + # Test conditional inference data + assert hasattr(model, "conditional_idata") + assert isinstance(model.conditional_idata, xr.Dataset) + + # Verify model parameters match initialization + assert model.level_order == 2 + assert model.seasonal_length == 7 + assert model.mode == "FAST_COMPILE" # --- Test Case 6: Error handling --- # # Test with invalid datetime_index with pytest.raises( ValueError, - match="coords must contain 'datetime_index' of type pandas.DatetimeIndex.", + match=r"coords must contain 'datetime_index' of type pd\.DatetimeIndex", ): - model_error = cp.pymc_models.InterruptedTimeSeries( + model_error = cp.pymc_models.StateSpaceTimeSeries( sample_kwargs=ss_sample_kwargs ) bad_coords = coords.copy() @@ -1118,10 +1179,10 @@ def test_state_space_time_series(): coords=bad_coords, ) - # Test prediction with invalid coords + # Test prediction with invalid coords (missing X) with pytest.raises( ValueError, - match="coords must contain 'datetime_index' for prediction period.", + match="X must have 'obs_ind' coordinate with datetime values", ): model.predict( X=None, @@ -1130,9 +1191,7 @@ def test_state_space_time_series(): ) # Test methods before fitting - unfitted_model = cp.pymc_models.InterruptedTimeSeries( - sample_kwargs=ss_sample_kwargs - ) + unfitted_model = cp.pymc_models.StateSpaceTimeSeries(sample_kwargs=ss_sample_kwargs) with pytest.raises(RuntimeError, match="Model must be fit before"): unfitted_model._smooth() @@ -1142,20 +1201,20 @@ def test_state_space_time_series(): # --- Test Case 7: Model initialization with different parameters --- # # Test different level orders - model_level1 = cp.pymc_models.InterruptedTimeSeries( + model_level1 = cp.pymc_models.StateSpaceTimeSeries( level_order=1, # Local level only (no slope) seasonal_length=7, sample_kwargs=ss_sample_kwargs, - mode="PyMC", + mode="FAST_COMPILE", ) assert model_level1.level_order == 1 # Test different seasonal lengths - model_monthly = cp.pymc_models.InterruptedTimeSeries( + model_monthly = cp.pymc_models.StateSpaceTimeSeries( level_order=2, seasonal_length=30, # Monthly seasonality sample_kwargs=ss_sample_kwargs, - mode="PyMC", + mode="FAST_COMPILE", ) assert model_monthly.seasonal_length == 30 diff --git a/docs/dev/its_pymc copy.ipynb b/docs/dev/its_pymc copy.ipynb index 084ce758..ae5debda 100644 --- a/docs/dev/its_pymc copy.ipynb +++ b/docs/dev/its_pymc copy.ipynb @@ -222,7 +222,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "903bc78c081e49a2bbd2548eacfa7d30", + "model_id": "a87e63336b3a4ceeb2afb57542bd5af3", "version_major": 2, "version_minor": 0 }, @@ -874,7 +874,7 @@ " * treated_units (treated_units) <U6 24B 'unit_0'\n", " * chain (chain) int64 32B 0 1 2 3\n", " * draw (draw) int64 8kB 0 1 2 3 4 5 6 ... 994 995 996 997 998 999\n", - " * obs_ind (obs_ind) datetime64[ns] 288B 2017-01-31 ... 2019-12-31