diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index 8e1a544ae..3e432aea4 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -7,6 +7,7 @@ import pandas as pd import numpy as np import pmdarima as pm +from joblib import Parallel, delayed from ads.opctl import logger @@ -29,32 +30,31 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.formatted_global_explanation = None self.formatted_local_explanation = None - def _build_model(self) -> pd.DataFrame: - full_data_dict = self.datasets.full_data_dict - - # Extract the Confidence Interval Width and convert to arima's equivalent - alpha - if self.spec.confidence_interval_width is None: - self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get( - "alpha", 0.05 - ) - model_kwargs = self.spec.model_kwargs - model_kwargs["alpha"] = 1 - self.spec.confidence_interval_width - if "error_action" not in model_kwargs.keys(): - model_kwargs["error_action"] = "ignore" + def _train_model(self, i, target, df): + """Trains the ARIMA model for a given target. - models = [] - self.datasets.datetime_col = self.spec.datetime_column.name - self.forecast_output = ForecastOutput( - confidence_interval_width=self.spec.confidence_interval_width - ) + Parameters + ---------- + i: int + The index of the target + target: str + The name of the target + df: pd.DataFrame + The dataframe containing the target data + """ + try: + # Extract the Confidence Interval Width and convert to arima's equivalent - alpha + if self.spec.confidence_interval_width is None: + self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get( + "alpha", 0.05 + ) + model_kwargs = self.spec.model_kwargs + model_kwargs["alpha"] = 1 - self.spec.confidence_interval_width + if "error_action" not in model_kwargs.keys(): + model_kwargs["error_action"] = "ignore" - outputs = dict() - outputs_legacy = [] - fitted_values = dict() - actual_values = dict() - dt_columns = dict() + # models = [] - for i, (target, df) in enumerate(full_data_dict.items()): # format the dataframe for this target. Dropping NA on target[df] will remove all future data le, df_encoded = utils._label_encode_dataframe( df, no_encode={self.spec.datetime_column.name, target} @@ -72,9 +72,7 @@ def _build_model(self) -> pd.DataFrame: target, self.spec.datetime_column.name, } - logger.debug( - f"Additional Regressors Detected {list(additional_regressors)}" - ) + logger.debug(f"Additional Regressors Detected {list(additional_regressors)}") # Split data into X and y for arima tune method y = data_i[target] @@ -85,9 +83,9 @@ def _build_model(self) -> pd.DataFrame: # Build and fit model model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs) - fitted_values[target] = model.predict_in_sample(X=X_in) - actual_values[target] = y - actual_values[target].index = pd.to_datetime(y.index) + self.fitted_values[target] = model.predict_in_sample(X=X_in) + self.actual_values[target] = y + self.actual_values[target].index = pd.to_datetime(y.index) # Build future dataframe start_date = y.index.values[-1] @@ -95,9 +93,7 @@ def _build_model(self) -> pd.DataFrame: if len(additional_regressors): X = df_clean[df_clean[target].isnull()].drop(target, axis=1) else: - X = pd.date_range( - start=start_date, periods=n_periods, freq=self.spec.freq - ) + X = pd.date_range(start=start_date, periods=n_periods, freq=self.spec.freq) # Predict and format forecast yhat, conf_int = model.predict( @@ -108,7 +104,7 @@ def _build_model(self) -> pd.DataFrame: ) yhat_clean = pd.DataFrame(yhat, index=yhat.index, columns=["yhat"]) - dt_columns[target] = df_encoded[self.spec.datetime_column.name] + self.dt_columns[target] = df_encoded[self.spec.datetime_column.name] conf_int_clean = pd.DataFrame( conf_int, index=yhat.index, columns=["yhat_lower", "yhat_upper"] ) @@ -117,15 +113,42 @@ def _build_model(self) -> pd.DataFrame: logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) # Collect all outputs - models.append(model) - outputs_legacy.append( + # models.append(model) + self.outputs_legacy.append( forecast.reset_index().rename(columns={"index": "ds"}) ) - outputs[target] = forecast + self.outputs[target] = forecast + + self.models_dict[target] = model - self.models = models + logger.debug("===========Done===========") + except Exception as e: + self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} + + def _build_model(self) -> pd.DataFrame: + full_data_dict = self.datasets.full_data_dict + + self.datasets.datetime_col = self.spec.datetime_column.name + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width + ) + + self.outputs = dict() + self.outputs_legacy = [] + self.fitted_values = dict() + self.actual_values = dict() + self.dt_columns = dict() + self.models_dict = dict() + self.errors_dict = dict() + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(ArimaOperatorModel._train_model)(self, i, target, df) + for self, (i, (target, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) - logger.debug("===========Done===========") + self.models = [self.models_dict[target] for target in self.target_columns] # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column @@ -134,15 +157,15 @@ def _build_model(self) -> pd.DataFrame: yhat_lower_name = ForecastOutputColumns.LOWER_BOUND for cat in self.categories: output_i = pd.DataFrame() - output_i["Date"] = dt_columns[f"{col}_{cat}"] + output_i["Date"] = self.dt_columns[f"{col}_{cat}"] output_i["Series"] = cat output_i = output_i.set_index("Date") - output_i["input_value"] = actual_values[f"{col}_{cat}"] - output_i["fitted_value"] = fitted_values[f"{col}_{cat}"] - output_i["forecast_value"] = outputs[f"{col}_{cat}"]["yhat"] - output_i[yhat_upper_name] = outputs[f"{col}_{cat}"]["yhat_upper"] - output_i[yhat_lower_name] = outputs[f"{col}_{cat}"]["yhat_lower"] + output_i["input_value"] = self.actual_values[f"{col}_{cat}"] + output_i["fitted_value"] = self.fitted_values[f"{col}_{cat}"] + output_i["forecast_value"] = self.outputs[f"{col}_{cat}"]["yhat"] + output_i[yhat_upper_name] = self.outputs[f"{col}_{cat}"]["yhat_upper"] + output_i[yhat_lower_name] = self.outputs[f"{col}_{cat}"]["yhat_lower"] output_i = output_i.reset_index(drop=False) output_col = pd.concat([output_col, output_i]) @@ -252,7 +275,7 @@ def _custom_predict_arima(self, data): """ date_col = self.spec.datetime_column.name - data[date_col] = pd.to_datetime(data[date_col], unit='s') + data[date_col] = pd.to_datetime(data[date_col], unit="s") data = data.set_index(date_col) # Get the index of the current series id series_index = self.target_columns.index(self.series_id) diff --git a/ads/opctl/operator/lowcode/forecast/model/automlx.py b/ads/opctl/operator/lowcode/forecast/model/automlx.py index eaa32587e..e0f36f995 100644 --- a/ads/opctl/operator/lowcode/forecast/model/automlx.py +++ b/ads/opctl/operator/lowcode/forecast/model/automlx.py @@ -45,10 +45,15 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): ), ) def _build_model(self) -> pd.DataFrame: + from automl import init from sktime.forecasting.model_selection import temporal_train_test_split - init(engine="local", check_deprecation_warnings=False) + init( + engine="local", + engine_opts={"n_jobs": -1, "model_n_jobs": -1}, + check_deprecation_warnings=False, + ) full_data_dict = self.datasets.full_data_dict @@ -63,6 +68,7 @@ def _build_model(self) -> pd.DataFrame: self.forecast_output = ForecastOutput( confidence_interval_width=self.spec.confidence_interval_width ) + self.errors_dict = dict() # Clean up kwargs for pass through model_kwargs_cleaned = self.spec.model_kwargs.copy() @@ -80,81 +86,84 @@ def _build_model(self) -> pd.DataFrame: ] = self.spec.preprocessing or model_kwargs_cleaned.get("preprocessing", True) for i, (target, df) in enumerate(full_data_dict.items()): - logger.debug("Running automl for {} at position {}".format(target, i)) - series_values = df[df[target].notna()] - # drop NaNs for the time period where data wasn't recorded - series_values.dropna(inplace=True) - df[date_column] = pd.to_datetime( - df[date_column], format=self.spec.datetime_column.format - ) - df = df.set_index(date_column) - # if len(df.columns) > 1: - # when additional columns are present - y_train, y_test = temporal_train_test_split(df, test_size=horizon) - forecast_x = y_test.drop(target, axis=1) - # else: - # y_train = df - # forecast_x = None - logger.debug( - "Time Index is" + "" - if y_train.index.is_monotonic - else "NOT" + "monotonic." - ) - model = automl.Pipeline( - task="forecasting", - **model_kwargs_cleaned, - ) - model.fit( - X=y_train.drop(target, axis=1), - y=pd.DataFrame(y_train[target]), - time_budget=time_budget, - ) - logger.debug("Selected model: {}".format(model.selected_model_)) - logger.debug( - "Selected model params: {}".format(model.selected_model_params_) - ) - summary_frame = model.forecast( - X=forecast_x, - periods=horizon, - alpha=1 - (self.spec.confidence_interval_width / 100), - ) - input_values = pd.Series( - y_train[target].values, - name="input_value", - index=y_train.index, - ) - fitted_values_raw = model.predict(y_train.drop(target, axis=1)) - fitted_values = pd.Series( - fitted_values_raw[target].values, - name="fitted_value", - index=y_train.index, - ) + try: + logger.debug("Running automl for {} at position {}".format(target, i)) + series_values = df[df[target].notna()] + # drop NaNs for the time period where data wasn't recorded + series_values.dropna(inplace=True) + df[date_column] = pd.to_datetime( + df[date_column], format=self.spec.datetime_column.format + ) + df = df.set_index(date_column) + # if len(df.columns) > 1: + # when additional columns are present + y_train, y_test = temporal_train_test_split(df, test_size=horizon) + forecast_x = y_test.drop(target, axis=1) + # else: + # y_train = df + # forecast_x = None + logger.debug( + "Time Index is" + "" + if y_train.index.is_monotonic + else "NOT" + "monotonic." + ) + model = automl.Pipeline( + task="forecasting", + **model_kwargs_cleaned, + ) + model.fit( + X=y_train.drop(target, axis=1), + y=pd.DataFrame(y_train[target]), + time_budget=time_budget, + ) + logger.debug("Selected model: {}".format(model.selected_model_)) + logger.debug( + "Selected model params: {}".format(model.selected_model_params_) + ) + summary_frame = model.forecast( + X=forecast_x, + periods=horizon, + alpha=1 - (self.spec.confidence_interval_width / 100), + ) + input_values = pd.Series( + y_train[target].values, + name="input_value", + index=y_train.index, + ) + fitted_values_raw = model.predict(y_train.drop(target, axis=1)) + fitted_values = pd.Series( + fitted_values_raw[target].values, + name="fitted_value", + index=y_train.index, + ) - summary_frame = pd.concat( - [input_values, fitted_values, summary_frame], axis=1 - ) + summary_frame = pd.concat( + [input_values, fitted_values, summary_frame], axis=1 + ) - # Collect Outputs - selected_models[target] = { - "series_id": target, - "selected_model": model.selected_model_, - "model_params": model.selected_model_params_, - } - models[target] = model - summary_frame = summary_frame.rename_axis("ds").reset_index() - summary_frame = summary_frame.rename( - columns={ - f"{target}_ci_upper": "yhat_upper", - f"{target}_ci_lower": "yhat_lower", - f"{target}": "yhat", + # Collect Outputs + selected_models[target] = { + "series_id": target, + "selected_model": model.selected_model_, + "model_params": model.selected_model_params_, } - ) - # In case of Naive model, model.forecast function call does not return confidence intervals. - if "yhat_upper" not in summary_frame: - summary_frame["yhat_upper"] = np.NAN - summary_frame["yhat_lower"] = np.NAN - outputs[target] = summary_frame - # outputs_legacy[target] = summary_frame + models[target] = model + summary_frame = summary_frame.rename_axis("ds").reset_index() + summary_frame = summary_frame.rename( + columns={ + f"{target}_ci_upper": "yhat_upper", + f"{target}_ci_lower": "yhat_lower", + f"{target}": "yhat", + } + ) + # In case of Naive model, model.forecast function call does not return confidence intervals. + if "yhat_upper" not in summary_frame: + summary_frame["yhat_upper"] = np.NAN + summary_frame["yhat_lower"] = np.NAN + outputs[target] = summary_frame + # outputs_legacy[target] = summary_frame + except Exception as e: + self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} logger.debug("===========Forecast Generated===========") outputs_merged = pd.DataFrame() diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index e0f16eca1..c6667a5b7 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -4,6 +4,7 @@ # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +import json import os import tempfile import time @@ -20,7 +21,12 @@ from ads.opctl import logger from .. import utils -from ..const import SUMMARY_METRICS_HORIZON_LIMIT, SupportedMetrics, SupportedModels, SpeedAccuracyMode +from ..const import ( + SUMMARY_METRICS_HORIZON_LIMIT, + SupportedMetrics, + SupportedModels, + SpeedAccuracyMode, +) from ..operator_config import ForecastOperatorConfig, ForecastOperatorSpec from ads.common.decorator.runtime_dependency import runtime_dependency from .forecast_datasets import ForecastDatasets, ForecastOutput @@ -56,6 +62,7 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): # "outputs" is a list of outputs generated by the models. These should only be generated when the framework requires the original output for plotting self.outputs = None self.forecast_output = None + self.errors_dict = dict() self.train_metrics = False self.forecast_col_name = "yhat" @@ -529,6 +536,20 @@ def _save_report( f"The outputs have been successfully " f"generated and placed into the directory: {output_dir}." ) + if self.errors_dict: + utils._write_data( + data=pd.DataFrame(self.errors_dict.items(), columns=["model", "error"]), + filename=os.path.join( + output_dir, self.spec.errors_dict_filename + ), + format="csv", + storage_options=storage_options, + index=True, + ) + else: + logger.info( + f"No model failures found for the selected forecasting model" + ) def _preprocess(self, data, ds_column, datetime_format): """The method that needs to be implemented on the particular model level.""" @@ -575,10 +596,13 @@ def explain_model(self, datetime_col_name, explain_predict_fn) -> dict: The keys are the feature names and the values are the average absolute SHAP values. """ from shap import PermutationExplainer + exp_start_time = time.time() global_ex_time = 0 local_ex_time = 0 - logger.info(f"Calculating explanations using {self.spec.explanations_accuracy_mode} mode") + logger.info( + f"Calculating explanations using {self.spec.explanations_accuracy_mode} mode" + ) for series_id in self.target_columns: self.series_id = series_id self.dataset_cols = ( @@ -592,10 +616,13 @@ def explain_model(self, datetime_col_name, explain_predict_fn) -> dict: datetime_col_name ) data = self.bg_data[list(self.dataset_cols)][: -self.spec.horizon][ - list(self.dataset_cols)] + list(self.dataset_cols) + ] ratio = SpeedAccuracyMode.ratio[self.spec.explanations_accuracy_mode] data_trimmed = data.tail(max(int(len(data) * ratio), 100)).reset_index() - data_trimmed[datetime_col_name] = data_trimmed[datetime_col_name].apply(lambda x: x.timestamp()) + data_trimmed[datetime_col_name] = data_trimmed[datetime_col_name].apply( + lambda x: x.timestamp() + ) kernel_explnr = PermutationExplainer( model=explain_predict_fn, masker=data_trimmed, @@ -624,8 +651,12 @@ def explain_model(self, datetime_col_name, explain_predict_fn) -> dict: kernel_explnr, series_id=series_id, datetime_col_name=datetime_col_name ) local_ex_time = local_ex_time + time.time() - exp_end_time - logger.info("Global explanations generation completed in %s seconds", global_ex_time) - logger.info("Local explanations generation completed in %s seconds", local_ex_time) + logger.info( + "Global explanations generation completed in %s seconds", global_ex_time + ) + logger.info( + "Local explanations generation completed in %s seconds", local_ex_time + ) def local_explainer(self, kernel_explainer, series_id, datetime_col_name) -> None: """ @@ -637,7 +668,7 @@ def local_explainer(self, kernel_explainer, series_id, datetime_col_name) -> Non """ # Get the data for the series ID and select the relevant columns # data = self.full_data_dict.get(series_id).set_index(datetime_col_name) - data_horizon = self.bg_data[-self.spec.horizon:][list(self.dataset_cols)] + data_horizon = self.bg_data[-self.spec.horizon :][list(self.dataset_cols)] data = data_horizon.reset_index() data[datetime_col_name] = data[datetime_col_name].apply(lambda x: x.timestamp()) # Generate local SHAP values using the kernel explainer diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index e2303e113..8c6f743b5 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -7,6 +7,7 @@ import numpy as np import optuna import pandas as pd +from joblib import Parallel, delayed from torch import Tensor from torchmetrics.regression import ( MeanAbsoluteError, @@ -71,33 +72,30 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.train_metrics = True self.forecast_col_name = "yhat1" - def _build_model(self) -> pd.DataFrame: - from neuralprophet import NeuralProphet - full_data_dict = self.datasets.full_data_dict - models = [] - outputs = dict() - outputs_legacy = [] - - # Extract the Confidence Interval Width and - # convert to neural prophets equivalent - quantiles - model_kwargs = self.spec.model_kwargs - - if self.spec.confidence_interval_width is None: - quantiles = model_kwargs.get("quantiles", [0.05, 0.95]) - self.spec.confidence_interval_width = float(quantiles[1]) - float( - quantiles[0] - ) - else: - boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2) - quantiles = [boundaries, self.spec.confidence_interval_width + boundaries] + def _train_model(self, i, target, df): - model_kwargs["quantiles"] = quantiles - self.forecast_output = ForecastOutput( - confidence_interval_width=self.spec.confidence_interval_width - ) + try: + from neuralprophet import NeuralProphet + + # Extract the Confidence Interval Width and + # convert to neural prophets equivalent - quantiles + model_kwargs = self.spec.model_kwargs + + if self.spec.confidence_interval_width is None: + self.quantiles = model_kwargs.get("quantiles", [0.05, 0.95]) + self.spec.confidence_interval_width = float(self.quantiles[1]) - float( + self.quantiles[0] + ) + else: + boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2) + self.quantiles = [boundaries, self.spec.confidence_interval_width + boundaries] + + model_kwargs["quantiles"] = self.quantiles + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width + ) - for i, (target, df) in enumerate(full_data_dict.items()): le, df_encoded = utils._label_encode_dataframe( df, no_encode={self.spec.datetime_column.name, target} ) @@ -212,14 +210,34 @@ def objective(trial): forecast = model.predict(future) logger.debug(f"-----------------Model {i}----------------------") logger.debug(forecast.tail()) - models.append(model) - outputs[target] = forecast - outputs_legacy.append(forecast) + # models.append(model) + self.outputs_dict[target] = forecast + self.outputs_legacy.append(forecast) + + self.models_dict[target] = model + self.outputs = self.outputs_legacy + + logger.debug("===========Done===========") + except Exception as e: + self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} + + def _build_model(self) -> pd.DataFrame: + # from neuralprophet import NeuralProphet - self.models = models - self.outputs = outputs_legacy + full_data_dict = self.datasets.full_data_dict + self.models_dict = dict() + self.outputs_dict = dict() + self.outputs_legacy = [] + self.errors_dict = dict() + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(NeuralProphetOperatorModel._train_model)(self, i, target, df) + for self, (i, (target, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) - logger.debug("===========Done===========") + self.models = [self.models_dict[target] for target in self.target_columns] # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column @@ -229,7 +247,7 @@ def objective(trial): for cat in self.categories: output_i = pd.DataFrame() - output_i["Date"] = outputs[f"{col}_{cat}"]["ds"] + output_i["Date"] = self.outputs_dict[f"{col}_{cat}"]["ds"] output_i["Series"] = cat output_i[f"input_value"] = full_data_dict[f"{col}_{cat}"][f"{col}_{cat}"] @@ -240,18 +258,18 @@ def objective(trial): output_i.iloc[ : -self.spec.horizon, output_i.columns.get_loc(f"fitted_value") - ] = (outputs[f"{col}_{cat}"]["yhat1"].iloc[: -self.spec.horizon].values) + ] = (self.outputs_dict[f"{col}_{cat}"]["yhat1"].iloc[: -self.spec.horizon].values) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(f"forecast_value"), ] = ( - outputs[f"{col}_{cat}"]["yhat1"].iloc[-self.spec.horizon :].values + self.outputs_dict[f"{col}_{cat}"]["yhat1"].iloc[-self.spec.horizon :].values ) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(yhat_upper_name), ] = ( - outputs[f"{col}_{cat}"][f"yhat1 {quantiles[1]*100}%"] + self.outputs_dict[f"{col}_{cat}"][f"yhat1 {self.quantiles[1]*100}%"] .iloc[-self.spec.horizon :] .values ) @@ -259,7 +277,7 @@ def objective(trial): -self.spec.horizon :, output_i.columns.get_loc(yhat_lower_name), ] = ( - outputs[f"{col}_{cat}"][f"yhat1 {quantiles[0]*100}%"] + self.outputs_dict[f"{col}_{cat}"][f"yhat1 {self.quantiles[0]*100}%"] .iloc[-self.spec.horizon :] .values ) diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index d1e204235..2b865a17e 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -7,6 +7,7 @@ import numpy as np import optuna import pandas as pd +from joblib import Parallel, delayed from ads.common.decorator.runtime_dependency import runtime_dependency from ads.opctl import logger from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig @@ -45,29 +46,25 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.global_explanation = {} self.local_explanation = {} - def _build_model(self) -> pd.DataFrame: - from prophet import Prophet - from prophet.diagnostics import cross_validation, performance_metrics + def _train_model(self, i, target, df): - full_data_dict = self.datasets.full_data_dict - models = [] - outputs = dict() - outputs_legacy = [] - - # Extract the Confidence Interval Width and convert to prophet's equivalent - interval_width - if self.spec.confidence_interval_width is None: - self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get( - "alpha", 0.90 - ) + try: + from prophet import Prophet + from prophet.diagnostics import cross_validation, performance_metrics - model_kwargs = self.spec.model_kwargs - model_kwargs["interval_width"] = self.spec.confidence_interval_width + # Extract the Confidence Interval Width and convert to prophet's equivalent - interval_width + if self.spec.confidence_interval_width is None: + self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get( + "alpha", 0.90 + ) - self.forecast_output = ForecastOutput( - confidence_interval_width=self.spec.confidence_interval_width - ) + model_kwargs = self.spec.model_kwargs + model_kwargs["interval_width"] = self.spec.confidence_interval_width + + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width + ) - for i, (target, df) in enumerate(full_data_dict.items()): le, df_encoded = utils._label_encode_dataframe( df, no_encode={self.spec.datetime_column.name, target} ) @@ -195,14 +192,35 @@ def objective(trial): ) # Collect Outputs - models.append(model) - outputs[target] = forecast - outputs_legacy.append(forecast) + # models.append(model) + self.outputs_dict[target] = forecast + self.outputs_legacy.append(forecast) - self.models = models - self.outputs = outputs_legacy + self.models_dict[target] = model + self.outputs = self.outputs_legacy + + logger.debug("===========Done===========") + except Exception as e: + self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} + + def _build_model(self) -> pd.DataFrame: + from prophet import Prophet + from prophet.diagnostics import cross_validation, performance_metrics + + full_data_dict = self.datasets.full_data_dict + self.models_dict = dict() + self.outputs_dict = dict() + self.outputs_legacy = [] + self.errors_dict = dict() + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(ProphetOperatorModel._train_model)(self, i, target, df) + for self, (i, (target, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) - logger.debug("===========Done===========") + self.models = [self.models_dict[target] for target in self.target_columns] # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column @@ -212,7 +230,7 @@ def objective(trial): for cat in self.categories: output_i = pd.DataFrame() - output_i["Date"] = outputs[f"{col}_{cat}"][PROPHET_INTERNAL_DATE_COL] + output_i["Date"] = self.outputs_dict[f"{col}_{cat}"][PROPHET_INTERNAL_DATE_COL] output_i["Series"] = cat output_i["input_value"] = full_data_dict[f"{col}_{cat}"][f"{col}_{cat}"] @@ -223,22 +241,22 @@ def objective(trial): output_i.iloc[ : -self.spec.horizon, output_i.columns.get_loc(f"fitted_value") - ] = (outputs[f"{col}_{cat}"]["yhat"].iloc[: -self.spec.horizon].values) + ] = (self.outputs_dict[f"{col}_{cat}"]["yhat"].iloc[: -self.spec.horizon].values) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(f"forecast_value"), ] = ( - outputs[f"{col}_{cat}"]["yhat"].iloc[-self.spec.horizon :].values + self.outputs_dict[f"{col}_{cat}"]["yhat"].iloc[-self.spec.horizon :].values ) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(yhat_upper_name) ] = ( - outputs[f"{col}_{cat}"]["yhat_upper"].iloc[-self.spec.horizon :].values + self.outputs_dict[f"{col}_{cat}"]["yhat_upper"].iloc[-self.spec.horizon :].values ) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(yhat_lower_name) ] = ( - outputs[f"{col}_{cat}"]["yhat_lower"].iloc[-self.spec.horizon :].values + self.outputs_dict[f"{col}_{cat}"]["yhat_lower"].iloc[-self.spec.horizon :].values ) output_col = pd.concat([output_col, output_i]) self.forecast_output.add_category( diff --git a/ads/opctl/operator/lowcode/forecast/operator_config.py b/ads/opctl/operator/lowcode/forecast/operator_config.py index 9565fd453..8a07dbbb6 100644 --- a/ads/opctl/operator/lowcode/forecast/operator_config.py +++ b/ads/opctl/operator/lowcode/forecast/operator_config.py @@ -132,6 +132,7 @@ def __post_init__(self): self.local_explanation_filename or "local_explanation.csv" ) self.target_column = self.target_column or "Sales" + self.errors_dict_filename = "errors.json" self.model_kwargs = self.model_kwargs or dict()