/
deepar.py
179 lines (155 loc) · 5.55 KB
/
deepar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
import pandas as pd
from etna import SETTINGS
from etna.datasets.tsdataset import TSDataset
from etna.loggers import tslogger
from etna.models.base import Model
from etna.models.base import log_decorator
from etna.transforms import PytorchForecastingTransform
if SETTINGS.torch_required:
import pytorch_lightning as pl
from pytorch_forecasting.data import TimeSeriesDataSet
from pytorch_forecasting.models import DeepAR
from pytorch_lightning import LightningModule
class DeepARModel(Model):
"""Wrapper for :py:class:`pytorch_forecasting.models.deepar.DeepAR`.
Notes
-----
We save :py:class:`pytorch_forecasting.data.timeseries.TimeSeriesDataSet` in instance to use it in the model.
It`s not right pattern of using Transforms and TSDataset.
"""
def __init__(
self,
batch_size: int = 64,
context_length: Optional[int] = None,
max_epochs: int = 10,
gpus: Union[int, List[int]] = 0,
gradient_clip_val: float = 0.1,
learning_rate: Optional[List[float]] = None,
cell_type: str = "LSTM",
hidden_size: int = 10,
rnn_layers: int = 2,
dropout: float = 0.1,
trainer_kwargs: Optional[Dict[str, Any]] = None,
):
"""
Initialize DeepAR wrapper.
Parameters
----------
batch_size:
Batch size.
context_length:
Max encoder length, if None max encoder length is equal to 2 horizons.
max_epochs:
Max epochs.
gpus:
0 - is CPU, or [n_{i}] - to choose n_{i} GPU from cluster.
gradient_clip_val:
Clipping by norm is using, choose 0 to not clip.
learning_rate:
Learning rate.
cell_type:
One of 'LSTM', 'GRU'.
hidden_size:
Hidden size of network which can range from 8 to 512.
rnn_layers:
Number of LSTM layers.
dropout:
Dropout rate.
trainer_kwargs:
Additional arguments for pytorch_lightning Trainer.
"""
self.max_epochs = max_epochs
self.gpus = gpus
self.gradient_clip_val = gradient_clip_val
self.learning_rate = learning_rate if learning_rate is not None else [0.001]
self.batch_size = batch_size
self.context_length = context_length
self.cell_type = cell_type
self.hidden_size = hidden_size
self.rnn_layers = rnn_layers
self.dropout = dropout
self.trainer_kwargs = trainer_kwargs if trainer_kwargs is not None else dict()
self.model: Optional[Union[LightningModule, DeepAR]] = None
self.trainer: Optional[pl.Trainer] = None
def _from_dataset(self, ts_dataset: TimeSeriesDataSet) -> LightningModule:
"""
Construct DeepAR.
Returns
-------
DeepAR
Class instance.
"""
return DeepAR.from_dataset(
ts_dataset,
learning_rate=self.learning_rate,
cell_type=self.cell_type,
hidden_size=self.hidden_size,
rnn_layers=self.rnn_layers,
dropout=self.dropout,
)
@staticmethod
def _get_pf_transform(ts: TSDataset) -> PytorchForecastingTransform:
"""Get PytorchForecastingTransform from ts.transforms or raise exception if not found."""
if ts.transforms is not None and isinstance(ts.transforms[-1], PytorchForecastingTransform):
return ts.transforms[-1]
else:
raise ValueError(
"Not valid usage of transforms, please add PytorchForecastingTransform at the end of transforms"
)
@log_decorator
def fit(self, ts: TSDataset) -> "DeepARModel":
"""
Fit model.
Parameters
----------
ts:
TSDataset to fit.
Returns
-------
DeepARModel
"""
pf_transform = self._get_pf_transform(ts)
self.model = self._from_dataset(pf_transform.pf_dataset_train)
trainer_kwargs = dict(
logger=tslogger.pl_loggers,
max_epochs=self.max_epochs,
gpus=self.gpus,
checkpoint_callback=False,
gradient_clip_val=self.gradient_clip_val,
)
trainer_kwargs.update(self.trainer_kwargs)
self.trainer = pl.Trainer(**trainer_kwargs)
train_dataloader = pf_transform.pf_dataset_train.to_dataloader(train=True, batch_size=self.batch_size)
self.trainer.fit(self.model, train_dataloader)
return self
@log_decorator
def forecast(self, ts: TSDataset) -> TSDataset:
"""
Predict future.
Parameters
----------
ts:
TSDataset to forecast.
Returns
-------
TSDataset
TSDataset with predictions.
"""
pf_transform = self._get_pf_transform(ts)
if pf_transform.pf_dataset_predict is None:
raise ValueError(
"The future is not generated! Generate future using TSDataset make_future before calling forecast method!"
)
prediction_dataloader = pf_transform.pf_dataset_predict.to_dataloader(
train=False, batch_size=self.batch_size * 2
)
predicts = self.model.predict(prediction_dataloader).numpy() # type: ignore
# shape (segments, encoder_length)
ts.loc[:, pd.IndexSlice[:, "target"]] = predicts.T[-len(ts.df) :]
ts.inverse_transform()
return ts