Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions ads/opctl/operator/lowcode/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import tempfile
from typing import List, Union

import cloudpickle
import fsspec
import oracledb
import pandas as pd
Expand Down Expand Up @@ -126,7 +127,26 @@ def load_data(data_spec, storage_options=None, **kwargs):
return data


def _safe_write(fn, **kwargs):
try:
fn(**kwargs)
except Exception:
logger.warning(f'Failed to write file {kwargs.get("filename", "UNKNOWN")}')


def write_data(data, filename, format, storage_options=None, index=False, **kwargs):
return _safe_write(
fn=_write_data,
data=data,
filename=filename,
format=format,
storage_options=storage_options,
index=index,
**kwargs,
)


def _write_data(data, filename, format, storage_options=None, index=False, **kwargs):
disable_print()
if not format:
_, format = os.path.splitext(filename)
Expand All @@ -143,11 +163,24 @@ def write_data(data, filename, format, storage_options=None, index=False, **kwar


def write_json(json_dict, filename, storage_options=None):
return _safe_write(
fn=_write_json,
json_dict=json_dict,
filename=filename,
storage_options=storage_options,
)


def _write_json(json_dict, filename, storage_options=None):
with fsspec.open(filename, mode="w", **storage_options) as f:
f.write(json.dumps(json_dict))


def write_simple_json(data, path):
return _safe_write(fn=_write_simple_json, data=data, path=path)


def _write_simple_json(data, path):
if ObjectStorageDetails.is_oci_path(path):
storage_options = default_signer()
else:
Expand All @@ -156,6 +189,60 @@ def write_simple_json(data, path):
json.dump(data, f, indent=4)


def write_file(local_filename, remote_filename, storage_options, **kwargs):
return _safe_write(
fn=_write_file,
local_filename=local_filename,
remote_filename=remote_filename,
storage_options=storage_options,
**kwargs,
)


def _write_file(local_filename, remote_filename, storage_options, **kwargs):
with open(local_filename) as f1:
with fsspec.open(
remote_filename,
"w",
**storage_options,
) as f2:
f2.write(f1.read())


def load_pkl(filepath):
return _safe_write(fn=_load_pkl, filepath=filepath)


def _load_pkl(filepath):
storage_options = {}
if ObjectStorageDetails.is_oci_path(filepath):
storage_options = default_signer()

with fsspec.open(filepath, "rb", **storage_options) as f:
return cloudpickle.load(f)
return None


def write_pkl(obj, filename, output_dir, storage_options):
return _safe_write(
fn=_write_pkl,
obj=obj,
filename=filename,
output_dir=output_dir,
storage_options=storage_options,
)


def _write_pkl(obj, filename, output_dir, storage_options):
pkl_path = os.path.join(output_dir, filename)
with fsspec.open(
pkl_path,
"wb",
**storage_options,
) as f:
cloudpickle.dump(obj, f)


def merge_category_columns(data, target_category_columns):
result = data.apply(
lambda x: "__".join([str(x[col]) for col in target_category_columns]), axis=1
Expand Down Expand Up @@ -290,4 +377,8 @@ def disable_print():

# Restore
def enable_print():
try:
sys.stdout.close()
except Exception:
pass
sys.stdout = sys.__stdout__
17 changes: 14 additions & 3 deletions ads/opctl/operator/lowcode/forecast/model/automlx.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets):
super().__init__(config, datasets)
self.global_explanation = {}
self.local_explanation = {}
self.explainability_kwargs = {}

def set_kwargs(self):
model_kwargs_cleaned = self.spec.model_kwargs
Expand All @@ -54,6 +55,9 @@ def set_kwargs(self):
self.spec.preprocessing.enabled
or model_kwargs_cleaned.get("preprocessing", True)
)
sample_ratio = model_kwargs_cleaned.pop("sample_to_feature_ratio", None)
if sample_ratio is not None:
self.explainability_kwargs = {"sample_to_feature_ratio": sample_ratio}
return model_kwargs_cleaned, time_budget

def preprocess(self, data, series_id): # TODO: re-use self.le for explanations
Expand Down Expand Up @@ -445,6 +449,7 @@ def explain_model(self):
else None,
pd.DataFrame(data_i[self.spec.target_column]),
task="forecasting",
**self.explainability_kwargs,
)

