Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions ads/opctl/operator/lowcode/forecast/model/arima.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ..operator_config import ForecastOperatorConfig
import traceback
from .forecast_datasets import ForecastDatasets, ForecastOutput
from ..const import ForecastOutputColumns
from ..const import ForecastOutputColumns, SupportedModels


class ArimaOperatorModel(ForecastOperatorBaseModel):
Expand Down Expand Up @@ -80,8 +80,10 @@ def _train_model(self, i, target, df):
if len(additional_regressors):
X_in = data_i.drop(target, axis=1)

# Build and fit model
model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs)
model = self.loaded_models[target] if self.loaded_models is not None else None
if model is None:
# Build and fit model
model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs)

self.fitted_values[target] = model.predict_in_sample(X=X_in)
self.actual_values[target] = y
Expand Down Expand Up @@ -119,7 +121,17 @@ def _train_model(self, i, target, df):
)
self.outputs[target] = forecast

self.models_dict[target] = model
if self.loaded_models is None:
self.models[target] = model

params = vars(model).copy()
for param in ['arima_res_', 'endog_index_']:
if param in params:
params.pop(param)
self.model_parameters[target] = {
"framework": SupportedModels.Arima,
**params,
}

logger.debug("===========Done===========")
except Exception as e:
Expand All @@ -133,12 +145,12 @@ def _build_model(self) -> pd.DataFrame:
confidence_interval_width=self.spec.confidence_interval_width
)

self.models = dict()
self.outputs = dict()
self.outputs_legacy = []
self.fitted_values = dict()
self.actual_values = dict()
self.dt_columns = dict()
self.models_dict = dict()
self.errors_dict = dict()

Parallel(n_jobs=-1, require="sharedmem")(
Expand All @@ -148,13 +160,15 @@ def _build_model(self) -> pd.DataFrame:
)
)

self.models = [self.models_dict[target] for target in self.target_columns]
if self.loaded_models is not None:
self.models = self.loaded_models

# Merge the outputs from each model into 1 df with all outputs by target and category
col = self.original_target_column
output_col = pd.DataFrame()
yhat_upper_name = ForecastOutputColumns.UPPER_BOUND
yhat_lower_name = ForecastOutputColumns.LOWER_BOUND

for cat in self.categories:
output_i = pd.DataFrame()
output_i["Date"] = self.dt_columns[f"{col}_{cat}"]
Expand Down Expand Up @@ -183,8 +197,8 @@ def _generate_report(self):

sec5_text = dp.Text(f"## ARIMA Model Parameters")
blocks = [
dp.HTML(m.summary().as_html(), label=self.target_columns[i])
for i, m in enumerate(self.models)
dp.HTML(m.summary().as_html(), label=target)
for i, (target, m) in enumerate(self.models.items())
]
sec5 = dp.Select(blocks=blocks) if len(blocks) > 1 else blocks[0]
all_sections = [sec5_text, sec5]
Expand All @@ -196,7 +210,6 @@ def _generate_report(self):
datetime_col_name=self.spec.datetime_column.name,
explain_predict_fn=self._custom_predict_arima,
)

