From 22d959c2e0705e1108f064c76554880a998a1b83 Mon Sep 17 00:00:00 2001 From: Allen Date: Tue, 12 Sep 2023 10:40:10 +0100 Subject: [PATCH 1/8] move imports to local --- ads/opctl/config/merger.py | 4 +- .../operator/lowcode/forecast/model/arima.py | 3 +- .../lowcode/forecast/model/automlx.py | 18 ++-- .../lowcode/forecast/model/base_model.py | 93 +++++++++++++----- .../lowcode/forecast/model/neuralprophet.py | 11 ++- .../lowcode/forecast/model/prophet.py | 18 ++-- ads/opctl/operator/lowcode/forecast/utils.py | 96 +++++++++++++------ 7 files changed, 165 insertions(+), 78 deletions(-) diff --git a/ads/opctl/config/merger.py b/ads/opctl/config/merger.py index ac8992bde..d18c82a94 100644 --- a/ads/opctl/config/merger.py +++ b/ads/opctl/config/merger.py @@ -218,7 +218,7 @@ def _config_flex_shape_details(self): ): raise ValueError( "Parameters `ocpus` and `memory_in_gbs` must be provided for using flex shape. " - "Call `ads opctl config` to specify." + "Call `ads opctl configure` to specify." ) infrastructure["shape_config_details"] = { "ocpus": infrastructure.pop("ocpus"), @@ -239,7 +239,7 @@ def _config_flex_shape_details(self): if parameter not in infrastructure: raise ValueError( f"Parameters {parameter} must be provided for using flex shape. " - "Call `ads opctl config` to specify." + "Call `ads opctl configure` to specify." ) infrastructure["driver_shape_config"] = { "ocpus": infrastructure.pop("driver_shape_ocpus"), diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index 698c45338..ef68b4a88 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -4,7 +4,6 @@ # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ -import datapane as dp import pandas as pd import pmdarima as pm @@ -134,6 +133,8 @@ def _build_model(self) -> pd.DataFrame: def _generate_report(self): """The method that needs to be implemented on the particular model level.""" + import datapane as dp + sec5_text = dp.Text(f"## ARIMA Model Parameters") sec5 = dp.Select( blocks=[ diff --git a/ads/opctl/operator/lowcode/forecast/model/automlx.py b/ads/opctl/operator/lowcode/forecast/model/automlx.py index bb4203946..172323150 100644 --- a/ads/opctl/operator/lowcode/forecast/model/automlx.py +++ b/ads/opctl/operator/lowcode/forecast/model/automlx.py @@ -4,8 +4,6 @@ # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ - -import datapane as dp import pandas as pd import numpy as np from ads.common.decorator.runtime_dependency import runtime_dependency @@ -30,7 +28,8 @@ class AutoMLXOperatorModel(ForecastOperatorBaseModel): ), ) def _build_model(self) -> pd.DataFrame: - from automl import init + from automl import init + init(engine="local", check_deprecation_warnings=False) full_data_dict = self.full_data_dict @@ -61,10 +60,13 @@ def _build_model(self) -> pd.DataFrame: "" if y_train.index.is_monotonic else "NOT", "monotonic.", ) - model = automl.Pipeline(task="forecasting", - n_algos_tuned=n_algos_tuned, - score_metric=AUTOMLX_METRIC_MAP.get(self.spec.metric, - "neg_sym_mean_abs_percent_error")) + model = automl.Pipeline( + task="forecasting", + n_algos_tuned=n_algos_tuned, + score_metric=AUTOMLX_METRIC_MAP.get( + self.spec.metric, "neg_sym_mean_abs_percent_error" + ), + ) model.fit(X=y_train.drop(target, axis=1), y=pd.DataFrame(y_train[target])) logger.info("Selected model: {}".format(model.selected_model_)) logger.info( @@ -127,6 +129,8 @@ def _build_model(self) -> pd.DataFrame: return outputs_merged def _generate_report(self): + import datapane as dp + """The method that needs to be implemented on the particular model level.""" selected_models_text = dp.Text( f"## Selected Models Overview \n " diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index 67b18e5e1..87bb42f9a 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -10,7 +10,6 @@ from abc import ABC, abstractmethod from typing import Tuple -import datapane as dp import fsspec import numpy as np import pandas as pd @@ -23,6 +22,7 @@ from ..operator_config import ForecastOperatorConfig, ForecastOperatorSpec from .transformations import Transformations + class ForecastOperatorBaseModel(ABC): """The base class for the forecast operator models.""" @@ -60,6 +60,7 @@ def __init__(self, config: ForecastOperatorConfig): def generate_report(self): """Generates the forecasting report.""" + import datapane as dp # load data and build models start_time = time.time() @@ -225,7 +226,11 @@ def generate_report(self): ) # save the report and result CSV - self._save_report(report_sections=report_sections, result_df=result_df, metrics_df=self.test_eval_metrics) + self._save_report( + report_sections=report_sections, + result_df=result_df, + metrics_df=self.test_eval_metrics, + ) def _load_data(self): """Loads forecasting input data.""" @@ -296,14 +301,30 @@ def _test_evaluate_metrics( summary_metrics = pd.DataFrame( { - SupportedMetrics.MEAN_SMAPE: np.mean(total_metrics.loc[SupportedMetrics.SMAPE]), - SupportedMetrics.MEDIAN_SMAPE: np.median(total_metrics.loc[SupportedMetrics.SMAPE]), - SupportedMetrics.MEAN_MAPE: np.mean(total_metrics.loc[SupportedMetrics.MAPE]), - SupportedMetrics.MEDIAN_MAPE: np.median(total_metrics.loc[SupportedMetrics.MAPE]), - SupportedMetrics.MEAN_RMSE: np.mean(total_metrics.loc[SupportedMetrics.RMSE]), - SupportedMetrics.MEDIAN_RMSE: np.median(total_metrics.loc[SupportedMetrics.RMSE]), - SupportedMetrics.MEAN_R2: np.mean(total_metrics.loc[SupportedMetrics.R2]), - SupportedMetrics.MEDIAN_R2: np.median(total_metrics.loc[SupportedMetrics.R2]), + SupportedMetrics.MEAN_SMAPE: np.mean( + total_metrics.loc[SupportedMetrics.SMAPE] + ), + SupportedMetrics.MEDIAN_SMAPE: np.median( + total_metrics.loc[SupportedMetrics.SMAPE] + ), + SupportedMetrics.MEAN_MAPE: np.mean( + total_metrics.loc[SupportedMetrics.MAPE] + ), + SupportedMetrics.MEDIAN_MAPE: np.median( + total_metrics.loc[SupportedMetrics.MAPE] + ), + SupportedMetrics.MEAN_RMSE: np.mean( + total_metrics.loc[SupportedMetrics.RMSE] + ), + SupportedMetrics.MEDIAN_RMSE: np.median( + total_metrics.loc[SupportedMetrics.RMSE] + ), + SupportedMetrics.MEAN_R2: np.mean( + total_metrics.loc[SupportedMetrics.R2] + ), + SupportedMetrics.MEDIAN_R2: np.median( + total_metrics.loc[SupportedMetrics.R2] + ), SupportedMetrics.MEAN_EXPLAINED_VARIANCE: np.mean( total_metrics.loc[SupportedMetrics.EXPLAINED_VARIANCE] ), @@ -315,41 +336,61 @@ def _test_evaluate_metrics( index=["All Targets"], ) - """Calculates Mean sMAPE, Median sMAPE, Mean MAPE, Median MAPE, Mean wMAPE, Median wMAPE values for each horizon + """Calculates Mean sMAPE, Median sMAPE, Mean MAPE, Median MAPE, Mean wMAPE, Median wMAPE values for each horizon if horizon <= 10.""" - if len(data['ds'])<=10: - - metrics_per_horizon = utils._build_metrics_per_horizon(data=data, outputs=outputs, target_columns=target_columns, target_col=target_col) - - summary_metrics = summary_metrics.append( - metrics_per_horizon + if len(data["ds"]) <= 10: + metrics_per_horizon = utils._build_metrics_per_horizon( + data=data, + outputs=outputs, + target_columns=target_columns, + target_col=target_col, ) - new_column_order = [SupportedMetrics.MEAN_SMAPE, SupportedMetrics.MEDIAN_SMAPE, SupportedMetrics.MEAN_MAPE, SupportedMetrics.MEDIAN_MAPE, SupportedMetrics.MEAN_WMAPE, SupportedMetrics.MEDIAN_WMAPE, - SupportedMetrics.MEAN_RMSE, SupportedMetrics.MEDIAN_RMSE, SupportedMetrics.MEAN_R2, SupportedMetrics.MEDIAN_R2, SupportedMetrics.MEAN_EXPLAINED_VARIANCE, SupportedMetrics.MEDIAN_EXPLAINED_VARIANCE, - SupportedMetrics.ELAPSED_TIME] + summary_metrics = summary_metrics.append(metrics_per_horizon) + + new_column_order = [ + SupportedMetrics.MEAN_SMAPE, + SupportedMetrics.MEDIAN_SMAPE, + SupportedMetrics.MEAN_MAPE, + SupportedMetrics.MEDIAN_MAPE, + SupportedMetrics.MEAN_WMAPE, + SupportedMetrics.MEDIAN_WMAPE, + SupportedMetrics.MEAN_RMSE, + SupportedMetrics.MEDIAN_RMSE, + SupportedMetrics.MEAN_R2, + SupportedMetrics.MEDIAN_R2, + SupportedMetrics.MEAN_EXPLAINED_VARIANCE, + SupportedMetrics.MEDIAN_EXPLAINED_VARIANCE, + SupportedMetrics.ELAPSED_TIME, + ] summary_metrics = summary_metrics[new_column_order] return total_metrics, summary_metrics, data - def _save_report(self, report_sections: Tuple, result_df: pd.DataFrame, metrics_df: pd.DataFrame): + def _save_report( + self, report_sections: Tuple, result_df: pd.DataFrame, metrics_df: pd.DataFrame + ): """Saves resulting reports to the given folder.""" + import datapane as dp + if self.spec.output_directory: output_dir = self.spec.output_directory.url else: output_dir = "tmp_fc_operator_result" logger.warn( "Since the output directory was not specified, the output will be saved to {} directory.".format( - output_dir)) + output_dir + ) + ) # datapane html report with tempfile.TemporaryDirectory() as temp_dir: report_local_path = os.path.join(temp_dir, "___report.html") dp.save_report(report_sections, report_local_path) with open(report_local_path) as f1: with fsspec.open( - os.path.join(output_dir, self.spec.report_file_name), - "w", - **default_signer(), + os.path.join(output_dir, self.spec.report_file_name), + "w", + **default_signer(), ) as f2: f2.write(f1.read()) @@ -367,7 +408,7 @@ def _save_report(self, report_sections: Tuple, result_df: pd.DataFrame, metrics_ filename=os.path.join(output_dir, self.spec.metrics_filename), format="csv", storage_options=default_signer(), - index = True + index=True, ) logger.warn( diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index ceb122ec7..07291648b 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -4,12 +4,9 @@ # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ - -import datapane as dp import numpy as np import optuna import pandas as pd -from neuralprophet import NeuralProphet from torch import Tensor from torchmetrics.regression import ( MeanAbsoluteError, @@ -67,6 +64,8 @@ class NeuralProphetOperatorModel(ForecastOperatorBaseModel): """Class representing NeuralProphet operator model.""" def _build_model(self) -> pd.DataFrame: + from neuralprophet import NeuralProphet + full_data_dict = self.full_data_dict models = [] outputs = dict() @@ -168,7 +167,9 @@ def objective(trial): ) study.optimize( objective, - n_trials=self.spec.tuning.n_trials if self.spec.tuning else DEFAULT_TRIALS, + n_trials=self.spec.tuning.n_trials + if self.spec.tuning + else DEFAULT_TRIALS, n_jobs=-1, ) @@ -239,6 +240,8 @@ def objective(trial): return outputs_merged def _generate_report(self): + import datapane as dp + sec1_text = dp.Text( "## Forecast Overview \nThese plots show your " "forecast in the context of historical data." diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index 0312e6201..52814e2c9 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -4,15 +4,9 @@ # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ - -import datapane as dp import numpy as np import optuna import pandas as pd -from prophet import Prophet -from prophet.diagnostics import cross_validation, performance_metrics -from prophet.plot import add_changepoints_to_plot - from ads.opctl import logger from ...forecast.const import DEFAULT_TRIALS @@ -25,6 +19,8 @@ def _add_unit(num, unit): def _fit_model(data, params, additional_regressors): + from prophet import Prophet + model = Prophet(**params) for add_reg in additional_regressors: model.add_regressor(add_reg) @@ -36,6 +32,9 @@ class ProphetOperatorModel(ForecastOperatorBaseModel): """Class representing Prophet operator model.""" def _build_model(self) -> pd.DataFrame: + from prophet import Prophet + from prophet.diagnostics import cross_validation, performance_metrics + full_data_dict = self.full_data_dict models = [] outputs = dict() @@ -145,7 +144,9 @@ def objective(trial): ) study.optimize( objective, - n_trials=self.spec.tuning.n_trials if self.spec.tuning else DEFAULT_TRIALS, + n_trials=self.spec.tuning.n_trials + if self.spec.tuning + else DEFAULT_TRIALS, n_jobs=-1, ) @@ -210,6 +211,9 @@ def objective(trial): return outputs_merged def _generate_report(self): + import datapane as dp + from prophet.plot import add_changepoints_to_plot + sec1_text = dp.Text( "## Forecast Overview \n" "These plots show your forecast in the context of historical data." diff --git a/ads/opctl/operator/lowcode/forecast/utils.py b/ads/opctl/operator/lowcode/forecast/utils.py index dce88a224..a5e24270d 100644 --- a/ads/opctl/operator/lowcode/forecast/utils.py +++ b/ads/opctl/operator/lowcode/forecast/utils.py @@ -4,10 +4,8 @@ # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ - import os from ads.opctl import logger -import datapane as dp import fsspec import numpy as np import pandas as pd @@ -46,10 +44,15 @@ def smape(actual, predicted) -> float: ) -def _build_metrics_per_horizon(data: pd.DataFrame, outputs: pd.DataFrame, target_columns: List[str], target_col: str) -> pd.DataFrame: +def _build_metrics_per_horizon( + data: pd.DataFrame, + outputs: pd.DataFrame, + target_columns: List[str], + target_col: str, +) -> pd.DataFrame: """ Calculates Mean sMAPE, Median sMAPE, Mean MAPE, Median MAPE, Mean wMAPE, Median wMAPE for each horizon - + Parameters ------------ data: Pandas Dataframe @@ -72,33 +75,46 @@ def _build_metrics_per_horizon(data: pd.DataFrame, outputs: pd.DataFrame, target totals = actuals_df.sum() wmape_weights = np.array((totals / totals.sum()).values) - metrics_df = pd.DataFrame(columns=[ - SupportedMetrics.MEAN_SMAPE, SupportedMetrics.MEDIAN_SMAPE, - SupportedMetrics.MEAN_MAPE, SupportedMetrics.MEDIAN_MAPE, - SupportedMetrics.MEAN_WMAPE, SupportedMetrics.MEDIAN_WMAPE - ]) - - for y_true, y_pred in zip(actuals_df.itertuples(index=False), forecasts_df.itertuples(index=False)): + metrics_df = pd.DataFrame( + columns=[ + SupportedMetrics.MEAN_SMAPE, + SupportedMetrics.MEDIAN_SMAPE, + SupportedMetrics.MEAN_MAPE, + SupportedMetrics.MEDIAN_MAPE, + SupportedMetrics.MEAN_WMAPE, + SupportedMetrics.MEDIAN_WMAPE, + ] + ) + for y_true, y_pred in zip( + actuals_df.itertuples(index=False), forecasts_df.itertuples(index=False) + ): y_true, y_pred = np.array(y_true), np.array(y_pred) - smapes = np.array([smape(actual=y_t, predicted=y_p) for y_t, y_p in zip(y_true, y_pred)]) - mapes = np.array([mean_absolute_percentage_error(y_true=[y_t], y_pred=[y_p]) for y_t, y_p in zip(y_true, y_pred)]) + smapes = np.array( + [smape(actual=y_t, predicted=y_p) for y_t, y_p in zip(y_true, y_pred)] + ) + mapes = np.array( + [ + mean_absolute_percentage_error(y_true=[y_t], y_pred=[y_p]) + for y_t, y_p in zip(y_true, y_pred) + ] + ) wmapes = np.array([mape * weight for mape, weight in zip(mapes, wmape_weights)]) - + metrics_row = { SupportedMetrics.MEAN_SMAPE: np.mean(smapes), SupportedMetrics.MEDIAN_SMAPE: np.median(smapes), SupportedMetrics.MEAN_MAPE: np.mean(mapes), SupportedMetrics.MEDIAN_MAPE: np.median(mapes), SupportedMetrics.MEAN_WMAPE: np.mean(wmapes), - SupportedMetrics.MEDIAN_WMAPE: np.median(wmapes) + SupportedMetrics.MEDIAN_WMAPE: np.median(wmapes), } - + metrics_df = metrics_df.append(metrics_row, ignore_index=True) - metrics_df.set_index(data['ds'], inplace=True) - + metrics_df.set_index(data["ds"], inplace=True) + return metrics_df @@ -121,6 +137,7 @@ def _load_data(filename, format, storage_options, columns, **kwargs): return data raise ValueError(f"Unrecognized format: {format}") + def _write_data(data, filename, format, storage_options, index=False, **kwargs): if not format: _, format = os.path.splitext(filename) @@ -165,7 +182,9 @@ def _clean_data(data, target_column, datetime_column, target_category_columns=No ) -def _validate_and_clean_data(cat: str, horizon: int, primary: pd.DataFrame, additional: pd.DataFrame): +def _validate_and_clean_data( + cat: str, horizon: int, primary: pd.DataFrame, additional: pd.DataFrame +): """ Checks compatibility between primary and additional dataframe for a category. @@ -190,8 +209,12 @@ def _validate_and_clean_data(cat: str, horizon: int, primary: pd.DataFrame, addi data_add_row_count = additional.shape[0] additional_surplus = data_add_row_count - horizon - data_row_count if additional_surplus < 0: - logger.warn("Forecast for {} will not be generated since additional data has less values({}) than" - " horizon({}) + primary data({})".format(cat, data_add_row_count, horizon, data_row_count)) + logger.warn( + "Forecast for {} will not be generated since additional data has less values({}) than" + " horizon({}) + primary data({})".format( + cat, data_add_row_count, horizon, data_row_count + ) + ) return None, None elif additional_surplus > 0: # Removing surplus future data in additional @@ -201,11 +224,14 @@ def _validate_and_clean_data(cat: str, horizon: int, primary: pd.DataFrame, addi dates_in_data = primary.index.tolist() dates_in_additional = additional.index.tolist() if not set(dates_in_data).issubset(set(dates_in_additional)): - logger.warn("Forecast for {} will not be generated since the dates in primary and additional do not" - " match".format(cat)) + logger.warn( + "Forecast for {} will not be generated since the dates in primary and additional do not" + " match".format(cat) + ) return None, None return primary, additional + def _build_indexed_datasets( data, target_column, @@ -213,7 +239,7 @@ def _build_indexed_datasets( horizon, target_category_columns=None, additional_data=None, - metadata_data=None + metadata_data=None, ): df_by_target = dict() categories = [] @@ -256,7 +282,9 @@ def _build_indexed_datasets( .fillna(0) ) - valid_primary, valid_add = _validate_and_clean_data(cat, horizon, data_by_cat_clean, data_add_by_cat_clean) + valid_primary, valid_add = _validate_and_clean_data( + cat, horizon, data_by_cat_clean, data_add_by_cat_clean + ) if valid_primary is None: invalid_categories.append(cat) data_by_cat_clean = None @@ -268,9 +296,12 @@ def _build_indexed_datasets( new_target_columns = list(df_by_target.keys()) remaining_categories = set(unique_categories) - set(invalid_categories) if not len(remaining_categories): - raise ValueError("Stopping forecast operator as there is no data that meets the validation criteria.") + raise ValueError( + "Stopping forecast operator as there is no data that meets the validation criteria." + ) return df_by_target, new_target_columns, remaining_categories + def _build_metrics_df(y_true, y_pred, column_name): metrics = dict() metrics["sMAPE"] = smape(actual=y_true, predicted=y_pred) @@ -295,6 +326,8 @@ def evaluate_metrics(target_columns, data, outputs, target_col="yhat"): def _select_plot_list(fn, target_columns): + import datapane as dp + return dp.Select( blocks=[dp.Plot(fn(i, col), label=col) for i, col in enumerate(target_columns)] ) @@ -383,7 +416,7 @@ def human_time_friendly(seconds): ("week", 60 * 60 * 24 * 7), ("day", 60 * 60 * 24), ("hour", 60 * 60), - ("min", 60) + ("min", 60), ) if seconds == 0: return "inf" @@ -397,23 +430,24 @@ def human_time_friendly(seconds): accumulator.append("{} secs".format(round(seconds, 2))) return ", ".join(accumulator) + def select_auto_model(columns: List[str]) -> str: """ Selects AutoMLX or Arima model based on column count. If the number of columns is less than or equal to the maximum allowed for AutoMLX, returns 'AutoMLX'. Otherwise, returns 'Arima'. - + Parameters ------------ columns: List The list of columns. - + Returns -------- str The type of the model. """ - if columns!=None and len(columns) > MAX_COLUMNS_AUTOMLX: + if columns != None and len(columns) > MAX_COLUMNS_AUTOMLX: return SupportedModels.Arima - return SupportedModels.AutoMLX \ No newline at end of file + return SupportedModels.AutoMLX From 72796d016ef7d6949cfe8b84a2a6c78d92c596a0 Mon Sep 17 00:00:00 2001 From: Allen Date: Tue, 12 Sep 2023 11:02:03 +0100 Subject: [PATCH 2/8] ODSC-47284: unreferenced var --- ads/opctl/operator/common/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ads/opctl/operator/common/utils.py b/ads/opctl/operator/common/utils.py index 7691edc50..49c9f450a 100644 --- a/ads/opctl/operator/common/utils.py +++ b/ads/opctl/operator/common/utils.py @@ -65,6 +65,7 @@ def from_init(*args: List, **kwargs: Dict) -> "OperatorInfo": """Instantiates the class from the initial operator details config.""" path = kwargs.get("__operator_path__") + operator_readme = None if path: readme_file_path = os.path.join(path, "readme.md") if os.path.exists(readme_file_path): From 4f3f101a556a1de46ca585351e73ebd2644bb075 Mon Sep 17 00:00:00 2001 From: Vikas Pandey Date: Thu, 14 Sep 2023 08:41:28 +0530 Subject: [PATCH 3/8] added the autots multivariate operator, added the report and model output functions, added/updated the autots package requirement --- ads/opctl/operator/lowcode/forecast/const.py | 1 + .../lowcode/forecast/environment.yaml | 1 + .../operator/lowcode/forecast/model/autots.py | 246 ++++++++++++++++++ .../lowcode/forecast/model/factory.py | 2 + .../operator/lowcode/forecast/schema.yaml | 1 + pyproject.toml | 2 +- 6 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 ads/opctl/operator/lowcode/forecast/model/autots.py diff --git a/ads/opctl/operator/lowcode/forecast/const.py b/ads/opctl/operator/lowcode/forecast/const.py index e5ba8daa4..64083e610 100644 --- a/ads/opctl/operator/lowcode/forecast/const.py +++ b/ads/opctl/operator/lowcode/forecast/const.py @@ -14,6 +14,7 @@ class SupportedModels(str, metaclass=ExtendedEnumMeta): Arima = "arima" NeuralProphet = "neuralprophet" AutoMLX = "automlx" + AutoTS = "autots" class SupportedMetrics(str, metaclass=ExtendedEnumMeta): diff --git a/ads/opctl/operator/lowcode/forecast/environment.yaml b/ads/opctl/operator/lowcode/forecast/environment.yaml index 410fc8689..58f19bb48 100644 --- a/ads/opctl/operator/lowcode/forecast/environment.yaml +++ b/ads/opctl/operator/lowcode/forecast/environment.yaml @@ -13,6 +13,7 @@ dependencies: - cerberus - json2table - sktime + - autots['additional'] - optuna==2.9.0 - oracle-automlx==23.2.3 - "--editable=git+https://github.com/oracle/accelerated-data-science.git@feature/forecasting#egg=oracle-ads" diff --git a/ads/opctl/operator/lowcode/forecast/model/autots.py b/ads/opctl/operator/lowcode/forecast/model/autots.py new file mode 100644 index 000000000..04c5d4600 --- /dev/null +++ b/ads/opctl/operator/lowcode/forecast/model/autots.py @@ -0,0 +1,246 @@ +import pandas as pd +import numpy as np + +from ads.opctl import logger +from ads.opctl.operator.lowcode.forecast import utils + +from .. import utils +from .base_model import ForecastOperatorBaseModel +from ads.common.decorator.runtime_dependency import runtime_dependency + + +class AutoTSOperatorModel(ForecastOperatorBaseModel): + """Class representing AutoTS operator model.""" + + @runtime_dependency( + module="autots", + err_msg="Please run `pip3 install autots` to install the required dependencies for autots.", + ) + def _build_model(self) -> pd.DataFrame: + """Builds the AutoTS model and generates forecasts. + + Returns: + pd.DataFrame: AutoTS model forecast dataframe + """ + + # Import necessary libraries + from autots import AutoTS + + models = dict() + outputs = dict() + outputs_legacy = [] + + # Get the name of the datetime column + date_column = self.spec.datetime_column.name + + # Initialize the AutoTS model with specified parameters + model = AutoTS( + forecast_length=self.spec.horizon.periods, + frequency="infer", + prediction_interval=self.spec.confidence_interval_width, + max_generations=2, # 10 + no_negatives=False, + constraint=None, + ensemble=self.spec.model_kwargs.get("ensemble", "auto"), + initial_template="General+Random", + random_seed=2022, + holiday_country="US", + subset=None, + aggfunc="first", + na_tolerance=1, + drop_most_recent=0, + drop_data_older_than_periods=None, + model_list="multivariate", + transformer_list="auto", + transformer_max_depth=6, + models_mode="random", + num_validations="auto", + models_to_validate=0.15, + max_per_model_class=None, + validation_method="backwards", + min_allowed_train_percent=0.5, + remove_leading_zeroes=False, + prefill_na=None, + introduce_na=None, + preclean=None, + model_interrupt=True, + generation_timeout=None, + current_model_file=None, + verbose=1, + n_jobs=-1, + ) + + # Prepare the data for model training + temp_list = [self.full_data_dict[i] for i in self.full_data_dict.keys()] + melt_temp = [ + temp_list[i].melt( + temp_list[i].columns.difference(self.target_columns), + var_name="series_id", + value_name=self.original_target_column, + ) + for i in range(len(self.target_columns)) + ] + full_data_long = pd.concat(melt_temp) + + # Fit the model to the training data + model = model.fit( + full_data_long, + date_col=self.spec.datetime_column.name, + value_col=self.original_target_column, + id_col="series_id", + ) + + # Store the trained model and generate forecasts + self.models = model + logger.info("===========Forecast Generated===========") + self.prediction = model.predict() + outputs + + # Process the forecasts for each target series + for series_idx, series in enumerate(self.target_columns): + # Create a dictionary to store the forecasts for each series + outputs[series] = ( + pd.concat( + [ + self.prediction.forecast.reset_index()[ + ["index", self.target_columns[series_idx]] + ].rename( + columns={ + "index": self.spec.datetime_column.name, + self.target_columns[series_idx]: "yhat", + } + ), + self.prediction.lower_forecast.reset_index()[ + ["index", self.target_columns[series_idx]] + ].rename( + columns={ + "index": self.spec.datetime_column.name, + self.target_columns[series_idx]: "yhat_lower", + } + ), + self.prediction.upper_forecast.reset_index()[ + ["index", self.target_columns[series_idx]] + ].rename( + columns={ + "index": self.spec.datetime_column.name, + self.target_columns[series_idx]: "yhat_upper", + } + ), + ], + axis=1, + ) + .T.drop_duplicates() + .T + ) + + # Store the processed forecasts in a list + self.outputs = [fc for fc in outputs.values()] + + # Re-merge historical datas for processing + data_merged = pd.concat( + [ + v[v[k].notna()].set_index(date_column) + for k, v in self.full_data_dict.items() + ], + axis=1, + ).reset_index() + self.data = data_merged + + outputs_merged = pd.DataFrame() + + col = self.original_target_column + output_col = pd.DataFrame() + yhat_lower_percentage = (100 - self.spec.confidence_interval_width * 100) // 2 + yhat_upper_name = "p" + str(int(100 - yhat_lower_percentage)) + yhat_lower_name = "p" + str(int(yhat_lower_percentage)) + + for cat in self.categories: + output_i = pd.DataFrame() + + output_i["Date"] = outputs[f"{col}_{cat}"][self.spec.datetime_column.name] + output_i["Series"] = cat + output_i[f"forecast_value"] = outputs[f"{col}_{cat}"]["yhat"] + output_i[yhat_upper_name] = outputs[f"{col}_{cat}"]["yhat_upper"] + output_i[yhat_lower_name] = outputs[f"{col}_{cat}"]["yhat_lower"] + output_col = pd.concat([output_col, output_i]) + output_col = output_col.reset_index(drop=True) + outputs_merged = pd.concat([outputs_merged, output_col], axis=1) + + logger.info("===========Done===========") + + return outputs_merged + + def _generate_report(self) -> tuple: + """ + Generates the report for the given function. + + Returns: + tuple: A tuple containing the following elements: + - model_description (dp.Text): A text object containing the description of the AutoTS model. + - other_sections (list): A list of sections to be included in the report. + - forecast_col_name (str): The name of the forecast column. + - train_metrics (bool): A boolean indicating whether to include train metrics. + - ds_column_series (pd.Series): A pandas Series containing the datetime column values. + - ds_forecast_col (pd.Index): A pandas Index containing the forecast column values. + - ci_col_names (list): A list of column names for confidence intervals. + """ + import datapane as dp + + # Section 1: Forecast Overview + sec1_text = dp.Text( + "## Forecast Overview \n" + "These plots show your forecast in the context of historical data." + ) + sec_1 = utils._select_plot_list( + lambda idx, *args: self.prediction.plot( + self.models.df_wide_numeric, + series=self.models.df_wide_numeric.columns[idx], + start_date=self.models.df_wide_numeric.reset_index()[ + self.spec.datetime_column.name + ].min(), + ), + target_columns=self.target_columns, + ) + + # Section 2: AutoTS Model Parameters + sec2_text = dp.Text(f"## AutoTS Model Parameters") + # TODO: Format the parameters better for display in report. + sec2 = dp.Select( + blocks=[ + dp.HTML( + pd.DataFrame( + [self.models.best_model_params["models"][x]["ModelParameters"]] + ).to_html(), + label=self.original_target_column + "_model_" +str(i), + ) + for i, x in enumerate( + list(self.models.best_model_params["models"].keys()) + ) + ] + ) + all_sections = [sec1_text, sec_1, sec2_text, sec2] + + # Model Description + model_description = dp.Text( + "AutoTS is a time series package for Python designed for rapidly deploying high-accuracy forecasts at scale." + "In 2023, AutoTS has won in the M6 forecasting competition," + "delivering the highest performance investment decisions across 12 months of stock market forecasting." + ) + + other_sections = all_sections + forecast_col_name = "yhat" + train_metrics = False + + ds_column_series = pd.to_datetime(self.data[self.spec.datetime_column.name]) + ds_forecast_col = self.outputs[0].index + ci_col_names = ["yhat_lower", "yhat_upper"] + + return ( + model_description, + other_sections, + forecast_col_name, + train_metrics, + ds_column_series, + ds_forecast_col, + ci_col_names, + ) diff --git a/ads/opctl/operator/lowcode/forecast/model/factory.py b/ads/opctl/operator/lowcode/forecast/model/factory.py index a1d930e42..580105c69 100644 --- a/ads/opctl/operator/lowcode/forecast/model/factory.py +++ b/ads/opctl/operator/lowcode/forecast/model/factory.py @@ -8,6 +8,7 @@ from ..operator_config import ForecastOperatorConfig from .arima import ArimaOperatorModel from .automlx import AutoMLXOperatorModel +from .autots import AutoTSOperatorModel from .base_model import ForecastOperatorBaseModel from .neuralprophet import NeuralProphetOperatorModel from .prophet import ProphetOperatorModel @@ -32,6 +33,7 @@ class ForecastOperatorModelFactory: SupportedModels.Arima: ArimaOperatorModel, SupportedModels.NeuralProphet: NeuralProphetOperatorModel, SupportedModels.AutoMLX: AutoMLXOperatorModel, + SupportedModels.AutoTS: AutoTSOperatorModel } @classmethod diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index 96627207b..25aac64ec 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -249,6 +249,7 @@ spec: - arima - neuralprophet - automlx + - autots - auto model_kwargs: diff --git a/pyproject.toml b/pyproject.toml index d6838921e..07486d2b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -153,7 +153,7 @@ viz = [ "seaborn>=0.11.0", ] forecast = [ - "autots", + "autots[additional]", "datapane", "json2table", "neuralprophet", From 71ab9a6cdb6ea81485cd654832b686d84a70a570 Mon Sep 17 00:00:00 2001 From: Vikas Pandey Date: Thu, 14 Sep 2023 08:42:09 +0530 Subject: [PATCH 4/8] added the autots multivariate operator, added the report and model output functions, added/updated the autots package requirement --- ads/opctl/operator/lowcode/forecast/model/autots.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/autots.py b/ads/opctl/operator/lowcode/forecast/model/autots.py index 04c5d4600..ae8babbaf 100644 --- a/ads/opctl/operator/lowcode/forecast/model/autots.py +++ b/ads/opctl/operator/lowcode/forecast/model/autots.py @@ -38,7 +38,7 @@ def _build_model(self) -> pd.DataFrame: forecast_length=self.spec.horizon.periods, frequency="infer", prediction_interval=self.spec.confidence_interval_width, - max_generations=2, # 10 + max_generations=10 no_negatives=False, constraint=None, ensemble=self.spec.model_kwargs.get("ensemble", "auto"), From cd9326cb5919b48bb8423b21480e2f5a9d8e3e60 Mon Sep 17 00:00:00 2001 From: Vikas Pandey Date: Thu, 14 Sep 2023 13:15:56 +0530 Subject: [PATCH 5/8] syntax fix in model params --- ads/opctl/operator/lowcode/forecast/model/autots.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/autots.py b/ads/opctl/operator/lowcode/forecast/model/autots.py index ae8babbaf..7a5d4447f 100644 --- a/ads/opctl/operator/lowcode/forecast/model/autots.py +++ b/ads/opctl/operator/lowcode/forecast/model/autots.py @@ -38,7 +38,7 @@ def _build_model(self) -> pd.DataFrame: forecast_length=self.spec.horizon.periods, frequency="infer", prediction_interval=self.spec.confidence_interval_width, - max_generations=10 + max_generations=10, no_negatives=False, constraint=None, ensemble=self.spec.model_kwargs.get("ensemble", "auto"), From 0c4e143d2576327db21d7bf3e5ca64cb65a567fc Mon Sep 17 00:00:00 2001 From: Allen Hosler Date: Thu, 14 Sep 2023 13:46:58 +0100 Subject: [PATCH 6/8] minor spacing in text --- ads/opctl/operator/lowcode/forecast/model/autots.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/autots.py b/ads/opctl/operator/lowcode/forecast/model/autots.py index 7a5d4447f..1c47d3b24 100644 --- a/ads/opctl/operator/lowcode/forecast/model/autots.py +++ b/ads/opctl/operator/lowcode/forecast/model/autots.py @@ -222,8 +222,8 @@ def _generate_report(self) -> tuple: # Model Description model_description = dp.Text( - "AutoTS is a time series package for Python designed for rapidly deploying high-accuracy forecasts at scale." - "In 2023, AutoTS has won in the M6 forecasting competition," + "AutoTS is a time series package for Python designed for rapidly deploying high-accuracy forecasts at scale. " + "In 2023, AutoTS has won in the M6 forecasting competition, " "delivering the highest performance investment decisions across 12 months of stock market forecasting." ) From a3112c9c59f0e306eaa03788109e086dfa934f67 Mon Sep 17 00:00:00 2001 From: Allen Hosler Date: Thu, 14 Sep 2023 13:48:44 +0100 Subject: [PATCH 7/8] Update autots.py --- ads/opctl/operator/lowcode/forecast/model/autots.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/autots.py b/ads/opctl/operator/lowcode/forecast/model/autots.py index 1c47d3b24..0931ace78 100644 --- a/ads/opctl/operator/lowcode/forecast/model/autots.py +++ b/ads/opctl/operator/lowcode/forecast/model/autots.py @@ -50,8 +50,8 @@ def _build_model(self) -> pd.DataFrame: na_tolerance=1, drop_most_recent=0, drop_data_older_than_periods=None, - model_list="multivariate", - transformer_list="auto", + model_list=self.spec.model_kwargs.get("model_list", "multivariate"), + transformer_list=self.spec.model_kwargs.get("transformer_list", "auto"), transformer_max_depth=6, models_mode="random", num_validations="auto", From f02d4a69c52f433407c2e28fe5bcc48775e81b82 Mon Sep 17 00:00:00 2001 From: Vikas Pandey Date: Wed, 20 Sep 2023 09:40:35 +0530 Subject: [PATCH 8/8] get the possible tunable model params via kwargs --- .../operator/lowcode/forecast/model/autots.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/autots.py b/ads/opctl/operator/lowcode/forecast/model/autots.py index 0931ace78..aba5fb021 100644 --- a/ads/opctl/operator/lowcode/forecast/model/autots.py +++ b/ads/opctl/operator/lowcode/forecast/model/autots.py @@ -38,13 +38,13 @@ def _build_model(self) -> pd.DataFrame: forecast_length=self.spec.horizon.periods, frequency="infer", prediction_interval=self.spec.confidence_interval_width, - max_generations=10, + max_generations=self.spec.model_kwargs.get("max_generations", 10), no_negatives=False, constraint=None, ensemble=self.spec.model_kwargs.get("ensemble", "auto"), - initial_template="General+Random", + initial_template=self.spec.model_kwargs.get("initial_template", "General+Random"), random_seed=2022, - holiday_country="US", + holiday_country=self.spec.model_kwargs.get("holiday_country", "US"), subset=None, aggfunc="first", na_tolerance=1, @@ -52,13 +52,13 @@ def _build_model(self) -> pd.DataFrame: drop_data_older_than_periods=None, model_list=self.spec.model_kwargs.get("model_list", "multivariate"), transformer_list=self.spec.model_kwargs.get("transformer_list", "auto"), - transformer_max_depth=6, - models_mode="random", - num_validations="auto", - models_to_validate=0.15, + transformer_max_depth=self.spec.model_kwargs.get("transformer_max_depth", 6), + models_mode=self.spec.model_kwargs.get("models_mode", "random"), + num_validations=self.spec.model_kwargs.get("num_validations", "auto"), + models_to_validate=self.spec.model_kwargs.get("models_to_validate", 0.15), max_per_model_class=None, - validation_method="backwards", - min_allowed_train_percent=0.5, + validation_method=self.spec.model_kwargs.get("validation_method", "backwards"), + min_allowed_train_percent=self.spec.model_kwargs.get("min_allowed_train_percent", 0.5), remove_leading_zeroes=False, prefill_na=None, introduce_na=None, @@ -204,7 +204,7 @@ def _generate_report(self) -> tuple: # Section 2: AutoTS Model Parameters sec2_text = dp.Text(f"## AutoTS Model Parameters") - # TODO: Format the parameters better for display in report. + # TODO: ODSC-47612 Format the parameters better for display in report. sec2 = dp.Select( blocks=[ dp.HTML(