Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 68 additions & 45 deletions ads/opctl/operator/lowcode/forecast/model/arima.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas as pd
import numpy as np
import pmdarima as pm
from joblib import Parallel, delayed

from ads.opctl import logger

Expand All @@ -29,32 +30,31 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets):
self.formatted_global_explanation = None
self.formatted_local_explanation = None

def _build_model(self) -> pd.DataFrame:
full_data_dict = self.datasets.full_data_dict

# Extract the Confidence Interval Width and convert to arima's equivalent - alpha
if self.spec.confidence_interval_width is None:
self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get(
"alpha", 0.05
)
model_kwargs = self.spec.model_kwargs
model_kwargs["alpha"] = 1 - self.spec.confidence_interval_width
if "error_action" not in model_kwargs.keys():
model_kwargs["error_action"] = "ignore"
def _train_model(self, i, target, df):
"""Trains the ARIMA model for a given target.

models = []
self.datasets.datetime_col = self.spec.datetime_column.name
self.forecast_output = ForecastOutput(
confidence_interval_width=self.spec.confidence_interval_width
)
Parameters
----------
i: int
The index of the target
target: str
The name of the target
df: pd.DataFrame
The dataframe containing the target data
"""
try:
# Extract the Confidence Interval Width and convert to arima's equivalent - alpha
if self.spec.confidence_interval_width is None:
self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get(
"alpha", 0.05
)
model_kwargs = self.spec.model_kwargs
model_kwargs["alpha"] = 1 - self.spec.confidence_interval_width
if "error_action" not in model_kwargs.keys():
model_kwargs["error_action"] = "ignore"

outputs = dict()
outputs_legacy = []
fitted_values = dict()
actual_values = dict()
dt_columns = dict()
# models = []

for i, (target, df) in enumerate(full_data_dict.items()):
# format the dataframe for this target. Dropping NA on target[df] will remove all future data
le, df_encoded = utils._label_encode_dataframe(
df, no_encode={self.spec.datetime_column.name, target}
Expand All @@ -72,9 +72,7 @@ def _build_model(self) -> pd.DataFrame:
target,
self.spec.datetime_column.name,
}
logger.debug(
f"Additional Regressors Detected {list(additional_regressors)}"
)
logger.debug(f"Additional Regressors Detected {list(additional_regressors)}")

# Split data into X and y for arima tune method
y = data_i[target]
Expand All @@ -85,19 +83,17 @@ def _build_model(self) -> pd.DataFrame:
# Build and fit model
model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs)

fitted_values[target] = model.predict_in_sample(X=X_in)
actual_values[target] = y
actual_values[target].index = pd.to_datetime(y.index)
self.fitted_values[target] = model.predict_in_sample(X=X_in)
self.actual_values[target] = y
self.actual_values[target].index = pd.to_datetime(y.index)

# Build future dataframe
start_date = y.index.values[-1]
n_periods = self.spec.horizon
if len(additional_regressors):
X = df_clean[df_clean[target].isnull()].drop(target, axis=1)
else:
X = pd.date_range(
start=start_date, periods=n_periods, freq=self.spec.freq
)
X = pd.date_range(start=start_date, periods=n_periods, freq=self.spec.freq)

# Predict and format forecast
yhat, conf_int = model.predict(
Expand All @@ -108,7 +104,7 @@ def _build_model(self) -> pd.DataFrame:
)
yhat_clean = pd.DataFrame(yhat, index=yhat.index, columns=["yhat"])

dt_columns[target] = df_encoded[self.spec.datetime_column.name]
self.dt_columns[target] = df_encoded[self.spec.datetime_column.name]
conf_int_clean = pd.DataFrame(
conf_int, index=yhat.index, columns=["yhat_lower", "yhat_upper"]
)
Expand All @@ -117,15 +113,42 @@ def _build_model(self) -> pd.DataFrame:
logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail())

# Collect all outputs
models.append(model)
outputs_legacy.append(
# models.append(model)
self.outputs_legacy.append(
forecast.reset_index().rename(columns={"index": "ds"})
)
outputs[target] = forecast
self.outputs[target] = forecast

self.models_dict[target] = model

self.models = models
logger.debug("===========Done===========")
except Exception as e:
self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)}

def _build_model(self) -> pd.DataFrame:
full_data_dict = self.datasets.full_data_dict