# Generate explanations for the forecast
Expand Down Expand Up @@ -518,7 +523,9 @@ def get_validation_score_and_metric(self, model):
model_params = model.selected_model_params_
if len(trials) > 0:
score_col = [col for col in trials.columns if "Score" in col][0]
validation_score = trials[trials.Hyperparameters == model_params][score_col].iloc[0]
validation_score = trials[trials.Hyperparameters == model_params][
score_col
].iloc[0]
else:
validation_score = 0
return -1 * validation_score
Expand All @@ -531,8 +538,12 @@ def generate_train_metrics(self) -> pd.DataFrame:
for s_id in self.forecast_output.list_series_ids():
try:
metrics = {self.spec.metric.upper(): self.models[s_id]["score"]}
metrics_df = pd.DataFrame.from_dict(metrics, orient="index", columns=[s_id])
logger.warning("AutoMLX failed to generate training metrics. Recovering validation loss instead")
metrics_df = pd.DataFrame.from_dict(
metrics, orient="index", columns=[s_id]
)
logger.warning(
"AutoMLX failed to generate training metrics. Recovering validation loss instead"
)
total_metrics = pd.concat([total_metrics, metrics_df], axis=1)
except Exception as e:
logger.debug(
Expand Down
26 changes: 14 additions & 12 deletions ads/opctl/operator/lowcode/forecast/model/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from abc import ABC, abstractmethod
from typing import Tuple

import fsspec
import numpy as np
import pandas as pd
import report_creator as rc
Expand All @@ -25,10 +24,13 @@
disable_print,
enable_print,
human_time_friendly,
load_pkl,
merged_category_column_name,
seconds_to_datetime,
write_data,
write_file,
write_json,
write_pkl,
)
from ads.opctl.operator.lowcode.forecast.utils import (
_build_metrics_df,
Expand All @@ -38,8 +40,6 @@
evaluate_train_metrics,
get_auto_select_plot,
get_forecast_plots,
load_pkl,
write_pkl,
)

from ..const import (
Expand Down Expand Up @@ -493,13 +493,11 @@ def _save_report(
enable_print()

report_path = os.path.join(unique_output_dir, self.spec.report_filename)
with open(report_local_path) as f1:
with fsspec.open(
report_path,
"w",
**storage_options,
) as f2:
f2.write(f1.read())
write_file(
local_filename=report_local_path,
remote_filename=report_path,
storage_options=storage_options,
)

# forecast csv report
# todo: add test data into forecast.csv
Expand Down Expand Up @@ -576,7 +574,9 @@ def _save_report(
# Round to 4 decimal places before writing
global_expl_rounded = self.formatted_global_explanation.copy()
global_expl_rounded = global_expl_rounded.apply(
lambda col: np.round(col, 4) if np.issubdtype(col.dtype, np.number) else col
lambda col: np.round(col, 4)
if np.issubdtype(col.dtype, np.number)
else col
)
if self.spec.generate_explanation_files:
write_data(
Expand All @@ -598,7 +598,9 @@ def _save_report(
# Round to 4 decimal places before writing
local_expl_rounded = self.formatted_local_explanation.copy()
local_expl_rounded = local_expl_rounded.apply(
lambda col: np.round(col, 4) if np.issubdtype(col.dtype, np.number) else col
lambda col: np.round(col, 4)
if np.issubdtype(col.dtype, np.number)
else col
)
if self.spec.generate_explanation_files:
write_data(
Expand Down
24 changes: 10 additions & 14 deletions ads/opctl/operator/lowcode/forecast/model/neuralprophet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
from ads.opctl.operator.lowcode.common.utils import (
disable_print,
enable_print,
)
from ads.opctl.operator.lowcode.forecast.utils import (
_select_plot_list,
load_pkl,
write_pkl,
)
from ads.opctl.operator.lowcode.forecast.utils import _select_plot_list

from ..const import DEFAULT_TRIALS, SupportedModels
from ..operator_config import ForecastOperatorConfig
Expand Down Expand Up @@ -159,20 +157,18 @@ def _train_model(self, i, s_id, df, model_kwargs):
upper_bound=self.get_horizon(forecast[upper_bound_col_name]).values,
lower_bound=self.get_horizon(forecast[lower_bound_col_name]).values,
)
core_columns = set(forecast.columns) - set(
[
"y",
"yhat1",
upper_bound_col_name,
lower_bound_col_name,
"future_regressors_additive",
"future_regressors_multiplicative",
]
)
core_columns = set(forecast.columns) - {
"y",
"yhat1",
upper_bound_col_name,
lower_bound_col_name,
"future_regressors_additive",
"future_regressors_multiplicative",
}
exog_variables = set(
filter(lambda x: x.startswith("future_regressor_"), list(core_columns))
)
combine_terms = list(core_columns - exog_variables - set(["ds"]))
combine_terms = list(core_columns - exog_variables - {"ds"})
temp_df = (
forecast[list(core_columns)]
.rename({"ds": "Date"}, axis=1)
Expand Down
19 changes: 12 additions & 7 deletions ads/opctl/operator/lowcode/forecast/model/prophet.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,27 @@ def _fit_model(data, params, additional_regressors):
from prophet import Prophet

monthly_seasonality = params.pop("monthly_seasonality", False)
data_floor = params.pop("min", None)
data_cap = params.pop("max", None)
if data_cap or data_floor:

has_min = "min" in params
has_max = "max" in params
if has_min or has_max:
params["growth"] = "logistic"
data_floor = params.pop("min", None)
data_cap = params.pop("max", None)

model = Prophet(**params)
if monthly_seasonality:
model.add_seasonality(name="monthly", period=30.5, fourier_order=5)
params["monthly_seasonality"] = monthly_seasonality
for add_reg in additional_regressors:
model.add_regressor(add_reg)
if data_floor:

if has_min:
data["floor"] = float(data_floor)
params["floor"] = data_floor
if data_cap:
params["min"] = data_floor
if has_max:
data["cap"] = float(data_cap)
params["cap"] = data_cap
params["max"] = data_cap

model.fit(data)
return model
Expand Down
Loading