Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time series forecasting support #611

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion frameworks/FEDOT/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
from amlb.benchmark import TaskConfig
from amlb.data import Dataset
from amlb.data import Dataset, DatasetType
from amlb.utils import call_script_in_same_dir
from copy import deepcopy


def setup(*args, **kwargs):
call_script_in_same_dir(__file__, "setup.sh", *args, **kwargs)


def run(dataset: Dataset, config: TaskConfig):
if dataset.type == DatasetType.timeseries:
return run_fedot_timeseries(dataset, config)
return run_fedot_tabular(dataset, config)


def run_fedot_tabular(dataset: Dataset, config: TaskConfig):
from frameworks.shared.caller import run_in_venv

data = dict(
Expand All @@ -23,3 +30,24 @@ def run(dataset: Dataset, config: TaskConfig):

return run_in_venv(__file__, "exec.py",
input_data=data, dataset=dataset, config=config)


def run_fedot_timeseries(dataset: Dataset, config: TaskConfig):
from frameworks.shared.caller import run_in_venv
dataset = deepcopy(dataset)

data = dict(
train_path=dataset.train.path,
test_path=dataset.test.path,
target=dataset.target.name,
id_column=dataset.id_column,
timestamp_column=dataset.timestamp_column,
forecast_horizon_in_steps=dataset.forecast_horizon_in_steps,
freq=dataset.freq,
seasonality=dataset.seasonality,
repeated_abs_seasonal_error=dataset.repeated_abs_seasonal_error,
repeated_item_id=dataset.repeated_item_id,
)

return run_in_venv(__file__, "exec_ts.py",
input_data=data, dataset=dataset, config=config)
43 changes: 24 additions & 19 deletions frameworks/FEDOT/exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,14 @@ def run(dataset, config):
log.info("\n**** FEDOT ****\n")

is_classification = config.type == 'classification'
# Mapping of benchmark metrics to FEDOT metrics
metrics_mapping = dict(
acc='acc',
auc='roc_auc',
f1='f1',
logloss='logloss',
mae='mae',
mse='mse',
msle='msle',
r2='r2',
rmse='rmse'
)
scoring_metric = metrics_mapping.get(config.metric, None)

if scoring_metric is None:
log.warning("Performance metric %s not supported.", config.metric)
scoring_metric = get_fedot_metrics(config)

training_params = {"preset": "best_quality", "n_jobs": config.cores}
training_params |= {k: v for k, v in config.framework_params.items() if not k.startswith('_')}
training_params.update({k: v for k, v in config.framework_params.items() if not k.startswith('_')})
n_jobs = training_params["n_jobs"]

log.info('Running FEDOT with a maximum time of %ss on %s cores, optimizing %s.',
config.max_runtime_seconds, n_jobs, scoring_metric)
log.info(f"Running FEDOT with a maximum time of {config.max_runtime_seconds}s on {n_jobs} cores, \
optimizing {scoring_metric}")
runtime_min = config.max_runtime_seconds / 60

fedot = Fedot(problem=config.type, timeout=runtime_min, metric=scoring_metric, seed=config.seed,
Expand Down Expand Up @@ -64,6 +49,26 @@ def run(dataset, config):
predict_duration=predict.duration)


def get_fedot_metrics(config):
metrics_mapping = dict(
acc='accuracy',
auc='roc_auc',
f1='f1',
logloss='neg_log_loss',
mae='mae',
mse='mse',
msle='msle',
r2='r2',
rmse='rmse',
)
scoring_metric = metrics_mapping.get(config.metric, None)

if scoring_metric is None:
log.warning(f"Performance metric {config.metric} not supported.")

return scoring_metric


def save_artifacts(automl, config):

artifacts = config.framework_params.get('_save_artifacts', [])
Expand Down
149 changes: 149 additions & 0 deletions frameworks/FEDOT/exec_ts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import logging
import os
from pathlib import Path
import numpy as np

from fedot.api.main import Fedot
from fedot.core.repository.tasks import Task, TaskTypesEnum, TsForecastingParams
from fedot.core.data.data import InputData
from fedot.core.repository.dataset_types import DataTypesEnum

from frameworks.shared.callee import call_run, result, output_subdir
from frameworks.shared.utils import Timer, load_timeseries_dataset

log = logging.getLogger(__name__)


def run(dataset, config):
log.info("\n**** FEDOT ****\n")

scoring_metric = get_fedot_metrics(config)

training_params = {"preset": "best_quality", "n_jobs": config.cores}
training_params.update({k: v for k, v in config.framework_params.items() if not k.startswith('_')})
n_jobs = training_params["n_jobs"]

log.info(f"Running FEDOT with a maximum time of {config.max_runtime_seconds}s on {n_jobs} cores, \
optimizing {scoring_metric}")
runtime_min = config.max_runtime_seconds / 60

task = Task(
TaskTypesEnum.ts_forecasting,
TsForecastingParams(forecast_length=dataset.forecast_horizon_in_steps)
)

train_df, test_df = load_timeseries_dataset(dataset)
id_column = dataset.id_column

log.info('Predicting on the test set.')
training_duration, predict_duration = 0, 0
models_count = 0
truth_only = test_df[dataset.target].values
predictions = []


for label, train_subdf in train_df.groupby(id_column, sort=False):
train_series = train_subdf[dataset.target].to_numpy()
train_input = InputData(
idx=np.arange(len(train_series)),
features=train_series,
target=train_series,
task=task,
data_type=DataTypesEnum.ts
)

test_sub_df = test_df[test_df[id_column] == label].drop(columns=[id_column], axis=1)
horizon = len(test_sub_df[dataset.target])

fedot = Fedot(
problem=TaskTypesEnum.ts_forecasting.value,
task_params=task.task_params,
timeout=runtime_min,
metric=scoring_metric,
seed=config.seed,
max_pipeline_fit_time=runtime_min / 10,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is / 10 necessary here?

Copy link
Author

@Lopa10ko Lopa10ko Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally speaking, this is a small safety measure to ensure that the training time of one pipeline is exactly within the total timeout. the classification and regression #563 uses the same empirical approach. it should be patched in the future

**training_params
)

with Timer() as training:
fedot.fit(train_input)
training_duration += training.duration

with Timer() as predict:
try:
prediction = fedot.forecast(train_input, horizon=horizon)
except Exception as e:
log.info('Pipeline crashed. Using no-op forecasting')
prediction = np.full(horizon, train_series[-1])

predict_duration += predict.duration

predictions.append(prediction)
models_count += fedot.current_pipeline.length

optional_columns = dict(
repeated_item_id=np.load(dataset.repeated_item_id),
repeated_abs_seasonal_error=np.load(dataset.repeated_abs_seasonal_error),
)
save_artifacts(fedot, config)
return result(output_file=config.output_predictions_file,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary to return dataset.repeated_item_id and dataset.repeated_abs_seasonal_error as optional_columns in the result for MASE computation to work correctly (see https://github.com/openml/automlbenchmark/blob/master/frameworks/AutoGluon/exec_ts.py#L63C1-L67C1).
This is a rather ugly hack that is necessary to make history-dependent metrics like MASE compatible with the AMLB results API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

predictions=np.hstack(predictions),
truth=truth_only,
target_is_encoded=False,
models_count=models_count,
training_duration=training_duration,
predict_duration=predict_duration,
optional_columns=optional_columns)


def get_fedot_metrics(config):
metrics_mapping = dict(
mape='mape',
smape='smape',
mase='mase',
mse='mse',
rmse='rmse',
mae='mae',
r2='r2',
)
scoring_metric = metrics_mapping.get(config.metric, None)

if scoring_metric is None:
log.warning(f"Performance metric {config.metric} not supported.")

return scoring_metric


def save_artifacts(automl, config):

artifacts = config.framework_params.get('_save_artifacts', [])
if 'models' in artifacts:
try:
models_dir = output_subdir('models', config)
models_file = os.path.join(models_dir, 'model.json')
automl.current_pipeline.save(models_file)
except Exception as e:
log.info(f"Error when saving 'models': {e}.", exc_info=True)

if 'info' in artifacts:
try:
info_dir = output_subdir("info", config)
if automl.history:
automl.history.save(os.path.join(info_dir, 'history.json'))
else:
log.info(f"There is no optimization history info to save.")
except Exception as e:
log.info(f"Error when saving info about optimisation history: {e}.", exc_info=True)

if 'leaderboard' in artifacts:
try:
leaderboard_dir = output_subdir("leaderboard", config)
if automl.history:
lb = automl.history.get_leaderboard()
Path(os.path.join(leaderboard_dir, "leaderboard.csv")).write_text(lb)
except Exception as e:
log.info(f"Error when saving 'leaderboard': {e}.", exc_info=True)


if __name__ == '__main__':
call_run(run)