self.datasets.datetime_col = self.spec.datetime_column.name
self.forecast_output = ForecastOutput(
confidence_interval_width=self.spec.confidence_interval_width
)

self.outputs = dict()
self.outputs_legacy = []
self.fitted_values = dict()
self.actual_values = dict()
self.dt_columns = dict()
self.models_dict = dict()
self.errors_dict = dict()

Parallel(n_jobs=-1, require="sharedmem")(
delayed(ArimaOperatorModel._train_model)(self, i, target, df)
for self, (i, (target, df)) in zip(
[self] * len(full_data_dict), enumerate(full_data_dict.items())
)
)

logger.debug("===========Done===========")
self.models = [self.models_dict[target] for target in self.target_columns]

# Merge the outputs from each model into 1 df with all outputs by target and category
col = self.original_target_column
Expand All @@ -134,15 +157,15 @@ def _build_model(self) -> pd.DataFrame:
yhat_lower_name = ForecastOutputColumns.LOWER_BOUND
for cat in self.categories:
output_i = pd.DataFrame()
output_i["Date"] = dt_columns[f"{col}_{cat}"]
output_i["Date"] = self.dt_columns[f"{col}_{cat}"]
output_i["Series"] = cat
output_i = output_i.set_index("Date")

output_i["input_value"] = actual_values[f"{col}_{cat}"]
output_i["fitted_value"] = fitted_values[f"{col}_{cat}"]
output_i["forecast_value"] = outputs[f"{col}_{cat}"]["yhat"]
output_i[yhat_upper_name] = outputs[f"{col}_{cat}"]["yhat_upper"]
output_i[yhat_lower_name] = outputs[f"{col}_{cat}"]["yhat_lower"]
output_i["input_value"] = self.actual_values[f"{col}_{cat}"]
output_i["fitted_value"] = self.fitted_values[f"{col}_{cat}"]
output_i["forecast_value"] = self.outputs[f"{col}_{cat}"]["yhat"]
output_i[yhat_upper_name] = self.outputs[f"{col}_{cat}"]["yhat_upper"]
output_i[yhat_lower_name] = self.outputs[f"{col}_{cat}"]["yhat_lower"]

output_i = output_i.reset_index(drop=False)
output_col = pd.concat([output_col, output_i])
Expand Down Expand Up @@ -252,7 +275,7 @@ def _custom_predict_arima(self, data):