# Create a markdown text block for the global explanation section
global_explanation_text = dp.Text(
f"## Global Explanation of Models \n "
Expand Down Expand Up @@ -277,10 +290,8 @@ def _custom_predict_arima(self, data):
date_col = self.spec.datetime_column.name
data[date_col] = pd.to_datetime(data[date_col], unit="s")
data = data.set_index(date_col)
# Get the index of the current series id
series_index = self.target_columns.index(self.series_id)

# Use the ARIMA model to predict the values
predictions = self.models[series_index].predict(X=data, n_periods=len(data))
predictions = self.models[self.series_id].predict(X=data, n_periods=len(data))

return predictions
86 changes: 53 additions & 33 deletions ads/opctl/operator/lowcode/forecast/model/automlx.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ads.common.decorator.runtime_dependency import runtime_dependency
from ads.opctl.operator.lowcode.forecast.const import (
AUTOMLX_METRIC_MAP,
ForecastOutputColumns,
ForecastOutputColumns, SupportedModels,
)
from ads.opctl import logger

Expand Down Expand Up @@ -60,7 +60,6 @@ def _build_model(self) -> pd.DataFrame:
models = dict()
outputs = dict()
outputs_legacy = dict()
selected_models = dict()
date_column = self.spec.datetime_column.name
horizon = self.spec.horizon
self.datasets.datetime_col = date_column
Expand All @@ -71,19 +70,22 @@ def _build_model(self) -> pd.DataFrame:
self.errors_dict = dict()

# Clean up kwargs for pass through
model_kwargs_cleaned = self.spec.model_kwargs.copy()
model_kwargs_cleaned["n_algos_tuned"] = model_kwargs_cleaned.get(
"n_algos_tuned", AUTOMLX_N_ALGOS_TUNED
)
model_kwargs_cleaned["score_metric"] = AUTOMLX_METRIC_MAP.get(
self.spec.metric,
model_kwargs_cleaned.get("score_metric", AUTOMLX_DEFAULT_SCORE_METRIC),
)
model_kwargs_cleaned.pop("task", None)
time_budget = model_kwargs_cleaned.pop("time_budget", 0)
model_kwargs_cleaned[
"preprocessing"
] = self.spec.preprocessing or model_kwargs_cleaned.get("preprocessing", True)
model_kwargs_cleaned = None

if self.loaded_models is None:
model_kwargs_cleaned = self.spec.model_kwargs.copy()
model_kwargs_cleaned["n_algos_tuned"] = model_kwargs_cleaned.get(
"n_algos_tuned", AUTOMLX_N_ALGOS_TUNED
)
model_kwargs_cleaned["score_metric"] = AUTOMLX_METRIC_MAP.get(
self.spec.metric,
model_kwargs_cleaned.get("score_metric", AUTOMLX_DEFAULT_SCORE_METRIC),
)
model_kwargs_cleaned.pop("task", None)
time_budget = model_kwargs_cleaned.pop("time_budget", 0)
model_kwargs_cleaned[
"preprocessing"
] = self.spec.preprocessing or model_kwargs_cleaned.get("preprocessing", True)

for i, (target, df) in enumerate(full_data_dict.items()):
try:
Expand All @@ -107,15 +109,18 @@ def _build_model(self) -> pd.DataFrame:
if y_train.index.is_monotonic
else "NOT" + "monotonic."
)
model = automl.Pipeline(
task="forecasting",
**model_kwargs_cleaned,
)
model.fit(
X=y_train.drop(target, axis=1),
y=pd.DataFrame(y_train[target]),
time_budget=time_budget,
)
model = self.loaded_models[target] if self.loaded_models is not None else None

if model is None:
model = automl.Pipeline(
task="forecasting",
**model_kwargs_cleaned,
)
model.fit(
X=y_train.drop(target, axis=1),
y=pd.DataFrame(y_train[target]),
time_budget=time_budget,
)
logger.debug("Selected model: {}".format(model.selected_model_))
logger.debug(
"Selected model params: {}".format(model.selected_model_params_)
Expand All @@ -142,12 +147,8 @@ def _build_model(self) -> pd.DataFrame:
)

# Collect Outputs
selected_models[target] = {
"series_id": target,
"selected_model": model.selected_model_,
"model_params": model.selected_model_params_,
}
models[target] = model
if self.loaded_models is None:
models[target] = model
summary_frame = summary_frame.rename_axis("ds").reset_index()
summary_frame = summary_frame.rename(
columns={
Expand All @@ -162,6 +163,24 @@ def _build_model(self) -> pd.DataFrame:
summary_frame["yhat_lower"] = np.NAN
outputs[target] = summary_frame
# outputs_legacy[target] = summary_frame

self.model_parameters[target] = {
"framework": SupportedModels.AutoMLX,
"score_metric": model.score_metric,
"random_state": model.random_state,
"model_list": model.model_list,
"n_algos_tuned": model.n_algos_tuned,
"adaptive_sampling": model.adaptive_sampling,
"min_features": model.min_features,
"optimization": model.optimization,
"preprocessing": model.preprocessing,
"search_space": model.search_space,
"time_series_period": model.time_series_period,
"min_class_instances": model.min_class_instances,
"max_tuning_trials": model.max_tuning_trials,
"selected_model": model.selected_model_,
"selected_model_params": model.selected_model_params_,
}
except Exception as e:
self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)}

Expand Down Expand Up @@ -191,7 +210,8 @@ def _build_model(self) -> pd.DataFrame:
# output_col = output_col.reset_index(drop=True)
# outputs_merged = pd.concat([outputs_merged, output_col], axis=1)

self.models = models
self.models = models if self.loaded_models is None else self.loaded_models

return outputs_merged

@runtime_dependency(
Expand Down Expand Up @@ -262,7 +282,7 @@ def _generate_report(self):
global_explanation_df = pd.DataFrame(self.global_explanation)

self.formatted_global_explanation = (
global_explanation_df / global_explanation_df.sum(axis=0) * 100
global_explanation_df / global_explanation_df.sum(axis=0) * 100
)

# Create a markdown section for the global explainability
Expand All @@ -285,7 +305,7 @@ def _generate_report(self):
dp.DataTable(
local_ex_df.div(local_ex_df.abs().sum(axis=1), axis=0) * 100,
label=s_id,
)
)
for s_id, local_ex_df in self.local_explanation.items()
]
local_explanation_section = (
Expand Down
Loading