From d9bab094a1c0d08f4d7c8031815143d75c902e2b Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 3 Jan 2024 05:26:03 +0000 Subject: [PATCH 1/8] make the model parallel, allow object parallel access, some restructuring and formatting --- .../operator/lowcode/forecast/model/arima.py | 188 ++++++++++-------- 1 file changed, 104 insertions(+), 84 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index 8e1a544ae..65936e8ee 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -7,6 +7,7 @@ import pandas as pd import numpy as np import pmdarima as pm +from joblib import Parallel, delayed from ads.opctl import logger @@ -29,8 +30,18 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.formatted_global_explanation = None self.formatted_local_explanation = None - def _build_model(self) -> pd.DataFrame: - full_data_dict = self.datasets.full_data_dict + def _train_model(self, i, target, df): + """Trains the ARIMA model for a given target. + + Parameters + ---------- + i: int + The index of the target + target: str + The name of the target + df: pd.DataFrame + The dataframe containing the target data + """ # Extract the Confidence Interval Width and convert to arima's equivalent - alpha if self.spec.confidence_interval_width is None: @@ -42,90 +53,99 @@ def _build_model(self) -> pd.DataFrame: if "error_action" not in model_kwargs.keys(): model_kwargs["error_action"] = "ignore" - models = [] - self.datasets.datetime_col = self.spec.datetime_column.name - self.forecast_output = ForecastOutput( - confidence_interval_width=self.spec.confidence_interval_width + # models = [] + + # format the dataframe for this target. Dropping NA on target[df] will remove all future data + le, df_encoded = utils._label_encode_dataframe( + df, no_encode={self.spec.datetime_column.name, target} ) - outputs = dict() - outputs_legacy = [] - fitted_values = dict() - actual_values = dict() - dt_columns = dict() + df_encoded[self.spec.datetime_column.name] = pd.to_datetime( + df_encoded[self.spec.datetime_column.name], + format=self.spec.datetime_column.format, + ) + df_clean = df_encoded.set_index(self.spec.datetime_column.name) + data_i = df_clean[df_clean[target].notna()] + + # Assume that all columns passed in should be used as additional data + additional_regressors = set(data_i.columns) - { + target, + self.spec.datetime_column.name, + } + logger.debug(f"Additional Regressors Detected {list(additional_regressors)}") + + # Split data into X and y for arima tune method + y = data_i[target] + X_in = None + if len(additional_regressors): + X_in = data_i.drop(target, axis=1) + + # Build and fit model + model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs) + + self.fitted_values[target] = model.predict_in_sample(X=X_in) + self.actual_values[target] = y + self.actual_values[target].index = pd.to_datetime(y.index) + + # Build future dataframe + start_date = y.index.values[-1] + n_periods = self.spec.horizon + if len(additional_regressors): + X = df_clean[df_clean[target].isnull()].drop(target, axis=1) + else: + X = pd.date_range(start=start_date, periods=n_periods, freq=self.spec.freq) + + # Predict and format forecast + yhat, conf_int = model.predict( + n_periods=n_periods, + X=X, + return_conf_int=True, + alpha=model_kwargs["alpha"], + ) + yhat_clean = pd.DataFrame(yhat, index=yhat.index, columns=["yhat"]) - for i, (target, df) in enumerate(full_data_dict.items()): - # format the dataframe for this target. Dropping NA on target[df] will remove all future data - le, df_encoded = utils._label_encode_dataframe( - df, no_encode={self.spec.datetime_column.name, target} - ) + self.dt_columns[target] = df_encoded[self.spec.datetime_column.name] + conf_int_clean = pd.DataFrame( + conf_int, index=yhat.index, columns=["yhat_lower", "yhat_upper"] + ) + forecast = pd.concat([yhat_clean, conf_int_clean], axis=1) + logger.debug(f"-----------------Model {i}----------------------") + logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) + + # Collect all outputs + # models.append(model) + self.outputs_legacy.append( + forecast.reset_index().rename(columns={"index": "ds"}) + ) + self.outputs[target] = forecast - df_encoded[self.spec.datetime_column.name] = pd.to_datetime( - df_encoded[self.spec.datetime_column.name], - format=self.spec.datetime_column.format, - ) - df_clean = df_encoded.set_index(self.spec.datetime_column.name) - data_i = df_clean[df_clean[target].notna()] - - # Assume that all columns passed in should be used as additional data - additional_regressors = set(data_i.columns) - { - target, - self.spec.datetime_column.name, - } - logger.debug( - f"Additional Regressors Detected {list(additional_regressors)}" - ) + self.models_dict[target] = model - # Split data into X and y for arima tune method - y = data_i[target] - X_in = None - if len(additional_regressors): - X_in = data_i.drop(target, axis=1) - - # Build and fit model - model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs) - - fitted_values[target] = model.predict_in_sample(X=X_in) - actual_values[target] = y - actual_values[target].index = pd.to_datetime(y.index) - - # Build future dataframe - start_date = y.index.values[-1] - n_periods = self.spec.horizon - if len(additional_regressors): - X = df_clean[df_clean[target].isnull()].drop(target, axis=1) - else: - X = pd.date_range( - start=start_date, periods=n_periods, freq=self.spec.freq - ) + logger.debug("===========Done===========") - # Predict and format forecast - yhat, conf_int = model.predict( - n_periods=n_periods, - X=X, - return_conf_int=True, - alpha=model_kwargs["alpha"], - ) - yhat_clean = pd.DataFrame(yhat, index=yhat.index, columns=["yhat"]) + def _build_model(self) -> pd.DataFrame: + full_data_dict = self.datasets.full_data_dict - dt_columns[target] = df_encoded[self.spec.datetime_column.name] - conf_int_clean = pd.DataFrame( - conf_int, index=yhat.index, columns=["yhat_lower", "yhat_upper"] - ) - forecast = pd.concat([yhat_clean, conf_int_clean], axis=1) - logger.debug(f"-----------------Model {i}----------------------") - logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) - - # Collect all outputs - models.append(model) - outputs_legacy.append( - forecast.reset_index().rename(columns={"index": "ds"}) - ) - outputs[target] = forecast + self.datasets.datetime_col = self.spec.datetime_column.name + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width + ) - self.models = models + self.outputs = dict() + self.outputs_legacy = [] + self.fitted_values = dict() + self.actual_values = dict() + self.dt_columns = dict() + self.models_dict = dict() + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(ArimaOperatorModel._train_model)(self, i, target, df) + for self, (i, (target, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) - logger.debug("===========Done===========") + self.models = [self.models_dict[target] for target in self.target_columns] # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column @@ -134,15 +154,15 @@ def _build_model(self) -> pd.DataFrame: yhat_lower_name = ForecastOutputColumns.LOWER_BOUND for cat in self.categories: output_i = pd.DataFrame() - output_i["Date"] = dt_columns[f"{col}_{cat}"] + output_i["Date"] = self.dt_columns[f"{col}_{cat}"] output_i["Series"] = cat output_i = output_i.set_index("Date") - output_i["input_value"] = actual_values[f"{col}_{cat}"] - output_i["fitted_value"] = fitted_values[f"{col}_{cat}"] - output_i["forecast_value"] = outputs[f"{col}_{cat}"]["yhat"] - output_i[yhat_upper_name] = outputs[f"{col}_{cat}"]["yhat_upper"] - output_i[yhat_lower_name] = outputs[f"{col}_{cat}"]["yhat_lower"] + output_i["input_value"] = self.actual_values[f"{col}_{cat}"] + output_i["fitted_value"] = self.fitted_values[f"{col}_{cat}"] + output_i["forecast_value"] = self.outputs[f"{col}_{cat}"]["yhat"] + output_i[yhat_upper_name] = self.outputs[f"{col}_{cat}"]["yhat_upper"] + output_i[yhat_lower_name] = self.outputs[f"{col}_{cat}"]["yhat_lower"] output_i = output_i.reset_index(drop=False) output_col = pd.concat([output_col, output_i]) @@ -252,7 +272,7 @@ def _custom_predict_arima(self, data): """ date_col = self.spec.datetime_column.name - data[date_col] = pd.to_datetime(data[date_col], unit='s') + data[date_col] = pd.to_datetime(data[date_col], unit="s") data = data.set_index(date_col) # Get the index of the current series id series_index = self.target_columns.index(self.series_id) From 2c53fa2998489be3b20ad8efc86cafddd773096a Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 3 Jan 2024 07:44:08 +0000 Subject: [PATCH 2/8] add failed model to error dict and write it to a file in case of failure --- .../operator/lowcode/forecast/model/arima.py | 152 +++++++++--------- .../lowcode/forecast/model/base_model.py | 44 ++++- .../lowcode/forecast/operator_config.py | 1 + 3 files changed, 116 insertions(+), 81 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index 65936e8ee..5f4145d5c 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -31,6 +31,7 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.formatted_local_explanation = None def _train_model(self, i, target, df): + # TODO : wrap it in try except and populate a errors_dict with the model name and the error """Trains the ARIMA model for a given target. Parameters @@ -42,86 +43,88 @@ def _train_model(self, i, target, df): df: pd.DataFrame The dataframe containing the target data """ + try: + # Extract the Confidence Interval Width and convert to arima's equivalent - alpha + if self.spec.confidence_interval_width is None: + self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get( + "alpha", 0.05 + ) + model_kwargs = self.spec.model_kwargs + model_kwargs["alpha"] = 1 - self.spec.confidence_interval_width + if "error_action" not in model_kwargs.keys(): + model_kwargs["error_action"] = "ignore" - # Extract the Confidence Interval Width and convert to arima's equivalent - alpha - if self.spec.confidence_interval_width is None: - self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get( - "alpha", 0.05 - ) - model_kwargs = self.spec.model_kwargs - model_kwargs["alpha"] = 1 - self.spec.confidence_interval_width - if "error_action" not in model_kwargs.keys(): - model_kwargs["error_action"] = "ignore" - - # models = [] + # models = [] - # format the dataframe for this target. Dropping NA on target[df] will remove all future data - le, df_encoded = utils._label_encode_dataframe( - df, no_encode={self.spec.datetime_column.name, target} - ) + # format the dataframe for this target. Dropping NA on target[df] will remove all future data + le, df_encoded = utils._label_encode_dataframe( + df, no_encode={self.spec.datetime_column.name, target} + ) - df_encoded[self.spec.datetime_column.name] = pd.to_datetime( - df_encoded[self.spec.datetime_column.name], - format=self.spec.datetime_column.format, - ) - df_clean = df_encoded.set_index(self.spec.datetime_column.name) - data_i = df_clean[df_clean[target].notna()] - - # Assume that all columns passed in should be used as additional data - additional_regressors = set(data_i.columns) - { - target, - self.spec.datetime_column.name, - } - logger.debug(f"Additional Regressors Detected {list(additional_regressors)}") - - # Split data into X and y for arima tune method - y = data_i[target] - X_in = None - if len(additional_regressors): - X_in = data_i.drop(target, axis=1) - - # Build and fit model - model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs) - - self.fitted_values[target] = model.predict_in_sample(X=X_in) - self.actual_values[target] = y - self.actual_values[target].index = pd.to_datetime(y.index) - - # Build future dataframe - start_date = y.index.values[-1] - n_periods = self.spec.horizon - if len(additional_regressors): - X = df_clean[df_clean[target].isnull()].drop(target, axis=1) - else: - X = pd.date_range(start=start_date, periods=n_periods, freq=self.spec.freq) - - # Predict and format forecast - yhat, conf_int = model.predict( - n_periods=n_periods, - X=X, - return_conf_int=True, - alpha=model_kwargs["alpha"], - ) - yhat_clean = pd.DataFrame(yhat, index=yhat.index, columns=["yhat"]) + df_encoded[self.spec.datetime_column.name] = pd.to_datetime( + df_encoded[self.spec.datetime_column.name], + format=self.spec.datetime_column.format, + ) + df_clean = df_encoded.set_index(self.spec.datetime_column.name) + data_i = df_clean[df_clean[target].notna()] + + # Assume that all columns passed in should be used as additional data + additional_regressors = set(data_i.columns) - { + target, + self.spec.datetime_column.name, + } + logger.debug(f"Additional Regressors Detected {list(additional_regressors)}") + + # Split data into X and y for arima tune method + y = data_i[target] + X_in = None + if len(additional_regressors): + X_in = data_i.drop(target, axis=1) + + # Build and fit model + model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs) + + self.fitted_values[target] = model.predict_in_sample(X=X_in) + self.actual_values[target] = y + self.actual_values[target].index = pd.to_datetime(y.index) + + # Build future dataframe + start_date = y.index.values[-1] + n_periods = self.spec.horizon + if len(additional_regressors): + X = df_clean[df_clean[target].isnull()].drop(target, axis=1) + else: + X = pd.date_range(start=start_date, periods=n_periods, freq=self.spec.freq) + + # Predict and format forecast + yhat, conf_int = model.predict( + n_periods=n_periods, + X=X, + return_conf_int=True, + alpha=model_kwargs["alpha"], + ) + yhat_clean = pd.DataFrame(yhat, index=yhat.index, columns=["yhat"]) - self.dt_columns[target] = df_encoded[self.spec.datetime_column.name] - conf_int_clean = pd.DataFrame( - conf_int, index=yhat.index, columns=["yhat_lower", "yhat_upper"] - ) - forecast = pd.concat([yhat_clean, conf_int_clean], axis=1) - logger.debug(f"-----------------Model {i}----------------------") - logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) - - # Collect all outputs - # models.append(model) - self.outputs_legacy.append( - forecast.reset_index().rename(columns={"index": "ds"}) - ) - self.outputs[target] = forecast + self.dt_columns[target] = df_encoded[self.spec.datetime_column.name] + conf_int_clean = pd.DataFrame( + conf_int, index=yhat.index, columns=["yhat_lower", "yhat_upper"] + ) + forecast = pd.concat([yhat_clean, conf_int_clean], axis=1) + logger.debug(f"-----------------Model {i}----------------------") + logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) + + # Collect all outputs + # models.append(model) + self.outputs_legacy.append( + forecast.reset_index().rename(columns={"index": "ds"}) + ) + self.outputs[target] = forecast - self.models_dict[target] = model + self.models_dict[target] = model - logger.debug("===========Done===========") + logger.debug("===========Done===========") + except Exception as e: + self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} def _build_model(self) -> pd.DataFrame: full_data_dict = self.datasets.full_data_dict @@ -137,6 +140,7 @@ def _build_model(self) -> pd.DataFrame: self.actual_values = dict() self.dt_columns = dict() self.models_dict = dict() + self.errors_dict = dict() Parallel(n_jobs=-1, require="sharedmem")( delayed(ArimaOperatorModel._train_model)(self, i, target, df) diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index e0f16eca1..404b38a2a 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -4,6 +4,7 @@ # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +import json import os import tempfile import time @@ -20,7 +21,12 @@ from ads.opctl import logger from .. import utils -from ..const import SUMMARY_METRICS_HORIZON_LIMIT, SupportedMetrics, SupportedModels, SpeedAccuracyMode +from ..const import ( + SUMMARY_METRICS_HORIZON_LIMIT, + SupportedMetrics, + SupportedModels, + SpeedAccuracyMode, +) from ..operator_config import ForecastOperatorConfig, ForecastOperatorSpec from ads.common.decorator.runtime_dependency import runtime_dependency from .forecast_datasets import ForecastDatasets, ForecastOutput @@ -529,6 +535,20 @@ def _save_report( f"The outputs have been successfully " f"generated and placed into the directory: {output_dir}." ) + if self.errors_dict: + utils._write_data( + data=pd.DataFrame(self.errors_dict.items(), columns=["model", "error"]), + filename=os.path.join( + output_dir, self.spec.errors_dict_filename + ), + format="csv", + storage_options=storage_options, + index=True, + ) + else: + logger.warn( + f"Attempted to write model errors for the {self.spec.errors_dict_filename} file, but an issue writing to storage." + ) def _preprocess(self, data, ds_column, datetime_format): """The method that needs to be implemented on the particular model level.""" @@ -575,10 +595,13 @@ def explain_model(self, datetime_col_name, explain_predict_fn) -> dict: The keys are the feature names and the values are the average absolute SHAP values. """ from shap import PermutationExplainer + exp_start_time = time.time() global_ex_time = 0 local_ex_time = 0 - logger.info(f"Calculating explanations using {self.spec.explanations_accuracy_mode} mode") + logger.info( + f"Calculating explanations using {self.spec.explanations_accuracy_mode} mode" + ) for series_id in self.target_columns: self.series_id = series_id self.dataset_cols = ( @@ -592,10 +615,13 @@ def explain_model(self, datetime_col_name, explain_predict_fn) -> dict: datetime_col_name ) data = self.bg_data[list(self.dataset_cols)][: -self.spec.horizon][ - list(self.dataset_cols)] + list(self.dataset_cols) + ] ratio = SpeedAccuracyMode.ratio[self.spec.explanations_accuracy_mode] data_trimmed = data.tail(max(int(len(data) * ratio), 100)).reset_index() - data_trimmed[datetime_col_name] = data_trimmed[datetime_col_name].apply(lambda x: x.timestamp()) + data_trimmed[datetime_col_name] = data_trimmed[datetime_col_name].apply( + lambda x: x.timestamp() + ) kernel_explnr = PermutationExplainer( model=explain_predict_fn, masker=data_trimmed, @@ -624,8 +650,12 @@ def explain_model(self, datetime_col_name, explain_predict_fn) -> dict: kernel_explnr, series_id=series_id, datetime_col_name=datetime_col_name ) local_ex_time = local_ex_time + time.time() - exp_end_time - logger.info("Global explanations generation completed in %s seconds", global_ex_time) - logger.info("Local explanations generation completed in %s seconds", local_ex_time) + logger.info( + "Global explanations generation completed in %s seconds", global_ex_time + ) + logger.info( + "Local explanations generation completed in %s seconds", local_ex_time + ) def local_explainer(self, kernel_explainer, series_id, datetime_col_name) -> None: """ @@ -637,7 +667,7 @@ def local_explainer(self, kernel_explainer, series_id, datetime_col_name) -> Non """ # Get the data for the series ID and select the relevant columns # data = self.full_data_dict.get(series_id).set_index(datetime_col_name) - data_horizon = self.bg_data[-self.spec.horizon:][list(self.dataset_cols)] + data_horizon = self.bg_data[-self.spec.horizon :][list(self.dataset_cols)] data = data_horizon.reset_index() data[datetime_col_name] = data[datetime_col_name].apply(lambda x: x.timestamp()) # Generate local SHAP values using the kernel explainer diff --git a/ads/opctl/operator/lowcode/forecast/operator_config.py b/ads/opctl/operator/lowcode/forecast/operator_config.py index 9565fd453..0832349c5 100644 --- a/ads/opctl/operator/lowcode/forecast/operator_config.py +++ b/ads/opctl/operator/lowcode/forecast/operator_config.py @@ -132,6 +132,7 @@ def __post_init__(self): self.local_explanation_filename or "local_explanation.csv" ) self.target_column = self.target_column or "Sales" + self.errors_dict_filename = "errors_dict.json" self.model_kwargs = self.model_kwargs or dict() From 4394f67c0aa982cde863719854cf1c482a092e07 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 4 Jan 2024 17:40:58 +0000 Subject: [PATCH 3/8] add the joblib parallelisation for the prophet, update the automlx param to inter-model and intra models parallel process --- .../lowcode/forecast/model/automlx.py | 155 +++++++++--------- .../lowcode/forecast/model/base_model.py | 5 +- .../lowcode/forecast/model/prophet.py | 83 ++++++---- 3 files changed, 136 insertions(+), 107 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/automlx.py b/ads/opctl/operator/lowcode/forecast/model/automlx.py index eaa32587e..e0f36f995 100644 --- a/ads/opctl/operator/lowcode/forecast/model/automlx.py +++ b/ads/opctl/operator/lowcode/forecast/model/automlx.py @@ -45,10 +45,15 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): ), ) def _build_model(self) -> pd.DataFrame: + from automl import init from sktime.forecasting.model_selection import temporal_train_test_split - init(engine="local", check_deprecation_warnings=False) + init( + engine="local", + engine_opts={"n_jobs": -1, "model_n_jobs": -1}, + check_deprecation_warnings=False, + ) full_data_dict = self.datasets.full_data_dict @@ -63,6 +68,7 @@ def _build_model(self) -> pd.DataFrame: self.forecast_output = ForecastOutput( confidence_interval_width=self.spec.confidence_interval_width ) + self.errors_dict = dict() # Clean up kwargs for pass through model_kwargs_cleaned = self.spec.model_kwargs.copy() @@ -80,81 +86,84 @@ def _build_model(self) -> pd.DataFrame: ] = self.spec.preprocessing or model_kwargs_cleaned.get("preprocessing", True) for i, (target, df) in enumerate(full_data_dict.items()): - logger.debug("Running automl for {} at position {}".format(target, i)) - series_values = df[df[target].notna()] - # drop NaNs for the time period where data wasn't recorded - series_values.dropna(inplace=True) - df[date_column] = pd.to_datetime( - df[date_column], format=self.spec.datetime_column.format - ) - df = df.set_index(date_column) - # if len(df.columns) > 1: - # when additional columns are present - y_train, y_test = temporal_train_test_split(df, test_size=horizon) - forecast_x = y_test.drop(target, axis=1) - # else: - # y_train = df - # forecast_x = None - logger.debug( - "Time Index is" + "" - if y_train.index.is_monotonic - else "NOT" + "monotonic." - ) - model = automl.Pipeline( - task="forecasting", - **model_kwargs_cleaned, - ) - model.fit( - X=y_train.drop(target, axis=1), - y=pd.DataFrame(y_train[target]), - time_budget=time_budget, - ) - logger.debug("Selected model: {}".format(model.selected_model_)) - logger.debug( - "Selected model params: {}".format(model.selected_model_params_) - ) - summary_frame = model.forecast( - X=forecast_x, - periods=horizon, - alpha=1 - (self.spec.confidence_interval_width / 100), - ) - input_values = pd.Series( - y_train[target].values, - name="input_value", - index=y_train.index, - ) - fitted_values_raw = model.predict(y_train.drop(target, axis=1)) - fitted_values = pd.Series( - fitted_values_raw[target].values, - name="fitted_value", - index=y_train.index, - ) + try: + logger.debug("Running automl for {} at position {}".format(target, i)) + series_values = df[df[target].notna()] + # drop NaNs for the time period where data wasn't recorded + series_values.dropna(inplace=True) + df[date_column] = pd.to_datetime( + df[date_column], format=self.spec.datetime_column.format + ) + df = df.set_index(date_column) + # if len(df.columns) > 1: + # when additional columns are present + y_train, y_test = temporal_train_test_split(df, test_size=horizon) + forecast_x = y_test.drop(target, axis=1) + # else: + # y_train = df + # forecast_x = None + logger.debug( + "Time Index is" + "" + if y_train.index.is_monotonic + else "NOT" + "monotonic." + ) + model = automl.Pipeline( + task="forecasting", + **model_kwargs_cleaned, + ) + model.fit( + X=y_train.drop(target, axis=1), + y=pd.DataFrame(y_train[target]), + time_budget=time_budget, + ) + logger.debug("Selected model: {}".format(model.selected_model_)) + logger.debug( + "Selected model params: {}".format(model.selected_model_params_) + ) + summary_frame = model.forecast( + X=forecast_x, + periods=horizon, + alpha=1 - (self.spec.confidence_interval_width / 100), + ) + input_values = pd.Series( + y_train[target].values, + name="input_value", + index=y_train.index, + ) + fitted_values_raw = model.predict(y_train.drop(target, axis=1)) + fitted_values = pd.Series( + fitted_values_raw[target].values, + name="fitted_value", + index=y_train.index, + ) - summary_frame = pd.concat( - [input_values, fitted_values, summary_frame], axis=1 - ) + summary_frame = pd.concat( + [input_values, fitted_values, summary_frame], axis=1 + ) - # Collect Outputs - selected_models[target] = { - "series_id": target, - "selected_model": model.selected_model_, - "model_params": model.selected_model_params_, - } - models[target] = model - summary_frame = summary_frame.rename_axis("ds").reset_index() - summary_frame = summary_frame.rename( - columns={ - f"{target}_ci_upper": "yhat_upper", - f"{target}_ci_lower": "yhat_lower", - f"{target}": "yhat", + # Collect Outputs + selected_models[target] = { + "series_id": target, + "selected_model": model.selected_model_, + "model_params": model.selected_model_params_, } - ) - # In case of Naive model, model.forecast function call does not return confidence intervals. - if "yhat_upper" not in summary_frame: - summary_frame["yhat_upper"] = np.NAN - summary_frame["yhat_lower"] = np.NAN - outputs[target] = summary_frame - # outputs_legacy[target] = summary_frame + models[target] = model + summary_frame = summary_frame.rename_axis("ds").reset_index() + summary_frame = summary_frame.rename( + columns={ + f"{target}_ci_upper": "yhat_upper", + f"{target}_ci_lower": "yhat_lower", + f"{target}": "yhat", + } + ) + # In case of Naive model, model.forecast function call does not return confidence intervals. + if "yhat_upper" not in summary_frame: + summary_frame["yhat_upper"] = np.NAN + summary_frame["yhat_lower"] = np.NAN + outputs[target] = summary_frame + # outputs_legacy[target] = summary_frame + except Exception as e: + self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} logger.debug("===========Forecast Generated===========") outputs_merged = pd.DataFrame() diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index 404b38a2a..c6667a5b7 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -62,6 +62,7 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): # "outputs" is a list of outputs generated by the models. These should only be generated when the framework requires the original output for plotting self.outputs = None self.forecast_output = None + self.errors_dict = dict() self.train_metrics = False self.forecast_col_name = "yhat" @@ -546,8 +547,8 @@ def _save_report( index=True, ) else: - logger.warn( - f"Attempted to write model errors for the {self.spec.errors_dict_filename} file, but an issue writing to storage." + logger.info( + f"No model failures found for the selected forecasting model" ) def _preprocess(self, data, ds_column, datetime_format): diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index d1e204235..02b02fea9 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -7,6 +7,7 @@ import numpy as np import optuna import pandas as pd +from joblib import Parallel, delayed from ads.common.decorator.runtime_dependency import runtime_dependency from ads.opctl import logger from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig @@ -45,29 +46,26 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.global_explanation = {} self.local_explanation = {} - def _build_model(self) -> pd.DataFrame: - from prophet import Prophet - from prophet.diagnostics import cross_validation, performance_metrics + def _train_model(self, i, target, df): - full_data_dict = self.datasets.full_data_dict - models = [] - outputs = dict() - outputs_legacy = [] - - # Extract the Confidence Interval Width and convert to prophet's equivalent - interval_width - if self.spec.confidence_interval_width is None: - self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get( - "alpha", 0.90 - ) + try: + from prophet import Prophet + from prophet.diagnostics import cross_validation, performance_metrics - model_kwargs = self.spec.model_kwargs - model_kwargs["interval_width"] = self.spec.confidence_interval_width + # Extract the Confidence Interval Width and convert to prophet's equivalent - interval_width + if self.spec.confidence_interval_width is None: + self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get( + "alpha", 0.90 + ) - self.forecast_output = ForecastOutput( - confidence_interval_width=self.spec.confidence_interval_width - ) + model_kwargs = self.spec.model_kwargs + model_kwargs["interval_width"] = self.spec.confidence_interval_width + + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width + ) - for i, (target, df) in enumerate(full_data_dict.items()): + # for i, (target, df) in enumerate(full_data_dict.items()): le, df_encoded = utils._label_encode_dataframe( df, no_encode={self.spec.datetime_column.name, target} ) @@ -117,8 +115,8 @@ def objective(trial): ) # Manual workaround because pandas 1.x dropped support for M and Y - interval = self.spec.horizon.interval - unit = self.spec.horizon.interval_unit + interval = self.spec.horizon + unit = "D" if unit == "M": unit = "D" interval = interval * 30.5 @@ -195,14 +193,35 @@ def objective(trial): ) # Collect Outputs - models.append(model) - outputs[target] = forecast - outputs_legacy.append(forecast) + # models.append(model) + self.outputs_dict[target] = forecast + self.outputs_legacy.append(forecast) - self.models = models - self.outputs = outputs_legacy + self.models_dict[target] = model + self.outputs = self.outputs_legacy + + logger.debug("===========Done===========") + except Exception as e: + self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} + + def _build_model(self) -> pd.DataFrame: + from prophet import Prophet + from prophet.diagnostics import cross_validation, performance_metrics + + full_data_dict = self.datasets.full_data_dict + self.models_dict = dict() + self.outputs_dict = dict() + self.outputs_legacy = [] + self.errors_dict = dict() + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(ProphetOperatorModel._train_model)(self, i, target, df) + for self, (i, (target, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) - logger.debug("===========Done===========") + self.models = [self.models_dict[target] for target in self.target_columns] # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column @@ -212,7 +231,7 @@ def objective(trial): for cat in self.categories: output_i = pd.DataFrame() - output_i["Date"] = outputs[f"{col}_{cat}"][PROPHET_INTERNAL_DATE_COL] + output_i["Date"] = self.outputs_dict[f"{col}_{cat}"][PROPHET_INTERNAL_DATE_COL] output_i["Series"] = cat output_i["input_value"] = full_data_dict[f"{col}_{cat}"][f"{col}_{cat}"] @@ -223,22 +242,22 @@ def objective(trial): output_i.iloc[ : -self.spec.horizon, output_i.columns.get_loc(f"fitted_value") - ] = (outputs[f"{col}_{cat}"]["yhat"].iloc[: -self.spec.horizon].values) + ] = (self.outputs_dict[f"{col}_{cat}"]["yhat"].iloc[: -self.spec.horizon].values) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(f"forecast_value"), ] = ( - outputs[f"{col}_{cat}"]["yhat"].iloc[-self.spec.horizon :].values + self.outputs_dict[f"{col}_{cat}"]["yhat"].iloc[-self.spec.horizon :].values ) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(yhat_upper_name) ] = ( - outputs[f"{col}_{cat}"]["yhat_upper"].iloc[-self.spec.horizon :].values + self.outputs_dict[f"{col}_{cat}"]["yhat_upper"].iloc[-self.spec.horizon :].values ) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(yhat_lower_name) ] = ( - outputs[f"{col}_{cat}"]["yhat_lower"].iloc[-self.spec.horizon :].values + self.outputs_dict[f"{col}_{cat}"]["yhat_lower"].iloc[-self.spec.horizon :].values ) output_col = pd.concat([output_col, output_i]) self.forecast_output.add_category( From 177fa40183f361b2dfac600998dd6cae66f56b0e Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 4 Jan 2024 18:03:24 +0000 Subject: [PATCH 4/8] revert changes --- ads/opctl/operator/lowcode/forecast/model/prophet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index 02b02fea9..e9dfecd53 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -115,8 +115,8 @@ def objective(trial): ) # Manual workaround because pandas 1.x dropped support for M and Y - interval = self.spec.horizon - unit = "D" + interval = self.spec.horizon.interval + unit = self.spec.horizon.interval_unit if unit == "M": unit = "D" interval = interval * 30.5 From 5242f1d904aabc4b6b6fbebcc6c765a48383dbbd Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 5 Jan 2024 06:38:20 +0000 Subject: [PATCH 5/8] make neural prophet parallel mdoel training --- .../lowcode/forecast/model/neuralprophet.py | 231 +++++++++++++++--- 1 file changed, 196 insertions(+), 35 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index e2303e113..e58032e2f 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -7,6 +7,7 @@ import numpy as np import optuna import pandas as pd +from joblib import Parallel, delayed from torch import Tensor from torchmetrics.regression import ( MeanAbsoluteError, @@ -71,33 +72,31 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.train_metrics = True self.forecast_col_name = "yhat1" - def _build_model(self) -> pd.DataFrame: - from neuralprophet import NeuralProphet - full_data_dict = self.datasets.full_data_dict - models = [] - outputs = dict() - outputs_legacy = [] - - # Extract the Confidence Interval Width and - # convert to neural prophets equivalent - quantiles - model_kwargs = self.spec.model_kwargs - - if self.spec.confidence_interval_width is None: - quantiles = model_kwargs.get("quantiles", [0.05, 0.95]) - self.spec.confidence_interval_width = float(quantiles[1]) - float( - quantiles[0] - ) - else: - boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2) - quantiles = [boundaries, self.spec.confidence_interval_width + boundaries] + def _train_model(self, i, target, df): - model_kwargs["quantiles"] = quantiles - self.forecast_output = ForecastOutput( - confidence_interval_width=self.spec.confidence_interval_width - ) + try: + from neuralprophet import NeuralProphet + + # Extract the Confidence Interval Width and + # convert to neural prophets equivalent - quantiles + model_kwargs = self.spec.model_kwargs + + if self.spec.confidence_interval_width is None: + self.quantiles = model_kwargs.get("quantiles", [0.05, 0.95]) + self.spec.confidence_interval_width = float(self.quantiles[1]) - float( + self.quantiles[0] + ) + else: + boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2) + self.quantiles = [boundaries, self.spec.confidence_interval_width + boundaries] + + model_kwargs["quantiles"] = self.quantiles + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width + ) - for i, (target, df) in enumerate(full_data_dict.items()): + # for i, (target, df) in enumerate(full_data_dict.items()): le, df_encoded = utils._label_encode_dataframe( df, no_encode={self.spec.datetime_column.name, target} ) @@ -212,14 +211,176 @@ def objective(trial): forecast = model.predict(future) logger.debug(f"-----------------Model {i}----------------------") logger.debug(forecast.tail()) - models.append(model) - outputs[target] = forecast - outputs_legacy.append(forecast) + # models.append(model) + self.outputs_dict[target] = forecast + self.outputs_legacy.append(forecast) + + self.models_dict[target] = model + self.outputs = self.outputs_legacy + + logger.debug("===========Done===========") + except Exception as e: + self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)} + + def _build_model(self) -> pd.DataFrame: + # from neuralprophet import NeuralProphet - self.models = models - self.outputs = outputs_legacy + full_data_dict = self.datasets.full_data_dict + self.models_dict = dict() + self.outputs_dict = dict() + self.outputs_legacy = [] + self.errors_dict = dict() + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(NeuralProphetOperatorModel._train_model)(self, i, target, df) + for self, (i, (target, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) - logger.debug("===========Done===========") + self.models = [self.models_dict[target] for target in self.target_columns] + + # # Extract the Confidence Interval Width and + # # convert to neural prophets equivalent - quantiles + # model_kwargs = self.spec.model_kwargs + + # if self.spec.confidence_interval_width is None: + # quantiles = model_kwargs.get("quantiles", [0.05, 0.95]) + # self.spec.confidence_interval_width = float(quantiles[1]) - float( + # quantiles[0] + # ) + # else: + # boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2) + # quantiles = [boundaries, self.spec.confidence_interval_width + boundaries] + + # model_kwargs["quantiles"] = quantiles + # self.forecast_output = ForecastOutput( + # confidence_interval_width=self.spec.confidence_interval_width + # ) + + # for i, (target, df) in enumerate(full_data_dict.items()): + # le, df_encoded = utils._label_encode_dataframe( + # df, no_encode={self.spec.datetime_column.name, target} + # ) + # model_kwargs_i = model_kwargs.copy() + + # # format the dataframe for this target. Dropping NA on target[df] will remove all future data + # df_clean = self._preprocess( + # df_encoded, + # self.spec.datetime_column.name, + # self.spec.datetime_column.format, + # ) + # data_i = df_clean[df_clean[target].notna()] + # data_i.rename({target: "y"}, axis=1, inplace=True) + + # # Assume that all columns passed in should be used as additional data + # additional_regressors = set(data_i.columns) - {"y", "ds"} + # training_data = data_i[["y", "ds"] + list(additional_regressors)] + + # if self.perform_tuning: + + # def objective(trial): + # params = { + # # 'seasonality_mode': trial.suggest_categorical('seasonality_mode', ['additive', 'multiplicative']), + # # 'seasonality_reg': trial.suggest_float('seasonality_reg', 0.1, 500, log=True), + # # 'learning_rate': trial.suggest_float('learning_rate', 0.0001, 0.1, log=True), + # "newer_samples_start": trial.suggest_float( + # "newer_samples_start", 0.001, 0.999 + # ), + # "newer_samples_weight": trial.suggest_float( + # "newer_samples_weight", 0, 100 + # ), + # "changepoints_range": trial.suggest_float( + # "changepoints_range", 0.8, 0.95 + # ), + # } + # # trend_reg, trend_reg_threshold, ar_reg, impute_rolling/impute_linear, + # params.update(model_kwargs_i) + + # folds = NeuralProphet(**params).crossvalidation_split_df( + # data_i, k=3 + # ) + # test_metrics_total_i = [] + # for df_train, df_test in folds: + # m, accepted_regressors = _fit_model( + # data=df_train, + # params=params, + # additional_regressors=additional_regressors, + # select_metric=self.spec.metric, + # ) + # df_test = df_test[["y", "ds"] + accepted_regressors] + + # test_forecast_i = m.predict(df=df_test) + # fold_metric_i = ( + # m.metrics[self.spec.metric] + # .forward( + # Tensor(test_forecast_i["yhat1"]), + # Tensor(test_forecast_i["y"]), + # ) + # .item() + # ) + # test_metrics_total_i.append(fold_metric_i) + # logger.debug( + # f"----------------------{np.asarray(test_metrics_total_i).mean()}----------------------" + # ) + # return np.asarray(test_metrics_total_i).mean() + + # study = optuna.create_study(direction="minimize") + # m_params = NeuralProphet().parameters() + # study.enqueue_trial( + # { + # # 'seasonality_mode': m_params['seasonality_mode'], + # # 'seasonality_reg': m_params['seasonality_reg'], + # # 'learning_rate': m_params['learning_rate'], + # "newer_samples_start": m_params["newer_samples_start"], + # "newer_samples_weight": m_params["newer_samples_weight"], + # "changepoints_range": m_params["changepoints_range"], + # } + # ) + # study.optimize( + # objective, + # n_trials=self.spec.tuning.n_trials + # if self.spec.tuning + # else DEFAULT_TRIALS, + # n_jobs=-1, + # ) + + # selected_params = study.best_params + # selected_params.update(model_kwargs_i) + # model_kwargs_i = selected_params + + # # Build and fit model + # model, accepted_regressors = _fit_model( + # data=training_data, + # params=model_kwargs_i, + # additional_regressors=additional_regressors, + # select_metric=self.spec.metric, + # ) + # logger.debug( + # f"Found the following additional data columns: {additional_regressors}" + # ) + # logger.debug( + # f"While fitting the model, some additional data may have been " + # f"discarded. Only using the columns: {accepted_regressors}" + # ) + + # # Build future dataframe + # future = df_clean.reset_index(drop=True) + # future["y"] = None + # future = future[["y", "ds"] + list(accepted_regressors)] + + # # Forecast model and collect outputs + # forecast = model.predict(future) + # logger.debug(f"-----------------Model {i}----------------------") + # logger.debug(forecast.tail()) + # models.append(model) + # outputs[target] = forecast + # outputs_legacy.append(forecast) + + # self.models = models + # self.outputs = outputs_legacy + + # logger.debug("===========Done===========") # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column @@ -229,7 +390,7 @@ def objective(trial): for cat in self.categories: output_i = pd.DataFrame() - output_i["Date"] = outputs[f"{col}_{cat}"]["ds"] + output_i["Date"] = self.outputs_dict[f"{col}_{cat}"]["ds"] output_i["Series"] = cat output_i[f"input_value"] = full_data_dict[f"{col}_{cat}"][f"{col}_{cat}"] @@ -240,18 +401,18 @@ def objective(trial): output_i.iloc[ : -self.spec.horizon, output_i.columns.get_loc(f"fitted_value") - ] = (outputs[f"{col}_{cat}"]["yhat1"].iloc[: -self.spec.horizon].values) + ] = (self.outputs_dict[f"{col}_{cat}"]["yhat1"].iloc[: -self.spec.horizon].values) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(f"forecast_value"), ] = ( - outputs[f"{col}_{cat}"]["yhat1"].iloc[-self.spec.horizon :].values + self.outputs_dict[f"{col}_{cat}"]["yhat1"].iloc[-self.spec.horizon :].values ) output_i.iloc[ -self.spec.horizon :, output_i.columns.get_loc(yhat_upper_name), ] = ( - outputs[f"{col}_{cat}"][f"yhat1 {quantiles[1]*100}%"] + self.outputs_dict[f"{col}_{cat}"][f"yhat1 {self.quantiles[1]*100}%"] .iloc[-self.spec.horizon :] .values ) @@ -259,7 +420,7 @@ def objective(trial): -self.spec.horizon :, output_i.columns.get_loc(yhat_lower_name), ] = ( - outputs[f"{col}_{cat}"][f"yhat1 {quantiles[0]*100}%"] + self.outputs_dict[f"{col}_{cat}"][f"yhat1 {self.quantiles[0]*100}%"] .iloc[-self.spec.horizon :] .values ) From 577cb23c96073fe31ab31f1c085a8f805645ca4e Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 5 Jan 2024 06:40:07 +0000 Subject: [PATCH 6/8] removed completed todo --- ads/opctl/operator/lowcode/forecast/model/arima.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index 5f4145d5c..3e432aea4 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -31,7 +31,6 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.formatted_local_explanation = None def _train_model(self, i, target, df): - # TODO : wrap it in try except and populate a errors_dict with the model name and the error """Trains the ARIMA model for a given target. Parameters From 9e65a1671f9da89e14e77fda2e93cff4bde00ac1 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 5 Jan 2024 06:40:53 +0000 Subject: [PATCH 7/8] removed commented code --- .../lowcode/forecast/model/neuralprophet.py | 142 ------------------ 1 file changed, 142 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index e58032e2f..c69f90c5e 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -240,148 +240,6 @@ def _build_model(self) -> pd.DataFrame: self.models = [self.models_dict[target] for target in self.target_columns] - # # Extract the Confidence Interval Width and - # # convert to neural prophets equivalent - quantiles - # model_kwargs = self.spec.model_kwargs - - # if self.spec.confidence_interval_width is None: - # quantiles = model_kwargs.get("quantiles", [0.05, 0.95]) - # self.spec.confidence_interval_width = float(quantiles[1]) - float( - # quantiles[0] - # ) - # else: - # boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2) - # quantiles = [boundaries, self.spec.confidence_interval_width + boundaries] - - # model_kwargs["quantiles"] = quantiles - # self.forecast_output = ForecastOutput( - # confidence_interval_width=self.spec.confidence_interval_width - # ) - - # for i, (target, df) in enumerate(full_data_dict.items()): - # le, df_encoded = utils._label_encode_dataframe( - # df, no_encode={self.spec.datetime_column.name, target} - # ) - # model_kwargs_i = model_kwargs.copy() - - # # format the dataframe for this target. Dropping NA on target[df] will remove all future data - # df_clean = self._preprocess( - # df_encoded, - # self.spec.datetime_column.name, - # self.spec.datetime_column.format, - # ) - # data_i = df_clean[df_clean[target].notna()] - # data_i.rename({target: "y"}, axis=1, inplace=True) - - # # Assume that all columns passed in should be used as additional data - # additional_regressors = set(data_i.columns) - {"y", "ds"} - # training_data = data_i[["y", "ds"] + list(additional_regressors)] - - # if self.perform_tuning: - - # def objective(trial): - # params = { - # # 'seasonality_mode': trial.suggest_categorical('seasonality_mode', ['additive', 'multiplicative']), - # # 'seasonality_reg': trial.suggest_float('seasonality_reg', 0.1, 500, log=True), - # # 'learning_rate': trial.suggest_float('learning_rate', 0.0001, 0.1, log=True), - # "newer_samples_start": trial.suggest_float( - # "newer_samples_start", 0.001, 0.999 - # ), - # "newer_samples_weight": trial.suggest_float( - # "newer_samples_weight", 0, 100 - # ), - # "changepoints_range": trial.suggest_float( - # "changepoints_range", 0.8, 0.95 - # ), - # } - # # trend_reg, trend_reg_threshold, ar_reg, impute_rolling/impute_linear, - # params.update(model_kwargs_i) - - # folds = NeuralProphet(**params).crossvalidation_split_df( - # data_i, k=3 - # ) - # test_metrics_total_i = [] - # for df_train, df_test in folds: - # m, accepted_regressors = _fit_model( - # data=df_train, - # params=params, - # additional_regressors=additional_regressors, - # select_metric=self.spec.metric, - # ) - # df_test = df_test[["y", "ds"] + accepted_regressors] - - # test_forecast_i = m.predict(df=df_test) - # fold_metric_i = ( - # m.metrics[self.spec.metric] - # .forward( - # Tensor(test_forecast_i["yhat1"]), - # Tensor(test_forecast_i["y"]), - # ) - # .item() - # ) - # test_metrics_total_i.append(fold_metric_i) - # logger.debug( - # f"----------------------{np.asarray(test_metrics_total_i).mean()}----------------------" - # ) - # return np.asarray(test_metrics_total_i).mean() - - # study = optuna.create_study(direction="minimize") - # m_params = NeuralProphet().parameters() - # study.enqueue_trial( - # { - # # 'seasonality_mode': m_params['seasonality_mode'], - # # 'seasonality_reg': m_params['seasonality_reg'], - # # 'learning_rate': m_params['learning_rate'], - # "newer_samples_start": m_params["newer_samples_start"], - # "newer_samples_weight": m_params["newer_samples_weight"], - # "changepoints_range": m_params["changepoints_range"], - # } - # ) - # study.optimize( - # objective, - # n_trials=self.spec.tuning.n_trials - # if self.spec.tuning - # else DEFAULT_TRIALS, - # n_jobs=-1, - # ) - - # selected_params = study.best_params - # selected_params.update(model_kwargs_i) - # model_kwargs_i = selected_params - - # # Build and fit model - # model, accepted_regressors = _fit_model( - # data=training_data, - # params=model_kwargs_i, - # additional_regressors=additional_regressors, - # select_metric=self.spec.metric, - # ) - # logger.debug( - # f"Found the following additional data columns: {additional_regressors}" - # ) - # logger.debug( - # f"While fitting the model, some additional data may have been " - # f"discarded. Only using the columns: {accepted_regressors}" - # ) - - # # Build future dataframe - # future = df_clean.reset_index(drop=True) - # future["y"] = None - # future = future[["y", "ds"] + list(accepted_regressors)] - - # # Forecast model and collect outputs - # forecast = model.predict(future) - # logger.debug(f"-----------------Model {i}----------------------") - # logger.debug(forecast.tail()) - # models.append(model) - # outputs[target] = forecast - # outputs_legacy.append(forecast) - - # self.models = models - # self.outputs = outputs_legacy - - # logger.debug("===========Done===========") - # Merge the outputs from each model into 1 df with all outputs by target and category col = self.original_target_column output_col = pd.DataFrame() From bccdd308936f0c124d4db296649f017fc4cb59bc Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 9 Jan 2024 07:43:39 +0000 Subject: [PATCH 8/8] resolve pr comments: --- ads/opctl/operator/lowcode/forecast/model/neuralprophet.py | 1 - ads/opctl/operator/lowcode/forecast/model/prophet.py | 1 - ads/opctl/operator/lowcode/forecast/operator_config.py | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index c69f90c5e..8c6f743b5 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -96,7 +96,6 @@ def _train_model(self, i, target, df): confidence_interval_width=self.spec.confidence_interval_width ) - # for i, (target, df) in enumerate(full_data_dict.items()): le, df_encoded = utils._label_encode_dataframe( df, no_encode={self.spec.datetime_column.name, target} ) diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index e9dfecd53..2b865a17e 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -65,7 +65,6 @@ def _train_model(self, i, target, df): confidence_interval_width=self.spec.confidence_interval_width ) - # for i, (target, df) in enumerate(full_data_dict.items()): le, df_encoded = utils._label_encode_dataframe( df, no_encode={self.spec.datetime_column.name, target} ) diff --git a/ads/opctl/operator/lowcode/forecast/operator_config.py b/ads/opctl/operator/lowcode/forecast/operator_config.py index 0832349c5..8a07dbbb6 100644 --- a/ads/opctl/operator/lowcode/forecast/operator_config.py +++ b/ads/opctl/operator/lowcode/forecast/operator_config.py @@ -132,7 +132,7 @@ def __post_init__(self): self.local_explanation_filename or "local_explanation.csv" ) self.target_column = self.target_column or "Sales" - self.errors_dict_filename = "errors_dict.json" + self.errors_dict_filename = "errors.json" self.model_kwargs = self.model_kwargs or dict()