diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index 3e432aea4..a07d089e4 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -16,7 +16,7 @@ from ..operator_config import ForecastOperatorConfig import traceback from .forecast_datasets import ForecastDatasets, ForecastOutput -from ..const import ForecastOutputColumns +from ..const import ForecastOutputColumns, SupportedModels class ArimaOperatorModel(ForecastOperatorBaseModel): @@ -80,8 +80,10 @@ def _train_model(self, i, target, df): if len(additional_regressors): X_in = data_i.drop(target, axis=1) - # Build and fit model - model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs) + model = self.loaded_models[target] if self.loaded_models is not None else None + if model is None: + # Build and fit model + model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs) self.fitted_values[target] = model.predict_in_sample(X=X_in) self.actual_values[target] = y @@ -119,7 +121,17 @@ def _train_model(self, i, target, df): ) self.outputs[target] = forecast - self.models_dict[target] = model + if self.loaded_models is None: + self.models[target] = model + + params = vars(model).copy() + for param in ['arima_res_', 'endog_index_']: + if param in params: + params.pop(param) + self.model_parameters[target] = { + "framework": SupportedModels.Arima, + **params, + } logger.debug("===========Done===========") except Exception as e: @@ -133,12 +145,12 @@ def _build_model(self) -> pd.DataFrame: confidence_interval_width=self.spec.confidence_interval_width ) + self.models = dict() self.outputs = dict() self.outputs_legacy = [] self.fitted_values = dict() self.actual_values = dict() self.dt_columns = dict() - self.models_dict = dict() self.errors_dict = dict() Parallel(n_jobs=-1, require="sharedmem")( @@ -148,13 +160,15 @@ def _build_model(self) -> pd.DataFrame: ) ) - self.models = [self.models_dict[target] for target in self.target_columns] + if self.loaded_models is not None: + self.models = self.loaded_models # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column output_col = pd.DataFrame() yhat_upper_name = ForecastOutputColumns.UPPER_BOUND yhat_lower_name = ForecastOutputColumns.LOWER_BOUND + for cat in self.categories: output_i = pd.DataFrame() output_i["Date"] = self.dt_columns[f"{col}_{cat}"] @@ -183,8 +197,8 @@ def _generate_report(self): sec5_text = dp.Text(f"## ARIMA Model Parameters") blocks = [ - dp.HTML(m.summary().as_html(), label=self.target_columns[i]) - for i, m in enumerate(self.models) + dp.HTML(m.summary().as_html(), label=target) + for i, (target, m) in enumerate(self.models.items()) ] sec5 = dp.Select(blocks=blocks) if len(blocks) > 1 else blocks[0] all_sections = [sec5_text, sec5] @@ -196,7 +210,6 @@ def _generate_report(self): datetime_col_name=self.spec.datetime_column.name, explain_predict_fn=self._custom_predict_arima, ) - # Create a markdown text block for the global explanation section global_explanation_text = dp.Text( f"## Global Explanation of Models \n " @@ -277,10 +290,8 @@ def _custom_predict_arima(self, data): date_col = self.spec.datetime_column.name data[date_col] = pd.to_datetime(data[date_col], unit="s") data = data.set_index(date_col) - # Get the index of the current series id - series_index = self.target_columns.index(self.series_id) # Use the ARIMA model to predict the values - predictions = self.models[series_index].predict(X=data, n_periods=len(data)) + predictions = self.models[self.series_id].predict(X=data, n_periods=len(data)) return predictions diff --git a/ads/opctl/operator/lowcode/forecast/model/automlx.py b/ads/opctl/operator/lowcode/forecast/model/automlx.py index e0f36f995..2eac20062 100644 --- a/ads/opctl/operator/lowcode/forecast/model/automlx.py +++ b/ads/opctl/operator/lowcode/forecast/model/automlx.py @@ -10,7 +10,7 @@ from ads.common.decorator.runtime_dependency import runtime_dependency from ads.opctl.operator.lowcode.forecast.const import ( AUTOMLX_METRIC_MAP, - ForecastOutputColumns, + ForecastOutputColumns, SupportedModels, ) from ads.opctl import logger @@ -60,7 +60,6 @@ def _build_model(self) -> pd.DataFrame: models = dict() outputs = dict() outputs_legacy = dict() - selected_models = dict() date_column = self.spec.datetime_column.name horizon = self.spec.horizon self.datasets.datetime_col = date_column @@ -71,19 +70,22 @@ def _build_model(self) -> pd.DataFrame: self.errors_dict = dict() # Clean up kwargs for pass through - model_kwargs_cleaned = self.spec.model_kwargs.copy() - model_kwargs_cleaned["n_algos_tuned"] = model_kwargs_cleaned.get( - "n_algos_tuned", AUTOMLX_N_ALGOS_TUNED - ) - model_kwargs_cleaned["score_metric"] = AUTOMLX_METRIC_MAP.get( - self.spec.metric, - model_kwargs_cleaned.get("score_metric", AUTOMLX_DEFAULT_SCORE_METRIC), - ) - model_kwargs_cleaned.pop("task", None) - time_budget = model_kwargs_cleaned.pop("time_budget", 0) - model_kwargs_cleaned[ - "preprocessing" - ] = self.spec.preprocessing or model_kwargs_cleaned.get("preprocessing", True) + model_kwargs_cleaned = None + + if self.loaded_models is None: + model_kwargs_cleaned = self.spec.model_kwargs.copy() + model_kwargs_cleaned["n_algos_tuned"] = model_kwargs_cleaned.get( + "n_algos_tuned", AUTOMLX_N_ALGOS_TUNED + ) + model_kwargs_cleaned["score_metric"] = AUTOMLX_METRIC_MAP.get( + self.spec.metric, + model_kwargs_cleaned.get("score_metric", AUTOMLX_DEFAULT_SCORE_METRIC), + ) + model_kwargs_cleaned.pop("task", None) + time_budget = model_kwargs_cleaned.pop("time_budget", 0) + model_kwargs_cleaned[ + "preprocessing" + ] = self.spec.preprocessing or model_kwargs_cleaned.get("preprocessing", True) for i, (target, df) in enumerate(full_data_dict.items()): try: @@ -107,15 +109,18 @@ def _build_model(self) -> pd.DataFrame: if y_train.index.is_monotonic else "NOT" + "monotonic." ) - model = automl.Pipeline( - task="forecasting", - **model_kwargs_cleaned, - ) - model.fit( - X=y_train.drop(target, axis=1), - y=pd.DataFrame(y_train[target]), - time_budget=time_budget, - ) + model = self.loaded_models[target] if self.loaded_models is not None else None + + if model is None: + model = automl.Pipeline( + task="forecasting", + **model_kwargs_cleaned, + ) + model.fit( + X=y_train.drop(target, axis=1), + y=pd.DataFrame(y_train[target]), + time_budget=time_budget, + ) logger.debug("Selected model: {}".format(model.selected_model_)) logger.debug( "Selected model params: {}".format(model.selected_model_params_) @@ -142,12 +147,8 @@ def _build_model(self) -> pd.DataFrame: ) # Collect Outputs - selected_models[target] = { - "series_id": target, - "selected_model": model.selected_model_, - "model_params": model.selected_model_params_, - } - models[target] = model + if self.loaded_models is None: + models[target] = model summary_frame = summary_frame.rename_axis("ds").reset_index() summary_frame = summary_frame.rename( columns={ @@ -162,6 +163,24 @@ def _build_model(self) -> pd.DataFrame: summary_frame["yhat_lower"] = np.NAN outputs[target] = summary_frame # outputs_legacy[target] = summary_frame + + self.model_parameters[target] = { + "framework": SupportedModels.AutoMLX, + "score_metric": model.score_metric, + "random_state": model.random_state, + "model_list": model.model_list, + "n_algos_tuned": model.n_algos_tuned, + "adaptive_sampling": model.adaptive_sampling, + "min_features": model.min_features, + "optimization": model.optimization, + "preprocessing": model.preprocessing, + "search_space": model.search_space, + "time_series_period": model.time_series_period, + "min_class_instances": model.min_class_instances, + "max_tuning_trials": model.max_tuning_trials, + "selected_model": model.selected_model_, + "selected_model_params": model.selected_model_params_, + } except Exception as e: self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} @@ -191,7 +210,8 @@ def _build_model(self) -> pd.DataFrame: # output_col = output_col.reset_index(drop=True) # outputs_merged = pd.concat([outputs_merged, output_col], axis=1) - self.models = models + self.models = models if self.loaded_models is None else self.loaded_models + return outputs_merged @runtime_dependency( @@ -262,7 +282,7 @@ def _generate_report(self): global_explanation_df = pd.DataFrame(self.global_explanation) self.formatted_global_explanation = ( - global_explanation_df / global_explanation_df.sum(axis=0) * 100 + global_explanation_df / global_explanation_df.sum(axis=0) * 100 ) # Create a markdown section for the global explainability @@ -285,7 +305,7 @@ def _generate_report(self): dp.DataTable( local_ex_df.div(local_ex_df.abs().sum(axis=1), axis=0) * 100, label=s_id, - ) + ) for s_id, local_ex_df in self.local_explanation.items() ] local_explanation_section = ( diff --git a/ads/opctl/operator/lowcode/forecast/model/autots.py b/ads/opctl/operator/lowcode/forecast/model/autots.py index 2b0b55e25..ad9341197 100644 --- a/ads/opctl/operator/lowcode/forecast/model/autots.py +++ b/ads/opctl/operator/lowcode/forecast/model/autots.py @@ -16,8 +16,7 @@ from ..operator_config import ForecastOperatorConfig from ads.common.decorator.runtime_dependency import runtime_dependency from .forecast_datasets import ForecastDatasets, ForecastOutput -from ..const import ForecastOutputColumns - +from ..const import ForecastOutputColumns, SupportedModels AUTOTS_MAX_GENERATION = 10 AUTOTS_MODELS_TO_VALIDATE = 0.15 @@ -54,59 +53,60 @@ def _build_model(self) -> pd.DataFrame: self.forecast_output = ForecastOutput( confidence_interval_width=self.spec.confidence_interval_width ) - - # Initialize the AutoTS model with specified parameters - model = AutoTS( - forecast_length=self.spec.horizon, - frequency=self.spec.model_kwargs.get("frequency", "infer"), - prediction_interval=self.spec.confidence_interval_width, - max_generations=self.spec.model_kwargs.get( - "max_generations", AUTOTS_MAX_GENERATION - ), - no_negatives=self.spec.model_kwargs.get("no_negatives", False), - constraint=self.spec.model_kwargs.get("constraint", None), - ensemble=self.spec.model_kwargs.get("ensemble", "auto"), - initial_template=self.spec.model_kwargs.get( - "initial_template", "General+Random" - ), - random_seed=self.spec.model_kwargs.get("random_seed", 2022), - holiday_country=self.spec.model_kwargs.get("holiday_country", "US"), - subset=self.spec.model_kwargs.get("subset", None), - aggfunc=self.spec.model_kwargs.get("aggfunc", "first"), - na_tolerance=self.spec.model_kwargs.get("na_tolerance", 1), - drop_most_recent=self.spec.model_kwargs.get("drop_most_recent", 0), - drop_data_older_than_periods=self.spec.model_kwargs.get( - "drop_data_older_than_periods", None - ), + model = self.loaded_models if self.loaded_models is not None else None + if model is None: + # Initialize the AutoTS model with specified parameters + model = AutoTS( + forecast_length=self.spec.horizon, + frequency=self.spec.model_kwargs.get("frequency", "infer"), + prediction_interval=self.spec.confidence_interval_width, + max_generations=self.spec.model_kwargs.get( + "max_generations", AUTOTS_MAX_GENERATION + ), + no_negatives=self.spec.model_kwargs.get("no_negatives", False), + constraint=self.spec.model_kwargs.get("constraint", None), + ensemble=self.spec.model_kwargs.get("ensemble", "auto"), + initial_template=self.spec.model_kwargs.get( + "initial_template", "General+Random" + ), + random_seed=self.spec.model_kwargs.get("random_seed", 2022), + holiday_country=self.spec.model_kwargs.get("holiday_country", "US"), + subset=self.spec.model_kwargs.get("subset", None), + aggfunc=self.spec.model_kwargs.get("aggfunc", "first"), + na_tolerance=self.spec.model_kwargs.get("na_tolerance", 1), + drop_most_recent=self.spec.model_kwargs.get("drop_most_recent", 0), + drop_data_older_than_periods=self.spec.model_kwargs.get( + "drop_data_older_than_periods", None + ), model_list=self.spec.model_kwargs.get("model_list", "fast_parallel"), - transformer_list=self.spec.model_kwargs.get("transformer_list", "auto"), - transformer_max_depth=self.spec.model_kwargs.get( - "transformer_max_depth", 6 - ), - models_mode=self.spec.model_kwargs.get("models_mode", "random"), - num_validations=self.spec.model_kwargs.get("num_validations", "auto"), - models_to_validate=self.spec.model_kwargs.get( - "models_to_validate", AUTOTS_MODELS_TO_VALIDATE - ), - max_per_model_class=self.spec.model_kwargs.get("max_per_model_class", None), - validation_method=self.spec.model_kwargs.get( - "validation_method", "backwards" - ), - min_allowed_train_percent=self.spec.model_kwargs.get( - "min_allowed_train_percent", 0.5 - ), - remove_leading_zeroes=self.spec.model_kwargs.get( - "remove_leading_zeroes", False - ), - prefill_na=self.spec.model_kwargs.get("prefill_na", None), - introduce_na=self.spec.model_kwargs.get("introduce_na", None), - preclean=self.spec.model_kwargs.get("preclean", None), - model_interrupt=self.spec.model_kwargs.get("model_interrupt", True), - generation_timeout=self.spec.model_kwargs.get("generation_timeout", None), - current_model_file=self.spec.model_kwargs.get("current_model_file", None), - verbose=self.spec.model_kwargs.get("verbose", 1), - n_jobs=self.spec.model_kwargs.get("n_jobs", -1), - ) + transformer_list=self.spec.model_kwargs.get("transformer_list", "auto"), + transformer_max_depth=self.spec.model_kwargs.get( + "transformer_max_depth", 6 + ), + models_mode=self.spec.model_kwargs.get("models_mode", "random"), + num_validations=self.spec.model_kwargs.get("num_validations", "auto"), + models_to_validate=self.spec.model_kwargs.get( + "models_to_validate", AUTOTS_MODELS_TO_VALIDATE + ), + max_per_model_class=self.spec.model_kwargs.get("max_per_model_class", None), + validation_method=self.spec.model_kwargs.get( + "validation_method", "backwards" + ), + min_allowed_train_percent=self.spec.model_kwargs.get( + "min_allowed_train_percent", 0.5 + ), + remove_leading_zeroes=self.spec.model_kwargs.get( + "remove_leading_zeroes", False + ), + prefill_na=self.spec.model_kwargs.get("prefill_na", None), + introduce_na=self.spec.model_kwargs.get("introduce_na", None), + preclean=self.spec.model_kwargs.get("preclean", None), + model_interrupt=self.spec.model_kwargs.get("model_interrupt", True), + generation_timeout=self.spec.model_kwargs.get("generation_timeout", None), + current_model_file=self.spec.model_kwargs.get("current_model_file", None), + verbose=self.spec.model_kwargs.get("verbose", 1), + n_jobs=self.spec.model_kwargs.get("n_jobs", -1), + ) # Prepare the data for model training full_data_dict = self.datasets.full_data_dict @@ -149,21 +149,24 @@ def _build_model(self) -> pd.DataFrame: self.future_regressor_train = r_tr.copy() - # Fit the model to the training data - model = model.fit( - self.full_data_long.groupby("series_id") - .head(-self.spec.horizon) - .reset_index(drop=True), - date_col=self.spec.datetime_column.name, - value_col=self.original_target_column, - future_regressor=r_tr.head(-self.spec.horizon) - if self.spec.additional_data - else None, - id_col="series_id", - ) + if self.loaded_models is None: + # Fit the model to the training data + model = model.fit( + self.full_data_long.groupby("series_id") + .head(-self.spec.horizon) + .reset_index(drop=True), + date_col=self.spec.datetime_column.name, + value_col=self.original_target_column, + future_regressor=r_tr.head(-self.spec.horizon) + if self.spec.additional_data + else None, + id_col="series_id", + ) - # Store the trained model and generate forecasts - self.models = copy.deepcopy(model) + # Store the trained model and generate forecasts + self.models = copy.deepcopy(model) + else: + self.models = self.loaded_models logger.debug("===========Forecast Generated===========") self.prediction = model.predict( future_regressor=r_tr.tail(self.spec.horizon) @@ -178,6 +181,22 @@ def _build_model(self) -> pd.DataFrame: yhat_lower_name = ForecastOutputColumns.LOWER_BOUND hist_df = model.back_forecast().forecast + params = vars(model).copy() + for param in [ + "ens_copy", + "df_wide_numeric", + "future_regressor_train", + "initial_results", + "score_per_series", + "validation_results", + "validation_train_indexes", + "validation_test_indexes", + "validation_indexes", + "best_model", + ]: + if param in params: + params.pop(param) + for cat in self.categories: output_i = pd.DataFrame() cat_target = f"{self.original_target_column}_{cat}" @@ -196,8 +215,8 @@ def _build_model(self) -> pd.DataFrame: output_i[yhat_upper_name] = self.prediction.upper_forecast[[cat_target]] output_i[yhat_lower_name] = self.prediction.lower_forecast[[cat_target]] output_i.iloc[ - -hist_df.shape[0] - self.spec.horizon : -self.spec.horizon, - output_i.columns.get_loc(f"fitted_value"), + -hist_df.shape[0] - self.spec.horizon : -self.spec.horizon, + output_i.columns.get_loc(f"fitted_value"), ] = hist_df[cat_target] output_i = output_i.reset_index() @@ -206,6 +225,11 @@ def _build_model(self) -> pd.DataFrame: category=cat, target_category_column=cat_target, forecast=output_i ) + self.model_parameters[cat_target] = { + "framework": SupportedModels.AutoTS, + **params, + } + output_col = output_col.reset_index(drop=True) logger.debug("===========Done===========") @@ -277,7 +301,7 @@ def _generate_report(self) -> tuple: ) self.formatted_global_explanation = ( - global_explanation_df / global_explanation_df.sum(axis=0) * 100 + global_explanation_df / global_explanation_df.sum(axis=0) * 100 ) # Create a markdown section for the global explainability @@ -300,7 +324,7 @@ def _generate_report(self) -> tuple: dp.DataTable( local_ex_df.div(local_ex_df.abs().sum(axis=1), axis=0) * 100, label=s_id, - ) + ) for s_id, local_ex_df in self.local_explanation.items() ] local_explanation_section = ( diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index 890af8394..8da837eba 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -57,8 +57,12 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.test_eval_metrics = None self.original_target_column = self.spec.target_column + self.model_parameters = dict() + self.loaded_models = None + # these fields are populated in the _build_model() method self.models = None + # "outputs" is a list of outputs generated by the models. These should only be generated when the framework requires the original output for plotting self.outputs = None self.forecast_output = None @@ -80,6 +84,10 @@ def generate_report(self): warnings.simplefilter(action="ignore", category=ConvergenceWarning) import datapane as dp + # load models if given + if self.spec.previous_output_dir is not None: + self._load_model() + start_time = time.time() result_df = self._build_model() elapsed_time = time.time() - start_time @@ -224,8 +232,8 @@ def generate_report(self): test_metrics_sections = [] if ( - self.test_eval_metrics is not None - and not self.test_eval_metrics.empty + self.test_eval_metrics is not None + and not self.test_eval_metrics.empty ): sec7_text = dp.Text(f"## Test Data Evaluation Metrics") sec7 = dp.DataTable(self.test_eval_metrics) @@ -255,12 +263,12 @@ def generate_report(self): yaml_appendix_title = dp.Text(f"## Reference: YAML File") yaml_appendix = dp.Code(code=self.config.to_yaml(), language="yaml") report_sections = ( - [title_text, summary] - + forecast_plots - + other_sections - + test_metrics_sections - + train_metrics_sections - + [yaml_appendix_title, yaml_appendix] + [title_text, summary] + + forecast_plots + + other_sections + + test_metrics_sections + + train_metrics_sections + + [yaml_appendix_title, yaml_appendix] ) # save the report and result CSV @@ -272,7 +280,7 @@ def generate_report(self): ) def _test_evaluate_metrics( - self, target_columns, test_filename, output, target_col="yhat", elapsed_time=0 + self, target_columns, test_filename, output, target_col="yhat", elapsed_time=0 ): total_metrics = pd.DataFrame() summary_metrics = pd.DataFrame() @@ -322,11 +330,11 @@ def _test_evaluate_metrics( for date in dates ] y_pred_i = output_forecast_i["forecast_value"].values - y_pred = np.asarray(y_pred_i[-len(y_true) :]) + y_pred = np.asarray(y_pred_i[-len(y_true):]) metrics_df = utils._build_metrics_df( - y_true=y_true[-self.spec.horizon :], - y_pred=y_pred[-self.spec.horizon :], + y_true=y_true[-self.spec.horizon:], + y_pred=y_pred[-self.spec.horizon:], column_name=target_column_i, ) total_metrics = pd.concat([total_metrics, metrics_df], axis=1) @@ -380,7 +388,7 @@ def _test_evaluate_metrics( target_columns_in_output = set(target_columns).intersection(data.columns) if self.spec.horizon <= SUMMARY_METRICS_HORIZON_LIMIT: if set(self.forecast_output.list_target_category_columns()) != set( - target_columns_in_output + target_columns_in_output ): logger.warn( f"Column Mismatch between Forecast Output and Target Columns" @@ -415,11 +423,11 @@ def _test_evaluate_metrics( return total_metrics, summary_metrics, data def _save_report( - self, - report_sections: Tuple, - result_df: pd.DataFrame, - metrics_df: pd.DataFrame, - test_metrics_df: pd.DataFrame, + self, + report_sections: Tuple, + result_df: pd.DataFrame, + metrics_df: pd.DataFrame, + test_metrics_df: pd.DataFrame, ): """Saves resulting reports to the given folder.""" import datapane as dp @@ -451,9 +459,9 @@ def _save_report( report_path = os.path.join(output_dir, self.spec.report_filename) with open(report_local_path) as f1: with fsspec.open( - report_path, - "w", - **storage_options, + report_path, + "w", + **storage_options, ) as f2: f2.write(f1.read()) @@ -532,6 +540,22 @@ def _save_report( logger.warn( "Unable to generate explanations for this model type or for this dataset." ) + + if self.spec.generate_model_parameters: + # model params + utils._write_data( + data=pd.DataFrame.from_dict(self.model_parameters), + filename=os.path.join(output_dir, "model_params.json"), + format="json", + storage_options=storage_options, + index=True, + indent=4, + ) + + # model pickle + if self.spec.generate_model_pickle: + self._save_model(output_dir, storage_options) + logger.info( f"The outputs have been successfully " f"generated and placed into the directory: {output_dir}." @@ -579,10 +603,26 @@ def _generate_train_metrics(self) -> pd.DataFrame: """ raise NotImplementedError + def _load_model(self): + try: + self.loaded_models = utils.load_pkl(self.spec.previous_output_dir + "/model.pkl") + except: + logger.info("model.pkl is not present") + + def _save_model(self, output_dir, storage_options): + utils.write_pkl( + obj=self.models, + filename="model.pkl", + output_dir=output_dir, + storage_options=storage_options, + ) + + + @runtime_dependency( module="shap", err_msg=( - "Please run `pip3 install shap` to install the required dependencies for model explanation." + "Please run `pip3 install shap` to install the required dependencies for model explanation." ), ) def explain_model(self, datetime_col_name, explain_predict_fn) -> dict: diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index 8c6f743b5..86c2933a6 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -23,7 +23,7 @@ ) from ads.opctl import logger -from ..const import DEFAULT_TRIALS, ForecastOutputColumns +from ..const import DEFAULT_TRIALS, ForecastOutputColumns, SupportedModels from .. import utils from .base_model import ForecastOperatorBaseModel from ..operator_config import ForecastOperatorConfig @@ -71,6 +71,15 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): super().__init__(config=config, datasets=datasets) self.train_metrics = True self.forecast_col_name = "yhat1" + self.loaded_trainers = None + self.trainers = None + + def _load_model(self): + try: + self.loaded_models = utils.load_pkl(self.spec.previous_output_dir + "/model.pkl") + self.loaded_trainers = utils.load_pkl(self.spec.previous_output_dir + "/trainer.pkl") + except: + logger.info("model.pkl/trainer.pkl is not present") def _train_model(self, i, target, df): @@ -80,8 +89,7 @@ def _train_model(self, i, target, df): # Extract the Confidence Interval Width and # convert to neural prophets equivalent - quantiles - model_kwargs = self.spec.model_kwargs - + model_kwargs = None if self.spec.confidence_interval_width is None: self.quantiles = model_kwargs.get("quantiles", [0.05, 0.95]) self.spec.confidence_interval_width = float(self.quantiles[1]) - float( @@ -91,7 +99,9 @@ def _train_model(self, i, target, df): boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2) self.quantiles = [boundaries, self.spec.confidence_interval_width + boundaries] - model_kwargs["quantiles"] = self.quantiles + if self.loaded_models is None: + model_kwargs = self.spec.model_kwargs + model_kwargs["quantiles"] = self.quantiles self.forecast_output = ForecastOutput( confidence_interval_width=self.spec.confidence_interval_width ) @@ -99,7 +109,6 @@ def _train_model(self, i, target, df): le, df_encoded = utils._label_encode_dataframe( df, no_encode={self.spec.datetime_column.name, target} ) - model_kwargs_i = model_kwargs.copy() # format the dataframe for this target. Dropping NA on target[df] will remove all future data df_clean = self._preprocess( @@ -114,85 +123,95 @@ def _train_model(self, i, target, df): additional_regressors = set(data_i.columns) - {"y", "ds"} training_data = data_i[["y", "ds"] + list(additional_regressors)] - if self.perform_tuning: - - def objective(trial): - params = { - # 'seasonality_mode': trial.suggest_categorical('seasonality_mode', ['additive', 'multiplicative']), - # 'seasonality_reg': trial.suggest_float('seasonality_reg', 0.1, 500, log=True), - # 'learning_rate': trial.suggest_float('learning_rate', 0.0001, 0.1, log=True), - "newer_samples_start": trial.suggest_float( - "newer_samples_start", 0.001, 0.999 - ), - "newer_samples_weight": trial.suggest_float( - "newer_samples_weight", 0, 100 - ), - "changepoints_range": trial.suggest_float( - "changepoints_range", 0.8, 0.95 - ), - } - # trend_reg, trend_reg_threshold, ar_reg, impute_rolling/impute_linear, - params.update(model_kwargs_i) - - folds = NeuralProphet(**params).crossvalidation_split_df( - data_i, k=3 - ) - test_metrics_total_i = [] - for df_train, df_test in folds: - m, accepted_regressors = _fit_model( - data=df_train, - params=params, - additional_regressors=additional_regressors, - select_metric=self.spec.metric, + model = self.loaded_models[target] if self.loaded_models is not None else None + accepted_regressors = None + if model is None: + model_kwargs_i = model_kwargs.copy() + if self.perform_tuning: + + def objective(trial): + params = { + # 'seasonality_mode': trial.suggest_categorical('seasonality_mode', ['additive', 'multiplicative']), + # 'seasonality_reg': trial.suggest_float('seasonality_reg', 0.1, 500, log=True), + # 'learning_rate': trial.suggest_float('learning_rate', 0.0001, 0.1, log=True), + "newer_samples_start": trial.suggest_float( + "newer_samples_start", 0.001, 0.999 + ), + "newer_samples_weight": trial.suggest_float( + "newer_samples_weight", 0, 100 + ), + "changepoints_range": trial.suggest_float( + "changepoints_range", 0.8, 0.95 + ), + } + # trend_reg, trend_reg_threshold, ar_reg, impute_rolling/impute_linear, + params.update(model_kwargs_i) + + folds = NeuralProphet(**params).crossvalidation_split_df( + data_i, k=3 ) - df_test = df_test[["y", "ds"] + accepted_regressors] - - test_forecast_i = m.predict(df=df_test) - fold_metric_i = ( - m.metrics[self.spec.metric] - .forward( - Tensor(test_forecast_i["yhat1"]), - Tensor(test_forecast_i["y"]), + test_metrics_total_i = [] + for df_train, df_test in folds: + m, accepted_regressors = _fit_model( + data=df_train, + params=params, + additional_regressors=additional_regressors, + select_metric=self.spec.metric, + ) + df_test = df_test[["y", "ds"] + accepted_regressors] + + test_forecast_i = m.predict(df=df_test) + fold_metric_i = ( + m.metrics[self.spec.metric] + .forward( + Tensor(test_forecast_i["yhat1"]), + Tensor(test_forecast_i["y"]), + ) + .item() ) - .item() + test_metrics_total_i.append(fold_metric_i) + logger.debug( + f"----------------------{np.asarray(test_metrics_total_i).mean()}----------------------" ) - test_metrics_total_i.append(fold_metric_i) - logger.debug( - f"----------------------{np.asarray(test_metrics_total_i).mean()}----------------------" + return np.asarray(test_metrics_total_i).mean() + + study = optuna.create_study(direction="minimize") + m_params = NeuralProphet().parameters() + study.enqueue_trial( + { + # 'seasonality_mode': m_params['seasonality_mode'], + # 'seasonality_reg': m_params['seasonality_reg'], + # 'learning_rate': m_params['learning_rate'], + "newer_samples_start": m_params["newer_samples_start"], + "newer_samples_weight": m_params["newer_samples_weight"], + "changepoints_range": m_params["changepoints_range"], + } + ) + study.optimize( + objective, + n_trials=self.spec.tuning.n_trials + if self.spec.tuning + else DEFAULT_TRIALS, + n_jobs=-1, ) - return np.asarray(test_metrics_total_i).mean() - - study = optuna.create_study(direction="minimize") - m_params = NeuralProphet().parameters() - study.enqueue_trial( - { - # 'seasonality_mode': m_params['seasonality_mode'], - # 'seasonality_reg': m_params['seasonality_reg'], - # 'learning_rate': m_params['learning_rate'], - "newer_samples_start": m_params["newer_samples_start"], - "newer_samples_weight": m_params["newer_samples_weight"], - "changepoints_range": m_params["changepoints_range"], - } - ) - study.optimize( - objective, - n_trials=self.spec.tuning.n_trials - if self.spec.tuning - else DEFAULT_TRIALS, - n_jobs=-1, - ) - selected_params = study.best_params - selected_params.update(model_kwargs_i) - model_kwargs_i = selected_params + selected_params = study.best_params + selected_params.update(model_kwargs_i) + model_kwargs_i = selected_params + + # Build and fit model + model, accepted_regressors = _fit_model( + data=training_data, + params=model_kwargs_i, + additional_regressors=additional_regressors, + select_metric=self.spec.metric, + ) + else: + accepted_regressors_config = model.config_regressors or dict() + accepted_regressors = list(accepted_regressors_config.keys()) + if self.loaded_trainers is not None: + model.trainer = self.loaded_trainers[target] - # Build and fit model - model, accepted_regressors = _fit_model( - data=training_data, - params=model_kwargs_i, - additional_regressors=additional_regressors, - select_metric=self.spec.metric, - ) logger.debug( f"Found the following additional data columns: {additional_regressors}" ) @@ -211,12 +230,36 @@ def objective(trial): logger.debug(f"-----------------Model {i}----------------------") logger.debug(forecast.tail()) # models.append(model) - self.outputs_dict[target] = forecast - self.outputs_legacy.append(forecast) - - self.models_dict[target] = model - self.outputs = self.outputs_legacy - + self.outputs[target] = forecast + + if self.loaded_models is None: + self.models[target] = model + self.trainers[target] = model.trainer + + self.model_parameters[target] = { + "framework": SupportedModels.NeuralProphet, + "config": model.config, + "config_trend": model.config_trend, + "config_train": model.config_train, + "config_seasonality": model.config_seasonality, + "config_regressors": model.config_regressors, + "config_ar": model.config_ar, + "config_events": model.config_events, + "config_country_holidays": model.config_country_holidays, + "config_lagged_regressors": model.config_lagged_regressors, + "config_normalization": model.config_normalization, + "config_missing": model.config_missing, + "config_model": model.config_model, + "config_normalization": model.config_normalization, + "data_freq": model.data_freq, + "fitted": model.fitted, + "data_params": model.data_params, + "future_periods": model.future_periods, + "predict_steps": model.predict_steps, + "highlight_forecast_step_n": model.highlight_forecast_step_n, + "true_ar_weights": model.true_ar_weights, + } + logger.debug("===========Done===========") except Exception as e: self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} @@ -225,9 +268,9 @@ def _build_model(self) -> pd.DataFrame: # from neuralprophet import NeuralProphet full_data_dict = self.datasets.full_data_dict - self.models_dict = dict() - self.outputs_dict = dict() - self.outputs_legacy = [] + self.models = dict() + self.trainers = dict() + self.outputs = dict() self.errors_dict = dict() Parallel(n_jobs=-1, require="sharedmem")( @@ -237,7 +280,12 @@ def _build_model(self) -> pd.DataFrame: ) ) - self.models = [self.models_dict[target] for target in self.target_columns] + if self.loaded_models is not None: + self.models = self.loaded_models + + if self.loaded_trainers is not None: + self.trainers = self.loaded_trainers + # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column @@ -247,7 +295,7 @@ def _build_model(self) -> pd.DataFrame: for cat in self.categories: output_i = pd.DataFrame() - output_i["Date"] = self.outputs_dict[f"{col}_{cat}"]["ds"] + output_i["Date"] = self.outputs[f"{col}_{cat}"]["ds"] output_i["Series"] = cat output_i[f"input_value"] = full_data_dict[f"{col}_{cat}"][f"{col}_{cat}"] @@ -258,18 +306,18 @@ def _build_model(self) -> pd.DataFrame: output_i.iloc[ : -self.spec.horizon, output_i.columns.get_loc(f"fitted_value") - ] = (self.outputs_dict[f"{col}_{cat}"]["yhat1"].iloc[: -self.spec.horizon].values) + ] = (self.outputs[f"{col}_{cat}"]["yhat1"].iloc[: -self.spec.horizon].values) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(f"forecast_value"), ] = ( - self.outputs_dict[f"{col}_{cat}"]["yhat1"].iloc[-self.spec.horizon :].values + self.outputs[f"{col}_{cat}"]["yhat1"].iloc[-self.spec.horizon :].values ) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(yhat_upper_name), ] = ( - self.outputs_dict[f"{col}_{cat}"][f"yhat1 {self.quantiles[1]*100}%"] + self.outputs[f"{col}_{cat}"][f"yhat1 {self.quantiles[1]*100}%"] .iloc[-self.spec.horizon :] .values ) @@ -277,7 +325,7 @@ def _build_model(self) -> pd.DataFrame: -self.spec.horizon :, output_i.columns.get_loc(yhat_lower_name), ] = ( - self.outputs_dict[f"{col}_{cat}"][f"yhat1 {self.quantiles[0]*100}%"] + self.outputs[f"{col}_{cat}"][f"yhat1 {self.quantiles[0]*100}%"] .iloc[-self.spec.horizon :] .values ) @@ -299,30 +347,30 @@ def _generate_report(self): "forecast in the context of historical data." ) sec1 = utils._select_plot_list( - lambda idx, *args: self.models[idx].plot(self.outputs[idx]), + lambda idx, target, *args: self.models[target].plot(self.outputs[target]), target_columns=self.target_columns, ) sec2_text = dp.Text(f"## Forecast Broken Down by Trend Component") sec2 = utils._select_plot_list( - lambda idx, *args: self.models[idx].plot_components(self.outputs[idx]), + lambda idx, target, *args: self.models[target].plot_components(self.outputs[target]), target_columns=self.target_columns, ) sec3_text = dp.Text(f"## Forecast Parameter Plots") sec3 = utils._select_plot_list( - lambda idx, *args: self.models[idx].plot_parameters(), + lambda idx, target, *args: self.models[target].plot_parameters(), target_columns=self.target_columns, ) sec5_text = dp.Text(f"## Neural Prophet Model Parameters") model_states = [] - for i, m in enumerate(self.models): + for i, (target, m) in enumerate(self.models.items()): model_states.append( pd.Series( m.state_dict(), index=m.state_dict().keys(), - name=self.target_columns[i], + name=target, ) ) all_model_states = pd.concat(model_states, axis=1) @@ -413,6 +461,20 @@ def _generate_report(self): other_sections, ) + def _save_model(self, output_dir, storage_options): + utils.write_pkl( + obj=self.models, + filename="model.pkl", + output_dir=output_dir, + storage_options=storage_options, + ) + utils.write_pkl( + obj=self.trainers, + filename="trainer.pkl", + output_dir=output_dir, + storage_options=storage_options, + ) + def _custom_predict_neuralprophet(self, data): raise NotImplementedError("NeuralProphet does not yet support explanations.") # data_prepped = data.reset_index() diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index 2b865a17e..51b86b76f 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -12,7 +12,7 @@ from ads.opctl import logger from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig -from ..const import DEFAULT_TRIALS, PROPHET_INTERNAL_DATE_COL, ForecastOutputColumns +from ..const import DEFAULT_TRIALS, PROPHET_INTERNAL_DATE_COL, ForecastOutputColumns, SupportedModels from .. import utils from .base_model import ForecastOperatorBaseModel from ..operator_config import ForecastOperatorConfig @@ -58,8 +58,10 @@ def _train_model(self, i, target, df): "alpha", 0.90 ) - model_kwargs = self.spec.model_kwargs - model_kwargs["interval_width"] = self.spec.confidence_interval_width + model_kwargs = None + if self.loaded_models is None: + model_kwargs = self.spec.model_kwargs + model_kwargs["interval_width"] = self.spec.confidence_interval_width self.forecast_output = ForecastOutput( confidence_interval_width=self.spec.confidence_interval_width @@ -69,7 +71,6 @@ def _train_model(self, i, target, df): df, no_encode={self.spec.datetime_column.name, target} ) - model_kwargs_i = model_kwargs.copy() # format the dataframe for this target. Dropping NA on target[df] will remove all future data df_clean = self._preprocess( df_encoded, @@ -84,95 +85,98 @@ def _train_model(self, i, target, df): "y", PROPHET_INTERNAL_DATE_COL, } + model = self.loaded_models[target] if self.loaded_models is not None else None + + if model is None: + model_kwargs_i = model_kwargs.copy() + if self.perform_tuning: + + def objective(trial): + params = { + "seasonality_mode": trial.suggest_categorical( + "seasonality_mode", ["additive", "multiplicative"] + ), + "changepoint_prior_scale": trial.suggest_float( + "changepoint_prior_scale", 0.001, 0.5, log=True + ), + "seasonality_prior_scale": trial.suggest_float( + "seasonality_prior_scale", 0.01, 10, log=True + ), + "holidays_prior_scale": trial.suggest_float( + "holidays_prior_scale", 0.01, 10, log=True + ), + "changepoint_range": trial.suggest_float( + "changepoint_range", 0.8, 0.95 + ), + } + params.update(model_kwargs_i) + + model = _fit_model( + data=data_i, + params=params, + additional_regressors=additional_regressors, + ) - if self.perform_tuning: - - def objective(trial): - params = { - "seasonality_mode": trial.suggest_categorical( - "seasonality_mode", ["additive", "multiplicative"] - ), - "changepoint_prior_scale": trial.suggest_float( - "changepoint_prior_scale", 0.001, 0.5, log=True - ), - "seasonality_prior_scale": trial.suggest_float( - "seasonality_prior_scale", 0.01, 10, log=True - ), - "holidays_prior_scale": trial.suggest_float( - "holidays_prior_scale", 0.01, 10, log=True - ), - "changepoint_range": trial.suggest_float( - "changepoint_range", 0.8, 0.95 - ), - } - params.update(model_kwargs_i) - - model = _fit_model( - data=data_i, - params=params, - additional_regressors=additional_regressors, - ) + # Manual workaround because pandas 1.x dropped support for M and Y + interval = self.spec.horizon.interval + unit = self.spec.horizon.interval_unit + if unit == "M": + unit = "D" + interval = interval * 30.5 + elif unit == "Y": + unit = "D" + interval = interval * 365.25 + horizon = _add_unit(int(self.spec.horizon * interval), unit=unit) + initial = _add_unit((data_i.shape[0] * interval) // 2, unit=unit) + period = _add_unit((data_i.shape[0] * interval) // 4, unit=unit) + + logger.debug( + f"using: horizon: {horizon}. initial:{initial}, period: {period}" + ) - # Manual workaround because pandas 1.x dropped support for M and Y - interval = self.spec.horizon.interval - unit = self.spec.horizon.interval_unit - if unit == "M": - unit = "D" - interval = interval * 30.5 - elif unit == "Y": - unit = "D" - interval = interval * 365.25 - horizon = _add_unit(int(self.spec.horizon * interval), unit=unit) - initial = _add_unit((data_i.shape[0] * interval) // 2, unit=unit) - period = _add_unit((data_i.shape[0] * interval) // 4, unit=unit) - - logger.debug( - f"using: horizon: {horizon}. initial:{initial}, period: {period}" + df_cv = cross_validation( + model, + horizon=horizon, + initial=initial, + period=period, + parallel="threads", + ) + df_p = performance_metrics(df_cv) + try: + return np.mean(df_p[self.spec.metric]) + except KeyError: + logger.warn( + f"Could not find the metric {self.spec.metric} within " + f"the performance metrics: {df_p.columns}. Defaulting to `rmse`" + ) + return np.mean(df_p["rmse"]) + + study = optuna.create_study(direction="minimize") + m_temp = Prophet() + study.enqueue_trial( + { + "seasonality_mode": m_temp.seasonality_mode, + "changepoint_prior_scale": m_temp.changepoint_prior_scale, + "seasonality_prior_scale": m_temp.seasonality_prior_scale, + "holidays_prior_scale": m_temp.holidays_prior_scale, + "changepoint_range": m_temp.changepoint_range, + } ) - - df_cv = cross_validation( - model, - horizon=horizon, - initial=initial, - period=period, - parallel="threads", + study.optimize( + objective, + n_trials=self.spec.tuning.n_trials + if self.spec.tuning + else DEFAULT_TRIALS, + n_jobs=-1, ) - df_p = performance_metrics(df_cv) - try: - return np.mean(df_p[self.spec.metric]) - except KeyError: - logger.warn( - f"Could not find the metric {self.spec.metric} within " - f"the performance metrics: {df_p.columns}. Defaulting to `rmse`" - ) - return np.mean(df_p["rmse"]) - - study = optuna.create_study(direction="minimize") - m_temp = Prophet() - study.enqueue_trial( - { - "seasonality_mode": m_temp.seasonality_mode, - "changepoint_prior_scale": m_temp.changepoint_prior_scale, - "seasonality_prior_scale": m_temp.seasonality_prior_scale, - "holidays_prior_scale": m_temp.holidays_prior_scale, - "changepoint_range": m_temp.changepoint_range, - } - ) - study.optimize( - objective, - n_trials=self.spec.tuning.n_trials - if self.spec.tuning - else DEFAULT_TRIALS, - n_jobs=-1, - ) - study.best_params.update(model_kwargs_i) - model_kwargs_i = study.best_params - model = _fit_model( - data=data_i, - params=model_kwargs_i, - additional_regressors=additional_regressors, - ) + study.best_params.update(model_kwargs_i) + model_kwargs_i = study.best_params + model = _fit_model( + data=data_i, + params=model_kwargs_i, + additional_regressors=additional_regressors, + ) # Make future df for prediction if len(additional_regressors): @@ -193,11 +197,20 @@ def objective(trial): # Collect Outputs # models.append(model) - self.outputs_dict[target] = forecast - self.outputs_legacy.append(forecast) - self.models_dict[target] = model - self.outputs = self.outputs_legacy + self.outputs[target] = forecast + + if self.loaded_models is None: + self.models[target] = model + + params = vars(model).copy() + for param in ["history", "history_dates", "stan_fit"]: + if param in params: + params.pop(param) + self.model_parameters[target] = { + "framework": SupportedModels.Prophet, + **params, + } logger.debug("===========Done===========") except Exception as e: @@ -208,9 +221,8 @@ def _build_model(self) -> pd.DataFrame: from prophet.diagnostics import cross_validation, performance_metrics full_data_dict = self.datasets.full_data_dict - self.models_dict = dict() - self.outputs_dict = dict() - self.outputs_legacy = [] + self.models = dict() + self.outputs = dict() self.errors_dict = dict() Parallel(n_jobs=-1, require="sharedmem")( @@ -220,7 +232,8 @@ def _build_model(self) -> pd.DataFrame: ) ) - self.models = [self.models_dict[target] for target in self.target_columns] + if self.loaded_models is not None: + self.models = self.loaded_models # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column @@ -230,7 +243,7 @@ def _build_model(self) -> pd.DataFrame: for cat in self.categories: output_i = pd.DataFrame() - output_i["Date"] = self.outputs_dict[f"{col}_{cat}"][PROPHET_INTERNAL_DATE_COL] + output_i["Date"] = self.outputs[f"{col}_{cat}"][PROPHET_INTERNAL_DATE_COL] output_i["Series"] = cat output_i["input_value"] = full_data_dict[f"{col}_{cat}"][f"{col}_{cat}"] @@ -241,22 +254,22 @@ def _build_model(self) -> pd.DataFrame: output_i.iloc[ : -self.spec.horizon, output_i.columns.get_loc(f"fitted_value") - ] = (self.outputs_dict[f"{col}_{cat}"]["yhat"].iloc[: -self.spec.horizon].values) + ] = (self.outputs[f"{col}_{cat}"]["yhat"].iloc[: -self.spec.horizon].values) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(f"forecast_value"), ] = ( - self.outputs_dict[f"{col}_{cat}"]["yhat"].iloc[-self.spec.horizon :].values + self.outputs[f"{col}_{cat}"]["yhat"].iloc[-self.spec.horizon :].values ) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(yhat_upper_name) ] = ( - self.outputs_dict[f"{col}_{cat}"]["yhat_upper"].iloc[-self.spec.horizon :].values + self.outputs[f"{col}_{cat}"]["yhat_upper"].iloc[-self.spec.horizon :].values ) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(yhat_lower_name) ] = ( - self.outputs_dict[f"{col}_{cat}"]["yhat_lower"].iloc[-self.spec.horizon :].values + self.outputs[f"{col}_{cat}"]["yhat_lower"].iloc[-self.spec.horizon :].values ) output_col = pd.concat([output_col, output_i]) self.forecast_output.add_category( @@ -276,26 +289,26 @@ def _generate_report(self): "These plots show your forecast in the context of historical data." ) sec1 = utils._select_plot_list( - lambda idx, *args: self.models[idx].plot( - self.outputs[idx], include_legend=True + lambda idx, target, *args: self.models[target].plot( + self.outputs[target], include_legend=True ), target_columns=self.target_columns, ) sec2_text = dp.Text(f"## Forecast Broken Down by Trend Component") sec2 = utils._select_plot_list( - lambda idx, *args: self.models[idx].plot_components(self.outputs[idx]), + lambda idx, target, *args: self.models[target].plot_components(self.outputs[target]), target_columns=self.target_columns, ) sec3_text = dp.Text(f"## Forecast Changepoints") sec3_figs = [ - self.models[idx].plot(self.outputs[idx]) - for idx in range(len(self.target_columns)) + self.models[target].plot(self.outputs[target]) + for target in self.target_columns ] [ add_changepoints_to_plot( - sec3_figs[idx].gca(), self.models[idx], self.outputs[idx] + sec3_figs[idx].gca(), self.models[self.target_columns[idx]], self.outputs[self.target_columns[idx]] ) for idx in range(len(self.target_columns)) ] @@ -307,12 +320,12 @@ def _generate_report(self): sec5_text = dp.Text(f"## Prophet Model Seasonality Components") model_states = [] - for i, m in enumerate(self.models): + for i, (target, m) in enumerate(self.models.items()): model_states.append( pd.Series( m.seasonalities, index=pd.Index(m.seasonalities.keys(), dtype="object"), - name=self.target_columns[i], + name=target, dtype="object", ) ) @@ -397,6 +410,6 @@ def _generate_report(self): def _custom_predict_prophet(self, data): data[PROPHET_INTERNAL_DATE_COL] = pd.to_datetime(data[PROPHET_INTERNAL_DATE_COL], unit='s') - return self.models[self.target_columns.index(self.series_id)].predict( + return self.models[self.series_id].predict( data.reset_index() )["yhat"] diff --git a/ads/opctl/operator/lowcode/forecast/operator_config.py b/ads/opctl/operator/lowcode/forecast/operator_config.py index 8a07dbbb6..ff7c19289 100644 --- a/ads/opctl/operator/lowcode/forecast/operator_config.py +++ b/ads/opctl/operator/lowcode/forecast/operator_config.py @@ -93,6 +93,10 @@ class ForecastOperatorSpec(DataClassSerializable): freq: str = None model: str = None model_kwargs: Dict = field(default_factory=dict) + model_parameters: str = None + previous_output_dir: str = None + generate_model_parameters: bool = None + generate_model_pickle: bool = None confidence_interval_width: float = None metric: str = None tuning: Tuning = field(default_factory=Tuning) @@ -100,7 +104,7 @@ class ForecastOperatorSpec(DataClassSerializable): def __post_init__(self): """Adjusts the specification details.""" self.metric = (self.metric or "").lower() or SupportedMetrics.SMAPE.lower() - self.model = (self.model or SupportedModels.Auto) + self.model = self.model or SupportedModels.Auto self.confidence_interval_width = self.confidence_interval_width or 0.80 self.report_filename = self.report_filename or "report.html" self.preprocessing = ( @@ -121,6 +125,17 @@ def __post_init__(self): else False ) self.explanations_accuracy_mode = self.explanations_accuracy_mode or SpeedAccuracyMode.FAST_APPROXIMATE + + self.generate_model_parameters = ( + self.generate_model_parameters + if self.generate_model_parameters is not None + else False + ) + self.generate_model_pickle = ( + self.generate_model_pickle + if self.generate_model_pickle is not None + else False + ) self.report_theme = self.report_theme or "light" self.metrics_filename = self.metrics_filename or "metrics.csv" self.test_metrics_filename = self.test_metrics_filename or "test_metrics.csv" diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index b38613cd6..c59a59152 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -296,6 +296,18 @@ spec: type: dict required: false + previous_output_dir: + type: string + required: false + + generate_model_parameters: + type: boolean + required: false + + generate_model_pickle: + type: boolean + required: false + confidence_interval_width: type: float required: false diff --git a/ads/opctl/operator/lowcode/forecast/utils.py b/ads/opctl/operator/lowcode/forecast/utils.py index 8810bd986..9a5ebf721 100644 --- a/ads/opctl/operator/lowcode/forecast/utils.py +++ b/ads/opctl/operator/lowcode/forecast/utils.py @@ -11,6 +11,7 @@ import fsspec import numpy as np import pandas as pd +import cloudpickle import plotly.express as px from plotly import graph_objects as go from sklearn.metrics import ( @@ -168,6 +169,26 @@ def _call_pandas_fsspec(pd_fn, filename, storage_options, **kwargs): return pd_fn(filename, storage_options=storage_options, **kwargs) +def load_pkl(filepath): + storage_options = dict() + if ObjectStorageDetails.is_oci_path(filepath): + storage_options = default_signer() + + with fsspec.open(filepath, "rb", **storage_options) as f: + return cloudpickle.load(f) + return None + + +def write_pkl(obj, filename, output_dir, storage_options): + pkl_path = os.path.join(output_dir, filename) + with fsspec.open( + pkl_path, + "wb", + **storage_options, + ) as f: + cloudpickle.dump(obj, f) + + def _load_data(filename, format, storage_options=None, columns=None, **kwargs): if not format: _, format = os.path.splitext(filename) @@ -194,7 +215,7 @@ def _write_data(data, filename, format, storage_options, index=False, **kwargs): if format in ["json", "clipboard", "excel", "csv", "feather", "hdf"]: write_fn = getattr(data, f"to_{format}") return _call_pandas_fsspec( - write_fn, filename, index=index, storage_options=storage_options + write_fn, filename, index=index, storage_options=storage_options, **kwargs ) raise ForecastInputDataError(f"Unrecognized format: {format}")