/
forecast.py
546 lines (469 loc) · 23.9 KB
/
forecast.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
# Copyright 2018 The TensorFlow Probability Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Methods for forecasting in StructuralTimeSeries models."""
# Dependency imports
import tensorflow.compat.v2 as tf
from tensorflow_probability.python.distributions import categorical
from tensorflow_probability.python.distributions import independent
from tensorflow_probability.python.distributions import mixture_same_family
from tensorflow_probability.python.distributions import mvn_tril
from tensorflow_probability.python.experimental import util as tfe_util
from tensorflow_probability.python.internal import distribution_util as dist_util
from tensorflow_probability.python.internal import prefer_static as ps
from tensorflow_probability.python.sts.internal import util as sts_util
from tensorflow.python.util import deprecation # pylint: disable=g-direct-tensorflow-import
def _prefer_static_event_ndims(distribution):
if distribution.event_shape.ndims is not None:
return distribution.event_shape.ndims
else:
return tf.size(distribution.event_shape_tensor())
@deprecation.deprecated_arg_values(
'2021-12-31',
'`Predictive distributions returned by`tfp.sts.one_step_predictive` will '
'soon compute per-timestep probabilities (treating timesteps as part of '
'the batch shape) instead of a single probability for an entire series '
'(the current approach, in which timesteps are treated as event shape). '
'Please update your code to pass `timesteps_are_event_shape=False` (this '
'will soon be the default) and to explicitly sum over the per-timestep log '
'probabilities if this is required.',
timesteps_are_event_shape=True)
def one_step_predictive(model, observed_time_series, parameter_samples,
timesteps_are_event_shape=True):
"""Compute one-step-ahead predictive distributions for all timesteps.
Given samples from the posterior over parameters, return the predictive
distribution over observations at each time `T`, given observations up
through time `T-1`.
Args:
model: An instance of `StructuralTimeSeries` representing a
time-series model. This represents a joint distribution over
time-series and their parameters with batch shape `[b1, ..., bN]`.
observed_time_series: `float` `Tensor` of shape
`concat([sample_shape, model.batch_shape, [num_timesteps, 1]])` where
`sample_shape` corresponds to i.i.d. observations, and the trailing `[1]`
dimension may (optionally) be omitted if `num_timesteps > 1`. Any `NaN`s
are interpreted as missing observations; missingness may be also be
explicitly specified by passing a `tfp.sts.MaskedTimeSeries` instance.
parameter_samples: Python `list` of `Tensors` representing posterior samples
of model parameters, with shapes `[concat([[num_posterior_draws],
param.prior.batch_shape, param.prior.event_shape]) for param in
model.parameters]`. This may optionally also be a map (Python `dict`) of
parameter names to `Tensor` values.
timesteps_are_event_shape: Deprecated, for backwards compatibility only.
If `False`, the predictive distribution will return per-timestep
probabilities
Default value: `True`.
Returns:
predictive_dist: a `tfd.MixtureSameFamily` instance with event shape
`[num_timesteps] if timesteps_are_event_shape else []` and
batch shape `concat([sample_shape, model.batch_shape,
[] if timesteps_are_event_shape else [num_timesteps])`, with
`num_posterior_draws` mixture components. The `t`th step represents the
forecast distribution `p(observed_time_series[t] |
observed_time_series[0:t-1], parameter_samples)`.
#### Examples
Suppose we've built a model and fit it to data using HMC:
```python
day_of_week = tfp.sts.Seasonal(
num_seasons=7,
observed_time_series=observed_time_series,
name='day_of_week')
local_linear_trend = tfp.sts.LocalLinearTrend(
observed_time_series=observed_time_series,
name='local_linear_trend')
model = tfp.sts.Sum(components=[day_of_week, local_linear_trend],
observed_time_series=observed_time_series)
samples, kernel_results = tfp.sts.fit_with_hmc(model, observed_time_series)
```
Passing the posterior samples into `one_step_predictive`, we construct a
one-step-ahead predictive distribution:
```python
one_step_predictive_dist = tfp.sts.one_step_predictive(
model, observed_time_series, parameter_samples=samples)
predictive_means = one_step_predictive_dist.mean()
predictive_scales = one_step_predictive_dist.stddev()
```
If using variational inference instead of HMC, we'd construct a forecast using
samples from the variational posterior:
```python
surrogate_posterior = tfp.sts.build_factored_surrogate_posterior(
model=model)
loss_curve = tfp.vi.fit_surrogate_posterior(
target_log_prob_fn=model.joint_distribution(observed_time_series).log_prob,
surrogate_posterior=surrogate_posterior,
optimizer=tf.optimizers.Adam(learning_rate=0.1),
num_steps=200)
samples = surrogate_posterior.sample(30)
one_step_predictive_dist = tfp.sts.one_step_predictive(
model, observed_time_series, parameter_samples=samples)
```
We can visualize the forecast by plotting:
```python
from matplotlib import pylab as plt
def plot_one_step_predictive(observed_time_series,
forecast_mean,
forecast_scale):
plt.figure(figsize=(12, 6))
num_timesteps = forecast_mean.shape[-1]
c1, c2 = (0.12, 0.47, 0.71), (1.0, 0.5, 0.05)
plt.plot(observed_time_series, label="observed time series", color=c1)
plt.plot(forecast_mean, label="one-step prediction", color=c2)
plt.fill_between(np.arange(num_timesteps),
forecast_mean - 2 * forecast_scale,
forecast_mean + 2 * forecast_scale,
alpha=0.1, color=c2)
plt.legend()
plot_one_step_predictive(observed_time_series,
forecast_mean=predictive_means,
forecast_scale=predictive_scales)
```
To detect anomalous timesteps, we check whether the observed value at each
step is within a 95% predictive interval, i.e., two standard deviations from
the mean:
```python
z_scores = ((observed_time_series[..., 1:] - predictive_means[..., :-1])
/ predictive_scales[..., :-1])
anomalous_timesteps = tf.boolean_mask(
tf.range(1, num_timesteps),
tf.abs(z_scores) > 2.0)
```
"""
with tf.name_scope('one_step_predictive'):
[
observed_time_series,
is_missing
] = sts_util.canonicalize_observed_time_series_with_mask(
observed_time_series)
# Run filtering over the training timesteps to extract the
# predictive means and variances.
num_timesteps = ps.dimension_size(observed_time_series, -2)
lgssm = tfe_util.JitPublicMethods(
model.make_state_space_model(num_timesteps=num_timesteps,
param_vals=parameter_samples),
trace_only=True) # Avoid eager overhead w/o introducing XLA dependence.
(_, _, _, _, _, observation_means, observation_covs
) = lgssm.forward_filter(observed_time_series, mask=is_missing)
# Squeeze dims to convert from LGSSM's event shape `[num_timesteps, 1]`
# to a scalar time series.
predictive_dist = sts_util.mix_over_posterior_draws(
means=observation_means[..., 0],
variances=observation_covs[..., 0, 0])
if timesteps_are_event_shape:
predictive_dist = independent.Independent(
predictive_dist, reinterpreted_batch_ndims=1)
return predictive_dist
def forecast(model,
observed_time_series,
parameter_samples,
num_steps_forecast,
include_observation_noise=True):
"""Construct predictive distribution over future observations.
Given samples from the posterior over parameters, return the predictive
distribution over future observations for num_steps_forecast timesteps.
Args:
model: An instance of `StructuralTimeSeries` representing a
time-series model. This represents a joint distribution over
time-series and their parameters with batch shape `[b1, ..., bN]`.
observed_time_series: `float` `Tensor` of shape
`concat([sample_shape, model.batch_shape, [num_timesteps, 1]])` where
`sample_shape` corresponds to i.i.d. observations, and the trailing `[1]`
dimension may (optionally) be omitted if `num_timesteps > 1`. Any `NaN`s
are interpreted as missing observations; missingness may be also be
explicitly specified by passing a `tfp.sts.MaskedTimeSeries` instance.
parameter_samples: Python `list` of `Tensors` representing posterior samples
of model parameters, with shapes `[concat([[num_posterior_draws],
param.prior.batch_shape, param.prior.event_shape]) for param in
model.parameters]`. This may optionally also be a map (Python `dict`) of
parameter names to `Tensor` values.
num_steps_forecast: scalar `int` `Tensor` number of steps to forecast.
include_observation_noise: Python `bool` indicating whether the forecast
distribution should include uncertainty from observation noise. If `True`,
the forecast is over future observations, if `False`, the forecast is over
future values of the latent noise-free time series.
Default value: `True`.
Returns:
forecast_dist: a `tfd.MixtureSameFamily` instance with event shape
[num_steps_forecast, 1] and batch shape
`concat([sample_shape, model.batch_shape])`, with `num_posterior_draws`
mixture components.
#### Examples
Suppose we've built a model and fit it to data using HMC:
```python
day_of_week = tfp.sts.Seasonal(
num_seasons=7,
observed_time_series=observed_time_series,
name='day_of_week')
local_linear_trend = tfp.sts.LocalLinearTrend(
observed_time_series=observed_time_series,
name='local_linear_trend')
model = tfp.sts.Sum(components=[day_of_week, local_linear_trend],
observed_time_series=observed_time_series)
samples, kernel_results = tfp.sts.fit_with_hmc(model, observed_time_series)
```
Passing the posterior samples into `forecast`, we construct a forecast
distribution:
```python
forecast_dist = tfp.sts.forecast(model, observed_time_series,
parameter_samples=samples,
num_steps_forecast=50)
forecast_mean = forecast_dist.mean()[..., 0] # shape: [50]
forecast_scale = forecast_dist.stddev()[..., 0] # shape: [50]
forecast_samples = forecast_dist.sample(10)[..., 0] # shape: [10, 50]
```
If using variational inference instead of HMC, we'd construct a forecast using
samples from the variational posterior:
```python
surrogate_posterior = tfp.sts.build_factored_surrogate_posterior(
model=model)
loss_curve = tfp.vi.fit_surrogate_posterior(
target_log_prob_fn=model.joint_distribution(observed_time_series).log_prob,
surrogate_posterior=surrogate_posterior,
optimizer=tf.optimizers.Adam(learning_rate=0.1),
num_steps=200)
samples = surrogate_posterior.sample(30)
forecast_dist = tfp.sts.forecast(model, observed_time_series,
parameter_samples=samples,
num_steps_forecast=50)
```
We can visualize the forecast by plotting:
```python
from matplotlib import pylab as plt
def plot_forecast(observed_time_series,
forecast_mean,
forecast_scale,
forecast_samples):
plt.figure(figsize=(12, 6))
num_steps = observed_time_series.shape[-1]
num_steps_forecast = forecast_mean.shape[-1]
num_steps_train = num_steps - num_steps_forecast
c1, c2 = (0.12, 0.47, 0.71), (1.0, 0.5, 0.05)
plt.plot(np.arange(num_steps), observed_time_series,
lw=2, color=c1, label='ground truth')
forecast_steps = np.arange(num_steps_train,
num_steps_train+num_steps_forecast)
plt.plot(forecast_steps, forecast_samples.T, lw=1, color=c2, alpha=0.1)
plt.plot(forecast_steps, forecast_mean, lw=2, ls='--', color=c2,
label='forecast')
plt.fill_between(forecast_steps,
forecast_mean - 2 * forecast_scale,
forecast_mean + 2 * forecast_scale, color=c2, alpha=0.2)
plt.xlim([0, num_steps])
plt.legend()
plot_forecast(observed_time_series,
forecast_mean=forecast_mean,
forecast_scale=forecast_scale,
forecast_samples=forecast_samples)
```
"""
with tf.name_scope('forecast'):
[
observed_time_series,
mask
] = sts_util.canonicalize_observed_time_series_with_mask(
observed_time_series)
# Run filtering over the observed timesteps to extract the
# latent state posterior at timestep T+1 (i.e., the final
# filtering distribution, pushed through the transition model).
# This is the prior for the forecast model ("today's prior
# is yesterday's posterior").
num_observed_steps = ps.dimension_size(observed_time_series, -2)
observed_data_ssm = tfe_util.JitPublicMethods(
model.make_state_space_model(num_timesteps=num_observed_steps,
param_vals=parameter_samples),
trace_only=True) # Avoid eager overhead w/o introducing XLA dependence.
(_, _, _, predictive_mean, predictive_cov, _, _
) = observed_data_ssm.forward_filter(observed_time_series,
mask=mask,
final_step_only=True)
# Build a batch of state-space models over the forecast period. Because
# we'll use MixtureSameFamily to mix over the posterior draws, we need to
# do some shenanigans to move the `[num_posterior_draws]` batch dimension
# from the leftmost to the rightmost side of the model's batch shape.
# TODO(b/120245392): enhance `MixtureSameFamily` to reduce along an
# arbitrary axis, and eliminate `move_dimension` calls here.
parameter_samples = model._canonicalize_param_vals_as_map(parameter_samples) # pylint: disable=protected-access
parameter_samples_with_reordered_batch_dimension = {
param.name: dist_util.move_dimension(
parameter_samples[param.name],
0, -(1 + _prefer_static_event_ndims(param.prior)))
for param in model.parameters}
forecast_prior = mvn_tril.MultivariateNormalTriL(
loc=dist_util.move_dimension(predictive_mean, 0, -2),
scale_tril=tf.linalg.cholesky(
dist_util.move_dimension(predictive_cov, 0, -3)))
# Ugly hack: because we moved `num_posterior_draws` to the trailing (rather
# than leading) dimension of parameters, the parameter batch shapes no
# longer broadcast against the `constant_offset` attribute used in `sts.Sum`
# models. We fix this by manually adding an extra broadcasting dim to
# `constant_offset` if present.
# The root cause of this hack is that we mucked with param dimensions above
# and are now passing params that are 'invalid' in the sense that they don't
# match the shapes of the model's param priors. The fix (as above) will be
# to update MixtureSameFamily so we can avoid changing param dimensions
# altogether.
# TODO(b/120245392): enhance `MixtureSameFamily` to reduce along an
# arbitrary axis, and eliminate this hack.
kwargs = {}
if hasattr(model, 'constant_offset'):
kwargs['constant_offset'] = tf.convert_to_tensor(
value=model.constant_offset,
dtype=forecast_prior.dtype)[..., tf.newaxis, :]
if not include_observation_noise:
parameter_samples_with_reordered_batch_dimension[
'observation_noise_scale'] = tf.zeros_like(
parameter_samples_with_reordered_batch_dimension[
'observation_noise_scale'])
# We assume that any STS model that has a `constant_offset` attribute
# will allow it to be overridden as a kwarg. This is currently just
# `sts.Sum`.
# TODO(b/120245392): when kwargs hack is removed, switch back to calling
# the public version of `_make_state_space_model`.
forecast_ssm = model._make_state_space_model( # pylint: disable=protected-access
num_timesteps=num_steps_forecast,
param_map=parameter_samples_with_reordered_batch_dimension,
initial_state_prior=forecast_prior,
initial_step=num_observed_steps,
**kwargs)
# Avoid eager-mode loops when querying the forecast.
forecast_ssm = tfe_util.JitPublicMethods(forecast_ssm, trace_only=True)
num_posterior_draws = (
tf.compat.dimension_value(forecast_ssm.batch_shape[-1]))
if num_posterior_draws is None:
num_posterior_draws = (
dist_util.prefer_static_value(forecast_ssm.batch_shape_tensor()[-1]))
return mixture_same_family.MixtureSameFamily(
mixture_distribution=categorical.Categorical(
logits=tf.zeros([num_posterior_draws], dtype=forecast_ssm.dtype)),
components_distribution=forecast_ssm)
@deprecation.deprecated_arg_values(
'2021-12-31',
'`Imputed distributions returned by`tfp.sts.impute_missing_values` will '
'soon compute per-timestep probabilities (treating timesteps as part of '
'the batch shape) instead of a single probability for an entire series '
'(the current approach, in which timesteps are treated as event shape). '
'Please update your code to pass `timesteps_are_event_shape=False` (this '
'will soon be the default) and to explicitly sum over the per-timestep log '
'probabilities if this is required.',
timesteps_are_event_shape=True)
def impute_missing_values(model,
observed_time_series,
parameter_samples,
include_observation_noise=False,
timesteps_are_event_shape=True):
"""Runs posterior inference to impute the missing values in a time series.
This method computes the posterior marginals `p(latent state | observations)`,
given the time series at observed timesteps (a missingness mask should
be specified using `tfp.sts.MaskedTimeSeries`). It pushes this posterior back
through the observation model to impute a predictive distribution on the
observed time series. At unobserved steps, this is an imputed value; at other
steps it is interpreted as the model's estimate of the underlying noise-free
series.
Args:
model: `tfp.sts.Sum` instance defining an additive STS model.
observed_time_series: `float` `Tensor` of shape
`concat([sample_shape, model.batch_shape, [num_timesteps, 1]])` where
`sample_shape` corresponds to i.i.d. observations, and the trailing `[1]`
dimension may (optionally) be omitted if `num_timesteps > 1`. Any `NaN`s
are interpreted as missing observations; missingness may be also be
explicitly specified by passing a `tfp.sts.MaskedTimeSeries` instance.
parameter_samples: Python `list` of `Tensors` representing posterior
samples of model parameters, with shapes `[concat([
[num_posterior_draws], param.prior.batch_shape,
param.prior.event_shape]) for param in model.parameters]`. This may
optionally also be a map (Python `dict`) of parameter names to
`Tensor` values.
include_observation_noise: If `False`, the imputed uncertainties
represent the model's estimate of the noise-free time series at each
timestep. If `True`, they represent the model's estimate of the range of
values that could be *observed* at each timestep, including any i.i.d.
observation noise.
Default value: `False`.
timesteps_are_event_shape: Deprecated, for backwards compatibility only.
If `False`, the predictive distribution will return per-timestep
probabilities
Default value: `True`.
Returns:
imputed_series_dist: a `tfd.MixtureSameFamily` instance with event shape
`[num_timesteps] if timesteps_are_event_shape else []` and
batch shape `concat([sample_shape, model.batch_shape,
[] if timesteps_are_event_shape else [num_timesteps])`, with
`num_posterior_draws` mixture components.
#### Example
To specify a time series with missing values, use `tfp.sts.MaskedTimeSeries`:
```python
time_series_with_nans = [-1., 1., np.nan, 2.4, np.nan, 5]
observed_time_series = tfp.sts.MaskedTimeSeries(
time_series=time_series_with_nans,
is_missing=tf.math.is_nan(time_series_with_nans))
```
Masked time series can be passed to `tfp.sts` methods in place of a
`observed_time_series` `Tensor`:
```python
# Build model using observed time series to set heuristic priors.
linear_trend_model = tfp.sts.LocalLinearTrend(
observed_time_series=observed_time_series)
model = tfp.sts.Sum([linear_trend_model],
observed_time_series=observed_time_series)
# Fit model to data
parameter_samples, _ = tfp.sts.fit_with_hmc(model, observed_time_series)
```
After fitting a model, `impute_missing_values` will return a distribution
```python
# Impute missing values
imputed_series_distribution = tfp.sts.impute_missing_values(
model, observed_time_series, parameter_samples=parameter_samples)
print('imputed means and stddevs: ',
imputed_series_distribution.mean(),
imputed_series_distribution.stddev())
```
"""
with tf.name_scope('impute_missing_values'):
[
observed_time_series,
mask
] = sts_util.canonicalize_observed_time_series_with_mask(
observed_time_series)
# Run smoothing over the training timesteps to extract the
# predictive means and variances.
num_timesteps = ps.dimension_size(observed_time_series, -2)
lgssm = tfe_util.JitPublicMethods(
model.make_state_space_model(num_timesteps=num_timesteps,
param_vals=parameter_samples),
trace_only=True) # Avoid eager overhead w/o introducing XLA dependence.
posterior_means, posterior_covs = lgssm.posterior_marginals(
observed_time_series, mask=mask)
observation_means, observation_covs = lgssm.latents_to_observations(
latent_means=posterior_means,
latent_covs=posterior_covs)
if not include_observation_noise:
# Extract just the variance of observation noise by pushing forward
# zero-variance latents.
_, observation_noise_covs = lgssm.latents_to_observations(
latent_means=posterior_means,
latent_covs=tf.zeros_like(posterior_covs))
# Subtract out the observation noise that was added in the original
# pushforward. Note that this could cause numerical issues if the
# observation noise is very large. If this becomes an issue we could
# avoid the subtraction by plumbing `include_observation_noise` through
# `lgssm.latents_to_observations`.
observation_covs -= observation_noise_covs
# Squeeze dims to convert from LGSSM's event shape `[num_timesteps, 1]`
# to a scalar time series.
imputed_values_dist = sts_util.mix_over_posterior_draws(
means=observation_means[..., 0],
variances=observation_covs[..., 0, 0])
if timesteps_are_event_shape:
imputed_values_dist = independent.Independent(
imputed_values_dist, reinterpreted_batch_ndims=1)
return imputed_values_dist