diff --git a/python/chronos/src/bigdl/chronos/autots/autotsestimator.py b/python/chronos/src/bigdl/chronos/autots/autotsestimator.py index 9f07a742dc6..9bd0e561c23 100644 --- a/python/chronos/src/bigdl/chronos/autots/autotsestimator.py +++ b/python/chronos/src/bigdl/chronos/autots/autotsestimator.py @@ -256,7 +256,14 @@ def fit(self, scheduler_params=scheduler_params ) - return TSPipeline(best_model=self._get_best_automl_model(), + best_model = self._get_best_automl_model() + + return TSPipeline(model=best_model.model, + loss=best_model.criterion, + optimizer=best_model.optimizer, + model_creator=best_model.model_creator, + loss_creator=best_model.loss_creator, + optimizer_creator=best_model.optimizer_creator, best_config=self.get_best_config(), scaler=self._scaler, scaler_index=self._scaler_index) diff --git a/python/chronos/src/bigdl/chronos/autots/tspipeline.py b/python/chronos/src/bigdl/chronos/autots/tspipeline.py index b8988dda4cf..8c536adba34 100644 --- a/python/chronos/src/bigdl/chronos/autots/tspipeline.py +++ b/python/chronos/src/bigdl/chronos/autots/tspipeline.py @@ -15,9 +15,12 @@ # import os +import torch +import types from bigdl.chronos.data import TSDataset -from bigdl.orca.automl.metrics import Evaluator +from bigdl.chronos.metric.forecast_metrics import Evaluator +from bigdl.nano.pytorch.trainer import Trainer DEFAULT_MODEL_INIT_DIR = "model_init.ckpt" DEFAULT_BEST_MODEL_DIR = "best_model.ckpt" @@ -34,15 +37,35 @@ class TSPipeline: 2. Deploy the model to their scenario. (save, load) ''' - def __init__(self, best_model, best_config, **kwargs): - self._best_model = best_model + def __init__(self, + model, + loss, + optimizer, + model_creator, + loss_creator, + optimizer_creator, + best_config, + **kwargs): + + # for runtime fit/predict/evaluate + self._best_model = Trainer.compile(model=model, + loss=loss, + optimizer=optimizer, + onnx=True) self._best_config = best_config + + # for data postprocessing self._scaler = None self._scaler_index = None if "scaler" in kwargs.keys(): self._scaler = kwargs["scaler"] self._scaler_index = kwargs["scaler_index"] + # for save/load + self.model_creator = model_creator + self.loss_creator = loss_creator + self.optimizer_creator = optimizer_creator + def evaluate(self, data, metrics=['mse'], multioutput="uniform_average", batch_size=32): ''' Evaluate the time series pipeline. @@ -60,14 +83,15 @@ def evaluate(self, data, metrics=['mse'], multioutput="uniform_average", batch_s ''' # predict x, y = self._tsdataset_to_numpy(data, is_predict=False) - yhat = self._best_model.predict(x, batch_size=batch_size) + yhat = self._best_model.inference(torch.from_numpy(x), + batch_size=batch_size, + backend=None).numpy() yhat = self._tsdataset_unscale(yhat) # unscale y = self._tsdataset_unscale(y) # evaluate - eval_result = [Evaluator.evaluate(m, y_true=y, y_pred=yhat, - multioutput=multioutput) - for m in metrics] + aggregate = 'mean' if multioutput == 'uniform_average' else None + eval_result = Evaluator.evaluate(metrics, y, yhat, aggregate=aggregate) return eval_result def evaluate_with_onnx(self, data, metrics=['mse'], multioutput="uniform_average", @@ -88,14 +112,15 @@ def evaluate_with_onnx(self, data, metrics=['mse'], multioutput="uniform_average ''' # predict with onnx x, y = self._tsdataset_to_numpy(data, is_predict=False) - yhat = self._best_model.predict_with_onnx(x, batch_size=batch_size) + yhat = self._best_model.inference(x, + batch_size=batch_size, + backend="onnx") yhat = self._tsdataset_unscale(yhat) # unscale y = self._tsdataset_unscale(y) # evaluate - eval_result = [Evaluator.evaluate(m, y_true=y, y_pred=yhat, - multioutput=multioutput) - for m in metrics] + aggregate = 'mean' if multioutput == 'uniform_average' else None + eval_result = Evaluator.evaluate(metrics, y, yhat, aggregate=aggregate) return eval_result def predict(self, data, batch_size=32): @@ -110,7 +135,9 @@ def predict(self, data, batch_size=32): effective when data is a TSDataset. The values defaults to 32. ''' x, _ = self._tsdataset_to_numpy(data, is_predict=True) - yhat = self._best_model.predict(x, batch_size=batch_size) + yhat = self._best_model.inference(torch.from_numpy(x), + batch_size=batch_size, + backend=None) yhat = self._tsdataset_unscale(yhat) return yhat @@ -126,29 +153,49 @@ def predict_with_onnx(self, data, batch_size=32): effective when data is a TSDataset. The values defaults to 32. ''' x, _ = self._tsdataset_to_numpy(data, is_predict=True) - yhat = self._best_model.predict_with_onnx(x, batch_size=batch_size) + yhat = self._best_model.inference(x, + batch_size=batch_size, + backend="onnx") yhat = self._tsdataset_unscale(yhat) return yhat - def fit(self, data, validation_data=None, epochs=1, metric="mse"): + def fit(self, + data, + validation_data=None, + epochs=1, + batch_size=None, + **kwargs): ''' Incremental fitting - :param data: data can be a TSDataset or data creator(will be supported). - the TSDataset should follow the same operations as the training - TSDataset used in AutoTSEstimator.fit. + :param data: The data support following formats: + + | 1. data creator (TO BE SUPPORTED): + | a function that takes a config dictionary as parameter and + | returns a PyTorch DataLoader. + | + | 2. a bigdl.chronos.data.TSDataset: + | the TSDataset should follow the same operations as the training + | TSDataset used in `AutoTSEstimator.fit`. + :param validation_data: validation data, same format as data. :param epochs: incremental fitting epoch. The value defaults to 1. :param metric: evaluate metric. + :param batch_size: batch size, defaults to None, which takes the searched best batch_size. + :param **kwargs: args to be passed to bigdl-nano trainer. ''' - x, y = self._tsdataset_to_numpy(data, is_predict=False) - if validation_data is None: - x_val, y_val = x, y - else: - x_val, y_val = self._tsdataset_to_numpy(validation_data, is_predict=False) + train_loader = None + vaild_loader = None + if batch_size is None: + batch_size = self._best_config["batch_size"] + train_loader = self._tsdataset_to_loader(data, batch_size=batch_size) + if validation_data is not None: + vaild_loader = self._tsdataset_to_loader(validation_data, batch_size=batch_size) - res = self._best_model.fit_eval(data=(x, y), validation_data=(x_val, y_val), metric=metric) - return res + self.trainer = Trainer(max_epochs=epochs, **kwargs) + self.trainer.fit(self._best_model, + train_dataloaders=train_loader, + val_dataloaders=vaild_loader) def save(self, file_path): ''' @@ -163,9 +210,9 @@ def save(self, file_path): model_path = os.path.join(file_path, DEFAULT_BEST_MODEL_DIR) data_process_path = os.path.join(file_path, DEFAULT_DATA_PROCESS_DIR) best_config_path = os.path.join(file_path, DEFAULT_BEST_CONFIG_DIR) - model_init = {"model_creator": self._best_model.model_creator, - "optimizer_creator": self._best_model.optimizer_creator, - "loss_creator": self._best_model.loss_creator} + model_init = {"model_creator": self.model_creator, + "optimizer_creator": self.optimizer_creator, + "loss_creator": self.loss_creator} data_process = {"scaler": self._scaler, "scaler_index": self._scaler_index} with open(model_init_path, "wb") as f: @@ -174,7 +221,8 @@ def save(self, file_path): pickle.dump(data_process, f) with open(best_config_path, "wb") as f: pickle.dump(self._best_config, f) - self._best_model.save(model_path) + # self._best_model.save(model_path) + torch.save(self._best_model.model.state_dict(), model_path) @staticmethod def load(file_path): @@ -194,21 +242,53 @@ def load(file_path): data_process = pickle.load(f) with open(best_config_path, "rb") as f: best_config = pickle.load(f) - from bigdl.orca.automl.model.base_pytorch_model import PytorchBaseModel - best_model = PytorchBaseModel(**model_init) - best_model.restore(model_path) - return TSPipeline(best_model, best_config, **data_process) - def _tsdataset_to_numpy(self, data, is_predict=False): - if isinstance(data, TSDataset): - lookback = self._best_config["past_seq_len"] - horizon = 0 if is_predict else self._best_config["future_seq_len"] - selected_features = self._best_config["selected_features"] - data.roll(lookback=lookback, horizon=horizon, feature_col=selected_features) - x, y = data.to_numpy() + model_creator = model_init["model_creator"] + optimizer_creator = model_init["optimizer_creator"] + loss_creator = model_init["loss_creator"] + + model = model_creator(best_config) + model.load_state_dict(torch.load(model_path)) + + if isinstance(optimizer_creator, types.FunctionType): + optimizer = optimizer_creator(model, best_config) + else: + optimizer = optimizer_creator(model.parameters(), + lr=best_config.get('lr', 0.001)) + + if isinstance(loss_creator, torch.nn.modules.loss._Loss): + loss = loss_creator else: - raise NotImplementedError("Data creator has not been supported now.") - return x, y + loss = loss_creator(best_config) + + return TSPipeline(model=model, + loss=loss, + optimizer=optimizer, + model_creator=model_creator, + loss_creator=loss_creator, + optimizer_creator=optimizer_creator, + best_config=best_config, + **data_process) + + def _tsdataset_to_loader(self, data, is_predict=False, batch_size=32): + lookback = self._best_config["past_seq_len"] + horizon = 0 if is_predict else self._best_config["future_seq_len"] + selected_features = self._best_config["selected_features"] + data_loader = data.to_torch_data_loader(batch_size=batch_size, + roll=True, + lookback=lookback, + horizon=horizon, + feature_col=selected_features) + return data_loader + + def _tsdataset_to_numpy(self, data, is_predict=False): + lookback = self._best_config["past_seq_len"] + horizon = 0 if is_predict else self._best_config["future_seq_len"] + selected_features = self._best_config["selected_features"] + data.roll(lookback=lookback, + horizon=horizon, + feature_col=selected_features) + return data.to_numpy() def _tsdataset_unscale(self, y): if self._scaler: