Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time series forecasting support #611

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion frameworks/FEDOT/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
from amlb.benchmark import TaskConfig
from amlb.data import Dataset
from amlb.data import Dataset, DatasetType
from amlb.utils import call_script_in_same_dir
from copy import deepcopy


def setup(*args, **kwargs):
call_script_in_same_dir(__file__, "setup.sh", *args, **kwargs)


def run(dataset: Dataset, config: TaskConfig):
if dataset.type == DatasetType.timeseries:
return run_fedot_timeseries(dataset, config)
return run_fedot_tabular(dataset, config)


def run_fedot_tabular(dataset: Dataset, config: TaskConfig):
from frameworks.shared.caller import run_in_venv

data = dict(
Expand All @@ -23,3 +30,24 @@ def run(dataset: Dataset, config: TaskConfig):

return run_in_venv(__file__, "exec.py",
input_data=data, dataset=dataset, config=config)


def run_fedot_timeseries(dataset: Dataset, config: TaskConfig):
from frameworks.shared.caller import run_in_venv
dataset = deepcopy(dataset)

data = dict(
train_path=dataset.train.path,
test_path=dataset.test.path,
target=dataset.target.name,
id_column=dataset.id_column,
timestamp_column=dataset.timestamp_column,
forecast_horizon_in_steps=dataset.forecast_horizon_in_steps,
freq=dataset.freq,
seasonality=dataset.seasonality,
repeated_abs_seasonal_error=dataset.repeated_abs_seasonal_error,
repeated_item_id=dataset.repeated_item_id,
)

return run_in_venv(__file__, "exec_ts.py",
input_data=data, dataset=dataset, config=config)
43 changes: 24 additions & 19 deletions frameworks/FEDOT/exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,14 @@ def run(dataset, config):
log.info("\n**** FEDOT ****\n")

is_classification = config.type == 'classification'
# Mapping of benchmark metrics to FEDOT metrics
metrics_mapping = dict(
acc='acc',
auc='roc_auc',
f1='f1',
logloss='logloss',
mae='mae',
mse='mse',
msle='msle',
r2='r2',
rmse='rmse'
)
scoring_metric = metrics_mapping.get(config.metric, None)

if scoring_metric is None:
log.warning("Performance metric %s not supported.", config.metric)
scoring_metric = get_fedot_metrics(config)

training_params = {"preset": "best_quality", "n_jobs": config.cores}
training_params |= {k: v for k, v in config.framework_params.items() if not k.startswith('_')}
training_params.update({k: v for k, v in config.framework_params.items() if not k.startswith('_')})
n_jobs = training_params["n_jobs"]

log.info('Running FEDOT with a maximum time of %ss on %s cores, optimizing %s.',
config.max_runtime_seconds, n_jobs, scoring_metric)
log.info(f"Running FEDOT with a maximum time of {config.max_runtime_seconds}s on {n_jobs} cores, \
optimizing {scoring_metric}")
runtime_min = config.max_runtime_seconds / 60

fedot = Fedot(problem=config.type, timeout=runtime_min, metric=scoring_metric, seed=config.seed,
Expand Down Expand Up @@ -64,6 +49,26 @@ def run(dataset, config):
predict_duration=predict.duration)


def get_fedot_metrics(config):
metrics_mapping = dict(
acc='accuracy',
auc='roc_auc',
f1='f1',
logloss='neg_log_loss',
mae='mae',
mse='mse',
msle='msle',
r2='r2',
rmse='rmse',
)
scoring_metric = metrics_mapping.get(config.metric, None)

if scoring_metric is None:
log.warning(f"Performance metric {config.metric} not supported.")

return scoring_metric


def save_artifacts(automl, config):

artifacts = config.framework_params.get('_save_artifacts', [])
Expand Down
155 changes: 155 additions & 0 deletions frameworks/FEDOT/exec_ts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import logging
import os
from pathlib import Path
import numpy as np

from fedot.api.main import Fedot
from fedot.core.repository.tasks import Task, TaskTypesEnum, TsForecastingParams
from fedot.core.data.data import InputData
from fedot.core.repository.dataset_types import DataTypesEnum

from frameworks.shared.callee import call_run, result, output_subdir
from frameworks.shared.utils import Timer, load_timeseries_dataset

log = logging.getLogger(__name__)


def run(dataset, config):
log.info("\n**** FEDOT ****\n")

scoring_metric = get_fedot_metrics(config)

training_params = {"preset": "best_quality", "n_jobs": config.cores}
training_params.update({k: v for k, v in config.framework_params.items() if not k.startswith('_')})
n_jobs = training_params["n_jobs"]

log.info(f"Running FEDOT with a maximum time of {config.max_runtime_seconds}s on {n_jobs} cores, \
optimizing {scoring_metric}")
runtime_min = config.max_runtime_seconds / 60

task = Task(
TaskTypesEnum.ts_forecasting,
TsForecastingParams(forecast_length=dataset.forecast_horizon_in_steps)
)

train_df, test_df = load_timeseries_dataset(dataset)
id_column = dataset.id_column

log.info('Predicting on the test set.')
training_duration, predict_duration = 0, 0
models_count = 0
truth_only = test_df[dataset.target].values
predictions = []


for label, ts in train_df.groupby(id_column, sort=False):
train_series = ts[dataset.target].to_numpy()

for label in train_df[id_column].unique():
train_sub_df = train_df[train_df[id_column] == label].drop(columns=[id_column], axis=1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be very inefficient for large dataframes with many time series (scales as O(N^2)). A better option would be to use groupby, e.g.

for label, ts in train_df.groupby(id_column, sort=False):
    train_series = ts[dataset.target].to_numpy()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, tried that: imo using zip may be inappropriate here, so I would stick with a janky bad-asymptotic solution to ensure that the labels are matching
e6f19e7

train_series = np.array(train_sub_df[dataset.target])
train_input = InputData(
idx=train_sub_df.index.to_numpy(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code uses the range index [0, 1, 2, ..., n-1] instead of the timestamps of the time series. Should this be train_sub_df[timestamp_column] instead, or does FEDOT ignore the timestamps?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FEDOT ignores timestamps, so that idx should be fine

features=train_series,
target=train_series,
task=task,
data_type=DataTypesEnum.ts
)

test_sub_df = test_df[test_df[id_column] == label].drop(columns=[id_column], axis=1)
test_series = np.array(test_sub_df[dataset.target])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This contains the future values of the time series, I doubt that these values need to be fed to the model at predictions time.

For example, if training data contains time steps [1, 2, 3, ..., T] and the goal is to predict [T+1, ..., T+H], then based on your current code

  • train_series contains timesteps [1, 2, 3, ..., T]
  • test_series contains timesteps [T+1, ..., T+H]

My guess is that we need to pass train_series as input to both fit() and predict()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in essence predict and forecast are the same thing, but we could pass test_input with features from train_series and target from test_series to get the prediction horizon in a predict

changed to a clear forecast method
a357726

test_input = InputData(
idx=test_sub_df.index.to_numpy(),
features=train_series,
target=test_series,
task=task,
data_type=DataTypesEnum.ts
)

fedot = Fedot(
problem=TaskTypesEnum.ts_forecasting.value,
task_params=task.task_params,
timeout=runtime_min,
metric=scoring_metric,
seed=config.seed,
max_pipeline_fit_time=runtime_min / 10,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is / 10 necessary here?

Copy link
Author

@Lopa10ko Lopa10ko Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally speaking, this is a small safety measure to ensure that the training time of one pipeline is exactly within the total timeout. the classification and regression #563 uses the same empirical approach. it should be patched in the future

**training_params
)

with Timer() as training:
fedot.fit(train_input)
training_duration += training.duration

with Timer() as predict:
try:
prediction = fedot.predict(test_input)
except Exception as e:
log.info('Pipeline crashed. Using no-op forecasting')
prediction = np.full(len(test_series), train_series[-1])

predict_duration += predict.duration

predictions.append(prediction)
models_count += fedot.current_pipeline.length

save_artifacts(fedot, config)
return result(output_file=config.output_predictions_file,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary to return dataset.repeated_item_id and dataset.repeated_abs_seasonal_error as optional_columns in the result for MASE computation to work correctly (see https://github.com/openml/automlbenchmark/blob/master/frameworks/AutoGluon/exec_ts.py#L63C1-L67C1).
This is a rather ugly hack that is necessary to make history-dependent metrics like MASE compatible with the AMLB results API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

predictions=np.hstack(predictions),
truth=truth_only,
target_is_encoded=False,
models_count=models_count,
training_duration=training_duration,
predict_duration=predict_duration)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think it would be cleaner to remove the line training_duration, predict_duration = 0, 0 above and just return training_duration=training.duration, predict_duration=predict.duration here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we train the FEDOT model for each series (for each label), training.duration value isn't cumulative and will only reflect the time spent on the last iteration

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I didn't realize this was happening inside the loop over individual series



def get_fedot_metrics(config):
metrics_mapping = dict(
mape='mape',
smape='smape',
mase='mase',
mse='mse',
rmse='rmse',
mae='mae',
r2='r2',
)
scoring_metric = metrics_mapping.get(config.metric, None)

if scoring_metric is None:
log.warning(f"Performance metric {config.metric} not supported.")

return scoring_metric


def save_artifacts(automl, config):

artifacts = config.framework_params.get('_save_artifacts', [])
if 'models' in artifacts:
try:
models_dir = output_subdir('models', config)
models_file = os.path.join(models_dir, 'model.json')
automl.current_pipeline.save(models_file)
except Exception as e:
log.info(f"Error when saving 'models': {e}.", exc_info=True)

if 'info' in artifacts:
try:
info_dir = output_subdir("info", config)
if automl.history:
automl.history.save(os.path.join(info_dir, 'history.json'))
else:
log.info(f"There is no optimization history info to save.")
except Exception as e:
log.info(f"Error when saving info about optimisation history: {e}.", exc_info=True)

if 'leaderboard' in artifacts:
try:
leaderboard_dir = output_subdir("leaderboard", config)
if automl.history:
lb = automl.history.get_leaderboard()
Path(os.path.join(leaderboard_dir, "leaderboard.csv")).write_text(lb)
except Exception as e:
log.info(f"Error when saving 'leaderboard': {e}.", exc_info=True)


if __name__ == '__main__':
call_run(run)