-
Notifications
You must be signed in to change notification settings - Fork 20
/
pdfs.py
1310 lines (1053 loc) · 52.4 KB
/
pdfs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) 2010 Matej Laitl <matej@laitl.cz>
# Distributed under the terms of the GNU General Public License v2 or any
# later version of the license, at your option.
"""
This module contains models of common probability density functions, abbreviated
as pdfs.
All classes from this module are currently imported to top-level pybayes module,
so instead of ``from pybayes.pdfs import Pdf`` you can type ``from pybayes import
Pdf``.
"""
from copy import deepcopy
import math
from numpy import random
import wrappers._linalg as linalg
import wrappers._numpy as np
class RVComp(object):
"""Atomic component of a random variable.
:var int dimension: dimension; do not change unless you know what you are doing
:var str name: name; can be changed as long as it remains a string (warning:
parent RVs are not updated)
"""
def __init__(self, dimension, name = None):
"""Initialise new component of a random variable :class:`RV`.
:param dimension: number of vector components this component occupies
:type dimension: positive integer
:param name: name of the component; default: None for anonymous component
:type name: string or None
:raises TypeError: non-integer dimension or non-string name
:raises ValueError: non-positive dimension
"""
if name is not None and not isinstance(name, str):
raise TypeError("name must be either None or a string")
self.name = name
if not isinstance(dimension, int):
raise TypeError("dimension must be integer (int)")
if dimension < 1:
raise ValueError("dimension must be non-zero positive")
self.dimension = dimension
#def __eq__(self, other):
#"""We want RVComp have to be hashable
#(http://docs.python.org/glossary.html#term-hashable), but default __eq__()
#and __hash__() implementations suffice, as they are instance-based.
#"""
class RV(object):
"""Representation of a random variable made of one or more components. Each component is
represented by :class:`RVComp` class.
:var int dimension: cummulative dimension; do not change
:var str name: pretty name, can be changed but needs to be a string
:var list components: list of RVComps; do not change
*Please take into account that all RVComp comparisons inside RV are
instance-based and component names are purely informational. To demonstrate:*
>>> rv = RV(RVComp(1, "a"))
>>> ...
>>> rv.contains(RVComp(1, "a"))
False
Right way to do this would be:
>>> a = RVComp(1, "arbitrary pretty name for a")
>>> rv = RV(a)
>>> ...
>>> rv.contains(a)
True
"""
def __init__(self, *components):
"""Initialise random variable meta-representation.
:param \*components: components that should form the random variable. You may
also pass another RVs which is a shotrcut for adding all their components.
:type \*components: :class:`RV`, :class:`RVComp` or a sequence of :class:`RVComp` items
:raises TypeError: invalid object passed (neither a :class:`RV` or a :class:`RVComp`)
Usual way of creating a RV could be:
>>> x = RV(RVComp(1, 'x_1'), RVComp(1, 'x_2'))
>>> x.name
'[x_1, x_2]'
>>> xy = RV(x, RVComp(2, 'y'))
>>> xy.name
'[x_1, x_2, y]'
"""
self.dimension = 0
self.components = []
if len(components) is 0:
self.name = '[]'
return
self.name = '['
for component in components:
if isinstance(component, RVComp):
self._add_component(component)
elif isinstance(component, RV):
for subcomp in component.components:
self._add_component(subcomp)
else:
try:
for subcomp in component:
self._add_component(subcomp)
except TypeError:
raise TypeError('component ' + str(component) + ' is neither an instance '
+ 'of RVComp or RV and is not iterable of RVComps')
self.name = self.name[:-2] + ']'
def __copy__(self):
ret = type(self).__new__(type(self))
ret.name = self.name
ret.dimension = self.dimension
ret.components = self.components
return ret
def __deepcopy__(self, memo):
ret = type(self).__new__(type(self))
ret.name = self.name # no need to deepcopy - string is immutable
ret.dimension = self.dimension # ditto
# Following shallow copy is special behaviour of RV:
ret.components = self.components[:]
return ret
def __str__(self):
return "<pybayes.pdfs.RV '{0}' dim={1} {2}>".format(self.name, self.dimension, self.components)
def _add_component(self, component):
"""Add new component to this random variable.
Internal function, do not use outside of RV."""
if not isinstance(component, RVComp):
raise TypeError("component is not of type RVComp")
self.components.append(component)
self.dimension += component.dimension
self.name = '{0}{1}, '.format(self.name, component.name)
return True
def contains(self, component):
"""Return True if this random variable contains the exact same instance of
the **component**.
:param component: component whose presence is tested
:type component: :class:`RVComp`
:rtype: bool
"""
return component in self.components
def contains_all(self, test_components):
"""Return True if this RV contains all RVComps from sequence
**test_components**.
:param test_components: list of components whose presence is checked
:type test_components: sequence of :class:`RVComp` items
"""
for test_comp in test_components:
if not self.contains(test_comp):
return False
return True;
def contains_any(self, test_components):
"""Return True if this RV contains any of **test_components**.
:param test_components: sequence of components whose presence is tested
:type test_components: sequence of :class:`RVComp` items
"""
for test_comp in test_components:
if self.contains(test_comp):
return True
return False
def contained_in(self, test_components):
"""Return True if sequence **test_components** contains all components
from this RV (and perhaps more).
:param test_components: set of components whose presence is checked
:type test_components: sequence of :class:`RVComp` items
"""
for component in self.components:
if component not in test_components:
return False
return True
def indexed_in(self, super_rv):
"""Return index array such that this rv is indexed in **super_rv**, which
must be a superset of this rv. Resulting array can be used with :func:`numpy.take`
and :func:`numpy.put`.
:param super_rv: returned indices apply to this rv
:type super_rv: :class:`RV`
:rtype: 1D :class:`numpy.ndarray` of ints with dimension = self.dimension
"""
ret = np.empty(self.dimension, dtype=int)
ret_ind = 0 # current index in returned index array
# process each component from target rv
for comp in self.components:
# find associated component in source_rv components:
src_ind = 0 # index in source vector
for source_comp in super_rv.components:
if source_comp is comp:
ret[ret_ind:] = np.arange(src_ind, src_ind + comp.dimension)
ret_ind += comp.dimension
break;
src_ind += source_comp.dimension
else:
raise AttributeError("Cannont find component "+str(comp)+" in source_rv.components.")
return ret
class CPdf(object):
r"""Base class for all Conditional (in general) Probability Density Functions.
When you evaluate a CPdf the result generally also depends on a condition
(vector) named `cond` in PyBayes. For a CPdf that is a :class:`Pdf` this is
not the case, the result is unconditional.
Every CPdf takes (apart from others) 2 optional arguments to constructor:
**rv** and **cond_rv**. (both :class:`RV` or a sequence of :class:`RVComp` objects) When
specified, they denote that the CPdf is associated with a particular random variable
(respectively its condition is associated with a particular random variable); when unspecified,
*anonymous* random variable is assumed (exceptions exist, see :class:`ProdPdf`).
It is an error to pass RV whose dimension is not same as CPdf's dimension
(or cond dimension respectively).
:var RV rv: associated random variable (always set in constructor, contains
at least one RVComp)
:var RV cond_rv: associated condition random variable (set in constructor to
potentially empty RV)
*While you can assign different rv and cond_rv to a CPdf, you should be
cautious because sanity checks are only performed in constructor.*
While entire idea of random variable associations may not be needed in simple
cases, it allows you to express more complicated situations. Assume the state
variable is composed of 2 components :math:`x_t = [a_t, b_t]` and following
probability density function has to be modelled:
.. math::
p(x_t|x_{t-1}) &= p_1(a_t|a_{t-1}, b_t) p_2(b_t|b_{t-1}) \\
p_1(a_t|a_{t-1}, b_t) &= \mathcal{N}(a_{t-1}, b_t) \\
p_2(b_t|b_{t-1}) &= \mathcal{N}(b_{t-1}, 0.0001)
This is done in PyBayes with associated RVs:
>>> a_t, b_t = RVComp(1, 'a_t'), RVComp(1, 'b_t') # create RV components
>>> a_tp, b_tp = RVComp(1, 'a_{t-1}'), RVComp(1, 'b_{t-1}') # t-1 case
>>> p1 = LinGaussCPdf(1., 0., 1., 0., [a_t], [a_tp, b_t])
>>> # params for p2:
>>> cov, A, b = np.array([[0.0001]]), np.array([[1.]]), np.array([0.])
>>> p2 = MLinGaussCPdf(cov, A, b, [b_t], [b_tp])
>>> p = ProdCPdf((p1, p2), [a_t, b_t], [a_tp, b_tp])
>>> p.sample(np.array([1., 2.]))
>>> p.eval_log()
"""
def shape(self):
"""Return pdf shape = number of dimensions of the random variable.
:meth:`mean` and :meth:`variance` methods must return arrays of this shape.
Default implementation (which you should not override) just returns
self.rv.dimension.
:rtype: int
"""
return self.rv.dimension
def cond_shape(self):
"""Return condition shape = number of dimensions of the conditioning variable.
Default implementation (which you should not override) just returns
self.cond_rv.dimension.
:rtype: int
"""
return self.cond_rv.dimension
def mean(self, cond = None):
"""Return (conditional) mean value of the pdf.
:rtype: :class:`numpy.ndarray`
"""
raise NotImplementedError("Derived classes must implement this function")
def variance(self, cond = None):
"""Return (conditional) variance (diagonal elements of covariance).
:rtype: :class:`numpy.ndarray`
"""
raise NotImplementedError("Derived classes must implement this function")
def eval_log(self, x, cond = None):
"""Return logarithm of (conditional) likelihood function in point x.
:param x: point which to evaluate the function in
:type x: :class:`numpy.ndarray`
:rtype: double
"""
raise NotImplementedError("Derived classes must implement this function")
def sample(self, cond = None):
"""Return one random (conditional) sample from this distribution
:rtype: :class:`numpy.ndarray`"""
raise NotImplementedError("Derived classes must implement this function")
def samples(self, n, cond = None):
"""Return n samples in an array. A convenience function that just calls
:meth:`shape` multiple times.
:param int n: number of samples to return
:rtype: 2D :class:`numpy.ndarray` of shape (*n*, m) where m is pdf
dimension"""
ret = np.empty((n, self.shape()))
for i in range(n):
ret[i] = self.sample(cond)
return ret
def _check_cond(self, cond):
"""Return True if cond has correct type and shape, raise Error otherwise.
:raises TypeError: cond is not of correct type
:raises ValueError: cond doesn't have appropriate shape
:rtype: bool
"""
if cond is None: # cython-specific
raise TypeError("cond must be numpy.ndarray")
if cond.ndim != 1:
raise ValueError("cond must be 1D numpy array (a vector)")
if cond.shape[0] != self.cond_shape():
raise ValueError("cond must be of shape ({0},) array of shape ({1},) given".format(self.cond_shape(), cond.shape[0]))
return True
def _check_x(self, x):
"""Return True if x has correct type and shape (determined by shape()),
raise Error otherwise.
:raises TypeError: cond is not of correct type
:raises ValueError: cond doesn't have appropriate shape
:rtype: bool
"""
if x is None: # cython-specific
raise TypeError("x must be numpy.ndarray")
if x.ndim != 1:
raise ValueError("x must be 1D numpy array (a vector)")
if x.shape[0] != self.shape():
raise ValueError("x must be of shape ({0},) array of shape ({1},) given".format(self.shape(), x.shape[0]))
return True
def _set_rvs(self, exp_shape, rv, exp_cond_shape, cond_rv):
"""Internal heper to check and set rv and cond_rv.
:param int exp_shape: expected random variable shape
:param rv: associated random variable or None to have it auto-created
:type rv: :class:`RV` or a sequence of :class:`RVComp` objects
:param int exp_cond_shape: expected conditioning variable shape
:param cond_rv: associated conditioning variable or None to have it auto-created
:type cond_rv: :class:`RV` or a sequence of :class:`RVComp` objects
:raises TypeError: rv or cond_rv doesnt have right type
:raises ValueError: dimensions do not match
"""
if rv is None:
self.rv = RV(RVComp(exp_shape)) # create RV with one anonymous component
else:
if isinstance(rv, RV):
self.rv = rv
else:
self.rv = RV(rv) # assume that rv is a sequence of components
if self.rv.dimension != exp_shape:
raise ValueError("rv has wrong dimension {0}, {1} expected".format(rv.dimension, exp_shape))
if cond_rv is None:
if exp_cond_shape is 0:
self.cond_rv = RV() # create empty RV to denote empty condition
else:
self.cond_rv = RV(RVComp(exp_cond_shape)) # create RV with one anonymous component
else:
if isinstance(cond_rv, RV):
self.cond_rv = cond_rv
else:
self.cond_rv = RV(cond_rv)
if self.cond_rv.dimension is not exp_cond_shape:
raise ValueError("cond_rv has wrong dimension {0}, {1} expected".format(cond_rv.dimension, exp_cond_shape))
return True
class Pdf(CPdf):
"""Base class for all unconditional (static) multivariate Probability Density
Functions. Subclass of :class:`CPdf`.
As in CPdf, constructor of every Pdf takes optional **rv** (:class:`RV`)
keyword argument (and no *cond_rv* argument as it would make no sense). For
discussion about associated random variables see :class:`CPdf`.
"""
def cond_shape(self):
"""Return zero as Pdfs have no condition."""
return 0
def _set_rv(self, exp_shape, rv):
"""Internal helper - shortcut to :meth:`~CPdf._set_rvs`"""
return self._set_rvs(exp_shape, rv, 0, None)
class UniPdf(Pdf):
r"""Simple uniform multivariate probability density function. Extends
:class:`Pdf`.
.. math:: f(x) = \Theta(x - a) \Theta(b - x) \prod_{i=1}^n \frac{1}{b_i-a_i}
:var a: left border
:type a: :class:`numpy.ndarray`
:var b: right border
:type b: :class:`numpy.ndarray`
You may modify these attributes as long as you don't change their shape and
assumption **a** < **b** still holds.
"""
def __init__(self, a, b, rv = None):
"""Initialise uniform distribution.
:param a: left border
:type a: :class:`numpy.ndarray`
:param b: right border
:type b: :class:`numpy.ndarray`
**b** must be greater (in each dimension) than **a**
"""
self.a = np.asarray(a)
self.b = np.asarray(b)
if a.ndim != 1 or b.ndim != 1:
raise ValueError("both a and b must be 1D numpy arrays (vectors)")
if a.shape[0] != b.shape[0]:
raise ValueError("a must have same shape as b")
if np.any(self.b <= self.a):
raise ValueError("b must be greater than a in each dimension")
self._set_rv(a.shape[0], rv)
def mean(self, cond = None):
return (self.a+self.b)/2. # element-wise division
def variance(self, cond = None):
return ((self.b-self.a)**2)/12. # element-wise power and division
def eval_log(self, x, cond = None):
self._check_x(x)
if np.any(x <= self.a) or np.any(x >= self.b):
return float('-inf')
return -math.log(np.prod(self.b-self.a))
def sample(self, cond = None):
return random.uniform(-0.5, 0.5, self.shape()) * (self.b-self.a) + self.mean()
class AbstractGaussPdf(Pdf):
r"""Abstract base for all Gaussian-like pdfs - the ones that take vector mean
and matrix covariance parameters. Extends :class:`Pdf`.
:var mu: mean value
:type mu: 1D :class:`numpy.ndarray`
:var R: covariance matrix
:type R: 2D :class:`numpy.ndarray`
You can modify object parameters only if you are absolutely sure that you
pass allowable values - parameters are only checked once in constructor.
"""
def __copy__(self):
"""Make a shallow copy of AbstractGaussPdf (or its derivative provided
that is doesn't add class variables)"""
# we cannont use AbstractGaussPdf statically - this method may be called
# by derived class
ret = type(self).__new__(type(self)) # optimisation TODO: currently slower than PY_NEW()
ret.mu = self.mu
ret.R = self.R
ret.rv = self.rv
ret.cond_rv = self.cond_rv
return ret
def __deepcopy__(self, memo):
"""Make a deep copy of AbstractGaussPdf (or its derivative provided
that is doesn't add class variables)"""
# we cannont use AbstractGaussPdf statically - this method may be called
# by derived class
ret = type(self).__new__(type(self)) # optimisation TODO: currently slower than PY_NEW()
ret.mu = deepcopy(self.mu, memo)
ret.R = deepcopy(self.R, memo)
ret.rv = deepcopy(self.rv, memo)
ret.cond_rv = deepcopy(self.cond_rv, memo)
return ret
class GaussPdf(AbstractGaussPdf):
r"""Unconditional Gaussian (normal) probability density function. Extends
:class:`AbstractGaussPdf`.
.. math:: f(x) \propto \exp \left( - \left( x-\mu \right)' R^{-1} \left( x-\mu \right) \right)
"""
def __init__(self, mean, cov, rv = None):
r"""Initialise Gaussian pdf.
:param mean: mean value; stored in **mu** attribute
:type mean: 1D :class:`numpy.ndarray`
:param cov: covariance matrix; stored in **R** arrtibute
:type cov: 2D :class:`numpy.ndarray`
Covariance matrix **cov** must be *positive definite*. This is not checked during
initialisation; it fail or give incorrect results in :meth:`~CPdf.eval_log` or
:meth:`~CPdf.sample`. To create standard normal distribution:
>>> # note that cov is a matrix because of the double [[ and ]]
>>> norm = GaussPdf(np.array([0.]), np.array([[1.]]))
"""
if not isinstance(mean, np.ndarray):
raise TypeError("mean must be numpy.ndarray")
if not isinstance(cov, np.ndarray):
raise TypeError("cov must be numpy.ndarray")
if mean.ndim != 1:
raise ValueError("mean must be one-dimensional (" + str(mean.ndim) + " dimensions encountered)")
n = mean.shape[0]
if cov.ndim != 2:
raise ValueError("cov must be two-dimensional")
if cov.shape[0] != n or cov.shape[1] != n:
raise ValueError("cov must have shape (" + str(n) + ", " + str(n) + "), " +
str(cov.shape) + " given")
if np.any(cov != cov.T):
raise ValueError("cov must be symmetric (complex covariance not supported)")
self.mu = mean
self.R = cov
self._set_rv(mean.shape[0], rv)
def __str__(self):
return "<pybayes.pdfs.GaussPdf mu={0} R={1}>".format(self.mu, self.R)
def mean(self, cond = None):
return self.mu
def variance(self, cond = None):
return np.diag(self.R)
def eval_log(self, x, cond = None):
self._check_x(x)
# compute logarithm of normalization constant (can be cached somehow in future)
# log(2*Pi) = 1.83787706640935
# we ignore sign (first part of slogdet return value) as it must be positive
log_norm = -1/2. * (self.mu.shape[0]*1.83787706640935 + linalg.slogdet(self.R)[1])
# part that actually depends on x
log_val = -1/2. * np.dotvv(x - self.mu, np.dot(linalg.inv(self.R), x - self.mu))
return log_norm + log_val # = log(norm*val)
def sample(self, cond = None):
# in univariate case, random.normal() can be used directly:
if self.mu.shape[0] == 1:
return random.normal(loc=self.mu[0], scale=math.sqrt(self.R[0,0]), size=1)
z = random.normal(size=self.mu.shape[0]);
# NumPy's cholesky(R) is equivalent to Matlab's chol(R).transpose()
return self.mu + np.dot(linalg.cholesky(self.R), z);
class LogNormPdf(AbstractGaussPdf):
r"""Unconditional log-normal probability density function. Extends
:class:`AbstractGaussPdf`.
More precisely, the density of random variable :math:`Y` where
:math:`Y = exp(X); ~ X \sim \mathcal{N}(\mu, R)`
"""
def __init__(self, mean, cov, rv = None):
r"""Initialise log-normal pdf.
:param mean: mean value of the **logarithm** of the associated random variable
:type mean: 1D :class:`numpy.ndarray`
:param cov: covariance matrix of the **logarithm** of the associated random variable
:type cov: 2D :class:`numpy.ndarray`
A current limitation is that LogNormPdf is only univariate. To create
standard log-normal distribution:
>>> lognorm = LogNormPdf(np.array([0.]), np.array([[1.]])) # note the shape of covariance
"""
if not isinstance(mean, np.ndarray):
raise TypeError("mean must be numpy.ndarray")
if not isinstance(cov, np.ndarray):
raise TypeError("cov must be numpy.ndarray")
if mean.ndim != 1:
raise ValueError("mean must be one-dimensional (" + str(mean.ndim) + " dimensions encountered)")
n = mean.shape[0]
if n != 1:
raise ValueError("LogNormPdf is currently limited to univariate random variables")
if cov.ndim != 2:
raise ValueError("cov must be two-dimensional")
if cov.shape[0] != n or cov.shape[1] != n:
raise ValueError("cov must have shape (" + str(n) + ", " + str(n) + "), " +
str(cov.shape) + " given")
if cov[0,0] <= 0.:
raise ValueError("cov must be positive")
self.mu = mean
self.R = cov
self._set_rv(1, rv)
def mean(self, cond = None):
return np.exp(self.mu + self.R[0]/2.)
def variance(self, cond = None):
return (np.exp(self.R[0])[0] - 1.)*np.exp(2*self.mu + self.R[0])
def eval_log(self, x, cond = None):
self._check_x(x)
if x[0] <= 0: # log-normal pdf support = (0, +inf)
return float('-inf')
# 1/2.*log(2*pi) = 0.91893853320467
return -((math.log(x[0]) - self.mu[0])**2)/(2.*self.R[0,0]) - math.log(x[0]*math.sqrt(self.R[0,0])) - 0.91893853320467
def sample(self, cond = None):
# size parameter ( = 1) makes lognormal() return a np.ndarray
return random.lognormal(self.mu[0], math.sqrt(self.R[0,0]), 1)
class AbstractEmpPdf(Pdf):
r"""An abstraction of empirical probability density functions that provides common methods such
as weight normalisation. Extends :class:`Pdf`.
:var numpy.ndarray weights: 1D array of particle weights
:math:`\omega_i >= 0 \forall i; \quad \sum \omega_i = 1`
"""
def normalise_weights(self):
r"""Multiply weights by appropriate constant so that :math:`\sum \omega_i = 1`
:raise AttributeError: when :math:`\exists i: \omega_i < 0` or
:math:`\forall i: \omega_i = 0`
"""
if np.any(self.weights < 0.):
raise AttributeError("Weights must not be negative")
wsum = sum(self.weights)
if wsum == 0:
raise AttributeError("Sum of weights == 0: weights cannot be normalised")
self.weights *= 1./wsum
return True
def get_resample_indices(self):
r"""Calculate first step of resampling process (dropping low-weight particles and replacing
them with more weighted ones.
:return: integer array of length n: :math:`(a_1, a_2 \dots a_n)` where :math:`a_i` means
that particle at ith place should be replaced with particle number :math:`a_i`
:rtype: :class:`numpy.ndarray` of ints
*This method doesnt modify underlying pdf in any way - it merely calculates how particles
should be replaced.*
"""
n = self.weights.shape[0]
cum_weights = np.cumsum(self.weights)
u = (np.arange(n, dtype=float) + random.uniform()) / n
# u[i] = (i + fuzz) / n
# calculate number of babies for each particle
baby_indeces = np.zeros(n, dtype=int) # index array: a[i] contains index of
# original particle that should be at i-th place in new particle array
j = 0
for i in range(n):
while u[i] > cum_weights[j]:
j += 1
baby_indeces[i] = j
return baby_indeces
class EmpPdf(AbstractEmpPdf):
r"""Weighted empirical probability density function. Extends :class:`AbstractEmpPdf`.
.. math::
p(x) &= \sum_{i=1}^n \omega_i \delta(x - x^{(i)}) \\
\text{where} \quad x^{(i)} &\text{ is value of the i}^{th} \text{ particle} \\
\omega_i \geq 0 &\text{ is weight of the i}^{th} \text{ particle} \quad \sum \omega_i = 1
:var numpy.ndarray particles: 2D array of particles; shape: (n, m) where n
is the number of particles, m dimension of this pdf
You may alter particles and weights, but you must ensure that their shapes
match and that weight constraints still hold. You can use
:meth:`~AbstractEmpPdf.normalise_weights` to do some work for you.
"""
def __init__(self, init_particles, rv = None):
r"""Initialise empirical pdf.
:param init_particles: 2D array of initial particles; shape (*n*, *m*)
determines that *n* *m*-dimensioned particles will be used. *Warning:
EmpPdf does not copy the particles - it rather uses passed array
through its lifetime, so it is not safe to reuse it for other
purposes.*
:type init_particles: :class:`numpy.ndarray`
"""
if not isinstance(init_particles, np.ndarray) or init_particles.ndim != 2:
raise TypeError("init_particles must be 2D numpy.ndarray")
self.particles = init_particles
# set n weights to 1/n
self.weights = np.ones(self.particles.shape[0]) / self.particles.shape[0]
self._set_rv(init_particles.shape[1], rv)
def mean(self, cond = None):
ret = np.zeros(self.particles.shape[1])
for i in range(self.particles.shape[0]):
ret += self.weights[i] * self.particles[i]
return ret
def variance(self, cond = None):
ret = np.zeros(self.particles.shape[1])
for i in range(self.particles.shape[0]):
ret += self.weights[i] * self.particles[i]**2
return ret - self.mean()**2
def eval_log(self, x, cond = None):
raise NotImplementedError("eval_log doesn't make sense for discrete distribution")
def sample(self, cond = None):
raise NotImplementedError("Sample for empirical pdf not (yet?) implemented")
def resample(self):
"""Drop low-weight particles, replace them with copies of more weighted
ones. Also reset weights to uniform."""
self.particles = self.particles[self.get_resample_indices()]
self.weights[:] = 1./self.weights.shape[0]
return True
class MarginalizedEmpPdf(AbstractEmpPdf):
r"""An extension to empirical pdf (:class:`EmpPdf`) used as posterior density by
:class:`~pybayes.filters.MarginalizedParticleFilter`. Extends :class:`AbstractEmpPdf`.
Assume that random variable :math:`x` can be divided into 2 independent
parts :math:`x = [a, b]`, then probability density function can be written as
.. math::
p(a, b) &= \sum_{i=1}^n \omega_i \Big[ \mathcal{N}\left(\hat{a}^{(i)}, P^{(i)}\right) \Big]_a
\delta(b - b^{(i)}) \\
\text{where } \quad \hat{a}^{(i)} &\text{ and } P^{(i)} \text{ is mean and
covariance of i}^{th} \text{ gauss pdf} \\
b^{(i)} &\text{ is value of the (second part of the) i}^{th} \text{ particle} \\
\omega_i \geq 0 &\text{ is weight of the i}^{th} \text{ particle} \quad \sum \omega_i = 1
:var numpy.ndarray gausses: 1D array that holds :class:`GaussPdf`
for each particle; shape: (n) where n is the number of particles
:var numpy.ndarray particles: 2D array of particles; shape: (n, m) where n
is the number of particles, m dimension of the "empirical" part of random variable
You may alter particles and weights, but you must ensure that their shapes
match and that weight constraints still hold. You can use
:meth:`~AbstractEmpPdf.normalise_weights` to do some work for you.
*Note: this pdf could have been coded as ProdPdf of EmpPdf and a mixture of GaussPdfs. However
it is implemented explicitly for simplicity and speed reasons.*
"""
def __init__(self, init_gausses, init_particles, rv = None):
r"""Initialise marginalized empirical pdf.
:param init_gausses: 1D array of :class:`GaussPdf` objects, all must have
the dimension
:type init_gausses: :class:`numpy.ndarray`
:param init_particles: 2D array of initial particles; shape (*n*, *m*)
determines that *n* particles whose *empirical* part will have dimension *m*
:type init_particles: :class:`numpy.ndarray`
*Warning: MarginalizedEmpPdf does not copy the particles - it rather uses
both passed arrays through its lifetime, so it is not safe to reuse them
for other purposes.*
"""
if not isinstance(init_gausses, np.ndarray) or init_gausses.ndim != 1:
raise TypeError("init_gausses must be 1D numpy.ndarray")
if not isinstance(init_particles, np.ndarray) or init_particles.ndim != 2:
raise TypeError("init_particles must be 2D numpy.ndarray")
if init_gausses.shape[0] != init_particles.shape[0] or init_gausses.shape[0] < 1:
raise ValueError("init_gausses count must be same as init_particles count and both must be positive")
gauss_shape = 0
for gauss in init_gausses:
if not isinstance(gauss, GaussPdf):
raise TypeError("all init_gausses items must be (subclasses of) GaussPdf")
if gauss_shape == 0:
gauss_shape = gauss.shape() # guaranteed to be non-zero
elif gauss.shape() != gauss_shape:
raise ValueError("all init_gausses items must have same shape")
self.gausses = init_gausses
self.particles = init_particles
# set n weights to 1/n
self.weights = np.ones(self.particles.shape[0]) / self.particles.shape[0]
self._gauss_shape = self.gausses[0].shape() # shape of the gaussian component
part_shape = self.particles.shape[1] # shape of the empirical component
self._set_rv(self._gauss_shape + part_shape, rv)
def mean(self, cond = None):
ret = np.zeros(self.shape())
temp = np.empty(self.shape())
for i in range(self.particles.shape[0]):
temp[0:self._gauss_shape] = self.gausses[i].mean()
temp[self._gauss_shape:] = self.particles[i]
ret += self.weights[i] * temp
return ret
def variance(self, cond = None):
# first, compute 2nd non-central moment
mom2 = np.zeros(self.shape())
temp = np.empty(self.shape())
for i in range(self.particles.shape[0]):
# set gauss part of temp to \mu_i^2 + \sigma_i^2
temp[0:self._gauss_shape] = self.gausses[i].mean()**2 + self.gausses[i].variance()**2
# set empirical part of temp to x_i^2
temp[self._gauss_shape:] = self.particles[i]**2
# finaly scale by \omega_i and add to 2nd non-central moment we are computing
mom2 += self.weights[i] * temp # cython limitation: cannot compile: array_a[0:n] += array_b
# return 2nd central moment by subtracting square of mean value
return mom2 - self.mean()**2
def eval_log(self, x, cond = None):
raise NotImplementedError("eval_log doesn't make sense for (partially) discrete distribution")
def sample(self, cond = None):
raise NotImplementedError("Drawing samples from MarginalizesEmpPdf is not supported")
class ProdPdf(Pdf):
r"""Unconditional product of multiple unconditional pdfs.
You can for example create a pdf that has uniform distribution with regards
to x-axis and normal distribution along y-axis. The caller (you) must ensure
that individial random variables are independent, otherwise their product may
have no mathematical sense. Extends :class:`Pdf`.
.. math:: f(x_1 x_2 x_3) = f_1(x_1) f_2(x_2) f_3(x_3)
"""
def __init__(self, factors, rv = None):
r"""Initialise product of unconditional pdfs.
:param factors: sequence of sub-distributions
:type factors: sequence of :class:`Pdf`
As an exception from the general rule, ProdPdf does not create anonymous
associated random variable if you do not supply it in constructor - it
rather reuses components of underlying factor pdfs. (You can of course
override this behaviour by bassing custom **rv**.)
Usual way of creating ProdPdf could be:
>>> prod = ProdPdf((UniPdf(...), GaussPdf(...))) # note the double (( and ))
"""
if rv is None:
rv_comps = [] # prepare to construnct associated rv
else:
rv_comps = None
if len(factors) is 0:
raise ValueError("at least one factor must be passed")
self.factors = np.array(factors, dtype=Pdf)
self.shapes = np.zeros(self.factors.shape[0], dtype=int) # array of factor shapes
for i in range(self.factors.shape[0]):
if not isinstance(self.factors[i], Pdf):
raise TypeError("all records in factors must be (subclasses of) Pdf")
self.shapes[i] = self.factors[i].shape()
if rv_comps is not None:
rv_comps.extend(self.factors[i].rv.components) # add components of child rvs
# pre-calclate shape
shape = sum(self.shapes)
# associate with a rv (needs to be after _shape calculation)
if rv_comps is None:
self._set_rv(shape, rv)
else:
self._set_rv(shape, RV(*rv_comps))
def mean(self, cond = None):
curr = 0
ret = np.zeros(self.shape())
for i in range(self.factors.shape[0]):
ret[curr:curr + self.shapes[i]] = self.factors[i].mean()
curr += self.shapes[i]
return ret;
def variance(self, cond = None):
curr = 0
ret = np.zeros(self.shape())
for i in range(self.factors.shape[0]):
ret[curr:curr + self.shapes[i]] = self.factors[i].variance()
curr += self.shapes[i]
return ret;
def eval_log(self, x, cond = None):
self._check_x(x)
curr = 0
ret = 0. # 1 is neutral element in multiplication; log(1) = 0
for i in range(self.factors.shape[0]):
ret += self.factors[i].eval_log(x[curr:curr + self.shapes[i]]) # log(x*y) = log(x) + log(y)
curr += self.shapes[i]
return ret;
def sample(self, cond = None):
curr = 0
ret = np.zeros(self.shape())
for i in range(self.factors.shape[0]):
ret[curr:curr + self.shapes[i]] = self.factors[i].sample()
curr += self.shapes[i]
return ret;
class MLinGaussCPdf(CPdf):
r"""Conditional Gaussian pdf whose mean is a linear function of condition.
Extends :class:`CPdf`.
.. math::
f(x|c) \propto \exp \left( - \left( x-\mu \right)' R^{-1} \left( x-\mu \right) \right)
\quad \quad \text{where} ~ \mu = A c + b
"""
def __init__(self, cov, A, b, rv = None, cond_rv = None, base_class = None):
r"""Initialise Mean-Linear Gaussian conditional pdf.
:param cov: covariance of underlying Gaussian pdf
:type cov: 2D :class:`numpy.ndarray`
:param A: given condition :math:`c`, :math:`\mu = Ac + b`
:type A: 2D :class:`numpy.ndarray`
:param b: see above
:type b: 1D :class:`numpy.ndarray`
:param class base_class: class whose instance is created as a base pdf for this
cpdf. Must be a subclass of :class:`AbstractGaussPdf` and the default is
:class:`GaussPdf`. One alternative is :class:`LogNormPdf` for example.
"""
if base_class is None:
self.gauss = GaussPdf(np.zeros(cov.shape[0]), cov)
else:
if not issubclass(base_class, AbstractGaussPdf):
raise TypeError("base_class must be a class (not an instance) and subclass of AbstractGaussPdf")
self.gauss = base_class(np.zeros(cov.shape[0]), cov)
self.A = np.asarray(A)
self.b = np.asarray(b)
if self.A.ndim != 2:
raise ValueError("A must be 2D numpy.ndarray (matrix)")
if self.b.ndim != 1:
raise ValueError("b must be 1D numpy.ndarray (vector)")
if self.b.shape[0] != self.gauss.shape():
raise ValueError("b must have same number of cols as covariance")
if self.A.shape[0] != self.b.shape[0]:
raise ValueError("A must have same number of rows as covariance")
self._set_rvs(self.b.shape[0], rv, self.A.shape[1], cond_rv)
def mean(self, cond = None):
# note: it may not be true that gauss.mu == gauss.mean() for all AbstractGaussPdf
# classes. One such example is LogNormPdf
self._set_mean(cond)
return self.gauss.mean()
def variance(self, cond = None):
# note: for some AbstractGaussPdf variance may depend on mu
self._set_mean(cond)
return self.gauss.variance()
def eval_log(self, x, cond = None):
# x is checked in self.gauss
self._set_mean(cond)
return self.gauss.eval_log(x)
def sample(self, cond = None):
self._set_mean(cond)
return self.gauss.sample()
def _set_mean(self, cond):