Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion python/chronos/src/bigdl/chronos/autots/autotsestimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,14 @@ def fit(self,
scheduler_params=scheduler_params
)

return TSPipeline(best_model=self._get_best_automl_model(),
best_model = self._get_best_automl_model()

return TSPipeline(model=best_model.model,
loss=best_model.criterion,
optimizer=best_model.optimizer,
model_creator=best_model.model_creator,
loss_creator=best_model.loss_creator,
optimizer_creator=best_model.optimizer_creator,
best_config=self.get_best_config(),
scaler=self._scaler,
scaler_index=self._scaler_index)
Expand Down
162 changes: 121 additions & 41 deletions python/chronos/src/bigdl/chronos/autots/tspipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
#

import os
import torch
import types

from bigdl.chronos.data import TSDataset
from bigdl.orca.automl.metrics import Evaluator
from bigdl.chronos.metric.forecast_metrics import Evaluator
from bigdl.nano.pytorch.trainer import Trainer

DEFAULT_MODEL_INIT_DIR = "model_init.ckpt"
DEFAULT_BEST_MODEL_DIR = "best_model.ckpt"
Expand All @@ -34,15 +37,35 @@ class TSPipeline:

2. Deploy the model to their scenario. (save, load)
'''
def __init__(self, best_model, best_config, **kwargs):
self._best_model = best_model
def __init__(self,
model,
loss,
optimizer,
model_creator,
loss_creator,
optimizer_creator,
best_config,
**kwargs):

# for runtime fit/predict/evaluate
self._best_model = Trainer.compile(model=model,
loss=loss,
optimizer=optimizer,
onnx=True)
self._best_config = best_config

# for data postprocessing
self._scaler = None
self._scaler_index = None
if "scaler" in kwargs.keys():
self._scaler = kwargs["scaler"]
self._scaler_index = kwargs["scaler_index"]

# for save/load
self.model_creator = model_creator
self.loss_creator = loss_creator
self.optimizer_creator = optimizer_creator

def evaluate(self, data, metrics=['mse'], multioutput="uniform_average", batch_size=32):
'''
Evaluate the time series pipeline.
Expand All @@ -60,14 +83,15 @@ def evaluate(self, data, metrics=['mse'], multioutput="uniform_average", batch_s
'''
# predict
x, y = self._tsdataset_to_numpy(data, is_predict=False)
yhat = self._best_model.predict(x, batch_size=batch_size)
yhat = self._best_model.inference(torch.from_numpy(x),
batch_size=batch_size,
backend=None).numpy()
yhat = self._tsdataset_unscale(yhat)
# unscale
y = self._tsdataset_unscale(y)
# evaluate
eval_result = [Evaluator.evaluate(m, y_true=y, y_pred=yhat,
multioutput=multioutput)
for m in metrics]
aggregate = 'mean' if multioutput == 'uniform_average' else None
eval_result = Evaluator.evaluate(metrics, y, yhat, aggregate=aggregate)
return eval_result

def evaluate_with_onnx(self, data, metrics=['mse'], multioutput="uniform_average",
Expand All @@ -88,14 +112,15 @@ def evaluate_with_onnx(self, data, metrics=['mse'], multioutput="uniform_average
'''
# predict with onnx
x, y = self._tsdataset_to_numpy(data, is_predict=False)
yhat = self._best_model.predict_with_onnx(x, batch_size=batch_size)
yhat = self._best_model.inference(x,
batch_size=batch_size,
backend="onnx")
yhat = self._tsdataset_unscale(yhat)
# unscale
y = self._tsdataset_unscale(y)
# evaluate
eval_result = [Evaluator.evaluate(m, y_true=y, y_pred=yhat,
multioutput=multioutput)
for m in metrics]
aggregate = 'mean' if multioutput == 'uniform_average' else None
eval_result = Evaluator.evaluate(metrics, y, yhat, aggregate=aggregate)
return eval_result

def predict(self, data, batch_size=32):
Expand All @@ -110,7 +135,9 @@ def predict(self, data, batch_size=32):
effective when data is a TSDataset. The values defaults to 32.
'''
x, _ = self._tsdataset_to_numpy(data, is_predict=True)
yhat = self._best_model.predict(x, batch_size=batch_size)
yhat = self._best_model.inference(torch.from_numpy(x),
batch_size=batch_size,
backend=None)
yhat = self._tsdataset_unscale(yhat)
return yhat

Expand All @@ -126,29 +153,49 @@ def predict_with_onnx(self, data, batch_size=32):
effective when data is a TSDataset. The values defaults to 32.
'''
x, _ = self._tsdataset_to_numpy(data, is_predict=True)
yhat = self._best_model.predict_with_onnx(x, batch_size=batch_size)
yhat = self._best_model.inference(x,
batch_size=batch_size,
backend="onnx")
yhat = self._tsdataset_unscale(yhat)
return yhat

def fit(self, data, validation_data=None, epochs=1, metric="mse"):
def fit(self,
data,
validation_data=None,
epochs=1,
batch_size=None,
**kwargs):
'''
Incremental fitting

:param data: data can be a TSDataset or data creator(will be supported).
the TSDataset should follow the same operations as the training
TSDataset used in AutoTSEstimator.fit.
:param data: The data support following formats:

| 1. data creator (TO BE SUPPORTED):
| a function that takes a config dictionary as parameter and
| returns a PyTorch DataLoader.
|
| 2. a bigdl.chronos.data.TSDataset:
| the TSDataset should follow the same operations as the training
| TSDataset used in `AutoTSEstimator.fit`.

:param validation_data: validation data, same format as data.
:param epochs: incremental fitting epoch. The value defaults to 1.
:param metric: evaluate metric.
:param batch_size: batch size, defaults to None, which takes the searched best batch_size.
:param **kwargs: args to be passed to bigdl-nano trainer.
'''
x, y = self._tsdataset_to_numpy(data, is_predict=False)
if validation_data is None:
x_val, y_val = x, y
else:
x_val, y_val = self._tsdataset_to_numpy(validation_data, is_predict=False)
train_loader = None
vaild_loader = None
if batch_size is None:
batch_size = self._best_config["batch_size"]
train_loader = self._tsdataset_to_loader(data, batch_size=batch_size)
if validation_data is not None:
vaild_loader = self._tsdataset_to_loader(validation_data, batch_size=batch_size)

res = self._best_model.fit_eval(data=(x, y), validation_data=(x_val, y_val), metric=metric)
return res
self.trainer = Trainer(max_epochs=epochs, **kwargs)
self.trainer.fit(self._best_model,
train_dataloaders=train_loader,
val_dataloaders=vaild_loader)

def save(self, file_path):
'''
Expand All @@ -163,9 +210,9 @@ def save(self, file_path):
model_path = os.path.join(file_path, DEFAULT_BEST_MODEL_DIR)
data_process_path = os.path.join(file_path, DEFAULT_DATA_PROCESS_DIR)
best_config_path = os.path.join(file_path, DEFAULT_BEST_CONFIG_DIR)
model_init = {"model_creator": self._best_model.model_creator,
"optimizer_creator": self._best_model.optimizer_creator,
"loss_creator": self._best_model.loss_creator}
model_init = {"model_creator": self.model_creator,
"optimizer_creator": self.optimizer_creator,
"loss_creator": self.loss_creator}
data_process = {"scaler": self._scaler,
"scaler_index": self._scaler_index}
with open(model_init_path, "wb") as f:
Expand All @@ -174,7 +221,8 @@ def save(self, file_path):
pickle.dump(data_process, f)
with open(best_config_path, "wb") as f:
pickle.dump(self._best_config, f)
self._best_model.save(model_path)
# self._best_model.save(model_path)
torch.save(self._best_model.model.state_dict(), model_path)

@staticmethod
def load(file_path):
Expand All @@ -194,21 +242,53 @@ def load(file_path):
data_process = pickle.load(f)
with open(best_config_path, "rb") as f:
best_config = pickle.load(f)
from bigdl.orca.automl.model.base_pytorch_model import PytorchBaseModel
best_model = PytorchBaseModel(**model_init)
best_model.restore(model_path)
return TSPipeline(best_model, best_config, **data_process)

def _tsdataset_to_numpy(self, data, is_predict=False):
if isinstance(data, TSDataset):
lookback = self._best_config["past_seq_len"]
horizon = 0 if is_predict else self._best_config["future_seq_len"]
selected_features = self._best_config["selected_features"]
data.roll(lookback=lookback, horizon=horizon, feature_col=selected_features)
x, y = data.to_numpy()
model_creator = model_init["model_creator"]
optimizer_creator = model_init["optimizer_creator"]
loss_creator = model_init["loss_creator"]

model = model_creator(best_config)
model.load_state_dict(torch.load(model_path))

if isinstance(optimizer_creator, types.FunctionType):
optimizer = optimizer_creator(model, best_config)
else:
optimizer = optimizer_creator(model.parameters(),
lr=best_config.get('lr', 0.001))

if isinstance(loss_creator, torch.nn.modules.loss._Loss):
loss = loss_creator
else:
raise NotImplementedError("Data creator has not been supported now.")
return x, y
loss = loss_creator(best_config)

return TSPipeline(model=model,
loss=loss,
optimizer=optimizer,
model_creator=model_creator,
loss_creator=loss_creator,
optimizer_creator=optimizer_creator,
best_config=best_config,
**data_process)

def _tsdataset_to_loader(self, data, is_predict=False, batch_size=32):
lookback = self._best_config["past_seq_len"]
horizon = 0 if is_predict else self._best_config["future_seq_len"]
selected_features = self._best_config["selected_features"]
data_loader = data.to_torch_data_loader(batch_size=batch_size,
roll=True,
lookback=lookback,
horizon=horizon,
feature_col=selected_features)
return data_loader

def _tsdataset_to_numpy(self, data, is_predict=False):
lookback = self._best_config["past_seq_len"]
horizon = 0 if is_predict else self._best_config["future_seq_len"]
selected_features = self._best_config["selected_features"]
data.roll(lookback=lookback,
horizon=horizon,
feature_col=selected_features)
return data.to_numpy()

def _tsdataset_unscale(self, y):
if self._scaler:
Expand Down