Skip to content

Commit

Permalink
Merge branch 'master' into fix/unit8co#1101
Browse files Browse the repository at this point in the history
  • Loading branch information
hrzn committed Aug 9, 2022
2 parents c9914e3 + 5ceac68 commit fe9fbc4
Show file tree
Hide file tree
Showing 4 changed files with 390 additions and 28 deletions.
48 changes: 40 additions & 8 deletions darts/models/forecasting/arima.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
from statsmodels.tsa.arima.model import ARIMA as staARIMA

from darts.logging import get_logger
from darts.models.forecasting.forecasting_model import DualCovariatesForecastingModel
from darts.models.forecasting.forecasting_model import (
TransferableDualCovariatesForecastingModel,
)
from darts.timeseries import TimeSeries

logger = get_logger(__name__)


class ARIMA(DualCovariatesForecastingModel):
class ARIMA(TransferableDualCovariatesForecastingModel):
def __init__(
self,
p: int = 12,
Expand Down Expand Up @@ -66,11 +68,14 @@ def __str__(self):
return f"SARIMA{self.order}x{self.seasonal_order}"

def _fit(self, series: TimeSeries, future_covariates: Optional[TimeSeries] = None):

super()._fit(series, future_covariates)

# storing to restore the statsmodels model results object
self.training_historic_future_covariates = future_covariates

m = staARIMA(
self.training_series.values(),
exog=future_covariates.values() if future_covariates else None,
series.values(copy=False),
exog=future_covariates.values(copy=False) if future_covariates else None,
order=self.order,
seasonal_order=self.seasonal_order,
trend=self.trend,
Expand All @@ -82,6 +87,8 @@ def _fit(self, series: TimeSeries, future_covariates: Optional[TimeSeries] = Non
def _predict(
self,
n: int,
series: Optional[TimeSeries] = None,
historic_future_covariates: Optional[TimeSeries] = None,
future_covariates: Optional[TimeSeries] = None,
num_samples: int = 1,
) -> TimeSeries:
Expand All @@ -93,18 +100,43 @@ def _predict(
"your model."
)

super()._predict(n, future_covariates, num_samples)
super()._predict(
n, series, historic_future_covariates, future_covariates, num_samples
)

# updating statsmodels results object state with the new ts and covariates
if series is not None:
self.model = self.model.apply(
series.values(copy=False),
exog=historic_future_covariates.values(copy=False)
if historic_future_covariates
else None,
)

if num_samples == 1:
forecast = self.model.forecast(
steps=n, exog=future_covariates.values() if future_covariates else None
steps=n,
exog=future_covariates.values(copy=False)
if future_covariates
else None,
)
else:
forecast = self.model.simulate(
nsimulations=n,
repetitions=num_samples,
initial_state=self.model.states.predicted[-1, :],
exog=future_covariates.values() if future_covariates else None,
exog=future_covariates.values(copy=False)
if future_covariates
else None,
)

# restoring statsmodels results object state
if series is not None:
self.model = self.model.apply(
self._orig_training_series.values(copy=False),
exog=self.training_historic_future_covariates.values(copy=False)
if self.training_historic_future_covariates
else None,
)

return self._build_forecast_series(forecast)
Expand Down
145 changes: 140 additions & 5 deletions darts/models/forecasting/forecasting_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ class DualCovariatesForecastingModel(ForecastingModel, ABC):
Among other things, it lets Darts forecasting models wrap around statsmodels models
having a `future_covariates` parameter, which corresponds to future-known covariates.
All implementations have to implement the `fit()` and `predict()` methods defined below.
All implementations have to implement the `_fit()` and `_predict()` methods defined below.
"""

_expect_covariate = False
Expand Down Expand Up @@ -1137,6 +1137,7 @@ def predict(
n: int,
future_covariates: Optional[TimeSeries] = None,
num_samples: int = 1,
**kwargs,
) -> TimeSeries:
"""Forecasts values for `n` time steps after the end of the training series.
Expand All @@ -1159,8 +1160,7 @@ def predict(
TimeSeries, a single time series containing the `n` next points after then end of the training series.
"""

if future_covariates is None:
super().predict(n, num_samples)
super().predict(n, num_samples)

if self._expect_covariate and future_covariates is None:
raise_log(
Expand All @@ -1170,6 +1170,12 @@ def predict(
)
)

raise_if(
not self._expect_covariate and future_covariates is not None,
"The model has been trained without `future_covariates` variable, but the "
"`future_covariates` parameter provided to `predict()` is not None.",
)

if future_covariates is not None:
start = self.training_series.end_time() + self.training_series.freq

Expand All @@ -1194,13 +1200,13 @@ def predict(
]

raise_if_not(
len(future_covariates) == n and self._expect_covariate,
len(future_covariates) == n,
invalid_time_span_error,
logger,
)

return self._predict(
n, future_covariates=future_covariates, num_samples=num_samples
n, future_covariates=future_covariates, num_samples=num_samples, **kwargs
)

@abstractmethod
Expand Down Expand Up @@ -1234,3 +1240,132 @@ def _predict_wrapper(
return self.predict(
n, future_covariates=future_covariates, num_samples=num_samples
)


class TransferableDualCovariatesForecastingModel(DualCovariatesForecastingModel, ABC):
"""The base class for the forecasting models that are not global, but support future covariates, and can
additionally be applied to new data unrelated to the original series used for fitting the model. Currently,
all the derived classes wrap statsmodels models.
All implementations have to implement the `_fit()`, `_predict()` methods.
"""

def predict(
self,
n: int,
series: Optional[TimeSeries] = None,
future_covariates: Optional[TimeSeries] = None,
num_samples: int = 1,
**kwargs,
) -> TimeSeries:
"""If the `series` parameter is not set, forecasts values for `n` time steps after the end of the training
series. If some future covariates were specified during the training, they must also be specified here.
If the `series` parameter is set, forecasts values for `n` time steps after the end of the new target
series. If some future covariates were specified during the training, they must also be specified here.
Parameters
----------
n
Forecast horizon - the number of time steps after the end of the series for which to produce predictions.
series
Optionally, a new target series whose future values will be predicted. Defaults to `None`, meaning that the
model will forecast the future value of the training series.
future_covariates
The time series of future-known covariates which can be fed as input to the model. It must correspond to
the covariate time series that has been used with the :func:`fit()` method for training.
If `series` is not set, it must contain at least the next `n` time steps/indices after the end of the
training target series. If `series` is set, it must contain at least the time steps/indices corresponding
to the new target series (historic future covariates), plus the next `n` time steps/indices after the end.
num_samples
Number of times a prediction is sampled from a probabilistic model. Should be left set to 1
for deterministic models.
Returns
-------
TimeSeries, a single time series containing the `n` next points after then end of the training series.
"""

if self._expect_covariate and future_covariates is None:
raise_log(
ValueError(
"The model has been trained with `future_covariates` variable. Some matching "
"`future_covariates` variables have to be provided to `predict()`."
)
)

historic_future_covariates = None

if series is not None and future_covariates:
raise_if_not(
future_covariates.start_time() <= series.start_time()
and future_covariates.end_time() >= series.end_time() + n * series.freq,
"The provided `future_covariates` related to the new target series must contain at least the same time"
"steps/indices as the target `series` + `n`.",
logger,
)
# splitting the future covariates
(
historic_future_covariates,
future_covariates,
) = future_covariates.split_after(series.end_time())

# in case future covariate have more values on the left end side that we don't need
if not series.has_same_time_as(historic_future_covariates):
historic_future_covariates = historic_future_covariates.slice_intersect(
series
)

# DualCovariatesForecastingModel performs some checks on self.training_series. We temporary replace that with
# the new ts
if series is not None:
self._orig_training_series = self.training_series
self.training_series = series

result = super().predict(
n=n,
series=series,
historic_future_covariates=historic_future_covariates,
future_covariates=future_covariates,
num_samples=num_samples,
**kwargs,
)

# restoring the original training ts
if series is not None:
self.training_series = self._orig_training_series

return result

@abstractmethod
def _predict(
self,
n: int,
series: Optional[TimeSeries] = None,
historic_future_covariates: Optional[TimeSeries] = None,
future_covariates: Optional[TimeSeries] = None,
num_samples: int = 1,
) -> TimeSeries:
"""Forecasts values for a certain number of time steps after the end of the series.
TransferableDualCovariatesForecastingModel must implement the predict logic in this method.
"""
pass

def _predict_wrapper(
self,
n: int,
series: TimeSeries,
past_covariates: Optional[TimeSeries],
future_covariates: Optional[TimeSeries],
num_samples: int,
) -> TimeSeries:
return self.predict(
n=n,
series=series,
future_covariates=future_covariates,
num_samples=num_samples,
)

def _supports_non_retrainable_historical_forecasts(self) -> bool:
return True

0 comments on commit fe9fbc4

Please sign in to comment.