"""
date_col = self.spec.datetime_column.name
data[date_col] = pd.to_datetime(data[date_col], unit='s')
data[date_col] = pd.to_datetime(data[date_col], unit="s")
data = data.set_index(date_col)
# Get the index of the current series id
series_index = self.target_columns.index(self.series_id)
Expand Down
155 changes: 82 additions & 73 deletions ads/opctl/operator/lowcode/forecast/model/automlx.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets):
),
)
def _build_model(self) -> pd.DataFrame:

from automl import init
from sktime.forecasting.model_selection import temporal_train_test_split

init(engine="local", check_deprecation_warnings=False)
init(
engine="local",
engine_opts={"n_jobs": -1, "model_n_jobs": -1},
check_deprecation_warnings=False,
)

full_data_dict = self.datasets.full_data_dict

Expand All @@ -63,6 +68,7 @@ def _build_model(self) -> pd.DataFrame:
self.forecast_output = ForecastOutput(
confidence_interval_width=self.spec.confidence_interval_width
)
self.errors_dict = dict()

# Clean up kwargs for pass through
model_kwargs_cleaned = self.spec.model_kwargs.copy()
Expand All @@ -80,81 +86,84 @@ def _build_model(self) -> pd.DataFrame:
] = self.spec.preprocessing or model_kwargs_cleaned.get("preprocessing", True)

for i, (target, df) in enumerate(full_data_dict.items()):
logger.debug("Running automl for {} at position {}".format(target, i))
series_values = df[df[target].notna()]
# drop NaNs for the time period where data wasn't recorded
series_values.dropna(inplace=True)
df[date_column] = pd.to_datetime(
df[date_column], format=self.spec.datetime_column.format
)
df = df.set_index(date_column)
# if len(df.columns) > 1:
# when additional columns are present
y_train, y_test = temporal_train_test_split(df, test_size=horizon)
forecast_x = y_test.drop(target, axis=1)
# else:
# y_train = df
# forecast_x = None
logger.debug(
"Time Index is" + ""
if y_train.index.is_monotonic
else "NOT" + "monotonic."
)
model = automl.Pipeline(
task="forecasting",
**model_kwargs_cleaned,
)
model.fit(
X=y_train.drop(target, axis=1),
y=pd.DataFrame(y_train[target]),
time_budget=time_budget,
)
logger.debug("Selected model: {}".format(model.selected_model_))
logger.debug(
"Selected model params: {}".format(model.selected_model_params_)
)
summary_frame = model.forecast(
X=forecast_x,
periods=horizon,
alpha=1 - (self.spec.confidence_interval_width / 100),
)
input_values = pd.Series(
y_train[target].values,
name="input_value",
index=y_train.index,
)
fitted_values_raw = model.predict(y_train.drop(target, axis=1))
fitted_values = pd.Series(
fitted_values_raw[target].values,
name="fitted_value",
index=y_train.index,
)
try:
logger.debug("Running automl for {} at position {}".format(target, i))
series_values = df[df[target].notna()]
# drop NaNs for the time period where data wasn't recorded
series_values.dropna(inplace=True)
df[date_column] = pd.to_datetime(
df[date_column], format=self.spec.datetime_column.format
)
df = df.set_index(date_column)
# if len(df.columns) > 1:
# when additional columns are present
y_train, y_test = temporal_train_test_split(df, test_size=horizon)
forecast_x = y_test.drop(target, axis=1)
# else:
# y_train = df
# forecast_x = None
logger.debug(
"Time Index is" + ""
if y_train.index.is_monotonic
else "NOT" + "monotonic."
)
model = automl.Pipeline(
task="forecasting",
**model_kwargs_cleaned,
)
model.fit(
X=y_train.drop(target, axis=1),
y=pd.DataFrame(y_train[target]),
time_budget=time_budget,
)
logger.debug("Selected model: {}".format(model.selected_model_))
logger.debug(
"Selected model params: {}".format(model.selected_model_params_)
)
summary_frame = model.forecast(
X=forecast_x,
periods=horizon,
alpha=1 - (self.spec.confidence_interval_width / 100),
)
input_values = pd.Series(
y_train[target].values,
name="input_value",
index=y_train.index,
)
fitted_values_raw = model.predict(y_train.drop(target, axis=1))
fitted_values = pd.Series(
fitted_values_raw[target].values,
name="fitted_value",
index=y_train.index,
)

summary_frame = pd.concat(
[input_values, fitted_values, summary_frame], axis=1
)
summary_frame = pd.concat(
[input_values, fitted_values, summary_frame], axis=1
)

# Collect Outputs
selected_models[target] = {
"series_id": target,
"selected_model": model.selected_model_,
"model_params": model.selected_model_params_,
}
models[target] = model
summary_frame = summary_frame.rename_axis("ds").reset_index()
summary_frame = summary_frame.rename(
columns={
f"{target}_ci_upper": "yhat_upper",
f"{target}_ci_lower": "yhat_lower",
f"{target}": "yhat",
# Collect Outputs
selected_models[target] = {
"series_id": target,
"selected_model": model.selected_model_,
"model_params": model.selected_model_params_,
}
)
# In case of Naive model, model.forecast function call does not return confidence intervals.
if "yhat_upper" not in summary_frame:
summary_frame["yhat_upper"] = np.NAN
summary_frame["yhat_lower"] = np.NAN
outputs[target] = summary_frame
# outputs_legacy[target] = summary_frame
models[target] = model
summary_frame = summary_frame.rename_axis("ds").reset_index()
summary_frame = summary_frame.rename(
columns={
f"{target}_ci_upper": "yhat_upper",
f"{target}_ci_lower": "yhat_lower",
f"{target}": "yhat",
}
)
# In case of Naive model, model.forecast function call does not return confidence intervals.
if "yhat_upper" not in summary_frame:
summary_frame["yhat_upper"] = np.NAN
summary_frame["yhat_lower"] = np.NAN
outputs[target] = summary_frame
# outputs_legacy[target] = summary_frame
except Exception as e:
self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)}

logger.debug("===========Forecast Generated===========")
outputs_merged = pd.DataFrame()
Expand Down
Loading