-
Notifications
You must be signed in to change notification settings - Fork 1
/
base.py
117 lines (90 loc) · 3.63 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
def index_to_dict(idx):
if isinstance(idx, pd.MultiIndex):
return {i.name: list(i) for i in idx.levels}
return {idx.name: list(idx)}
def remove_none_values_from_dict(dict_):
"""Remove none values, like `None` and `np.nan` from the dict."""
def t(x):
return (x is None) or (isinstance(x, float) and np.isnan(x))
result = {k: v for k, v in dict_.items() if not t(v)}
return result
# ------------------------------------------------------------------------------
# ProbabilisticModel
# ------------------------------------------------------------------------------
class ProbabilisticModel(object):
@classmethod
def parse_query_string(cls, query_string):
"""Parse a query string into a tuple of query_dist, query_values,
evidence_dist, evidence_values.
The query P(I,G=g1|D,L=l0) would imply:
query_dist = ('I',)
query_values = {'G': 'g1'}
evidence_dist = ('D',)
evidence_values = {'L': 'l0'}
"""
def split(s):
dist, values = [], {}
params = []
if s:
params = s.split(',')
for p in params:
if '=' in p:
key, value = p.split('=')
values[key] = value
else:
dist.append(p)
return dist, values
query_str, given_str = query_string, ''
if '|' in query_str:
query_str, given_str = query_string.split('|')
return split(query_str) + split(given_str)
@classmethod
def create_query_string(cls, qd=None, qv=None, ed=None, ev=None):
"""Generate a query string."""
qd_str = ','.join(qd) if qd else ''
qv_str = ','.join([f'{k}={v}' for k, v in qv.items()]) if qv else ''
ed_str = ','.join(ed) if ed else ''
ev_str = ','.join([f'{k}={v}' for k, v in ev.items()]) if ev else ''
Q = ','.join([q for q in [qd_str, qv_str] if q])
E = ','.join([e for e in [ed_str, ev_str] if e])
return '|'.join([p for p in [Q, E] if p])
def compute_posterior(self, qd, qv, ed, ev):
"""Compute the (posterior) probability of query given evidence.
The query P(I,G=g1|D,L=l0) would imply:
qd = ['I']
qv = {'G': 'g1'}
ed = ['D']
ev = {'L': 'l0'}
Args:
qd (list): query distributions: RVs to query
qv (dict): query values: RV-values to extract
ed (list): evidence distributions: coniditioning RVs to include
ev (dict): evidence values: values to set as evidence.
Returns:
CPT
"""
raise NotImplementedError
def P(self, query_string):
"""Return the probability as queried by query_string.
P('I,G=g1|D,L=l0') is equivalent to calling compute_posterior with:
query_dist = ('I',)
query_values = {'G': 'g1'}
evidence_dist = ('D',)
evidence_values = {'L': 'l0'}
"""
qd, qv, gd, gv = self.parse_query_string(query_string)
return self.compute_posterior(qd, qv, gd, gv)
# def MAP(self, query_dist, evidence_values, include_probability=True):
# """Perform a Maximum a Posteriori query."""
# d = self.compute_posterior(query_dist, {}, [], evidence_values)
# evidence_vars = [e for e in evidence_values.keys() if e in d.scope]
#
# d = d.droplevel(evidence_vars)
#
# if include_probability:
# return d.idxmax(), d.max()
#
# return d.idxmax()