/
transformed_distribution.py
774 lines (659 loc) · 33.6 KB
/
transformed_distribution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
# Copyright 2018 The TensorFlow Probability Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""A Transformed Distribution class."""
import functools
import tensorflow.compat.v2 as tf
from tensorflow_probability.python.bijectors import ldj_ratio
from tensorflow_probability.python.distributions import batch_broadcast
from tensorflow_probability.python.distributions import distribution as distribution_lib
from tensorflow_probability.python.distributions import kullback_leibler
from tensorflow_probability.python.distributions import log_prob_ratio
from tensorflow_probability.python.internal import auto_composite_tensor
from tensorflow_probability.python.internal import distribution_util as dist_util
from tensorflow_probability.python.internal import parameter_properties
from tensorflow_probability.python.internal import prefer_static as ps
from tensorflow_probability.python.internal import tensorshape_util
__all__ = [
'TransformedDistribution',
]
JAX_MODE = False
def _default_kwargs_split_fn(kwargs):
"""Default `kwargs` `dict` getter."""
return (kwargs.get('distribution_kwargs', {}),
kwargs.get('bijector_kwargs', {}))
class _TransformedDistribution(distribution_lib.Distribution):
"""A Transformed Distribution.
A `TransformedDistribution` models `p(y)` given a base distribution `p(x)`,
and a deterministic, invertible, differentiable transform, `Y = g(X)`. The
transform is typically an instance of the `Bijector` class and the base
distribution is typically an instance of the `Distribution` class.
A `Bijector` is expected to implement the following functions:
- `forward`,
- `inverse`,
- `inverse_log_det_jacobian`.
The semantics of these functions are outlined in the `Bijector` documentation.
We now describe how a `TransformedDistribution` alters the input/outputs of a
`Distribution` associated with a random variable (rv) `X`.
Write `cdf(Y=y)` for an absolutely continuous cumulative distribution function
of random variable `Y`; write the probability density function
`pdf(Y=y) := d^k / (dy_1,...,dy_k) cdf(Y=y)` for its derivative wrt to `Y`
evaluated at `y`. Assume that `Y = g(X)` where `g` is a deterministic
diffeomorphism, i.e., a non-random, continuous, differentiable, and invertible
function. Write the inverse of `g` as `X = g^{-1}(Y)` and `(J o g)(x)` for
the Jacobian of `g` evaluated at `x`.
A `TransformedDistribution` implements the following operations:
* `sample`
Mathematically: `Y = g(X)`
Programmatically: `bijector.forward(distribution.sample(...))`
* `log_prob`
Mathematically: `(log o pdf)(Y=y) = (log o pdf o g^{-1})(y)
+ (log o abs o det o J o g^{-1})(y)`
Programmatically: `(distribution.log_prob(bijector.inverse(y))
+ bijector.inverse_log_det_jacobian(y))`
* `log_cdf`
Mathematically: `(log o cdf)(Y=y) = (log o cdf o g^{-1})(y)`
Programmatically: `distribution.log_cdf(bijector.inverse(x))`
* and similarly for: `cdf`, `prob`, `log_survival_function`,
`survival_function`.
Kullback-Leibler divergence is also well defined for `TransformedDistribution`
instances that have matching bijectors. Bijector matching is performed via
the `Bijector.__eq__` method, e.g., `td1.bijector == td2.bijector`, as part
of the KL calculation. If the underlying bijectors do not match, a
`NotImplementedError` is raised when calling `kl_divergence`. This is the
same behavior as calling `kl_divergence` when two distributions do not have
a registered KL divergence.
**Note** Due to the current constraints imposed on bijector equality testing,
`kl_divergence` may behave differently in eager mode computation vs. traced
computation. For example, if a TD Bijector's parameters are `Tensor` objects,
and are themselves derived from e.g. a Variable, some stateful operation, or
from an argument to a `tf.function` then Bijector equality cannot be known
during the call to `kl_divergence` and the bijectors are assumed unequal.
In this case, calling `kl_divergence` may raise an exception in
graph / tf.function mode, but work just fine in eager / numpy mode.
A simple example constructing a Log-Normal distribution from a Normal
distribution:
```python
tfd = tfp.distributions
tfb = tfp.bijectors
log_normal = tfd.TransformedDistribution(
distribution=tfd.Normal(loc=0., scale=1.),
bijector=tfb.Exp(),
name='LogNormalTransformedDistribution')
```
A `LogNormal` made from callables:
```python
tfd = tfp.distributions
tfb = tfp.bijectors
log_normal = tfd.TransformedDistribution(
distribution=tfd.Normal(loc=0., scale=1.),
bijector=tfb.Inline(
forward_fn=tf.exp,
inverse_fn=tf.log,
inverse_log_det_jacobian_fn=(
lambda y: -tf.reduce_sum(tf.log(y), axis=-1)),
name='LogNormalTransformedDistribution')
```
Another example constructing a Normal from a StandardNormal:
```python
tfd = tfp.distributions
tfb = tfp.bijectors
normal = tfd.TransformedDistribution(
distribution=tfd.Normal(loc=0., scale=1.),
bijector=tfb.Shift(shift=-1.)(tfb.Scale(scale=2.)),
name='NormalTransformedDistribution')
```
A `TransformedDistribution`'s `batch_shape` is derived by *broadcasting* the
batch shapes of the base distribution and the bijector. The base distribution
is then itself implicitly lifted to the broadcast batch shape. For example, in
```python
tfd = tfp.distributions
tfb = tfp.bijectors
batch_normal = tfd.TransformedDistribution(
distribution=tfd.Normal(loc=0., scale=1.),
bijector=tfb.Shift(shift=[-1., 0., 1.]),
name='BatchNormalTransformedDistribution')
```
the base distribution has batch shape `[]`, and the bijector applied to this
distribution contributes a batch shape of `[3]` (obtained as
`bijector.experimental_batch_shape(
x_event_ndims=tf.rank(distribution.event_shape))`, yielding the broadcast
shape `batch_normal.batch_shape == [3]`. Although sampling from the base
distribution would ordinarily return just a single value, calling
`batch_normal.sample()` will return a Tensor of 3 independent values, just as
if the base distribution had explicitly followed the broadcast batch shape.
The `event_shape` of a `TransformedDistribution` is the `forward_event_shape`
of the bijector applied to the `event_shape` of the base distribution.
`tfd.Sample` or `tfd.Independent` may be used to add extra IID dimensions to
the `event_shape` of the base distribution before the bijector operates on it.
The following example demonstrates how to construct a multivariate Normal as a
`TransformedDistribution`, by adding a rank-1 IID dimension to the
`event_shape` of a standard Normal and applying `tfb.ScaleMatvecTriL`.
```python
tfd = tfp.distributions
tfb = tfp.bijectors
# We will create two MVNs with batch_shape = event_shape = 2.
mean = [[-1., 0], # batch:0
[0., 1]] # batch:1
chol_cov = [[[1., 0],
[0, 1]], # batch:0
[[1, 0],
[2, 2]]] # batch:1
mvn1 = tfd.TransformedDistribution(
distribution=tfd.Sample(
tfd.Normal(loc=[0., 0], scale=1.), # base_dist.batch_shape == [2]
sample_shape=[2]) # base_dist.event_shape == [2]
bijector=tfb.Shift(shift=mean)(tfb.ScaleMatvecTriL(scale_tril=chol_cov)))
mvn2 = ds.MultivariateNormalTriL(loc=mean, scale_tril=chol_cov)
# mvn1.log_prob(x) == mvn2.log_prob(x)
```
"""
def __init__(self,
distribution,
bijector,
kwargs_split_fn=_default_kwargs_split_fn,
validate_args=False,
parameters=None,
name=None):
"""Construct a Transformed Distribution.
Args:
distribution: The base distribution instance to transform. Typically an
instance of `Distribution`.
bijector: The object responsible for calculating the transformation.
Typically an instance of `Bijector`.
kwargs_split_fn: Python `callable` which takes a kwargs `dict` and returns
a tuple of kwargs `dict`s for each of the `distribution` and `bijector`
parameters respectively.
Default value: `_default_kwargs_split_fn` (i.e.,
`lambda kwargs: (kwargs.get('distribution_kwargs', {}),
kwargs.get('bijector_kwargs', {}))`)
validate_args: Python `bool`, default `False`. When `True` distribution
parameters are checked for validity despite possibly degrading runtime
performance. When `False` invalid inputs may silently render incorrect
outputs.
parameters: Locals dict captured by subclass constructor, to be used for
copy/slice re-instantiation operations.
name: Python `str` name prefixed to Ops created by this class. Default:
`bijector.name + distribution.name`.
"""
parameters = dict(locals()) if parameters is None else parameters
name = name or (('' if bijector is None else bijector.name) +
(distribution.name or ''))
with tf.name_scope(name) as name:
self._distribution = distribution
self._bijector = bijector
self._kwargs_split_fn = (_default_kwargs_split_fn
if kwargs_split_fn is None
else kwargs_split_fn)
# For convenience we define some handy constants.
self._zero = tf.constant(0, dtype=tf.int32, name='zero')
# We don't just want to check isinstance(JointDistribution) because
# TransformedDistributions with multipart bijectors are effectively
# joint but don't inherit from JD. The 'duck-type' test is that
# JDs have a structured dtype.
dtype = self.bijector.forward_dtype(self.distribution.dtype)
self._is_joint = tf.nest.is_nested(dtype)
super(_TransformedDistribution, self).__init__(
dtype=dtype,
reparameterization_type=self._distribution.reparameterization_type,
validate_args=validate_args,
allow_nan_stats=self._distribution.allow_nan_stats,
parameters=parameters,
name=name)
@property
def distribution(self):
"""Base distribution, p(x)."""
return self._distribution
@property
def bijector(self):
"""Function transforming x => y."""
return self._bijector
@property
def experimental_is_sharded(self):
raise NotImplementedError # TODO(b/175084455): Handle bijector sharding.
@classmethod
def _parameter_properties(cls, dtype, num_classes=None):
return dict(
distribution=parameter_properties.BatchedComponentProperties(),
bijector=parameter_properties.BatchedComponentProperties(
event_ndims=lambda td: tf.nest.map_structure( # pylint: disable=g-long-lambda
tensorshape_util.rank, td.distribution.event_shape),
event_ndims_tensor=lambda td: tf.nest.map_structure( # pylint: disable=g-long-lambda
ps.rank_from_shape, td.distribution.event_shape_tensor())))
def _event_shape_tensor(self):
return self.bijector.forward_event_shape_tensor(
self.distribution.event_shape_tensor())
def _event_shape(self):
# Since the `bijector` may change the `event_shape`, we then forward what we
# know to the bijector. This allows the `bijector` to have final say in the
# `event_shape`.
return self.bijector.forward_event_shape(self.distribution.event_shape)
def _batch_shape_tensor(self):
base_batch_shape_tensor = self.distribution.batch_shape_tensor()
if tf.nest.is_nested(base_batch_shape_tensor) and self._is_joint:
# Pass-through rudimentary support for JDs with structured batch shape.
# TODO(b/194742372): remove support for structured batch shape.
return tf.nest.pack_sequence_as(
self.dtype, tf.nest.flatten(base_batch_shape_tensor))
return super()._batch_shape_tensor()
def _batch_shape(self):
batch_shape = self.distribution.batch_shape
if tf.nest.is_nested(batch_shape) and self._is_joint:
# Pass-through rudimentary support for JDs with structured batch shape.
# TODO(b/194742372): remove support for structured batch shape.
return tf.nest.pack_sequence_as(
self.dtype, tf.nest.flatten(batch_shape))
return super()._batch_shape()
def _maybe_broadcast_distribution_batch_shape(self):
"""Returns the base distribution broadcast to the TD's full batch shape."""
distribution_batch_shape = self.distribution.batch_shape
if (tf.nest.is_nested(distribution_batch_shape) or
tf.nest.is_nested(self.distribution.dtype)):
# TODO(b/191674464): Support joint distributions in BatchBroadcast.
return self.distribution
overall_batch_shape = self.batch_shape
if (tensorshape_util.is_fully_defined(overall_batch_shape) and
distribution_batch_shape == overall_batch_shape):
# No need to broadcast if the distribution already has full batch shape.
return self.distribution
if not tensorshape_util.is_fully_defined(overall_batch_shape):
overall_batch_shape = self.batch_shape_tensor()
return batch_broadcast.BatchBroadcast(
self.distribution, with_shape=overall_batch_shape)
def _call_sample_n(self, sample_shape, seed, **kwargs):
# We override `_call_sample_n` rather than `_sample_n` so we can ensure that
# the result of `self.bijector.forward` is not modified (and thus caching
# works).
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
# First, generate samples from the base distribution.
x = self._maybe_broadcast_distribution_batch_shape().sample(
sample_shape=sample_shape, seed=seed, **distribution_kwargs)
# Apply the bijector's forward transformation. For caching to
# work, it is imperative that this is the last modification to the
# returned result.
return self.bijector.forward(x, **bijector_kwargs)
def _sample_and_log_prob(self, sample_shape, seed, **kwargs):
if not self.bijector._is_injective: # pylint: disable=protected-access
# Computing log_prob with a non-injective bijector requires an explicit
# inverse to get all points in the inverse image, so we can't get by
# with just doing the forward pass.
return super()._sample_and_log_prob(sample_shape, seed=seed, **kwargs)
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
x, base_distribution_log_prob = (
self._maybe_broadcast_distribution_batch_shape(
).experimental_sample_and_log_prob(
sample_shape, seed, **distribution_kwargs))
y = self.bijector.forward(x, **bijector_kwargs)
fldj = self.bijector.forward_log_det_jacobian(
x,
event_ndims=tf.nest.map_structure(
ps.rank_from_shape,
self.distribution.event_shape_tensor()),
**bijector_kwargs)
return y, (base_distribution_log_prob -
tf.cast(fldj, base_distribution_log_prob.dtype))
def _log_prob(self, y, **kwargs):
if self.bijector._is_injective: # pylint: disable=protected-access
log_prob, _ = self.experimental_local_measure(
y, backward_compat=True, **kwargs)
return log_prob
# TODO(b/197680518): Support base measure handling for non-injective
# bijectors.
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
# For caching to work, it is imperative that the bijector is the first to
# modify the input.
x = self.bijector.inverse(y, **bijector_kwargs)
event_ndims = tf.nest.map_structure(
ps.rank_from_shape,
self._event_shape_tensor(),
self.event_shape)
ildj = self.bijector.inverse_log_det_jacobian(
y, event_ndims=event_ndims, **bijector_kwargs)
# Compute log_prob on each element of the inverse image.
lp_on_fibers = []
for x_i, ildj_i in zip(x, ildj):
base_log_prob = self.distribution.log_prob(x_i, **distribution_kwargs)
lp_on_fibers.append(base_log_prob + tf.cast(ildj_i, base_log_prob.dtype))
return tf.reduce_logsumexp(tf.stack(lp_on_fibers), axis=0)
def _prob(self, y, **kwargs):
if not hasattr(self.distribution, '_prob') or self.bijector._is_injective: # pylint: disable=protected-access
return tf.exp(self._log_prob(y, **kwargs))
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
x = self.bijector.inverse(y, **bijector_kwargs)
event_ndims = tf.nest.map_structure(
ps.rank_from_shape,
self._event_shape_tensor(),
self.event_shape
)
ildj = self.bijector.inverse_log_det_jacobian(
y, event_ndims=event_ndims, **bijector_kwargs)
# Compute prob on each element of the inverse image.
prob_on_fibers = []
for x_i, ildj_i in zip(x, ildj):
base_prob = self.distribution.prob(x_i, **distribution_kwargs)
prob_on_fibers.append(
base_prob * tf.exp(tf.cast(ildj_i, base_prob.dtype)))
return sum(prob_on_fibers)
def _log_cdf(self, y, **kwargs):
if not self.bijector._is_injective: # pylint: disable=protected-access
raise NotImplementedError('`log_cdf` is not implemented when '
'`bijector` is not injective.')
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
x = self.bijector.inverse(y, **bijector_kwargs)
dist = self.distribution
# TODO(b/141130733): Check/fix any gradient numerics issues.
return ps.smart_where(
self.bijector._internal_is_increasing(**bijector_kwargs), # pylint: disable=protected-access
lambda: dist.log_cdf(x, **distribution_kwargs),
lambda: dist.log_survival_function(x, **distribution_kwargs))
def _cdf(self, y, **kwargs):
if not self.bijector._is_injective: # pylint: disable=protected-access
raise NotImplementedError('`cdf` is not implemented when '
'`bijector` is not injective.')
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
x = self.bijector.inverse(y, **bijector_kwargs)
# TODO(b/141130733): Check/fix any gradient numerics issues.
return ps.smart_where(
self.bijector._internal_is_increasing(**bijector_kwargs), # pylint: disable=protected-access
lambda: self.distribution.cdf(x, **distribution_kwargs),
lambda: self.distribution.survival_function(x, **distribution_kwargs))
def _log_survival_function(self, y, **kwargs):
if not self.bijector._is_injective: # pylint: disable=protected-access
raise NotImplementedError('`log_survival_function` is not implemented '
'when `bijector` is not injective.')
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
x = self.bijector.inverse(y, **bijector_kwargs)
dist = self.distribution
# TODO(b/141130733): Check/fix any gradient numerics issues.
return ps.smart_where(
self.bijector._internal_is_increasing(**bijector_kwargs), # pylint: disable=protected-access
lambda: dist.log_survival_function(x, **distribution_kwargs),
lambda: dist.log_cdf(x, **distribution_kwargs))
def _survival_function(self, y, **kwargs):
if not self.bijector._is_injective: # pylint: disable=protected-access
raise NotImplementedError('`survival_function` is not implemented when '
'`bijector` is not injective.')
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
x = self.bijector.inverse(y, **bijector_kwargs)
# TODO(b/141130733): Check/fix any gradient numerics issues.
return ps.smart_where(
self.bijector._internal_is_increasing(**bijector_kwargs), # pylint: disable=protected-access
lambda: self.distribution.survival_function(x, **distribution_kwargs),
lambda: self.distribution.cdf(x, **distribution_kwargs))
def _quantile(self, value, **kwargs):
if not self.bijector._is_injective: # pylint: disable=protected-access
raise NotImplementedError('`quantile` is not implemented when '
'`bijector` is not injective.')
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
value = ps.smart_where(
self.bijector._internal_is_increasing(**bijector_kwargs), # pylint: disable=protected-access
lambda: value,
lambda: 1 - value)
# x_q is the "qth quantile" of X iff q = P[X <= x_q]. Now, since X =
# g^{-1}(Y), q = P[X <= x_q] = P[g^{-1}(Y) <= x_q] = P[Y <= g(x_q)],
# implies the qth quantile of Y is g(x_q).
inv_cdf = self.distribution.quantile(value, **distribution_kwargs)
return self.bijector.forward(inv_cdf, **bijector_kwargs)
def _mode(self, **kwargs):
return self._mean_mode_impl('mode', kwargs)
def _mean(self, **kwargs):
return self._mean_mode_impl('mean', kwargs)
def _mean_mode_impl(self, attr, kwargs):
if not self.bijector.is_constant_jacobian:
raise NotImplementedError(
f'`{attr}` is not implemented for non-affine `bijectors`.')
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
x = getattr(self.distribution, attr)(**distribution_kwargs)
y = self.bijector.forward(x, **bijector_kwargs)
sample_shape = tf.convert_to_tensor([], dtype=tf.int32, name='sample_shape')
y = self._set_sample_static_shape(y, sample_shape, **kwargs)
return y
def _stddev(self, **kwargs):
if not self.bijector.is_constant_jacobian:
raise NotImplementedError('`stddev` is not implemented for non-affine '
'`bijectors`.')
if not self.bijector._is_injective: # pylint: disable=protected-access
raise NotImplementedError('`stddev` is not implemented when '
'`bijector` is not injective.')
if not (self.bijector._is_scalar # pylint: disable=protected-access
or self.bijector._is_permutation): # pylint: disable=protected-access
raise NotImplementedError('`stddev` is not implemented when `bijector` '
'is a multivariate transformation.')
# A scalar affine bijector is of the form `forward(x) = scale * x + shift`,
# where the standard deviation is invariant to the shift, so we extract the
# shift and subtract it.
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
x_stddev = self.distribution.stddev(**distribution_kwargs)
y_stddev_plus_shift = self.bijector.forward(x_stddev, **bijector_kwargs)
shift = self.bijector.forward(
tf.nest.map_structure(
tf.zeros_like, x_stddev),
**bijector_kwargs)
return tf.nest.map_structure(
tf.abs,
tf.nest.map_structure(tf.subtract, y_stddev_plus_shift, shift))
def _covariance(self, **kwargs):
if not self.bijector.is_constant_jacobian:
raise NotImplementedError(
'`covariance` is not implemented for non-affine `bijectors`.')
if not self.bijector._is_injective: # pylint: disable=protected-access
raise NotImplementedError(
'`covariance` is not implemented when `bijector` is not injective.')
if (tf.nest.is_nested(self.bijector.forward_min_event_ndims) or
self.bijector.forward_event_ndims(1) != 1):
raise NotImplementedError(
'`covariance` is only implemented when `bijector` takes vector '
'inputs and produces vector outputs.')
# An affine bijector is of the form `forward(x) = scale @ x + shift`,
# where the covariance is invariant to the shift, so we extract the
# shift and subtract it.
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
cov = self.distribution.covariance(**distribution_kwargs)
zero_vector = tf.zeros_like(cov[..., 0])
shift = self.bijector.forward(zero_vector, **bijector_kwargs)
if shift is zero_vector: # Short-circuit if bijector is tfb.Identity.
return cov
# Broadcast `cov` to full batch rank so we can treat its rows as an
# additional batch dim. Note that we can't just call `forward(cov)` directly
# because the user presumably lined up the bijector batch dimensions to work
# when transforming vectors, not matrices.
cov = tf.broadcast_to(
cov, ps.broadcast_shape(ps.shape(cov),
ps.concat([ps.ones_like(ps.shape(shift)),
[1]], axis=0)))
ndims = ps.rank(cov)
cov_rows = dist_util.move_dimension( # No-op if cov has no batch dims.
cov, source_idx=-2, dest_idx=0)
tmp = self.bijector.forward( # scale @ transpose(cov).
cov_rows, **bijector_kwargs) - shift
# Swap leftmost batch dim (rows) with event dim (columns).
tmp_transpose = tf.transpose( # cov @ transpose(scale).
tmp, perm=ps.concat([[ndims - 1], ps.range(1, ndims - 1), [0]], axis=0))
result_rows = self.bijector.forward( # scale @ cov @ transpose(scale).
tmp_transpose, **bijector_kwargs) - shift
return dist_util.move_dimension( # No-op if result has no batch dims.
result_rows, source_idx=0, dest_idx=-2)
def _entropy(self, **kwargs):
if not self.bijector.is_constant_jacobian:
raise NotImplementedError('`entropy` is not implemented.')
if not self.bijector._is_injective: # pylint: disable=protected-access
raise NotImplementedError('`entropy` is not implemented when '
'`bijector` is not injective.')
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
# Suppose Y = g(X) where g is a diffeomorphism and X is a continuous rv. It
# can be shown that:
# H[Y] = H[X] + E_X[(log o abs o det o J o g)(X)].
# If is_constant_jacobian then:
# E_X[(log o abs o det o J o g)(X)] = (log o abs o det o J o g)(c)
# where c can by anything.
entropy = self.distribution.entropy(**distribution_kwargs)
# Create a dummy event of zeros to pass to
# `bijector.inverse_log_det_jacobian` to extract the constant Jacobian.
event_shape_tensor = self._event_shape_tensor()
event_ndims = tf.nest.map_structure(
ps.rank_from_shape,
event_shape_tensor, self.event_shape)
dummy = tf.nest.map_structure(
ps.zeros, event_shape_tensor, self.dtype)
ildj = self.bijector.inverse_log_det_jacobian(
dummy, event_ndims=event_ndims, **bijector_kwargs)
entropy = entropy - tf.cast(ildj, entropy.dtype)
tensorshape_util.set_shape(entropy, self.batch_shape)
return entropy
# pylint: disable=not-callable
def _default_event_space_bijector(self):
if self.distribution.experimental_default_event_space_bijector() is None:
return None
return self.bijector(
self.distribution.experimental_default_event_space_bijector())
# pylint: enable=not-callable
def experimental_local_measure(self, y, backward_compat=False, **kwargs):
distribution_kwargs, bijector_kwargs = self._kwargs_split_fn(kwargs)
# For caching to work, it is imperative that the bijector is the first to
# modify the input.
x = self.bijector.inverse(y, **bijector_kwargs)
event_ndims = self.bijector.inverse_event_ndims(
tf.nest.map_structure(ps.rank_from_shape, self._event_shape_tensor(),
self.event_shape), **bijector_kwargs)
if self.bijector._is_injective: # pylint: disable=protected-access
local_measure_fn = self.distribution.experimental_local_measure
density_corr_fn = self.bijector.experimental_compute_density_correction
base_log_prob, tangent_space = local_measure_fn(
x, backward_compat=backward_compat, **distribution_kwargs)
correction, new_tangent_space = density_corr_fn(
x,
tangent_space,
backward_compat=backward_compat,
event_ndims=event_ndims,
**bijector_kwargs)
log_prob = base_log_prob - tf.cast(correction, base_log_prob.dtype)
return log_prob, new_tangent_space
else:
raise NotImplementedError
class _TransformedDistributionMeta(distribution_lib._DistributionMeta): # pylint: disable=protected-access
"""Metaclass for TransformedDistribution.
This metaclass ensures that subclasses of TransformedDistribution are
AutoCompositeTensors. TransformedDistribution itself is not an
AutoCompositeTensor, since we define its type spec differently depending on
whether `split_kwargs_fn` is the default (in which case we omit it from the
type spec, making the spec serializable).
"""
def __new__(mcs, classname, baseclasses, attrs): # pylint: disable=bad-classmethod-argument
cls = super(_TransformedDistributionMeta, mcs).__new__(
mcs, classname, baseclasses, attrs)
if ((cls.__module__ ==
'tensorflow_probability.python.distributions.transformed_distribution')
and classname == 'TransformedDistribution'):
return cls
# Ensure that subclasses of TransformedDistribution are
# AutoCompositeTensors.
if 'tensorflow_probability.python.distributions' in cls.__module__:
module_name = 'tfp.distributions'
elif ('tensorflow_probability.python.experimental.distributions'
in cls.__module__):
module_name = 'tfp.experimental.distributions'
else:
module_name = cls.__module__
return auto_composite_tensor.auto_composite_tensor(
cls,
omit_kwargs=('parameters',),
non_identifying_kwargs=('name',),
module_name=module_name)
class TransformedDistribution(
_TransformedDistribution, tf.__internal__.CompositeTensor,
metaclass=_TransformedDistributionMeta):
def __new__(cls, *args, **kwargs):
"""Maybe return a non-`CompositeTensor` `_TransformedDistribution`."""
if cls is TransformedDistribution:
if args:
distribution = args[0]
else:
distribution = kwargs.get('distribution')
if len(args) > 1:
bijector = args[1]
else:
bijector = kwargs.get('bijector')
if not (auto_composite_tensor.is_composite_tensor(distribution)
and auto_composite_tensor.is_composite_tensor(bijector)):
return _TransformedDistribution(*args, **kwargs)
return super(TransformedDistribution, cls).__new__(cls)
@property
def _type_spec(self):
# If `kwargs_split_fn` is the default, omit it so the type spec is
# serializable.
if self._kwargs_split_fn is _default_kwargs_split_fn:
omit_kwargs = ('parameters', 'kwargs_split_fn')
else:
omit_kwargs = ('parameters',)
return _TransformedDistributionSpec.from_instance(
self, omit_kwargs=omit_kwargs, non_identifying_kwargs=('name',))
def _convert_variables_to_tensors(self):
return auto_composite_tensor.convert_variables_to_tensors(self)
@auto_composite_tensor.type_spec_register(
'tfp.distributions.TransformedDistributionSpec')
class _TransformedDistributionSpec(
auto_composite_tensor._AutoCompositeTensorTypeSpec): # pylint: disable=protected-access
@property
def value_type(self):
return TransformedDistribution
if JAX_MODE:
from jax import tree_util # pylint: disable=g-import-not-at-top
tree_util.register_pytree_node(
TransformedDistribution,
auto_composite_tensor.pytree_flatten,
functools.partial(auto_composite_tensor.pytree_unflatten,
TransformedDistribution))
TransformedDistribution.__doc__ = _TransformedDistribution.__doc__ + '\n' + (
'If both `distribution` and `bijector` are `CompositeTensor`s, then the '
'resulting `TransformedDistribution` instance is a `CompositeTensor` as '
'well. Otherwise, a non-`CompositeTensor` `_TransformedDistribution` '
'instance is created instead. Distribution subclasses that inherit from '
'`TransformedDistribution` will also inherit from `CompositeTensor`.')
@kullback_leibler.RegisterKL(
_TransformedDistribution, _TransformedDistribution)
def _kl_transformed_transformed(a, b, name=None):
"""Calculate the batched KL divergence KL(a || b) with a and b Transformed.
Args:
a: instance of a TransformedDistribution object.
b: instance of a TransformedDistribution object.
name: Name to use for created operations.
Default value: `None` (i.e., `'kl_normal_normal'`).
Returns:
kl_div: Batchwise KL(a || b)
Raises:
NotImplementedError: If `a.bijector != b.bijector`.
"""
with tf.name_scope(name or 'kl_transformed_transformed'):
if a.bijector == b.bijector:
return kullback_leibler.kl_divergence(a.distribution, b.distribution)
raise NotImplementedError(
'Unable to calculate KL divergence between {} and {} because '
'their bijectors are not equal: {} vs. {}'.format(
a, b, a.bijector, b.bijector))
@log_prob_ratio.RegisterLogProbRatio(_TransformedDistribution)
def _transformed_log_prob_ratio(p, x, q, y, name=None):
"""Computes p.log_prob(x) - q.log_prob(y) for p and q both TDs."""
with tf.name_scope(name or 'transformed_log_prob_ratio'):
x_ = p.bijector.inverse(x)
y_ = q.bijector.inverse(y)
base_log_prob_ratio = log_prob_ratio.log_prob_ratio(
p.distribution, x_, q.distribution, y_)
event_ndims = tf.nest.map_structure(
ps.rank_from_shape,
p.event_shape_tensor,
tf.nest.map_structure(tensorshape_util.merge_with,
p.event_shape, q.event_shape))
ildj_ratio = ldj_ratio.inverse_log_det_jacobian_ratio(
p.bijector, x, q.bijector, y, event_ndims)
return base_log_prob_ratio + tf.cast(ildj_ratio, base_log_prob_ratio.dtype)